from __future__ import annotations
from dataclasses import dataclass
from typing import Iterable, List, Optional, Sequence
import numpy as np
[docs]
@dataclass
class GaussianProcessKernel:
length_scale: float = 1.0
signal_variance: float = 1.0
def __call__(self, x1: np.ndarray, x2: np.ndarray) -> np.ndarray:
x1 = np.atleast_2d(x1)
x2 = np.atleast_2d(x2)
x1_sq = np.sum(x1 * x1, axis=1)[:, None]
x2_sq = np.sum(x2 * x2, axis=1)[None, :]
sq_dist = x1_sq + x2_sq - 2.0 * x1 @ x2.T
return self.signal_variance * np.exp(-0.5 * sq_dist / (self.length_scale ** 2))
class GaussianProcessRegressorLite:
def __init__(self,
kernel: Optional[GaussianProcessKernel] = None,
noise_variance: float = 1e-10,
jitter: float = 1e-12) -> None:
self.kernel = kernel if kernel is not None else GaussianProcessKernel()
self.noise_variance = noise_variance
self.jitter = jitter
self._x_train: Optional[np.ndarray] = None
self._alpha: Optional[np.ndarray] = None
self._chol: Optional[np.ndarray] = None
def fit(self, x_train: np.ndarray, y_train: np.ndarray) -> None:
x_train = np.asarray(x_train, dtype=float)
y_train = np.asarray(y_train, dtype=float).reshape(-1, 1)
kxx = self.kernel(x_train, x_train)
kxx = kxx + (self.noise_variance + self.jitter) * np.eye(kxx.shape[0])
self._chol = np.linalg.cholesky(kxx)
self._alpha = np.linalg.solve(self._chol.T, np.linalg.solve(self._chol, y_train))
self._x_train = x_train
def predict(self, x_query: np.ndarray) -> np.ndarray:
if self._x_train is None or self._alpha is None:
raise RuntimeError("GaussianProcessRegressorLite has not been fit yet.")
x_query = np.asarray(x_query, dtype=float)
kx = self.kernel(x_query, self._x_train)
y_mean = kx @ self._alpha
return y_mean.ravel()
@staticmethod
def log_marginal_likelihood(x_train: np.ndarray,
y_train: np.ndarray,
kernel: GaussianProcessKernel,
noise_variance: float,
jitter: float = 1e-12) -> float:
x_train = np.asarray(x_train, dtype=float)
y_train = np.asarray(y_train, dtype=float).reshape(-1, 1)
kxx = kernel(x_train, x_train)
kxx = kxx + (noise_variance + jitter) * np.eye(kxx.shape[0])
try:
chol = np.linalg.cholesky(kxx)
except np.linalg.LinAlgError:
return float("-inf")
alpha = np.linalg.solve(chol.T, np.linalg.solve(chol, y_train))
log_det = np.sum(np.log(np.diag(chol)))
n = y_train.shape[0]
lml = -0.5 * float(y_train.T @ alpha) - log_det - 0.5 * n * np.log(2.0 * np.pi)
return lml
[docs]
class GaussianProcessQoiModel:
"""
Gaussian Process QoI surrogate model that follows the QoiModel API.
For vector QoIs, a POD is performed and a GP is trained for each
reduced coefficient.
"""
def __init__(self,
parameters: np.ndarray,
qois: np.ndarray,
parameter_names: Optional[Sequence[str]] = None,
pod_energy_fraction: float = 0.999999,
max_pod_modes: Optional[int] = None,
kernel: Optional[GaussianProcessKernel] = None,
noise_variance: Optional[float] = None,
auto_noise_variance: bool = False,
noise_variance_fraction: float = 1e-6,
tune_hyperparameters: bool = False,
length_scale_grid: Optional[Sequence[float]] = None,
signal_variance_grid: Optional[Sequence[float]] = None,
normalize_parameters: bool = False,
normalize_targets: bool = False) -> None:
self.parameter_names = list(parameter_names) if parameter_names is not None else None
self.parameters = np.asarray(parameters, dtype=float)
self.qois = np.asarray(qois, dtype=float)
self.pod_energy_fraction = pod_energy_fraction
self.max_pod_modes = max_pod_modes
self.kernel = kernel if kernel is not None else GaussianProcessKernel()
self.noise_variance = noise_variance
self.auto_noise_variance = auto_noise_variance
self.noise_variance_fraction = noise_variance_fraction
self.tune_hyperparameters = tune_hyperparameters
self.length_scale_grid = length_scale_grid
self.signal_variance_grid = signal_variance_grid
self.normalize_parameters = normalize_parameters
self.normalize_targets = normalize_targets
self._mean_qoi: Optional[np.ndarray] = None
self._pod_modes: Optional[np.ndarray] = None
self._gps: List[GaussianProcessRegressorLite] = []
self._param_min: Optional[np.ndarray] = None
self._param_scale: Optional[np.ndarray] = None
self._scaled_parameters: Optional[np.ndarray] = None
self._target_min: Optional[np.ndarray] = None
self._target_scale: Optional[np.ndarray] = None
self._fit()
def populate_run_directory(self, run_directory: str, parameter_sample: dict) -> None:
return None
def run_model(self, run_directory: str, parameter_sample: dict) -> int:
return 0
def compute_qoi(self, run_directory: str, parameter_sample: dict) -> np.ndarray:
x = self._parameter_sample_to_array(parameter_sample)
coeffs = np.array([gp.predict(x[None, :])[0] for gp in self._gps], dtype=float)
coeffs = self._unscale_targets(coeffs)
if self._pod_modes is None:
return np.asarray([coeffs[0]])
qoi = self._mean_qoi + self._pod_modes @ coeffs
return np.asarray(qoi).ravel()
def _parameter_sample_to_array(self, parameter_sample: dict) -> np.ndarray:
if isinstance(parameter_sample, dict):
if self.parameter_names is None:
raise ValueError("parameter_names must be provided to map dict inputs.")
params = np.array([parameter_sample[name] for name in self.parameter_names], dtype=float)
else:
params = np.asarray(parameter_sample, dtype=float).ravel()
return self._scale_parameters(params)
def _fit(self) -> None:
self._initialize_parameter_scaling()
parameters = self._scaled_parameters if self._scaled_parameters is not None else self.parameters
if self.qois.ndim == 1 or self.qois.shape[1] == 1:
y = self.qois.reshape(-1, 1)
self._mean_qoi = np.zeros(1, dtype=float)
self._pod_modes = None
y_scaled = self._scale_targets(y.ravel())
noise_variance = self._resolve_noise_variance(y_scaled)
kernel = self._tuned_kernel(parameters, y_scaled, noise_variance)
gp = GaussianProcessRegressorLite(kernel=kernel, noise_variance=noise_variance)
gp.fit(parameters, y_scaled)
self._gps = [gp]
return
mean_qoi = np.mean(self.qois, axis=0)
centered = self.qois - mean_qoi[None, :]
_, svals, vt = np.linalg.svd(centered, full_matrices=False)
energy = np.cumsum(svals ** 2)
total_energy = energy[-1] if energy.size > 0 else 0.0
if total_energy == 0.0:
modes = np.zeros((self.qois.shape[1], 1))
else:
k = int(np.searchsorted(energy / total_energy, self.pod_energy_fraction) + 1)
if self.max_pod_modes is not None:
k = min(k, self.max_pod_modes)
k = max(k, 1)
modes = vt[:k, :].T
coeffs = centered @ modes
if self.normalize_targets:
coeff_min = np.min(coeffs, axis=0)
coeff_max = np.max(coeffs, axis=0)
scale = coeff_max - coeff_min
scale[scale == 0.0] = 1.0
self._target_min = coeff_min
self._target_scale = scale
self._mean_qoi = mean_qoi
self._pod_modes = modes
self._gps = []
for i in range(coeffs.shape[1]):
target = coeffs[:, i]
target_scaled = self._scale_targets(target, index=i)
noise_variance = self._resolve_noise_variance(target_scaled)
kernel = self._tuned_kernel(parameters, target_scaled, noise_variance)
gp = GaussianProcessRegressorLite(kernel=kernel, noise_variance=noise_variance)
gp.fit(parameters, target_scaled)
self._gps.append(gp)
def _resolve_noise_variance(self, target: np.ndarray) -> float:
if self.noise_variance is not None:
return float(self.noise_variance)
if self.auto_noise_variance:
variance = float(np.var(target))
return self.noise_variance_fraction * variance
return 1e-10
def _initialize_parameter_scaling(self) -> None:
if not self.normalize_parameters:
self._param_min = None
self._param_scale = None
self._scaled_parameters = None
return
param_min = np.min(self.parameters, axis=0)
param_max = np.max(self.parameters, axis=0)
scale = param_max - param_min
scale[scale == 0.0] = 1.0
self._param_min = param_min
self._param_scale = scale
self._scaled_parameters = (self.parameters - param_min) / scale
def _scale_parameters(self, params: np.ndarray) -> np.ndarray:
if not self.normalize_parameters or self._param_min is None or self._param_scale is None:
return params
return (params - self._param_min) / self._param_scale
def _scale_targets(self, target: np.ndarray, index: Optional[int] = None) -> np.ndarray:
if not self.normalize_targets:
return np.asarray(target, dtype=float).ravel()
target = np.asarray(target, dtype=float).ravel()
if self._target_min is None or self._target_scale is None:
target_min = np.min(target)
target_max = np.max(target)
scale = target_max - target_min
if scale == 0.0:
scale = 1.0
self._target_min = np.array([target_min], dtype=float)
self._target_scale = np.array([scale], dtype=float)
if index is not None and self._target_min.size > 1:
target_min = self._target_min[index]
scale = self._target_scale[index]
else:
target_min = float(self._target_min[0])
scale = float(self._target_scale[0])
return (target - target_min) / scale
def _unscale_targets(self, values: np.ndarray) -> np.ndarray:
values = np.asarray(values, dtype=float).ravel()
if not self.normalize_targets or self._target_min is None or self._target_scale is None:
return values
if self._target_min.size == 1:
return values * self._target_scale[0] + self._target_min[0]
return values * self._target_scale + self._target_min
def _tuned_kernel(self,
parameters: np.ndarray,
target: np.ndarray,
noise_variance: float) -> GaussianProcessKernel:
if not self.tune_hyperparameters:
return self.kernel
length_scales = (self.length_scale_grid
if self.length_scale_grid is not None
else np.logspace(-2, 2, 9))
signal_variances = (self.signal_variance_grid
if self.signal_variance_grid is not None
else np.logspace(-6, 2, 9))
best_kernel = self.kernel
best_lml = float("-inf")
for length_scale in length_scales:
for signal_variance in signal_variances:
candidate = GaussianProcessKernel(length_scale=float(length_scale),
signal_variance=float(signal_variance))
lml = GaussianProcessRegressorLite.log_marginal_likelihood(
parameters, target, candidate, noise_variance
)
if lml > best_lml:
best_lml = lml
best_kernel = candidate
return best_kernel