diff --git a/binarycpython/tests/test_ensemble_manager.py b/binarycpython/tests/test_ensemble_manager.py new file mode 100644 index 0000000000000000000000000000000000000000..ff8cc1f20a15a4fa96a88f11216d5dd6b7a9169d --- /dev/null +++ b/binarycpython/tests/test_ensemble_manager.py @@ -0,0 +1,3 @@ +""" +Test routines for the ensemble_manager class +""" \ No newline at end of file diff --git a/binarycpython/utils/ensemble_manager.py b/binarycpython/utils/ensemble_manager.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..6bbf2af5f74aa60e959e6569bc498a81de481c05 100644 --- a/binarycpython/utils/ensemble_manager.py +++ b/binarycpython/utils/ensemble_manager.py @@ -0,0 +1,2108 @@ +""" +Main script containing the class for the ensemble manager +""" + + +############################################################ +# +# Functions used in the ensemble_manager.py script, +# part of binary_c using binary_c-python +# +# (c) Robert Izzard 2022+ +# +############################################################ + +import binarycpython +import colorama +import git +import itertools +import json +import os +import pandas as pd +import pathlib +import re +import socket +import sqlite3 +import subprocess +import sys +import uuid +from binarycpython.utils.functions import ( + get_defaults, + get_ANSI_colours, + ) +from colorama import Fore, Back, Style +from prettytable import PrettyTable +colorama.init() + +class binary_c_ensemble_manager: + """ + Binary_c's ensemble manager class. The ensemble manager helps you to launch, generate and manage sets of stellar population ensembles using binary_c and binary_c-python, both on your PC and HPC (Slurm and HTCondor) clusters. + + Please see the file $BINARY_C/src/python/ensemble_manager.py for a usage example, where $BINARY_C is the root of your binary_c installation. + """ + + ############################################################ + # + # class variables + # + # These are (very) unlikely to be changed by the user. + # + ############################################################ + default_ID_keys = [ + 'version', + 'git_branch', + 'git_revision', + 'git_url' + ] + encoding = 'utf-8' + sources = { + 'ensemble' : { + 'prefix' : 'e_', + 'dict' : 'ensemble_args', + }, + 'binary_c' : { + 'prefix' : 'b_', + 'dict' : 'binary_c_args', + }, + 'build' : { + 'prefix' : 'B_', + 'dict' : 'build_info', + }, + } + # unique keys skipped DB looked + skip_unique_key_regexes = [ + 'status', + 'uuid', + 'outdir', + 'tmpdir', + 'condor', + 'slurm' + ] + job_types = ( + 'pending', + 'launched', + 'finished', + 'finished broken', + 'stopped' + ) + colours = get_ANSI_colours() + HPC_types = ('condor','slurm') + + def __init__(self): + # set up the ensemble manager class + + # variables that may, or should, be changed by the user + self.sqlfile = 'ensembles.sql' + self.dbchunkfile = 'db_chunk_JSON' + self.inlist = None + + # get either inlist or command from command line, and check it + self.command,self.inlist,self.cmdline_args = self.parse_cmdline() + commands = self.commands() + if not self.command: + print("Please give a command to ensemble_manager, one of:",' '.join(commands),'. Or run "ensemble_manager.py help" to see some help text.') + sys.exit() + elif not self.command in commands: + print("Command",self.command,"unknown to ensemble_manager, one of:",' '.join(commands),'. Or run "ensemble_manager.py help" to see some help text.') + sys.exit() + + self.runtime_vars = {} + self.binary_c_args = {} + self.prefixes = [] + for s in self.sources.values(): + self.prefixes.append(s['prefix']) + + return None + + ############################################################ + def setup(self,cluster_options=None): + """ + Ensemble manager setup function + """ + self.set_runtime_vars() + if cluster_options: + # call custom cluster options function + cluster_options(self) + else: + # call the default cluster options function + self.add_cluster_options() + # reread inlist to overwrite options if required + self.read_inlist() + # make output directory + self.make_output_directory() + + def make_output_directory(self): + """ + Function to make the output directory for the + ensemble. + """ + try: + os.makedirs(self.runtime_vars['miscellaneous']['outdir'], + exist_ok=True) + except Exception: + print(f"Failed to make output directory at {self.runtime_vars['miscellaneous']['outdir']}") + sys.exit(1) + + @staticmethod + def run_cmd(cmd,show=True,show_flush=False,flush=True,header=None): + """ + Function to run an external shell command and + show the output as soon as it arrives. + """ + if isinstance(cmd,list): + # convert command list to string + cmd = binary_c_ensemble_manager._make_cmd(cmd) + if show and not show_flush: + # print the command before running it + print("Run : ",cmd) + if flush: + # force flushing with stdbuf + cmd = f'stdbuf -i 0 -e 0 -o 0 {cmd}' + if show_flush: + print("Run : ",cmd) + + proc = subprocess.Popen( + cmd, + shell=True, + stdout=subprocess.PIPE + ) + + for line in iter(proc.stdout.readline,b''): + try: + if header: + sys.stdout.write(f"{str(header)} : {line.decode(sys.stdout.encoding)}") + else: + sys.stdout.write(line.decode(sys.stdout.encoding)) + + except Exception: + raise + + proc.communicate() + return_code = proc.returncode + if return_code != 0: + print(f"\nSomething went wrong when submitting the HPC job, the command \"{cmd}\" returned code {return_code}: please check above for any errors") + sys.exit() + + @staticmethod + def binary_c_version(): + """ + return sorted binary_c version dict + """ + version_info = binarycpython.utils.population_extensions.version_info.version_info().return_binary_c_version_info() + return json.loads(json.dumps(version_info,sort_keys=True)) + + @staticmethod + def binary_c_git_revision(): + """ + return the binary_c git revision + """ + v = binary_c_ensemble_manager.binary_c_version()['miscellaneous'] + s = 'git_revision' + if s in v: + return v[s] + else: + return None + + @staticmethod + def check_binary_c_git_revision(exit_on_error=False): + """ + Check the current git revision compared to the binary_c build. + """ + try: + v_binary_c = binary_c_ensemble_manager.binary_c_version()['miscellaneous']['git_revision'] + except: + print("Warning: we do not know binary_c's git revision.") + v_binary_c = None + try: + repo = git.Repo(search_parent_directories=True) + if not repo: + print("Warning: we seem not to be running from a git checkout.") + v_repo = None + else: + v_repo = repo.head.object.hexsha[0:9] + except: + v_repo = None + + if v_repo and v_binary_c: + if not v_binary_c.endswith(v_repo): + if not os.getenv('ENSEMBLE_MANAGER_WRONG_REPO_WARNING'): + print(f"Warning: the binary_c repository is revision {v_binary_c} while the git repository is revision {v_repo}, these are inconsistent, please check that this is what you meant to do.") + if exit_on_error: + sys.exit(1) + elif exit_on_error: + print("Exit because we could not verify that binary_c's git version matched the repostiory.") + sys.exit(1) + + @staticmethod + def binary_c_version_number(): + """ + return the binary_c version number + """ + v = binary_c_ensemble_manager.binary_c_version()['miscellaneous'] + s = 'version' + if s in v: + return v[s] + else: + return None + + @staticmethod + def _quotewrap(list): + """ + Given a list, wrap each item in double quotes and return the new list + """ + return [f'"{_x}"' for _x in list] + + def _make_cmd(list): + """ + Given a list of arguments, build a command-line string + """ + return ' '.join(binary_c_ensemble_manager._quotewrap(list)) + + def set_runtime_vars(self): + """ + Function to setup the runtime_vars dict and make sure + the inlist options are loaded + """ + my_setup = self.system_setup(__file__) + self.runtime_vars = { + ############################################################ + # general options for run_populations.py + 'miscellaneous' : { + 'script' : self.manager_path('script'), + 'scriptdir' : my_setup['scriptdir'], + 'plotscript' : self.manager_path('plotscript'), + 'project' : None, + 'ensemble file' : self.manager_path('ensemble_file'), + }, + + ############################################################ + # build information + 'build_info' : self.binary_c_build_info_dict(), + + ############################################################ + # default HPC options + 'HPC_options' : { + 'njobs' : 1, + 'memory' : '512MB', + 'warn_max_memory' : '1024MB', + 'max_time' : 0, + 'directory' : None, + 'condor_requirements' : '', + 'slurm_partition' : None, + 'cluster_type' : None, + }, + + ############################################################ + # args passed to ensemble.py + 'ensemble_args' : { + ############################################################ + # ensemble.py's options + 'dists' : 'Moe', # Moe, 2008(+), 2018(+) + 'binaries' : True, + 'r' : 100, + 'M1spacing' : 'const_dt', # const_dt, logM1 + 'fsample' : 0.25, + 'normalize' : True, + 'mmax' : 80.0, + 'log_runtime_systems' : 0, + 'run_zero_probability_system' : 0, + 'combine_ensemble_with_thread_joining' : True, + + ################################################## + 'verbosity' : 1, + 'log_dt' : 300, # 300, # log to stdout every five minutes + 'log_args' : 0, + 'log_args_dir' : None, + + ############################################################ + # binary_c runtime options should be set in the inlist + 'max_evolution_time' : 15000, + 'minimum_timestep' : 1e-6, + 'ensemble' : 1, + 'ensemble_defer' : 1, + 'ensemble_logtimes' : 1, + 'ensemble_startlogtime' : 0.1, + ### 'ensemble_dt' : 1000.0, # not required when ensemble_logtimes==1 + 'ensemble_logdt' : 0.1, + 'ensemble_filters_off' : 1, + } + } + + # load the given inlist to obtain parameters for the run + self.read_inlist() + + # set output dir + self.runtime_vars['miscellaneous']['outdir'] = self.manager_path('outdir') + + return + + def inlist_path(self,file): + """ + Function to find the path of the inlist. + + file is a required parameter: it is the file name of the inlist + """ + + # first see if the inlist has already been set, in which case, use it + if self.inlist: + return self.inlist + + # check we have a file + if not file: + print("Trying to construct a path to the inlist, but no inlist file has been given. Please specify one as one of the first two arguments to ensemble_manager.py.") + sys.exit(1) + + # search paths for the inlist + # + # we use the file, and $BINARY_C/src/inlists, $BINARY_C/src/python, + # by default, unless the environment variable + # ENSEMBLE_MANAGER_INLIST_PATHS is set, in which case + # use the paths specified therein. + if os.getenv('ENSEMBLE_MANAGER_INLIST_PATHS'): + inlist_paths = os.getenv('ENSEMBLE_MANAGER_INLIST_PATHS').split(':') + else: + inlist_paths = ( + file, + os.path.join(os.getenv('BINARY_C'), + 'src', + 'inlists', + file), + os.path.join(os.getenv('BINARY_C'), + 'src', + 'python', + file), + ) + + # check all possible locations, return if found + inlist = None + for f in inlist_paths: + if os.path.isfile(f): + inlist = os.path.abspath(f) + break + if inlist: + print(f"Using inlist at {inlist}") + return inlist + + ############################################################ + def read_inlist(self): + """ + Read inlist and set appropriate variables + """ + if not self.inlist or self.inlist is None: + command,self.inlist,args = self.parse_cmdline() + self.inlist = self.inlist_path(self.inlist) + print(f"Using inlist at {self.inlist}") + + if not self.inlist: + print(f"Failed to load the inlist at {self.inlist} : please give me an inlist that sets runtime_Vars, binary_c_args, ensemble_args, project and possibly HPC_options. Check out the examples in $BINARY_C/src/inlists. Thanks!") + sys.exit(1) + + localdict = { + 'runtime_vars' : self.runtime_vars + } + + try: + exec(open(self.inlist).read(), + globals(), + localdict) + + except Exception as e: + exception_type, exception_object, exception_traceback = sys.exc_info() + print("Exception type: ", exception_type) + print(f"{e} : Failed to load the inlist at {self.inlist} : please give me an inlist that sets runtime_vars, binary_c_args, ensemble_args, project and possibly HPC_options. Check out the examples in $BINARY_C/src/inlists. Thanks!") + sys.exit(1) + + if not 'runtime_vars' in localdict: + print(f"Failed to load the inlist at {self.inlist} : please give me an inlist that sets runtime_vars, binary_c_args, ensemble_args, project and possibly HPC_options. Check out the examples in $BINARY_C/src/inlists. Thanks!") + sys.exit(1) + + if 'binary_c_args' in localdict: + self.binary_c_args = localdict['binary_c_args'].copy() + for x in self.runtime_vars['miscellaneous']: + if x in localdict: + self.runtime_vars['miscellaneous'][x] = str(localdict[x]) + for x in ('ensemble_args','HPC_options'): + if x in localdict: + self.runtime_vars[x].update(localdict[x]) + return + + def parse_cmdline(self): + """ + Parse the command line and return the command, inlist and remaining args, + if given. Also check the BINARY_C env var. + """ + if not os.getenv('BINARY_C'): + print("You need to set the BINARY_C environment variable to point to your binary_c installation.") + sys.exit() + + command = None + inlist = None + args = [] + l = len(sys.argv) + if l > 1: + if l > 2: + f1 = self.inlist_path(sys.argv[1]) + f2 = self.inlist_path(sys.argv[2]) + if f1: + inlist = f1 + command = sys.argv[2] + args = sys.argv[3:] + elif f2: + command = sys.argv[1] + inlist = f2 + args = sys.argv[3:] + else: + command = sys.argv[1] + args = sys.argv[2:] + # no inlist file + else: + f = self.inlist_path(sys.argv[1]) + if f: + inlist = f + command = None + else: + command = sys.argv[1] + + return command,inlist,args + + def manager_path(self, + name=None, + path=None): + """ + Function to return a default file or directory name, given the "name" + of the path type + """ + if name == 'outdir': + return self.populations_dir(path) + elif name == 'script': + return os.getenv('ENSEMBLE_MANAGER_SCRIPT') or 'ensemble.py' + elif name == 'ensemble_file': + return 'ensemble_output.json.bz2' + elif name == 'plotscript': + return os.getenv('ENSEMBLE_MANAGER_PLOTSCRIPT') or 'ensemble_plotter_threaded.py' + + def populations_dir(self, + path=None): + """ + return a path to the population directory for output + + if ENSEMBLE_MANAGER_DIR is set, just use this + + Otherwise, append the stub to the default + directory name, where the stub is like + -BINARY_C_VERSION-GIT_REVISION + + """ + envdir = os.getenv('ENSEMBLE_MANAGER_DIR') + if envdir: + return envdir + else: + # set up the stub that appends onto the main path + stub = \ + f'-{str(self.binary_c_version_number())}-{str(self.binary_c_git_revision())}' + + # append the project name if given + if self.runtime_vars and \ + 'miscellaneous' in self.runtime_vars and \ + 'project' in self.runtime_vars['miscellaneous']: + stub += f"-{self.runtime_vars['miscellaneous']['project']}" + + if path: + return path + stub + else: + home = str(pathlib.Path.home()) + return os.path.join(home, 'data', f'populations{stub}') + + ############################################################ + # + # Column prefix functions: we use the prefixes (e.g. 'b_' + # and 'e_') to map nested dicts to the non-nested sqlite + # database. + # + ############################################################ + def col_is_prefixed(self,col): + """ + Detect if column (col) is prefixed with one of our + above-defined prefixes + """ + for prefix in self.prefixes: + if col.startswith(prefix): + return True + return False + + def strip_prefix(self,col): + """ + Function to strip the prefix of a column. + + If the column (col) is prefixed, e.g. with 'e_' or 'b_', + remove the prefix and return the true column name. + """ + if self.col_is_prefixed(col): + return col[2:] + else: + return col + + def binary_c_build_info_dict(self): + """ + Function to return the binary_c build information as a dictionary + """ + v = self.binary_c_version() + d = {} + for k in self.default_ID_keys: + if k in v['miscellaneous']: + d[k] = v['miscellaneous'][k] + return d + + @staticmethod + def expand_args(d, + _list=None, + opts=None, + first=True): + """ + Recursive function to expand a set of binary_c_args + and return a list of argument lists. + """ + if _list is None: + _list = [] + if opts is None: + opts = {} + for key,value in d.items(): + t = type(value) + if t is tuple: + # tuple contains: [], {} : + # we loop over the list and ally the dict for each + vals,subdict = value + for val in vals: + newopts = opts.copy() + newopts[key] = val + binary_c_ensemble_manager.expand_args(subdict, + _list=_list, + opts=newopts, + first=False) + elif t is list: + # list of values over which we loop + for val in value: + newopts = opts.copy() + newopts[key] = val + _list.append(newopts) + + elif t is int or t is float or t is str: + # single number or string + newopts = opts.copy() + newopts[key] = value + _list.append(newopts) + + else: + print("Error in parsing binary_c_args dict: sub-dict of type ",t," (key=",key,",value=",value,") should be tuples or lists, not type",t) + sys.exit() + + if first: + # all done: return the list of lists, + # or an empty list if we have nothing + if len(_list)==0: + return [[]] + return _list + + def launch(self, + opts, + dry_run=False): + """ + Given a set of options, launch populations using the + local HPC. + + If dry_run is True, we don't actually launch or change the + database. + """ + opts = opts.copy() # work on a copy + + v = opts['binary_c_args'] + + # get binary_c defaults + d = get_defaults() + + # construct dict of vars that changed relative to the default + c = {} + for var in v: + if var in d: + if not self.vars_same(v[var],d[var]): + c[var] = v[var] + + # replace binary_c args with only those we need to change + opts['binary_c_args'] = c + + # open the database + db = self.connect_db(opts) + + # make ID for this population in the database + db_id = self.binary_c_ensemble_id_dict(opts=opts, + prefix=True, + force_strings=True) + + result = self.search_db_dict(db,db_id) + + # check if we've already run this population + if not result: + # we haven't already run, so we should + + # make a unique ID hash string + uuid = self.ensemble_uuid() + + # easier typing + prefix = self.sources['ensemble']['prefix'] + + # save the UUID in the database + db_id[f"{prefix}uuid"] = uuid + + # make output directory + db_id[f"{prefix}outdir"] = \ + os.path.join(opts['miscellaneous']['outdir'], + f'ensemble-{uuid}') + + if dry_run == False: + os.makedirs(db_id[f"{prefix}outdir"], + exist_ok=True) + + cluster = None + + for c in ['slurm','condor']: + cc = f"{prefix}{c}" + if cc in db_id and self.try_number(db_id[cc]) == 1: + cluster = c + + if cluster: + cluster_dir = os.path.join(db_id['e_outdir'], + cluster) + else: + cluster_dir = None + + # enter the data into the sqlite database + if dry_run == False: + self.enter_db_dict(db,db_id) + + # and launch the population + if dry_run: + print("[Dry run] ",end='') + print("Launch in",db_id['e_outdir'],':', + self.colours['red']+repr(opts)) + + cmd = [ + os.path.join(opts['miscellaneous']['scriptdir'], + opts['miscellaneous']['script']), + ] + for x in (opts['ensemble_args'],opts['binary_c_args']): + for k,v in x.items(): + cmd.append(f"{str(k)}={str(v)}") + + cmd += [ + f'outdir={db_id["e_outdir"]}' + ] + + if cluster: + cmd.append(f"{cluster}_dir={cluster_dir}") + + # build command string + cmd = binary_c_ensemble_manager._make_cmd(cmd) + + if dry_run: + print("[Dry run] ",end='') + print(f"Run command: {self.colours['yellow']}{repr(cmd)}") + + # run the command to launch the job + if not dry_run: + self.run_cmd(cmd,header=uuid) + + # set status + self.set_status(db=db, + uuid=uuid, + status="launched") + print(f"{self.colours['green']}... ensemble has been launched.{self.colours['reset']}") + + else: + print(f"[Dry run] ... {self.colours['green']}ensemble would have launched{self.colours['reset']}") + else: + try: + uuid = f" as {self.colours['yellow']}{result[0][0]}{self.colours['cyan']}," + except Exception: + uuid = '' + + print(f"{self.colours['cyan']}This ensemble has already launched{uuid} skipping it.") + + # close the database + self.disconnect_db(db) + + ############################################################ + @staticmethod + def system_setup(scriptpath): + """ + Return a dict containing useful system setup information + + You should call this with + + manager.system_setup(__file__) + + so we know the location of the parent path which + contains the various Python parts of binary_c's ensemble + handling. + + """ + + # sorted version info + v = binary_c_ensemble_manager.binary_c_version() + + e = { + 'home' : str(pathlib.Path.home()), + 'version' : v['miscellaneous']['version'], + 'scriptpath' : os.path.realpath(scriptpath), + 'scriptdir' : os.path.dirname(os.path.realpath(scriptpath)), + } + + return e + + def binary_c_ensemble_id_dict(self, + opts=None, + prefix=False, + force_strings=False): + + """ + Make a unique non-nested dict that defines an ensemble run + based on the opts dict. + + If prefix is True, we apply the source prefixes as required + for the SQLite database (e.g. 'b_' for binary_c args, + 'e_' for ensemble args, 'B_' for build information, + see the sources dict defined at the top of this file). + + If force_strings is True, we make sure all stored values + are strings (required for the SQLite database). + + """ + if opts is None: + opts = {} + id = {} + for source in self.sources: + s = self.sources[source] + _prefix = s['prefix'] + _dict = opts[s['dict']] + for k in _dict: + col = k + if prefix: + col = f"{_prefix}{col}" + val = _dict[k] + if force_strings: + val = str(val) + id[col] = val + + # return dict + return id + + def binary_c_ensemble_identifier_JSON(self, + opts): + """ + wrapper to return a JSON identifier + """ + d = self.binary_c_ensemble_id_dict(opts) + return self.binary_c_ensemble_identifier_JSON_from_dict(d) + + ############################################################ + # functions to convert JSON <> dict + ############################################################ + @staticmethod + def binary_c_ensemble_identifier_dict_from_JSON(id_JSON): + """ + convert ID in JSON format to dict + """ + return json.loads(id_JSON) + + @staticmethod + def binary_c_ensemble_identifier_JSON_from_dict(id_dict): + """ + convert ID in dict format to JSON + """ + return json.dumps(id_dict) + + ############################################################ + # + # Database administration functions + # + ############################################################ + + def db_file(self, + opts, + outdir=None): + """ + return the ensemble_manager SQLite database filename + """ + if not outdir: + outdir = opts['miscellaneous']['outdir'] + f = os.path.join(outdir, + self.sqlfile) + return f + + def make_new_db(self, + opts): + """ + make a new binary_c_ensemble_manager sqlite3 database + at the default directory location (given by self.db_file()). + """ + f = self.db_file(opts) + print("Make new SQLite database at",f) + db = sqlite3.connect(f) + + sql = [] + for k in self.default_ID_keys: + x = f'B_{k} text' + sql.append(x) + sql_string = f'create table if not exists ensembles ("status",{", ".join(sql)})' + self.db_do_sql(db, + sql_string) + + # make a table of binary_c defaults + d = get_defaults() + sql_string = 'create table if not exists defaults ("variable","value")' + self.db_do_sql(db, + sql_string) + s = [] + for var in d: + s.append(f'("{var}","{d[var]}")') + self.db_do_sql(db, + f'insert into defaults("variable","value") values {", ".join(s)}') + + + def connect_db(self, + opts, + outdir=None, + file=None, + vb=False): + """ + Connect to the ensemble managers SQLite database at file + """ + if not file: + file = self.db_file(opts, + outdir=outdir) + + # check if the database exists, if not, create it + if not os.path.exists(file): + if vb: + print("No existing database : make new") + self.make_new_db(opts) + + if vb: + print("Connect to database",file) + try: + db = sqlite3.connect(file) + + except Error as e: + print(e) + return db + + def disconnect_db(self,db,vb=False): + """ + disconnect from binary_c_ensemble_manager's SQLite database + """ + if vb: + print("Disconnect from database",db) + if db: + try: + db.close() + except Error as e: + print("Failed to disconnect SQLite database",e) + return db + + ############################################################ + # + # Database search functions + # + ############################################################ + + def search_db_dict(self, + db, + id_dict, + strip_uuids=False + ): + """ + + Given an identifier dict, search for a matching ensemble + in the database. Returns the matching ensemble identifier, + or None if not found. + + We skip keys that match a string skip in skip_key_regexes. + By default this skips the UUID and directories that are + unique and may change. + + """ + keys = [] + wheres = [] + rlist = list(map(re.compile, self.skip_unique_key_regexes)) + for k in id_dict: + match = False + for r in rlist: + if r.search(k): + match = True + if match == False: + keys.append(k) + wheres.append(f'{k} = "{id_dict[k]}"') + + sql = f'select e_uuid, {", ".join(keys)} from ensembles where {" and ".join(wheres)} ;' + result = self.db_do_sql(db, + sql, + vb=False, + allow_error=True) + + if result: + lines = result.fetchall() + if len(lines) == 0: + return None + else: + return lines + else: + return None + + def search_db_JSON(self,db,id_JSON): + """ + Given an identifier in JSON format, search for a match in + the database. + """ + id_dict = self.binary_c_ensemble_identifier_dict_from_JSON(id_JSON) + return self.search_db_dict(db,id_dict,strip_uuids=True) + + + def db_columns(self,db,table): + """ + Return a list of columns in the given table in sqlite3 database db. + """ + table_info = self.db_do_sql(db,f'pragma table_info({table})') + cols = [] + if table_info: + for x in table_info: + cols.append(x[1]) + return cols + + ############################################################ + # + # Database data-entry functions + # + ############################################################ + + def enter_db_dict(self,db,db_id): + """ + Enter an ensemble run corresponding to the db_id dict + into the database + + Note: we assume it is not already in the database, you + need to do the checking to make sure it is not + doubled. + """ + + + db_cols = self.db_columns(db,'ensembles') + d = get_defaults() + + # add missing columns + for col in db_id: + if not col in db_cols: + sql = f'alter table ensembles add column {col} text ' + # set a default for the column + # if it's a binary_c argument + if col.startswith('b_'): + arg = col[2:] + if arg in d: + sql += f'default "{d[arg]}"' + sql += ';' + self.db_do_sql(db,sql) + + + # add this run to database + sql = 'insert into ensembles (status, ' + \ + ', '.join(db_id.keys()) + ') ' + \ + 'values' + \ + ' ("pending", "' + '", "'.join(db_id.values()) + '")' + + self.db_do_sql(db,sql,allow_error=False) + + # save a JSON version of the database information in the directory + # in case we need to reconstruct it + j = self.binary_c_ensemble_identifier_JSON_from_dict(db_id) + p = os.path.join(db_id[f"{self.sources['ensemble']['prefix']}outdir"], + self.dbchunkfile) + with open(p,"w",encoding=self.encoding) as f: + f.write(j) + + return + + def db_do_sql(self, + db, + sql_string, + commit=True, + allow_error=False, + vb=False): + """ + Execute sql_string in the database + + If allow_error is True (default) on error we return None + """ + cur = db.cursor() + if vb: + print(f"SQLite DO: {self.colours['magenta']}{sql_string}{self.colours['reset']}") + + try: + result = cur.execute(sql_string) + except Exception: + if allow_error: + return None + else: + raise + + if commit: + db.commit() + + return result + + def enter_db_JSON(self,db,id_JSON): + """ + Wrapper to enter a JSON string in db + """ + return self.enter_db_dict(db, + binary_c_ensemble_identifier_dict_from_JSON(id_JSON)) + + ############################################################ + # + # HPC Cluster-specific options + # + ############################################################ + + def add_cluster_options(self,vb=False): + """ + Function to add to launch_population.py's + runtime_vars according to the (auto) deterined local + cluster. + + Of course, you will want to do something different for + your cluster... but you can use this as a basis for your + function, or override the parameters passed in. + """ + if vb: + print("Check cluster") + + # use getfqdn() to acquire the full hostname + hostname = socket.getfqdn() + if vb: + print("Full hostname",hostname) + + cluster_type = self.runtime_vars['HPC_options']['cluster_type'] + + # if we have no cluster type, set it by according + # to the hostname + if cluster_type is None: + if re.search('eureka', hostname): + # Surrey's Eureka(2) Slurm cluster + if re.search('eureka2', hostname): + cluster_name = 'Eureka2' + else: + cluster_name = 'Eureka' + + cluster_type = 'slurm' + if not self.runtime_vars['HPC_options']['slurm_partition']: + self.runtime_vars['HPC_options']['slurm_partition'] = 'all' + + elif re.search('orca', hostname): + # Surrey's HTCondor cluster + cluster_name = 'Orca' + cluster_type = 'condor' + + if cluster_type: + print("On cluster",cluster_name,"of type",cluster_type) + x = {} + h = self.runtime_vars['HPC_options'] + + # set ensemble_args from HPC_options + if cluster_type == 'slurm': + x = { + 'slurm' : 1, + 'slurm_dir' : h['directory'], + 'slurm_njobs' : h['njobs'], + 'slurm_memory' : h['memory'], + 'slurm_warn_max_memory' : h['warn_max_memory'], + 'slurm_time' : h['max_time'], + 'slurm_partition' : h['slurm_partition'], + } + + elif cluster_type == 'condor': + x = { + 'condor' : 1, + 'condor_dir' : h['directory'], + 'condor_njobs' : h['njobs'], + 'condor_memory' : h['memory'], + 'condor_warn_max_memory' : h['warn_max_memory'], + 'condor_requirements' : h['condor_requirements'], + } + + self.runtime_vars['ensemble_args'].update(x) + + ############################################################ + # + # Status functions + # + ############################################################ + def set_status(self, + status=None, + uuid=None, + db=None, + outdir=None): + """ + Function to set the status of job given by UUID. + + You can pass in either the open database (db) or + the outdir in which it is stored. + """ + db,outdir = self.default_db(None,db=db,outdir=outdir) + + # check UUID exists in the database + sql = f'select * from ensembles where "e_uuid" = "{uuid}";' + result = self.db_do_sql(db,sql) + if result: + lines = result.fetchall() + if len(lines) == 0: + print(f"Failed to find jobs in the databse matching UUID {uuid} when trying to set_status, this is an error ") + sys.exit(1) + else: + print(f"Failed to find jobs in the databse matching UUID {uuid} when trying to set_status, this is an error ") + sys.exit(1) + + # set the status + sql = f'update ensembles set status = "{status}" where "e_uuid" = "{uuid}";' + self.db_do_sql(db,sql) + + def get_status(self, + uuid=None, + db=None, + outdir=None): + """ + Function to get the status of a job given its UUID. + + You can pass in either the open database (db) or + the outdir in which it is stored, but one of the two must + be passed in. + + Returns None on failure. + """ + if not db and not outdir: + print("in get_status you must either pass in an open database, or an outdir where it is stored") + sys.exit(1) + + # set the status + sql = f'select status from ensembles where "e_uuid" = "{uuid}"' + result = self.db_do_sql(db,sql) + if result: + lines = result.fetchall() + if len(lines) == 0: + print("Failed to get status of job with UUID=",uuid,"(no data available in the database") + return None + else: + return str(lines[0][0]) + else: + print("Failed to get status of job with UUID=",uuid) + return None + + @staticmethod + def _unique_list(list): + """ + Return a unique list based on list passed in, keeping the + original sequence order. + """ + s = set() + return [x for x in list if x not in s and not s.add(x)] + + def status(self, + opts, + db=None, + outdir=None, + formatting='pandas', + colour='True', + alldata=False, + sqlfilters=None): + """ + Report status of all binary_c_ensemble_manager jobs + + Options: + db = database, auto connected if None + outdir = output directory to be search, auto detected if None + formatting : Module to use for output, 'pandas' or 'prettytable' + colour : whether to use colour in the output (True) + alldata : if True forces pandas to set maxrows/cols to None + so all the data is output whatever the terminal size + sqlfilters : default sqlfilters for the output + """ + if sqlfilters is None: + sqlfilters = ['e_uuid', 'status'] + + db,outdir = self.default_db(opts,db=db,outdir=outdir) + + d = get_defaults() + cols = self.db_columns(db,'ensembles') + + self.check_binary_c_git_revision() + + # add extra filters from command line + wheres = [] + if len(self.cmdline_args) > 0: + for arg in self.cmdline_args: + if arg in cols: + sqlfilters.append(x) + else: + # arg not found + # + # first check for a special command + if arg == 'prettytable' or arg == 'pandas': + # change table formatting + formatting = arg + elif arg == 'alldata': + # force pandas to show all rows/columns + pd.set_option('display.max_rows',None) + pd.set_option('display.max_columns',None) + elif arg == 'binary_c_args': + # add binary_c args that are in the table + for col in cols: + if col.startswith('b_'): + sqlfilters.append(col) + elif arg == 'colour': + # turn colours on + colour = True + elif arg == 'nocolour': + # turn colours off + colour = False + else: + # then check for it with a prefix + for x in cols: + if x.endswith(arg): + sqlfilters.append(x) + break + + sqlfilters = self._unique_list(sqlfilters) + + # construct SQLite command + sql = f'select {", ".join(sqlfilters)} from ensembles' + if len(wheres)>0: + sql += f' where {" and ".join(wheres)}' + sql += ';' + + # set up formatting for output + if colour: + self.colours['default'] = self.colours['yellow'] + self.colours['nondefault'] = self.colours['red'] + else: + self.colours['default'] = '' + self.colours['nondefault'] = '' + + # do it + result = self.db_do_sql(db,sql,allow_error=True) + if result: + # get the result in a 2D list (NOT a 2D tuple) + lines = [] + for l in result.fetchall(): + lines.append(list(l)) + + if len(lines) == 0: + print("No job status data available") + return + else: + separator = ' : ' + table = [sqlfilters] + for line in lines: + for i,col in enumerate(sqlfilters): + if col.startswith('b_'): + # binary_c arg column + barg = col[2:] + if barg in d and self.vars_same(line[i], d[barg]): + line[i] = f"[{self.colours['default']}{line[i]}{self.colours['reset']}]" + else: + line[i] = f"[{self.colours['nondefault']}{line[i]}{self.colours['reset']}]" + + table.append(line) + + # format header: remove prefixes + for i,col in enumerate(table[0]): + table[0][i] = self.strip_prefix(col) + + if formatting == 'pandas': + print(pd.DataFrame(table)) + elif formatting == 'prettytable': + pretty = PrettyTable() + pretty.field_names = table[0] + pretty.add_rows(table[1:]) + pretty.align = 'r' + print(pretty) + else: + print(table) + + else: + print("No data available (have you run any ensembles? have you set the inlist correctly?)") + + return + + def long_status(self, + opts, + db=None, + outdir=None): + """ + Like manager.status() but outputs all information + """ + db,outdir = self.default_db(opts,db=db,outdir=outdir) + + # default sql filters: don't want these twice + sqlfilters=['e_uuid', 'status'] + + cols = self.db_columns(db,'ensembles') + for col in cols: + if not col in sqlfilters: + sqlfilters.append(col) + + return self.status(opts,db=db,sqlfilters=sqlfilters) + + + + def default_db(self, + opts, + db=None, + outdir=None): + """ + Set outdir and db to defaults, if not given + """ + # get outdir if not given, but we need opts for it + if opts and not outdir: + outdir = opts['miscellaneous']['outdir'] + + # connect to database if not given + if not db: + file = os.path.join(outdir, + self.sqlfile) + db = self.connect_db(opts, + outdir=outdir, + file=file) + + return db,outdir + + ############################################################ + # + # Database action functions + # + ############################################################ + def stop_all_of(self, + opts, + option, # the type of job to delete + db=None, + outdir=None): + db,outdir = self.default_db(opts,db=db,outdir=outdir) + """ + stop all jobs of a the type given in option + """ + if option in self.job_types: + # get a list of launched jobs + sql = f'select e_uuid from ensembles where status = "{option}";' + elif option == 'all': + # get a list of all jobs + sql = 'select e_uuid from ensembles;' + else: + print("Unknown option",option,": should be one of",', '.join(self.job_types,'all')) + sql = None + + if sql: + result = self.db_do_sql(db, + sql) + if result: + lines = result.fetchall() + if len(lines) == 0: + print('No jobs of status',option,'found') + return + + jobs = list(itertools.chain(*lines)) + print('Stop jobs of status',option,': ',', '.join(jobs)) + + for job in jobs: + self.stop_job(opts,job) + + return + + def delete_all_of(self, + opts, + option, + db=None, + outdir=None, + stop=True): + db,outdir = self.default_db(opts,db=db,outdir=outdir) + """ + delete all jobs of a given type from the binary_c_ensemble_manager database. + + The stop parameter is passed to delete_job() + """ + if option in self.job_types: + # get a list of launched jobs + sql = f'select e_uuid from ensembles where status = "{option}";' + elif option == 'all': + # get a list of all jobs + sql = 'select e_uuid from ensembles;' + else: + print("Unknown option",option,": should be one of",', '.join(self.job_types,'all')) + sql = None + + if sql: + result = self.db_do_sql(db, + sql) + if result: + lines = result.fetchall() + if len(lines) == 0: + print('No jobs of status',option,'found') + return + + jobs = list(itertools.chain(*lines)) + print('Delete jobs of status',option,': ',', '.join(jobs)) + + for job in jobs: + self.delete_job(opts,job) + + return + + + def stop_job(self, + opts, + uuid, # uuid to stop + db=None, + outdir=None): + db,outdir = self.default_db(opts,db=db,outdir=outdir) + """ + Stop one job in the ensemble_manager database with given UUID. + """ + print("Stop job",uuid) + uuids = self.uuid_list(opts,db,outdir) + + if len(uuids) > 0: + if not uuid in uuids: + print("Job",uuid,"is not in the database, and you want to stop it, this is an error.") + return + else: + print("Database is empty: no jobs to stop") + return + + # check job is in state "launched" + if self.get_status(uuid=uuid,db=db,outdir=outdir) != 'launched': + print("Job",uuid,"cannot be stopped because its status is not of status 'launched'.") + return + + jobdir = os.path.join(outdir, + f'ensemble-{str(uuid)}') + + if os.path.isdir(jobdir): + # find the HPC job information + HPC_type = None + HPC_dir = None + for h in self.HPC_types: + d = os.path.join(jobdir,h) + if os.path.isdir(d): + HPC_type = h + HPC_dir = d + break + + if HPC_type and HPC_dir: + # get the job id + i = self.HPC_info_from_dir(opts,HPC_dir) + if i and i['jobid']: + print("Stop job with HPC ID",i['jobid']) + if HPC_type == 'slurm': + cmd = ["scancel", i['jobid']] + elif HPC_type == 'condor': + cmd = ["condor_rm", i['jobid']] + else: + cmd = None + + if cmd: + # run the command to stop the job + self.run_cmd(cmd,header=uuid) + # remove the jobid file so we know the job + # has been stopped + os.remove(os.path.join(HPC_dir,'jobid')) + + self.set_status('stopped', + uuid=uuid, + db=db, + outdir=outdir) + + def delete_job(self, + opts, + uuid, + db=None, + outdir=None, + stop=True): + """ + delete one job from the ensemble_manager database with + the given UUID. + + Note: by default we stop the job first, if you don't + want to do this, set stop=False + """ + db,outdir = self.default_db(opts,db=db,outdir=outdir) + uuids = self.uuid_list(opts,db=db,outdir=outdir) + if not uuid in uuids: + print("Job",uuid,"is not in the database and this is an error") + return + + if stop: + print("Stopping job",uuid,"prior to delete") + self.stop_job(opts,uuid,db=db,outdir=outdir) + + jobdir = os.path.join(outdir, + f'ensemble-{str(uuid)}') + print("Delete job",uuid,"from the database") + sql = 'delete from ensembles where e_uuid = "{uuid}"' + self.db_do_sql(db,sql) + + @staticmethod + def subfolders(folder): + """ + Return a list of subfolders in folder + """ + return [ f.path for f in os.scandir(folder) if f.is_dir() ] + + def update_database(self, + opts, + db=None, + done=None, + outdir=None, + did_something=False): + """ + Function to update the database depending on the disk + + If this function returns true, it should be called again. + """ + db,outdir = self.default_db(opts,db=db,outdir=outdir) + print(f"Update database at {outdir}") + + # call_again is set to True if we need to update again + # on exit + call_again = False + + # the database is out of date with respect to what is stored + # on disk : update it + _subfolders = self.subfolders(outdir) + + if done is None: + done = {} + + for subfolder in _subfolders: + action = None + print("update: Found subfolder",subfolder) + uuid = self.uuid_from_dir(subfolder) + status_dir = self.status_from_dir(opts, + directory=subfolder, + db=db, + outdir=outdir) + if uuid: + status_db = self.get_status(db=db, + outdir=outdir, + uuid=uuid) + else: + status_db = None + + jsonpath = os.path.join(subfolder, + self.dbchunkfile) + + print(" status : in database = ",status_db,"; in folder = ",status_dir,"; json chunk path = ",jsonpath) + + if status_db is None and status_dir is not None: + # job is not in the database, but we have the directory + # with a json backup chunk, so add + print(" Add directory to database from JSON chunk",uuid) + with open(jsonpath) as f: + db_dict = json.load(f) + if db_dict: + self.enter_db_dict(db,db_dict) + self.set_status(db=db,uuid=uuid,status="launched") + call_again = True + did_something=True + action = f'Add dir from JSON at {f}' + + elif status_dir and status_db != status_dir: + # job has changed status + print(" Update from",status_db,"to",status_dir) + self.set_status(db=db,uuid=uuid,status=status_dir) + did_something=True + action = f'{status_db} -> {status_dir}' + + # logging + if action: + if action in done: + done[action] += 1 + else: + done[action] = 1 + + print() + if did_something == False: + print("update did nothing : database is up to date") + call_again = False + else: + if call_again: + print("update did something : returning so we re-update") + else: + print("database has been updated:") + for k in done: + if done[k] == 1: + print(f" {k} (once)") + else: + print(f" {done[k]} x {k}") + + return call_again,did_something,done + + def status_from_dir(self, + opts, + directory, + db=None, + outdir=None): + """ + Given a directory, guess its status + """ + db,outdir = self.default_db(opts,db=db,outdir=outdir) + h = self.HPC_info_from_dir(opts,directory) + + jsonpath = os.path.join(directory, + self.dbchunkfile) + + if h['HPC type'] is None: + # job ran locally : look at its status directory + # to determine current status + d = os.path.join(directory, 'status') + if os.path.isdir(d): + status = {} + n = 0 + for file in os.listdir(d): + with open(os.path.join(d,file),"r",encoding=self.encoding) as f: + s = f.readlines()[0].strip() + + if not s in status: + status[s] = 0 + status[s] += 1 + n += 1 + if n > 0: + if 'finished' in status and status['finished'] == n: + # jobs are all finished + return 'finished' + else: + # jobs exist, but are not finished, but presumably launched + return 'launched' + else: + # no jobs : must be pending? + return 'pending' + print("We should never get here : failure to obtain status in status_from_dir().") + sys.exit() + else: + # HPC job that is running/has run on a cluster + # + # make a dict counting the status of all the files + status = {} + for file,filestatus in h['HPC status'].items(): + if not filestatus in status: + status[filestatus] = 0 + status[filestatus] += 1 + + # given a directory, determine its likely ensemble_manager status + # (assuming it's not in the database) + if os.path.isfile(os.path.join(directory, opts['miscellaneous']['ensemble file'])): + # this has probably finished, but get the HPC info to check + + if h['HPC njobs']>0 and status['finished'] == h['HPC njobs']: + # all are finished + return 'finished' + else: + # not finished, so either launched or stopped: + # if we stopped it, there's no jobid file + if not h['jobid']: + return 'stopped' + else: + return 'launched' + + else: + if h['HPC njobs']>0 and \ + 'finished' in status and \ + status['finished'] == h['HPC njobs']: + return 'finished broken' + else: + if not h['jobid']: + return 'stopped' + else: + return 'launched' + return None + + def HPC_info_from_dir(self, + opts, + directory): + ############################################################ + # + # given a directory, obtain its HPC information from disk + # + ############################################################ + h = { + 'directory' : directory, + 'jobid' : None, + 'ensemble data path' : None, + 'ensemble data size' : None, + 'HPC type' : None, + 'HPC directory' : None, + 'HPC joining' : None, + 'HPC status files' : [], + 'HPC status' : {}, + 'HPC njobs' : 0, + } + print("DIR",directory) + + # get the existence and size of the ensemble_output file + datafiles = ( + 'ensemble_output.json', + 'ensemble_output.json.bz2', + opts['miscellaneous']['ensemble file'] + ) + + for f in datafiles: + efile = os.path.join(directory, f) + if os.path.exists(efile): + h['ensemble data path'] = efile + h['ensemble data size'] = os.path.getsize(efile) + + # get the HPC type (e.g. slurm, condor) + for t in self.HPC_types: + d = os.path.join(directory,t) + if os.path.isdir(d): + h['HPC type'] = t + h['HPC directory'] = d + + if h['HPC type']: + # get the HPC information + if os.path.isfile(os.path.join(h['HPC directory'], + 'joining')): + h['HPC joining'] = True + else: + h['HPC joining'] = False + + # get the jobid + jobidfile = os.path.join(h['HPC directory'], 'jobid') + if os.path.isfile(jobidfile): + with open(jobidfile,"r",encoding=self.encoding) as f: + h['jobid'] = f.readlines()[0].strip() + + + # list of statusfiles + statusdir = os.path.join(h['HPC directory'], + 'status') + if os.path.isdir(statusdir): + h['HPC status files'] = [f for f in os.listdir(statusdir) if os.path.isfile(os.path.join(statusdir, f))] + + for statusfile in h['HPC status files']: + p = os.path.join(directory, + statusdir, + statusfile) + with open(p,'r',encoding=self.encoding) as f: + h['HPC status'][statusfile] = f.readlines()[0].strip() + h['HPC njobs'] += 1 + + return h + + + + def make_plots(self, + opts, + db=None, + outdir=None): + """ + Function to make ensemble plots when they are missing. + """ + db,outdir = self.default_db(opts,db=db,outdir=outdir) + + _subfolders = self.subfolders(outdir) + + if len(self.cmdline_args)>2 and self.cmdline_args[0]=='force': + force = True + else: + force = False + plot = False + for subfolder in _subfolders: + uuid = self.uuid_from_dir(subfolder) + status = self.get_status(db=db, + outdir=outdir, + uuid=uuid) + if status == 'finished': + pdffile = f"{subfolder}/ensemble.pdf" + if force or \ + not os.path.isfile(pdffile) \ + or os.path.getsize(pdffile) == 0: + + # search for the plotscript : first try the + # absolute filename, then in the scriptdir + f = None + f1 = opts['miscellaneous']['plotscript'] + if not os.path.isfile(f1): + f2 = os.path.join(opts['miscellaneous']['scriptdir'], + opts['miscellaneous']['plotscript']) + if not os.path.isfile(f2): + print("Error: plotscript cannot be found at either",f1,'or',f2) + sys.exit(1) + else: + f = f2 + else: + f = f1 + cmd = [ + f, + f"{subfolder}/{opts['miscellaneous']['ensemble file']}", + '--output_file', + pdffile + ] + plot = True + self.run_cmd(cmd,header=uuid) + + if plot: + print("Finished plot construction.") + else: + print("Finished plot construction (none were required).") + + ############################################################ + # + # UUID (universally-unique identifier) functions + # + ############################################################ + + @staticmethod + def ensemble_uuid(n=8): + """ + "good enough" n-char (defaults to 8) UUID for ensemble runs + """ + return str(uuid.uuid4())[:n] + + def uuid_list(self, + opts, + db=None, + outdir=None): + """ + return a list of UUIDs in the database + """ + db,outdir = self.default_db(opts,db=db,outdir=outdir) + sql = 'select e_uuid from ensembles;' + result = self.db_do_sql(db, + sql) + if result: + lines = result.fetchall() + uuids = list(itertools.chain(*lines)) + return uuids + else: + return [] + + @staticmethod + def uuid_from_dir(directory): + """ + Given a directory, remove the UUID from the end + "ensemble-UUID". + """ + x = str(pathlib.PurePath(directory).name) + m = re.match("ensemble-(.*)",x) + if m: + return m.group(1).strip() + else: + return None + + ############################################################ + # + # Miscellaneous functions + # + ############################################################ + @staticmethod + def is_number(x): + """ + return True if x is a number, otherwise False + """ + try: + float(x) + return True + except Exception: + try: + int(x) + return True + except Exception: + return False + + @staticmethod + def try_float(x): + """ + return a float if x can be converted to a float, + otherwise x + """ + if binary_c_ensemble_manager.is_number(x): + return float(x) + else: + return x + + @staticmethod + def try_number(x): + """ + return a float if x can be converted to a float, + an int version of x if x can be converted to an int, + otherwise x + """ + try: + y = float(x) + return y + except Exception: + try: + y = int(x) + return y + except Exception: + return x + + @staticmethod + def vars_same(x,y): + """ + See if x and y are equal, even once converted to + numbers. + """ + if x == 'False': + x = 0 + if y == 'False': + y = 0 + if x == 'True': + x = 1 + if y == 'True': + y = 1 + return binary_c_ensemble_manager.try_float(x) == binary_c_ensemble_manager.try_float(y) + + @staticmethod + def help(): + """ + Lists the binary_c_ensemble_manager help text. + """ + print(""" + ensemble_manager.py + + A manager for running binary_c stellar-population ensembles on + your high-performance computing (HPC) cluster. + + Usage: + + ensemble_manager.py <command> <inlist> + + where <commands> are: + + status [columns] + Shows a brief table describing the database UUID and status, + with (optional) columns also shown. + + longstatus + Shows a long table describing the database. + + launch + Launches all the jobs by looping over the variables in + binary_c_args, defined in the inlist file, with the + appropriate population settings in ensemble_args. + Jobs that have already run or been launched are not rerun. + + update + Updates the database. This is often required because jobs + finish and the database does not know, or if directories + are copied into the database directory. + + makeplots [force] + Calls the ensemble_plotter_threaded.py script to make a PDF + from each dataset in the file ensemble.pdf, if this file does + not exist (or is of size 0). You can add the extra argument "force" + to force the plot making even if the file does exist. + + stop <UUID|all> + Stop a job on the HPC cluster with the given UUID, or try + to stop all of them. + + delete <UUID|all> + Deletes database entries of the given UUID, or all of them. Note: + does NOT delete the files associated with the job. + + help + Shows this help. + + --- + + Environment variables + + BINARY_C + Points to the root binary_c directory. This must be set. + + ENSEMBLE_MANAGER_DIR + The directory in which the database, and folders containing + generated ensemble data, is to be stored. + Defaults to: + $HOME/data/populations-<binary_c_version>-<git_revision>-<project> + where the binary_c_version is given by binary_c-config (e.g. 2.2.1), + the git revision is like 5845:20220122:2b57e488e where the first number + is the commit number, the second the date and the third is the commit + MD5. The -<project> is set by self.runtime_vars['miscellaneous']['project'] + and is ignored if this variable is set to None. + + ENSEMBLE_MANAGER_SCRIPT + The script to be run to make ensemble data. + Defaults to $BINARY_C/src/python/ensemble.py + + ENSEMBLE_MANAGER_PLOTSCRIPT + The script to be run to plot an ensemble. + Defaults to $BINARY_C/src/python/ensemble_plotter_threaded.py + + ENSEMBLE_MANAGER_DRY_RUN + When set, launch commands do nothing but do report what they + would have done, i.e. a dry run. + + ENSEMBLE_MANAGER_INLIST_PATHS + By default, we look at the inlist's full path, and also in $BINARY_C/src/inlist and $BINARY_C/src/python, for the inlist. You can specify a custom set of paths in $ENSEMBLE_MANAGER_INLIST_PATHS (separated by colons in the normal manner). + + ENSEMBLE_MANAGER_WRONG_REPO_WARNING + If set, disable the warning that is given when the git repository revision does not match that of the currently-built binary_c. + --- + + The binary_c homepage is at + http://personal.ph.surrey.ac.uk/~ri0005/binary_c.html + + Documentation is at + http://personal.ph.surrey.ac.uk/~ri0005/doc/binary_c/binary_c.html + + The gitlab repository of binary_c and binary_c-python is at + https://gitlab.surrey.ac.uk/ri0005/binary_c + https://gitlab.surrey.ac.uk/ri0005/binary_c-python + https://gitlab.com/binary_c + + """) + + @staticmethod + def commands(): + """ + Returns a list of allowed commands in binary_c_ensenmble_manager. + """ + return ('status', 'longstatus', 'launch', 'update', 'stop', 'delete', 'help', 'makeplots') + + +############################################################ +# external functions +def is_number(x): + """ + return True if x is a number, otherwise False + """ + try: + float(x) + return True + except Exception: + try: + int(x) + return True + except Exception: + return False + +def quoted_argstring(args=None, + quoter='"'): + """return double-quoted string containing all the arguments to this run of Python. + + Options: + + args : list of arguments, defaults to double-quote, " + quoter : string used as double quotes, defaults to sys.argv + """ + if args is None: + args = sys.argv + + return ' '.join(f'{quoter}{arg}{quoter}' for arg in sys.argv)