import concurrent.futures
import multiprocessing
import numpy as np
import os
import time
def _create_parameter_dict(parameter_names, parameter_values):
return dict(zip(parameter_names, parameter_values))
[docs]
def prepare_and_run(model, observations, run_directory, parameter_names, parameter_sample):
"""Prepare the model run and compute the QoI and error."""
os.makedirs(run_directory, exist_ok=True)
parameter_dict = _create_parameter_dict(parameter_names, parameter_sample)
model.populate_run_directory(run_directory, parameter_dict)
ts = time.time()
flag = model.run_model(run_directory, parameter_dict)
qoi = model.compute_qoi(run_directory, parameter_dict)
assert isinstance(qoi,np.ndarray), "Error, compute_qoi must return an np.ndarray"
error = observations - qoi
run_time = time.time() - ts
return qoi, error, run_time
def bound_samples(samples,samples_min,samples_max):
if samples_max is not None:
samples[:,:] = np.fmin(samples[:,:],samples_max)
if samples_min is not None:
samples[:,:] = np.fmax(samples[:,:],samples_min)
return samples
[docs]
def run_eki_iteration(model, observations, run_directory_base, parameter_names, parameter_samples, evaluation_concurrency):
"""Run the EKI iteration for the specified parameters."""
mp_cntxt = multiprocessing.get_context("fork")
ensemble_size = np.shape(parameter_samples)[0]
if evaluation_concurrency == 1:
# Run at parameter mean
run_directory = f'{run_directory_base}mean'
parameter_means = np.mean(parameter_samples, axis=0)
mean_qoi, mean_error, run_time = prepare_and_run(model, observations, run_directory, parameter_names, parameter_means)
qois = np.zeros((mean_qoi.size, ensemble_size))
errors = np.zeros((mean_qoi.size, ensemble_size))
for ensemble_member in range(ensemble_size):
run_directory = f'{run_directory_base}{ensemble_member}'
qois[:, ensemble_member], errors[:, ensemble_member], run_time = prepare_and_run(model, observations, run_directory, parameter_names, parameter_samples[ensemble_member])
else:
samples_to_run = list(range(ensemble_size))
with concurrent.futures.ProcessPoolExecutor(max_workers=evaluation_concurrency, mp_context=mp_cntxt) as executor:
#these_futures = [executor.submit(my_print,ensemble_member) for ensemble_member in samples_to_run]
parameter_means = np.mean(parameter_samples, axis=0)
run_directory = f'{run_directory_base}mean'
these_futures = [executor.submit(prepare_and_run, model, observations, run_directory, parameter_names, parameter_means)]
these_futures.extend([executor.submit(prepare_and_run, model, observations, f'{run_directory_base}{ensemble_member}', parameter_names, parameter_samples[ensemble_member]) for ensemble_member in samples_to_run])
concurrent.futures.wait(these_futures)
for i, future in enumerate(these_futures):
if i == 0:
mean_qoi, mean_error, run_time = future.result()
qois = np.zeros((mean_qoi.size, ensemble_size))
errors = np.zeros((mean_qoi.size, ensemble_size))
else:
qois[:, i-1], errors[:, i-1], run_time = future.result()
results = {}
results['qois'] = qois
results['mean-qoi'] = mean_qoi
results['errors'] = errors
return results