1D adv-diff: LSPG with nonlinear manifold projection via kPCA

Overview

This demo solves the same problem as the one here, but instead of using POD modes, we show here how to use a nonlinear manifold computed via kernel PCA.

Main function

The main function of the demo is the following:

logger.initialize(logger.logto.terminal)
logger.setVerbosity([logger.loglevel.info])

# create fom object
fomObj = AdvDiff1d(nGrid=120, adv_coef=2.0)

# the final time to integrate to
finalTime = .05

#--- 1. FOM ---#
fomTimeStepSize  = 1e-5
fomNumberOfSteps = int(finalTime/fomTimeStepSize)
sampleEvery      = 200
[fomFinalState, snapshots] = doFom(fomObj, fomTimeStepSize, fomNumberOfSteps, sampleEvery)

#--- 2. train a nonlinear mapping using kPCA ---#
# here we use 3 modes, change this to try different modes
myNonLinearMapper = MyMapperKPCA(snapshots.T, numModes=3)

#--- 3. LSPG ROM ---#
romTimeStepSize  = 3e-4
romNumberOfSteps = int(finalTime/romTimeStepSize)
approximatedState = runLspg(fomObj, romTimeStepSize, romNumberOfSteps, myNonLinearMapper)

# compute l2-error between fom and approximate state
fomNorm = linalg.norm(fomFinalState)
err = linalg.norm(fomFinalState-approximatedState)
print("Final state relative l2 error: {}".format(err/fomNorm))

logger.finalize()

1. Run FOM and collect snapshots

This step is the same as described here,

2. Setup and train the nonlinear kPCA mapper

It is important to note that while the mapper class below has the API required by pressio4py, it can encapsulate any arbitrary mapping function. In this case we show how to create a kPCA-based representation, but one can use, e.g., autoencoder, and any other types of mapping. This is how we enable support for testing various methods.

class MyMapperKPCA:
  def __init__(self, snapshots, numModes):
    self.transformer_ = skd.KernelPCA(n_components=numModes,\
                                      kernel='poly',
                                      degree=3,
                                      fit_inverse_transform=True)
    # do training using provided snapshots
    self.transformer_.fit(snapshots)

    self.numModes_ = numModes
    fomSize = snapshots.shape[1]
    self.fomState0 = np.zeros(fomSize)
    self.fomState1 = np.zeros(fomSize)
    # attention: the jacobian of the mapping must be column-major oder
    # so that pressio can view it without deep copying it, this enables
    # to keep only one jacobian object around and to call the update
    # method below correctly
    self.jacobian_ = np.zeros((fomSize,numModes), order='F')

  def numModes(self): return self.numModes_

  def jacobian(self): return self.jacobian_

  def applyMapping(self, romState, fomState):
    fomState[:] = np.squeeze(self.transformer_.inverse_transform(romState.reshape(1,-1)))

  def applyInverseMapping(self, romState, fomState):
    romState[:] = np.squeeze(self.transformer_.transform(fomState.reshape(1,-1)))

  def updateJacobian(self, romState):
    romStateLocal = romState.copy()
    # finite difference to approximate jacobian of the mapping
    self.applyMapping(romStateLocal,self.fomState0)
    eps = 0.001
    for i in range(self.numModes_):
        romStateLocal[i] += eps
        self.applyMapping(romStateLocal, self.fomState1)
        self.jacobian_[:,i] = (self.fomState1 - self.fomState0) / eps
        romStateLocal[i] -= eps

3. Construct and run LSPG

def runLspg(fomObj, dt, nsteps, customMapper):
  # this is an auxiliary class that can be passed to solve
  # LSPG to monitor the rom state.
  class RomStateObserver:
    def __call__(self, timeStep, time, state): pass

  # this linear solver is used at each gauss-newton iteration
  class MyLinSolver:
    def solve(self, A,b,x):
      lumat, piv, info = linalg.lapack.dgetrf(A, overwrite_a=True)
      x[:], info = linalg.lapack.dgetrs(lumat, piv, b, 0, 0)

  #----------------------------------------
  # create a custom decoder using the mapper passed as argument
  customDecoder = rom.Decoder(customMapper, "kPCAMapper")

  # fom reference state: here it is zero
  fomReferenceState = np.zeros(fomObj.nGrid)

  # create ROM state by projecting the fom initial condition
  fomInitialState = fomObj.u0.copy()
  romState = np.zeros(customMapper.numModes())
  customMapper.applyInverseMapping(romState, fomInitialState)

  # create LSPG problem
  scheme = ode.stepscheme.BDF1
  problem = rom.lspg.unsteady.DefaultProblem(scheme, fomObj, customDecoder, romState, fomReferenceState)

  # create the Gauss-Newton solver
  nonLinSolver = solvers.create_gauss_newton(problem, romState, MyLinSolver())
  # set tolerance and convergence criteria
  nlsTol, nlsMaxIt = 1e-7, 10
  nonLinSolver.setMaxIterations(nlsMaxIt)
  nonLinSolver.setStoppingCriterion(solvers.stop.WhenCorrectionAbsoluteNormBelowTolerance)

  # create object to monitor the romState at every iteration
  myObs = RomStateObserver()
  # solve problem
  ode.advance_n_steps_and_observe(problem, romState, 0., dt, nsteps, myObs, nonLinSolver)

  # after we are done, use the reconstructor object to reconstruct the fom state
  # get the reconstructor object: this allows to map romState to fomState
  fomRecon = problem.fomStateReconstructor()
  return fomRecon(romState)

Results

If everything works fine, the following plot shows the result.

Image