Source code for romtools.workflows.workflow_utils

from typing import Iterable
import os
import shutil
import subprocess


[docs] def create_empty_dir(dir_name: str): ''' Create empty directory ''' if os.path.exists(dir_name): pass else: os.makedirs(dir_name)
[docs] def setup_directory(source_dir: str, target_dir: str, files2link: Iterable = (), files2copy: Iterable = ()): ''' Create new directory and populate with files ''' create_empty_dir(target_dir) for file in files2copy: shutil.copy(f'{source_dir}/{file}', f'{target_dir}/{os.path.basename(file)}') for file in files2link: os.symlink(f'{source_dir}/{file}', f'{target_dir}/{os.path.basename(file)}')
[docs] def run_model(module: str = None, pre_script: str = None, executable: str = 'bash', num_procs: int = 1, directory: str = '.', **kwargs): ''' Execute external model ''' execution_str = f'module load {module};' if module is not None else '' execution_str += pre_script if pre_script is not None else '' execution_str += f'mpirun -n {num_procs} {executable}' for flag, value in kwargs: execution_str += f' {flag} {value}' subprocess.run(execution_str, shell=True, check=True, cwd=directory)