GitHub

romtools.workflows.sampling.sampling

  1#
  2# ************************************************************************
  3#
  4#                         ROM Tools and Workflows
  5# Copyright 2019 National Technology & Engineering Solutions of Sandia,LLC
  6#                              (NTESS)
  7#
  8# Under the terms of Contract DE-NA0003525 with NTESS, the
  9# U.S. Government retains certain rights in this software.
 10#
 11# ROM Tools and Workflows is licensed under BSD-3-Clause terms of use:
 12#
 13# Redistribution and use in source and binary forms, with or without
 14# modification, are permitted provided that the following conditions
 15# are met:
 16#
 17# 1. Redistributions of source code must retain the above copyright
 18# notice, this list of conditions and the following disclaimer.
 19#
 20# 2. Redistributions in binary form must reproduce the above copyright
 21# notice, this list of conditions and the following disclaimer in the
 22# documentation and/or other materials provided with the distribution.
 23#
 24# 3. Neither the name of the copyright holder nor the names of its
 25# contributors may be used to endorse or promote products derived
 26# from this software without specific prior written permission.
 27#
 28# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 29# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 30# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 31# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
 32# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 33# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 34# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
 35# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 36# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 37# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
 38# IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 39# POSSIBILITY OF SUCH DAMAGE.
 40#
 41# Questions? Contact Eric Parish (ejparis@sandia.gov)
 42#
 43# ************************************************************************
 44#
 45
 46import os
 47import time
 48import numpy as np
 49import concurrent.futures
 50import multiprocessing
 51
 52from romtools.workflows.workflow_utils import create_empty_dir
 53from romtools.workflows.models import Model
 54from romtools.workflows.parameter_spaces import ParameterSpace
 55
 56
 57def _get_run_id_from_run_dir(run_dir):
 58    return int(run_dir.split('_')[-1])
 59
 60
 61def _create_parameter_dict(parameter_names, parameter_values):
 62    return dict(zip(parameter_names, parameter_values))
 63
 64
 65def run_sampling(model: Model,
 66                 parameter_space: ParameterSpace,
 67                 absolute_sampling_directory: str,
 68                 evaluation_concurrency = 1,
 69                 number_of_samples: int = 10,
 70                 random_seed: int = 1,
 71                 dry_run: bool = False,
 72                 overwrite: bool = False):
 73    '''
 74    Core algorithm
 75    '''
 76
 77    # we use here spawn because the default fork causes issues with mpich,
 78    # see here: https://github.com/Pressio/rom-tools-and-workflows/pull/206
 79    #
 80    # to read more about fork/spawn:
 81    #   https://docs.python.org/3/library/multiprocessing.html#multiprocessing-start-methods
 82    #
 83    # and
 84    #   https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor
 85    #
 86    mp_cntxt=multiprocessing.get_context("spawn")
 87
 88    np.random.seed(random_seed)
 89
 90    # Create folder if it doesn't exist
 91    create_empty_dir(absolute_sampling_directory)
 92
 93    # create parameter samples
 94    parameter_samples = parameter_space.generate_samples(number_of_samples)
 95    parameter_names = parameter_space.get_names()
 96
 97    # Save parameter samples
 98    np.savetxt(f'{absolute_sampling_directory}/sample_parameters.txt', parameter_samples, fmt="%s "*parameter_space.get_dimensionality())
 99
100    # Set up model directories
101    run_directory_base = f'{absolute_sampling_directory}/run_'
102    run_directories = []
103    starting_sample_index = 0
104    end_sample_index = starting_sample_index + parameter_samples.shape[0]
105    for sample_index in range(starting_sample_index, end_sample_index):
106        run_directory = f'{run_directory_base}{sample_index}'
107        create_empty_dir(run_directory)
108        parameter_dict = _create_parameter_dict(parameter_names, parameter_samples[sample_index - starting_sample_index])
109        model.populate_run_directory(run_directory, parameter_dict)
110        run_directories.append(run_directory)
111
112    # Print MPI warnings
113    print("""
114    Warning: If you are using your model with MPI via a direct call to `mpirun -n ...`,
115    be aware that this may or may not work for issues that are purely related to MPI.
116    """)
117    if not dry_run:
118        # Run cases
119        if evaluation_concurrency == 1:
120            run_times = np.zeros(number_of_samples)
121            for sample_index in range(0, number_of_samples):
122                print("=======  Sample " + str(sample_index) + " ============")
123                run_directory = f'{run_directory_base}{sample_index}'
124                if "passed.txt" in os.listdir(run_directory) and not overwrite:
125                    print("Skipping (Sample has already run successfully)")
126                else:
127                    print("Running")
128                    parameter_dict = _create_parameter_dict(parameter_names, parameter_samples[sample_index])
129                    run_times[sample_index] = run_sample(run_directory, model, parameter_dict)
130                    sample_stats_save_directory = f'{run_directory_base}{sample_index}/../'
131                    np.savez(f'{sample_stats_save_directory}/sampling_stats',
132                             run_times=run_times)
133        else:
134            #Identify samples to run
135            samples_to_run = []
136            for sample_index in range(0, number_of_samples):
137                run_directory = f'{run_directory_base}{sample_index}'
138                if "passed.txt" in os.listdir(run_directory) and not overwrite:
139                    print(f"Skipping sample {sample_index} (Sample has already run successfully)")
140                    pass
141                else:
142                    samples_to_run.append(sample_index)
143            with concurrent.futures.ProcessPoolExecutor(max_workers = evaluation_concurrency, mp_context=mp_cntxt) as executor:
144                these_futures = [executor.submit(run_sample,
145                                 f'{run_directory_base}{sample_id}', model,
146                                 _create_parameter_dict(parameter_names, parameter_samples[sample_id]))
147                                 for sample_id in samples_to_run]
148
149                # Wait for all processes to finish
150                concurrent.futures.wait(these_futures)
151
152            run_times = [future.result() for future in these_futures]
153            sample_stats_save_directory = f'{run_directory_base}{sample_index}/../'
154            np.savez(f'{sample_stats_save_directory}/sampling_stats', run_times=run_times)
155
156    return run_directories
157
158
159def run_sample(run_directory: str, model: Model, parameter_sample: dict):
160    run_id = _get_run_id_from_run_dir(run_directory)
161    ts = time.time()
162    flag = model.run_model(run_directory, parameter_sample)
163    tf = time.time()
164    run_time = tf - ts
165
166    if flag == 0:
167        print(f"Sample {run_id} is complete, run time = {run_time}")
168        np.savetxt(os.path.join(run_directory, 'passed.txt'), np.array([0]), '%i')
169    else:
170        print(f"Sample {run_id} failed, run time = {run_time}")
171    print(" ")
172    return run_time
def run_sampling( model: romtools.workflows.models.Model, parameter_space: romtools.workflows.parameter_spaces.ParameterSpace, absolute_sampling_directory: str, evaluation_concurrency=1, number_of_samples: int = 10, random_seed: int = 1, dry_run: bool = False, overwrite: bool = False):
 66def run_sampling(model: Model,
 67                 parameter_space: ParameterSpace,
 68                 absolute_sampling_directory: str,
 69                 evaluation_concurrency = 1,
 70                 number_of_samples: int = 10,
 71                 random_seed: int = 1,
 72                 dry_run: bool = False,
 73                 overwrite: bool = False):
 74    '''
 75    Core algorithm
 76    '''
 77
 78    # we use here spawn because the default fork causes issues with mpich,
 79    # see here: https://github.com/Pressio/rom-tools-and-workflows/pull/206
 80    #
 81    # to read more about fork/spawn:
 82    #   https://docs.python.org/3/library/multiprocessing.html#multiprocessing-start-methods
 83    #
 84    # and
 85    #   https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor
 86    #
 87    mp_cntxt=multiprocessing.get_context("spawn")
 88
 89    np.random.seed(random_seed)
 90
 91    # Create folder if it doesn't exist
 92    create_empty_dir(absolute_sampling_directory)
 93
 94    # create parameter samples
 95    parameter_samples = parameter_space.generate_samples(number_of_samples)
 96    parameter_names = parameter_space.get_names()
 97
 98    # Save parameter samples
 99    np.savetxt(f'{absolute_sampling_directory}/sample_parameters.txt', parameter_samples, fmt="%s "*parameter_space.get_dimensionality())
100
101    # Set up model directories
102    run_directory_base = f'{absolute_sampling_directory}/run_'
103    run_directories = []
104    starting_sample_index = 0
105    end_sample_index = starting_sample_index + parameter_samples.shape[0]
106    for sample_index in range(starting_sample_index, end_sample_index):
107        run_directory = f'{run_directory_base}{sample_index}'
108        create_empty_dir(run_directory)
109        parameter_dict = _create_parameter_dict(parameter_names, parameter_samples[sample_index - starting_sample_index])
110        model.populate_run_directory(run_directory, parameter_dict)
111        run_directories.append(run_directory)
112
113    # Print MPI warnings
114    print("""
115    Warning: If you are using your model with MPI via a direct call to `mpirun -n ...`,
116    be aware that this may or may not work for issues that are purely related to MPI.
117    """)
118    if not dry_run:
119        # Run cases
120        if evaluation_concurrency == 1:
121            run_times = np.zeros(number_of_samples)
122            for sample_index in range(0, number_of_samples):
123                print("=======  Sample " + str(sample_index) + " ============")
124                run_directory = f'{run_directory_base}{sample_index}'
125                if "passed.txt" in os.listdir(run_directory) and not overwrite:
126                    print("Skipping (Sample has already run successfully)")
127                else:
128                    print("Running")
129                    parameter_dict = _create_parameter_dict(parameter_names, parameter_samples[sample_index])
130                    run_times[sample_index] = run_sample(run_directory, model, parameter_dict)
131                    sample_stats_save_directory = f'{run_directory_base}{sample_index}/../'
132                    np.savez(f'{sample_stats_save_directory}/sampling_stats',
133                             run_times=run_times)
134        else:
135            #Identify samples to run
136            samples_to_run = []
137            for sample_index in range(0, number_of_samples):
138                run_directory = f'{run_directory_base}{sample_index}'
139                if "passed.txt" in os.listdir(run_directory) and not overwrite:
140                    print(f"Skipping sample {sample_index} (Sample has already run successfully)")
141                    pass
142                else:
143                    samples_to_run.append(sample_index)
144            with concurrent.futures.ProcessPoolExecutor(max_workers = evaluation_concurrency, mp_context=mp_cntxt) as executor:
145                these_futures = [executor.submit(run_sample,
146                                 f'{run_directory_base}{sample_id}', model,
147                                 _create_parameter_dict(parameter_names, parameter_samples[sample_id]))
148                                 for sample_id in samples_to_run]
149
150                # Wait for all processes to finish
151                concurrent.futures.wait(these_futures)
152
153            run_times = [future.result() for future in these_futures]
154            sample_stats_save_directory = f'{run_directory_base}{sample_index}/../'
155            np.savez(f'{sample_stats_save_directory}/sampling_stats', run_times=run_times)
156
157    return run_directories

Core algorithm

def run_sample( run_directory: str, model: romtools.workflows.models.Model, parameter_sample: dict):
160def run_sample(run_directory: str, model: Model, parameter_sample: dict):
161    run_id = _get_run_id_from_run_dir(run_directory)
162    ts = time.time()
163    flag = model.run_model(run_directory, parameter_sample)
164    tf = time.time()
165    run_time = tf - ts
166
167    if flag == 0:
168        print(f"Sample {run_id} is complete, run time = {run_time}")
169        np.savetxt(os.path.join(run_directory, 'passed.txt'), np.array([0]), '%i')
170    else:
171        print(f"Sample {run_id} failed, run time = {run_time}")
172    print(" ")
173    return run_time