Source code for romtools.workflows.inverse.bfgs_drivers

import numpy as np
import os
import time
from romtools.workflows.models import QoiModel
from romtools.workflows.parameter_spaces import ParameterSpace
from romtools.workflows.inverse._inverse_utils import *

def _normalize_finite_difference_method(finite_difference_method: str):
    method = finite_difference_method.strip().lower()
    aliases = {
        "first_order": "first_order",
        "forward_first_order": "first_order",
        "forward": "first_order",
        "second_order": "second_order",
        "central_second_order": "second_order",
        "central": "second_order",
        "fourth_order": "fourth_order",
        "central_fourth_order": "fourth_order",
    }
    if method not in aliases:
        raise ValueError(
            f"Unsupported finite_difference_method '{finite_difference_method}'. "
            "Supported methods are 'first_order', 'second_order', and 'fourth_order'."
        )
    return aliases[method]


def _project_parameter_values(parameter_values: np.ndarray,
                              parameter_mins: np.ndarray = None,
                              parameter_maxes: np.ndarray = None):
    projected_values = parameter_values.copy()
    if parameter_maxes is not None:
        projected_values = np.fmin(projected_values, parameter_maxes)
    if parameter_mins is not None:
        projected_values = np.fmax(projected_values, parameter_mins)
    return projected_values


def _compute_fractional_fd_steps(parameter_values: np.ndarray,
                                 parameter_step_fraction,
                                 minimum_parameter_step: float,
                                 parameter_mins: np.ndarray = None,
                                 parameter_maxes: np.ndarray = None):
    dimensionality = parameter_values.size
    if np.isscalar(parameter_step_fraction):
        fractions = np.full(dimensionality, float(parameter_step_fraction), dtype=float)
    else:
        fractions = np.asarray(parameter_step_fraction, dtype=float)
        if fractions.shape != (dimensionality,):
            raise ValueError(
                f"parameter_step_fraction shape {fractions.shape} does not match "
                f"the parameter dimensionality ({dimensionality},)."
            )
    if np.any(fractions <= 0.0):
        raise ValueError("parameter_step_fraction must be strictly positive.")

    scales = np.abs(parameter_values)
    if parameter_mins is not None and parameter_maxes is not None:
        bounded_scales = np.maximum(np.abs(parameter_maxes - parameter_mins), 1.0)
        scales = np.where(scales > 0.0, scales, bounded_scales)
    else:
        scales = np.where(scales > 0.0, scales, 1.0)
    step_sizes = fractions * scales
    return np.maximum(step_sizes, minimum_parameter_step)


def _compute_weighted_error_objective(error: np.ndarray,
                                      observations_covariance: np.ndarray,
                                      covariance_regularization: float):
    if observations_covariance is None:
        return 0.5 * float(error @ error)
    regularized_covariance = observations_covariance + covariance_regularization * np.eye(error.size)
    try:
        weighted_error = np.linalg.solve(regularized_covariance, error)
    except np.linalg.LinAlgError:
        weighted_error = np.linalg.pinv(regularized_covariance) @ error
    return 0.5 * float(error @ weighted_error)


def _perturb_parameter(parameter_values: np.ndarray,
                       parameter_index: int,
                       perturbation: float,
                       parameter_mins: np.ndarray = None,
                       parameter_maxes: np.ndarray = None):
    perturbed_values = parameter_values.copy()
    perturbed_values[parameter_index] += perturbation
    if parameter_maxes is not None:
        perturbed_values[parameter_index] = min(perturbed_values[parameter_index], parameter_maxes[parameter_index])
    if parameter_mins is not None:
        perturbed_values[parameter_index] = max(perturbed_values[parameter_index], parameter_mins[parameter_index])
    actual_perturbation = perturbed_values[parameter_index] - parameter_values[parameter_index]
    return perturbed_values, actual_perturbation


def _compute_finite_difference_gradient(parameter_values: np.ndarray,
                                        objective_at_parameters: float,
                                        finite_difference_method: str,
                                        step_sizes: np.ndarray,
                                        parameter_mins: np.ndarray,
                                        parameter_maxes: np.ndarray,
                                        evaluate_objective):
    gradient = np.zeros_like(parameter_values, dtype=float)
    dimensionality = parameter_values.size

    for parameter_index in range(dimensionality):
        step_size = step_sizes[parameter_index]

        plus_h_parameters, plus_h_delta = _perturb_parameter(
            parameter_values,
            parameter_index,
            step_size,
            parameter_mins,
            parameter_maxes,
        )
        minus_h_parameters, minus_h_delta = _perturb_parameter(
            parameter_values,
            parameter_index,
            -step_size,
            parameter_mins,
            parameter_maxes,
        )
        plus_h_objective = None
        minus_h_objective = None
        if np.abs(plus_h_delta) > 0.0:
            plus_h_objective = evaluate_objective(
                plus_h_parameters,
                f'fd_{parameter_index}_plus_h',
            )
        if np.abs(minus_h_delta) > 0.0:
            minus_h_objective = evaluate_objective(
                minus_h_parameters,
                f'fd_{parameter_index}_minus_h',
            )

        if finite_difference_method == 'first_order':
            if plus_h_objective is not None:
                gradient[parameter_index] = (plus_h_objective - objective_at_parameters) / plus_h_delta
            elif minus_h_objective is not None:
                gradient[parameter_index] = (minus_h_objective - objective_at_parameters) / minus_h_delta
            else:
                gradient[parameter_index] = 0.0
            continue

        if finite_difference_method == 'second_order':
            if plus_h_objective is not None and minus_h_objective is not None:
                gradient[parameter_index] = (plus_h_objective - minus_h_objective) / (plus_h_delta - minus_h_delta)
            elif plus_h_objective is not None:
                gradient[parameter_index] = (plus_h_objective - objective_at_parameters) / plus_h_delta
            elif minus_h_objective is not None:
                gradient[parameter_index] = (minus_h_objective - objective_at_parameters) / minus_h_delta
            else:
                gradient[parameter_index] = 0.0
            continue

        plus_2h_parameters, plus_2h_delta = _perturb_parameter(
            parameter_values,
            parameter_index,
            2.0 * step_size,
            parameter_mins,
            parameter_maxes,
        )
        minus_2h_parameters, minus_2h_delta = _perturb_parameter(
            parameter_values,
            parameter_index,
            -2.0 * step_size,
            parameter_mins,
            parameter_maxes,
        )
        can_use_fourth_order = (
            plus_h_objective is not None
            and minus_h_objective is not None
            and np.abs(plus_2h_delta) > 0.0
            and np.abs(minus_2h_delta) > 0.0
            and np.isclose(plus_h_delta, step_size)
            and np.isclose(minus_h_delta, -step_size)
            and np.isclose(plus_2h_delta, 2.0 * step_size)
            and np.isclose(minus_2h_delta, -2.0 * step_size)
        )
        if can_use_fourth_order:
            plus_2h_objective = evaluate_objective(
                plus_2h_parameters,
                f'fd_{parameter_index}_plus_2h',
            )
            minus_2h_objective = evaluate_objective(
                minus_2h_parameters,
                f'fd_{parameter_index}_minus_2h',
            )
            gradient[parameter_index] = (
                -plus_2h_objective
                + 8.0 * plus_h_objective
                - 8.0 * minus_h_objective
                + minus_2h_objective
            ) / (12.0 * step_size)
        elif plus_h_objective is not None and minus_h_objective is not None:
            gradient[parameter_index] = (plus_h_objective - minus_h_objective) / (plus_h_delta - minus_h_delta)
        elif plus_h_objective is not None:
            gradient[parameter_index] = (plus_h_objective - objective_at_parameters) / plus_h_delta
        elif minus_h_objective is not None:
            gradient[parameter_index] = (minus_h_objective - objective_at_parameters) / minus_h_delta
        else:
            gradient[parameter_index] = 0.0

    return gradient


[docs] def run_bfgs(model: QoiModel, parameter_space: ParameterSpace, observations: np.ndarray, observations_covariance: np.ndarray, parameter_mins: np.ndarray = None, parameter_maxes: np.ndarray = None, absolute_bfgs_directory: str = os.getcwd() + "/work/", initial_parameters: np.ndarray = None, finite_difference_method: str = 'second_order', parameter_step_fraction=1e-2, minimum_parameter_step: float = 1e-8, initial_line_search_step_size: float = 1.0, line_search_decay_factor: float = 2.0, armijo_sufficient_decrease: float = 1e-4, covariance_regularization: float = 1e-12, bfgs_curvature_tolerance: float = 1e-12, gradient_norm_tolerance: float = 1e-6, delta_params_tolerance: float = 1e-8, objective_tolerance: float = 1e-12, max_line_search_iterations: int = 20, max_iterations: int = 100, random_seed: int = 1, restart_file: str = None): ''' Run a finite-difference BFGS optimizer for inverse problems. The objective is a weighted observation misfit: 0.5 * (observations - qoi)^T * C^{-1} * (observations - qoi), where C is observations_covariance (with optional regularization). Args: model: QoiModel to evaluate. parameter_space: ParameterSpace used for initialization if initial_parameters is not provided. observations: Observed QoI vector. observations_covariance: Observation covariance matrix used in objective. parameter_mins: Optional lower bounds. parameter_maxes: Optional upper bounds. absolute_bfgs_directory: Absolute path for optimizer run directories. initial_parameters: Optional initial parameter vector. finite_difference_method: One of 'first_order', 'second_order', or 'fourth_order'. parameter_step_fraction: Scalar or per-parameter array defining finite difference step size as a fraction of each parameter value. minimum_parameter_step: Lower bound on finite difference step size. initial_line_search_step_size: Initial step size for line search. line_search_decay_factor: Backtracking divisor (> 1). armijo_sufficient_decrease: Armijo sufficient decrease constant. covariance_regularization: Diagonal regularization for covariance inversion. bfgs_curvature_tolerance: Minimum s^T y for BFGS Hessian update. gradient_norm_tolerance: Convergence tolerance for gradient norm. delta_params_tolerance: Convergence tolerance for parameter update norm. objective_tolerance: Convergence tolerance for objective value. max_line_search_iterations: Maximum backtracking iterations. max_iterations: Maximum BFGS iterations. random_seed: RNG seed for initialization. restart_file: Optional restart file created by this routine. Returns: Tuple of (parameter_values, qoi) at termination. ''' start_time = time.time() assert os.path.isabs(absolute_bfgs_directory), ( f"bfgs_directory is not an absolute path ({absolute_bfgs_directory})" ) if line_search_decay_factor <= 1.0: raise ValueError("line_search_decay_factor must be greater than 1.0") if initial_line_search_step_size <= 0.0: raise ValueError("initial_line_search_step_size must be positive.") if minimum_parameter_step <= 0.0: raise ValueError("minimum_parameter_step must be positive.") if covariance_regularization < 0.0: raise ValueError("covariance_regularization cannot be negative.") if max_line_search_iterations <= 0: raise ValueError("max_line_search_iterations must be positive.") if max_iterations <= 0: raise ValueError("max_iterations must be positive.") finite_difference_method = _normalize_finite_difference_method(finite_difference_method) dimensionality = parameter_space.get_dimensionality() parameter_names = parameter_space.get_names() if parameter_mins is not None and np.size(parameter_mins) != dimensionality: raise ValueError( f"parameter_mins of size {np.size(parameter_mins)} is inconsistent " f"with parameter dimensionality ({dimensionality})." ) if parameter_maxes is not None and np.size(parameter_maxes) != dimensionality: raise ValueError( f"parameter_maxes of size {np.size(parameter_maxes)} is inconsistent " f"with parameter dimensionality ({dimensionality})." ) np.random.seed(random_seed) if restart_file is None: if initial_parameters is None: parameter_values = parameter_space.generate_samples(1)[0] else: parameter_values = np.asarray(initial_parameters, dtype=float) if parameter_values.shape != (dimensionality,): raise ValueError( f"initial_parameters shape {parameter_values.shape} does not match " f"required shape ({dimensionality},)." ) parameter_values = _project_parameter_values( parameter_values, parameter_mins, parameter_maxes, ) inverse_hessian = np.eye(dimensionality) iteration = 0 else: restart_data = np.load(restart_file) parameter_values = restart_data['parameter_values'] if parameter_values.shape != (dimensionality,): raise ValueError( f"Restart parameter_values shape {parameter_values.shape} does not match " f"required shape ({dimensionality},)." ) parameter_values = _project_parameter_values( parameter_values, parameter_mins, parameter_maxes, ) inverse_hessian = ( restart_data['inverse_hessian'] if 'inverse_hessian' in restart_data else np.eye(dimensionality) ) iteration = int(restart_data['iteration']) if 'iteration' in restart_data else 0 evaluation_cache = {} evaluation_counter = 0 def _evaluate_at(candidate_parameters: np.ndarray, iteration_index: int, label: str): nonlocal evaluation_counter candidate_parameters = _project_parameter_values( np.asarray(candidate_parameters, dtype=float), parameter_mins, parameter_maxes, ) cache_key = tuple(np.round(candidate_parameters, 15)) if cache_key in evaluation_cache: return evaluation_cache[cache_key] run_directory = ( f'{absolute_bfgs_directory}/iteration_{iteration_index}/' f'eval_{evaluation_counter}_{label}' ) qoi, error, _ = prepare_and_run( model, observations, run_directory, parameter_names, candidate_parameters, ) objective = _compute_weighted_error_objective( error, observations_covariance, covariance_regularization, ) result = { 'parameter_values': candidate_parameters.copy(), 'qoi': qoi, 'error': error, 'objective': objective, } evaluation_cache[cache_key] = result evaluation_counter += 1 return result current_state = _evaluate_at(parameter_values, iteration, 'base') parameter_values = current_state['parameter_values'] qoi = current_state['qoi'] error = current_state['error'] objective = current_state['objective'] fd_step_sizes = _compute_fractional_fd_steps( parameter_values, parameter_step_fraction, minimum_parameter_step, parameter_mins, parameter_maxes, ) gradient = _compute_finite_difference_gradient( parameter_values, objective, finite_difference_method, fd_step_sizes, parameter_mins, parameter_maxes, lambda params, label: _evaluate_at(params, iteration, label)['objective'], ) gradient_norm = np.linalg.norm(gradient) parameter_update_norm = np.inf wall_time = time.time() - start_time print( f'Iteration: {iteration}, Objective: {objective:.6e}, ' f'Gradient 2-norm: {gradient_norm:.6e}, Delta p: {parameter_update_norm:.6e}, ' f'Wall time: {wall_time:.6f}' ) np.savez( f'{absolute_bfgs_directory}/iteration_{iteration}/restart.npz', parameter_values=parameter_values, qoi=qoi, error=error, objective=objective, gradient=gradient, inverse_hessian=inverse_hessian, iteration=iteration, ) line_search_failed = False while ( iteration < max_iterations and objective > objective_tolerance and gradient_norm > gradient_norm_tolerance and parameter_update_norm > delta_params_tolerance ): search_direction = -inverse_hessian @ gradient directional_derivative = float(gradient @ search_direction) if directional_derivative >= 0.0: inverse_hessian = np.eye(dimensionality) search_direction = -gradient.copy() directional_derivative = float(gradient @ search_direction) trial_step_size = initial_line_search_step_size accepted_trial_state = None accepted_parameter_update = None best_trial_state = None best_parameter_update = None best_trial_objective = np.inf for line_search_iteration in range(max_line_search_iterations): trial_parameters = _project_parameter_values( parameter_values + trial_step_size * search_direction, parameter_mins, parameter_maxes, ) trial_update = trial_parameters - parameter_values if np.linalg.norm(trial_update) <= 0.0: trial_step_size /= line_search_decay_factor continue trial_state = _evaluate_at( trial_parameters, iteration + 1, f'line_search_{line_search_iteration}', ) if trial_state['objective'] < best_trial_objective: best_trial_objective = trial_state['objective'] best_trial_state = trial_state best_parameter_update = trial_update if trial_state['objective'] <= ( objective + armijo_sufficient_decrease * trial_step_size * directional_derivative ): accepted_trial_state = trial_state accepted_parameter_update = trial_update break trial_step_size /= line_search_decay_factor if accepted_trial_state is None: if best_trial_state is not None and best_trial_state['objective'] < objective: accepted_trial_state = best_trial_state accepted_parameter_update = best_parameter_update print( f' Warning, Armijo condition failed after {max_line_search_iterations} ' f'tries at iteration {iteration}; accepting best objective-reducing trial step.' ) else: inverse_hessian = np.eye(dimensionality) search_direction = -gradient.copy() directional_derivative = float(gradient @ search_direction) trial_step_size = initial_line_search_step_size fallback_best_state = None fallback_best_update = None fallback_best_objective = np.inf for line_search_iteration in range(max_line_search_iterations): trial_parameters = _project_parameter_values( parameter_values + trial_step_size * search_direction, parameter_mins, parameter_maxes, ) trial_update = trial_parameters - parameter_values if np.linalg.norm(trial_update) <= 0.0: trial_step_size /= line_search_decay_factor continue trial_state = _evaluate_at( trial_parameters, iteration + 1, f'fallback_line_search_{line_search_iteration}', ) if trial_state['objective'] < fallback_best_objective: fallback_best_objective = trial_state['objective'] fallback_best_state = trial_state fallback_best_update = trial_update if trial_state['objective'] <= ( objective + armijo_sufficient_decrease * trial_step_size * directional_derivative ): accepted_trial_state = trial_state accepted_parameter_update = trial_update break trial_step_size /= line_search_decay_factor if accepted_trial_state is None and fallback_best_state is not None and fallback_best_state['objective'] < objective: accepted_trial_state = fallback_best_state accepted_parameter_update = fallback_best_update print( f' Warning, fallback Armijo condition failed after {max_line_search_iterations} ' f'tries at iteration {iteration}; accepting best objective-reducing trial step.' ) if accepted_trial_state is None: print( f' Warning, failed line search after {max_line_search_iterations} ' f'tries at iteration {iteration}.' ) line_search_failed = True break previous_parameters = parameter_values previous_gradient = gradient parameter_values = accepted_trial_state['parameter_values'] qoi = accepted_trial_state['qoi'] error = accepted_trial_state['error'] objective = accepted_trial_state['objective'] parameter_update_norm = np.linalg.norm(accepted_parameter_update) fd_step_sizes = _compute_fractional_fd_steps( parameter_values, parameter_step_fraction, minimum_parameter_step, parameter_mins, parameter_maxes, ) gradient = _compute_finite_difference_gradient( parameter_values, objective, finite_difference_method, fd_step_sizes, parameter_mins, parameter_maxes, lambda params, label: _evaluate_at(params, iteration + 1, label)['objective'], ) gradient_norm = np.linalg.norm(gradient) s_vector = parameter_values - previous_parameters y_vector = gradient - previous_gradient ys_dot = float(y_vector @ s_vector) if ys_dot > bfgs_curvature_tolerance: rho = 1.0 / ys_dot identity = np.eye(dimensionality) sy_outer = np.outer(s_vector, y_vector) ys_outer = np.outer(y_vector, s_vector) ss_outer = np.outer(s_vector, s_vector) inverse_hessian = ( (identity - rho * sy_outer)
[docs] @ inverse_hessian @ (identity - rho * ys_outer) + rho * ss_outer ) else: inverse_hessian = np.eye(dimensionality) iteration += 1 wall_time = time.time() - start_time print( f'Iteration: {iteration}, Objective: {objective:.6e}, ' f'Gradient 2-norm: {gradient_norm:.6e}, Delta p: {parameter_update_norm:.6e}, ' f'Wall time: {wall_time:.6f}' ) np.savez( f'{absolute_bfgs_directory}/iteration_{iteration}/restart.npz', parameter_values=parameter_values, qoi=qoi, error=error, objective=objective, gradient=gradient, inverse_hessian=inverse_hessian, iteration=iteration, ) if line_search_failed: print('Terminating due to line-search failure.') elif iteration >= max_iterations: print('Max iterations reached, terminating.') elif objective <= objective_tolerance: print('Objective dropped below tolerance.') elif gradient_norm <= gradient_norm_tolerance: print('Gradient norm dropped below tolerance.') elif parameter_update_norm <= delta_params_tolerance: print('Parameter update dropped below tolerance.') return parameter_values, qoi
def run_steepest_descent(model: QoiModel, parameter_space: ParameterSpace, observations: np.ndarray, observations_covariance: np.ndarray, parameter_mins: np.ndarray = None, parameter_maxes: np.ndarray = None, absolute_steepest_descent_directory: str = os.getcwd() + "/work/", initial_parameters: np.ndarray = None, finite_difference_method: str = 'second_order', parameter_step_fraction=1e-2, minimum_parameter_step: float = 1e-8, initial_line_search_step_size: float = 1.0, line_search_decay_factor: float = 2.0, armijo_sufficient_decrease: float = 1e-4, covariance_regularization: float = 1e-12, gradient_norm_tolerance: float = 1e-6, delta_params_tolerance: float = 1e-8, objective_tolerance: float = 1e-12, max_line_search_iterations: int = 20, max_iterations: int = 100, random_seed: int = 1, restart_file: str = None): ''' Run a finite-difference steepest-descent optimizer for inverse problems. The objective is a weighted observation misfit: 0.5 * (observations - qoi)^T * C^{-1} * (observations - qoi), where C is observations_covariance (with optional regularization). Args: model: QoiModel to evaluate. parameter_space: ParameterSpace used for initialization if initial_parameters is not provided. observations: Observed QoI vector. observations_covariance: Observation covariance matrix used in objective. parameter_mins: Optional lower bounds. parameter_maxes: Optional upper bounds. absolute_steepest_descent_directory: Absolute path for optimizer runs. initial_parameters: Optional initial parameter vector. finite_difference_method: One of 'first_order', 'second_order', or 'fourth_order'. parameter_step_fraction: Scalar or per-parameter array defining finite difference step size as a fraction of each parameter value. minimum_parameter_step: Lower bound on finite difference step size. initial_line_search_step_size: Initial step size for line search. line_search_decay_factor: Backtracking divisor (> 1). armijo_sufficient_decrease: Armijo sufficient decrease constant. covariance_regularization: Diagonal regularization for covariance inversion. gradient_norm_tolerance: Convergence tolerance for gradient norm. delta_params_tolerance: Convergence tolerance for parameter update norm. objective_tolerance: Convergence tolerance for objective value. max_line_search_iterations: Maximum backtracking iterations. max_iterations: Maximum steepest-descent iterations. random_seed: RNG seed for initialization. restart_file: Optional restart file created by this routine. Returns: Tuple of (parameter_values, qoi) at termination. ''' start_time = time.time() assert os.path.isabs(absolute_steepest_descent_directory), ( "steepest_descent_directory is not an absolute path " f"({absolute_steepest_descent_directory})" ) if line_search_decay_factor <= 1.0: raise ValueError("line_search_decay_factor must be greater than 1.0") if initial_line_search_step_size <= 0.0: raise ValueError("initial_line_search_step_size must be positive.") if minimum_parameter_step <= 0.0: raise ValueError("minimum_parameter_step must be positive.") if covariance_regularization < 0.0: raise ValueError("covariance_regularization cannot be negative.") if max_line_search_iterations <= 0: raise ValueError("max_line_search_iterations must be positive.") if max_iterations <= 0: raise ValueError("max_iterations must be positive.") finite_difference_method = _normalize_finite_difference_method(finite_difference_method) dimensionality = parameter_space.get_dimensionality() parameter_names = parameter_space.get_names() if parameter_mins is not None and np.size(parameter_mins) != dimensionality: raise ValueError( f"parameter_mins of size {np.size(parameter_mins)} is inconsistent " f"with parameter dimensionality ({dimensionality})." ) if parameter_maxes is not None and np.size(parameter_maxes) != dimensionality: raise ValueError( f"parameter_maxes of size {np.size(parameter_maxes)} is inconsistent " f"with parameter dimensionality ({dimensionality})." ) np.random.seed(random_seed) if restart_file is None: if initial_parameters is None: parameter_values = parameter_space.generate_samples(1)[0] else: parameter_values = np.asarray(initial_parameters, dtype=float) if parameter_values.shape != (dimensionality,): raise ValueError( f"initial_parameters shape {parameter_values.shape} does not match " f"required shape ({dimensionality},)." ) parameter_values = _project_parameter_values( parameter_values, parameter_mins, parameter_maxes, ) iteration = 0 else: restart_data = np.load(restart_file) parameter_values = restart_data['parameter_values'] if parameter_values.shape != (dimensionality,): raise ValueError( f"Restart parameter_values shape {parameter_values.shape} does not match " f"required shape ({dimensionality},)." ) parameter_values = _project_parameter_values( parameter_values, parameter_mins, parameter_maxes, ) iteration = int(restart_data['iteration']) if 'iteration' in restart_data else 0 evaluation_cache = {} evaluation_counter = 0 def _evaluate_at(candidate_parameters: np.ndarray, iteration_index: int, label: str): nonlocal evaluation_counter candidate_parameters = _project_parameter_values( np.asarray(candidate_parameters, dtype=float), parameter_mins, parameter_maxes, ) cache_key = tuple(np.round(candidate_parameters, 15)) if cache_key in evaluation_cache: return evaluation_cache[cache_key] run_directory = ( f'{absolute_steepest_descent_directory}/iteration_{iteration_index}/' f'eval_{evaluation_counter}_{label}' ) qoi, error, _ = prepare_and_run( model, observations, run_directory, parameter_names, candidate_parameters, ) objective = _compute_weighted_error_objective( error, observations_covariance, covariance_regularization, ) result = { 'parameter_values': candidate_parameters.copy(), 'qoi': qoi, 'error': error, 'objective': objective, } evaluation_cache[cache_key] = result evaluation_counter += 1 return result current_state = _evaluate_at(parameter_values, iteration, 'base') parameter_values = current_state['parameter_values'] qoi = current_state['qoi'] error = current_state['error'] objective = current_state['objective'] fd_step_sizes = _compute_fractional_fd_steps( parameter_values, parameter_step_fraction, minimum_parameter_step, parameter_mins, parameter_maxes, ) gradient = _compute_finite_difference_gradient( parameter_values, objective, finite_difference_method, fd_step_sizes, parameter_mins, parameter_maxes, lambda params, label: _evaluate_at(params, iteration, label)['objective'], ) gradient_norm = np.linalg.norm(gradient) parameter_update_norm = np.inf wall_time = time.time() - start_time print( f'Iteration: {iteration}, Objective: {objective:.6e}, ' f'Gradient 2-norm: {gradient_norm:.6e}, Delta p: {parameter_update_norm:.6e}, ' f'Wall time: {wall_time:.6f}' ) np.savez( f'{absolute_steepest_descent_directory}/iteration_{iteration}/restart.npz', parameter_values=parameter_values, qoi=qoi, error=error, objective=objective, gradient=gradient, iteration=iteration, ) line_search_failed = False while ( iteration < max_iterations and objective > objective_tolerance and gradient_norm > gradient_norm_tolerance and parameter_update_norm > delta_params_tolerance ): search_direction = -gradient.copy() directional_derivative = float(gradient @ search_direction) trial_step_size = initial_line_search_step_size accepted_trial_state = None accepted_parameter_update = None best_trial_state = None best_parameter_update = None best_trial_objective = np.inf for line_search_iteration in range(max_line_search_iterations): trial_parameters = _project_parameter_values( parameter_values + trial_step_size * search_direction, parameter_mins, parameter_maxes, ) trial_update = trial_parameters - parameter_values if np.linalg.norm(trial_update) <= 0.0: trial_step_size /= line_search_decay_factor continue trial_state = _evaluate_at( trial_parameters, iteration + 1, f'line_search_{line_search_iteration}', ) if trial_state['objective'] < best_trial_objective: best_trial_objective = trial_state['objective'] best_trial_state = trial_state best_parameter_update = trial_update if trial_state['objective'] <= ( objective + armijo_sufficient_decrease * trial_step_size * directional_derivative ): accepted_trial_state = trial_state accepted_parameter_update = trial_update break trial_step_size /= line_search_decay_factor if accepted_trial_state is None: if best_trial_state is not None and best_trial_state['objective'] < objective: accepted_trial_state = best_trial_state accepted_parameter_update = best_parameter_update print( f' Warning, Armijo condition failed after {max_line_search_iterations} ' f'tries at iteration {iteration}; accepting best objective-reducing trial step.' ) else: print( f' Warning, failed line search after {max_line_search_iterations} ' f'tries at iteration {iteration}.' ) line_search_failed = True break parameter_values = accepted_trial_state['parameter_values'] qoi = accepted_trial_state['qoi'] error = accepted_trial_state['error'] objective = accepted_trial_state['objective'] parameter_update_norm = np.linalg.norm(accepted_parameter_update) fd_step_sizes = _compute_fractional_fd_steps( parameter_values, parameter_step_fraction, minimum_parameter_step, parameter_mins, parameter_maxes, ) gradient = _compute_finite_difference_gradient( parameter_values, objective, finite_difference_method, fd_step_sizes, parameter_mins, parameter_maxes, lambda params, label: _evaluate_at(params, iteration + 1, label)['objective'], ) gradient_norm = np.linalg.norm(gradient) iteration += 1 wall_time = time.time() - start_time print( f'Iteration: {iteration}, Objective: {objective:.6e}, ' f'Gradient 2-norm: {gradient_norm:.6e}, Delta p: {parameter_update_norm:.6e}, ' f'Wall time: {wall_time:.6f}' ) np.savez( f'{absolute_steepest_descent_directory}/iteration_{iteration}/restart.npz', parameter_values=parameter_values, qoi=qoi, error=error, objective=objective, gradient=gradient, iteration=iteration, ) if line_search_failed: print('Terminating due to line-search failure.') elif iteration >= max_iterations: print('Max iterations reached, terminating.') elif objective <= objective_tolerance: print('Objective dropped below tolerance.') elif gradient_norm <= gradient_norm_tolerance: print('Gradient norm dropped below tolerance.') elif parameter_update_norm <= delta_params_tolerance: print('Parameter update dropped below tolerance.') return parameter_values, qoi
[docs] def run_steepest_decent(*args, **kwargs): """Compatibility alias for the common misspelling 'decent'.""" return run_steepest_descent(*args, **kwargs)