romtools.workflows.sampling.sampling
1# 2# ************************************************************************ 3# 4# ROM Tools and Workflows 5# Copyright 2019 National Technology & Engineering Solutions of Sandia,LLC 6# (NTESS) 7# 8# Under the terms of Contract DE-NA0003525 with NTESS, the 9# U.S. Government retains certain rights in this software. 10# 11# ROM Tools and Workflows is licensed under BSD-3-Clause terms of use: 12# 13# Redistribution and use in source and binary forms, with or without 14# modification, are permitted provided that the following conditions 15# are met: 16# 17# 1. Redistributions of source code must retain the above copyright 18# notice, this list of conditions and the following disclaimer. 19# 20# 2. Redistributions in binary form must reproduce the above copyright 21# notice, this list of conditions and the following disclaimer in the 22# documentation and/or other materials provided with the distribution. 23# 24# 3. Neither the name of the copyright holder nor the names of its 25# contributors may be used to endorse or promote products derived 26# from this software without specific prior written permission. 27# 28# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 29# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 30# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 31# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 32# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 33# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 34# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 35# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 36# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, 37# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING 38# IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 39# POSSIBILITY OF SUCH DAMAGE. 40# 41# Questions? Contact Eric Parish (ejparis@sandia.gov) 42# 43# ************************************************************************ 44# 45 46import os 47import time 48import numpy as np 49import concurrent.futures 50import multiprocessing 51 52from romtools.workflows.workflow_utils import create_empty_dir 53from romtools.workflows.models import Model 54from romtools.workflows.parameter_spaces import ParameterSpace 55 56 57def _get_run_id_from_run_dir(run_dir): 58 return int(run_dir.split('_')[-1]) 59 60 61def _create_parameter_dict(parameter_names, parameter_values): 62 return dict(zip(parameter_names, parameter_values)) 63 64 65def run_sampling(model: Model, 66 parameter_space: ParameterSpace, 67 absolute_sampling_directory: str, 68 evaluation_concurrency = 1, 69 number_of_samples: int = 10, 70 random_seed: int = 1, 71 dry_run: bool = False, 72 overwrite: bool = False): 73 ''' 74 Core algorithm 75 ''' 76 77 # we use here spawn because the default fork causes issues with mpich, 78 # see here: https://github.com/Pressio/rom-tools-and-workflows/pull/206 79 # 80 # to read more about fork/spawn: 81 # https://docs.python.org/3/library/multiprocessing.html#multiprocessing-start-methods 82 # 83 # and 84 # https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor 85 # 86 mp_cntxt=multiprocessing.get_context("spawn") 87 88 np.random.seed(random_seed) 89 90 # Create folder if it doesn't exist 91 create_empty_dir(absolute_sampling_directory) 92 93 # create parameter samples 94 parameter_samples = parameter_space.generate_samples(number_of_samples) 95 parameter_names = parameter_space.get_names() 96 97 # Save parameter samples 98 np.savetxt(f'{absolute_sampling_directory}/sample_parameters.txt', parameter_samples, fmt="%s "*parameter_space.get_dimensionality()) 99 100 # Set up model directories 101 run_directory_base = f'{absolute_sampling_directory}/run_' 102 run_directories = [] 103 starting_sample_index = 0 104 end_sample_index = starting_sample_index + parameter_samples.shape[0] 105 for sample_index in range(starting_sample_index, end_sample_index): 106 run_directory = f'{run_directory_base}{sample_index}' 107 create_empty_dir(run_directory) 108 parameter_dict = _create_parameter_dict(parameter_names, parameter_samples[sample_index - starting_sample_index]) 109 model.populate_run_directory(run_directory, parameter_dict) 110 run_directories.append(run_directory) 111 112 # Print MPI warnings 113 print(""" 114 Warning: If you are using your model with MPI via a direct call to `mpirun -n ...`, 115 be aware that this may or may not work for issues that are purely related to MPI. 116 """) 117 if not dry_run: 118 # Run cases 119 if evaluation_concurrency == 1: 120 run_times = np.zeros(number_of_samples) 121 for sample_index in range(0, number_of_samples): 122 print("======= Sample " + str(sample_index) + " ============") 123 run_directory = f'{run_directory_base}{sample_index}' 124 if "passed.txt" in os.listdir(run_directory) and not overwrite: 125 print("Skipping (Sample has already run successfully)") 126 else: 127 print("Running") 128 parameter_dict = _create_parameter_dict(parameter_names, parameter_samples[sample_index]) 129 run_times[sample_index] = run_sample(run_directory, model, parameter_dict) 130 sample_stats_save_directory = f'{run_directory_base}{sample_index}/../' 131 np.savez(f'{sample_stats_save_directory}/sampling_stats', 132 run_times=run_times) 133 else: 134 #Identify samples to run 135 samples_to_run = [] 136 for sample_index in range(0, number_of_samples): 137 run_directory = f'{run_directory_base}{sample_index}' 138 if "passed.txt" in os.listdir(run_directory) and not overwrite: 139 print(f"Skipping sample {sample_index} (Sample has already run successfully)") 140 pass 141 else: 142 samples_to_run.append(sample_index) 143 with concurrent.futures.ProcessPoolExecutor(max_workers = evaluation_concurrency, mp_context=mp_cntxt) as executor: 144 these_futures = [executor.submit(run_sample, 145 f'{run_directory_base}{sample_id}', model, 146 _create_parameter_dict(parameter_names, parameter_samples[sample_id])) 147 for sample_id in samples_to_run] 148 149 # Wait for all processes to finish 150 concurrent.futures.wait(these_futures) 151 152 run_times = [future.result() for future in these_futures] 153 sample_stats_save_directory = f'{run_directory_base}{sample_index}/../' 154 np.savez(f'{sample_stats_save_directory}/sampling_stats', run_times=run_times) 155 156 return run_directories 157 158 159def run_sample(run_directory: str, model: Model, parameter_sample: dict): 160 run_id = _get_run_id_from_run_dir(run_directory) 161 ts = time.time() 162 flag = model.run_model(run_directory, parameter_sample) 163 tf = time.time() 164 run_time = tf - ts 165 166 if flag == 0: 167 print(f"Sample {run_id} is complete, run time = {run_time}") 168 np.savetxt(os.path.join(run_directory, 'passed.txt'), np.array([0]), '%i') 169 else: 170 print(f"Sample {run_id} failed, run time = {run_time}") 171 print(" ") 172 return run_time
def
run_sampling( model: romtools.workflows.models.Model, parameter_space: romtools.workflows.parameter_spaces.ParameterSpace, absolute_sampling_directory: str, evaluation_concurrency=1, number_of_samples: int = 10, random_seed: int = 1, dry_run: bool = False, overwrite: bool = False):
66def run_sampling(model: Model, 67 parameter_space: ParameterSpace, 68 absolute_sampling_directory: str, 69 evaluation_concurrency = 1, 70 number_of_samples: int = 10, 71 random_seed: int = 1, 72 dry_run: bool = False, 73 overwrite: bool = False): 74 ''' 75 Core algorithm 76 ''' 77 78 # we use here spawn because the default fork causes issues with mpich, 79 # see here: https://github.com/Pressio/rom-tools-and-workflows/pull/206 80 # 81 # to read more about fork/spawn: 82 # https://docs.python.org/3/library/multiprocessing.html#multiprocessing-start-methods 83 # 84 # and 85 # https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor 86 # 87 mp_cntxt=multiprocessing.get_context("spawn") 88 89 np.random.seed(random_seed) 90 91 # Create folder if it doesn't exist 92 create_empty_dir(absolute_sampling_directory) 93 94 # create parameter samples 95 parameter_samples = parameter_space.generate_samples(number_of_samples) 96 parameter_names = parameter_space.get_names() 97 98 # Save parameter samples 99 np.savetxt(f'{absolute_sampling_directory}/sample_parameters.txt', parameter_samples, fmt="%s "*parameter_space.get_dimensionality()) 100 101 # Set up model directories 102 run_directory_base = f'{absolute_sampling_directory}/run_' 103 run_directories = [] 104 starting_sample_index = 0 105 end_sample_index = starting_sample_index + parameter_samples.shape[0] 106 for sample_index in range(starting_sample_index, end_sample_index): 107 run_directory = f'{run_directory_base}{sample_index}' 108 create_empty_dir(run_directory) 109 parameter_dict = _create_parameter_dict(parameter_names, parameter_samples[sample_index - starting_sample_index]) 110 model.populate_run_directory(run_directory, parameter_dict) 111 run_directories.append(run_directory) 112 113 # Print MPI warnings 114 print(""" 115 Warning: If you are using your model with MPI via a direct call to `mpirun -n ...`, 116 be aware that this may or may not work for issues that are purely related to MPI. 117 """) 118 if not dry_run: 119 # Run cases 120 if evaluation_concurrency == 1: 121 run_times = np.zeros(number_of_samples) 122 for sample_index in range(0, number_of_samples): 123 print("======= Sample " + str(sample_index) + " ============") 124 run_directory = f'{run_directory_base}{sample_index}' 125 if "passed.txt" in os.listdir(run_directory) and not overwrite: 126 print("Skipping (Sample has already run successfully)") 127 else: 128 print("Running") 129 parameter_dict = _create_parameter_dict(parameter_names, parameter_samples[sample_index]) 130 run_times[sample_index] = run_sample(run_directory, model, parameter_dict) 131 sample_stats_save_directory = f'{run_directory_base}{sample_index}/../' 132 np.savez(f'{sample_stats_save_directory}/sampling_stats', 133 run_times=run_times) 134 else: 135 #Identify samples to run 136 samples_to_run = [] 137 for sample_index in range(0, number_of_samples): 138 run_directory = f'{run_directory_base}{sample_index}' 139 if "passed.txt" in os.listdir(run_directory) and not overwrite: 140 print(f"Skipping sample {sample_index} (Sample has already run successfully)") 141 pass 142 else: 143 samples_to_run.append(sample_index) 144 with concurrent.futures.ProcessPoolExecutor(max_workers = evaluation_concurrency, mp_context=mp_cntxt) as executor: 145 these_futures = [executor.submit(run_sample, 146 f'{run_directory_base}{sample_id}', model, 147 _create_parameter_dict(parameter_names, parameter_samples[sample_id])) 148 for sample_id in samples_to_run] 149 150 # Wait for all processes to finish 151 concurrent.futures.wait(these_futures) 152 153 run_times = [future.result() for future in these_futures] 154 sample_stats_save_directory = f'{run_directory_base}{sample_index}/../' 155 np.savez(f'{sample_stats_save_directory}/sampling_stats', run_times=run_times) 156 157 return run_directories
Core algorithm
def
run_sample( run_directory: str, model: romtools.workflows.models.Model, parameter_sample: dict):
160def run_sample(run_directory: str, model: Model, parameter_sample: dict): 161 run_id = _get_run_id_from_run_dir(run_directory) 162 ts = time.time() 163 flag = model.run_model(run_directory, parameter_sample) 164 tf = time.time() 165 run_time = tf - ts 166 167 if flag == 0: 168 print(f"Sample {run_id} is complete, run time = {run_time}") 169 np.savetxt(os.path.join(run_directory, 'passed.txt'), np.array([0]), '%i') 170 else: 171 print(f"Sample {run_id} failed, run time = {run_time}") 172 print(" ") 173 return run_time