diff --git a/binarycpython/utils/HPC.py b/binarycpython/utils/HPC.py index 5740c813611da98b26672776f16d136b2041fe49..12b2fac92d8ba4c9b627a2565dfb424f63ac81c4 100644 --- a/binarycpython/utils/HPC.py +++ b/binarycpython/utils/HPC.py @@ -1,17 +1,27 @@ """ Binary_c-python's HPC functions - These are common to both the slurm and condor interfaces. + These functions form a single API through which you can access HPC resources. + + Generally, you should call an HPC function rather than the Slurm or Condor interface + directly. The HPC function then decides which interface to use, so that all the + other modules can use a single API rather than have to choose to use the Slurm or + Condor API. """ + import os +import pathlib + +from binarycpython.utils.slurm import slurm +from binarycpython.utils.condor import condor -class HPC(): +class HPC(condor,slurm): def __init__(self, **kwargs): # don't do anything: we just inherit from this class return - def joinfiles(self): + def HPC_joinfiles(self): """ Function to load in the joinlist to an array and return it. """ @@ -20,7 +30,7 @@ class HPC(): f.close() return list - def join_from_files(self,newobj,joinfiles): + def HPC_join_from_files(self,newobj,joinfiles): """ Merge the results from the list joinfiles into newobj. """ @@ -30,18 +40,17 @@ class HPC(): file) return newobj - def can_join(self,joinfiles,joiningfile,vb=False): + def HPC_can_join(self,joinfiles,joiningfile,vb=False): """ Check the joinfiles to make sure they all exist and their .saved equivalents also exist """ - vb = False # for debugging set this to True if os.path.exists(joiningfile): if vb: print("cannot join: joiningfile exists at {}".format(joiningfile)) return False elif vb: - print("joiningfile does not exist") + print("joiningfile (at {}) does not exist".format(joiningfile)) for file in joinfiles: if vb: print("check for {}".format(file)) @@ -70,5 +79,171 @@ class HPC(): print("joiningfile does not exist") if vb: - print("returning True from can_join()") + print("returning True from HPC_can_join()") return True + + def HPCjob(self): + """ + Function to return True if we're running an HPC (Slurm or Condor) job, False otherwise. + """ + if self.grid_options['slurm'] > 0 or self.grid_options['condor'] > 0: + return True + else: + return False + + def HPCjobtype(self): + """ + Function to return a string telling us the type of an HPC job, i.e. + "slurm", "condor" or "None". + """ + if self.grid_options['slurm'] > 0: + return "slurm" + elif self.grid_options['condor'] > 0: + return "condor" + else: + return "None" + + def HPCjobID(self): + """ + Function to return an HPC (Slurm or Condor) job id. Returns None if not an HPC job. + """ + if self.grid_options['slurm'] > 0: + return self.slurmID() + elif self.grid_options['condor'] > 0: + return self.condorID() + else: + # not an HPC job + return None + + def HPC_set_status(self,string): + """ + Set the appropriate HPC job (Condor or Slurm) status file to whatever is given in string. + + Arguments: + string : the new contents of the status file + """ + if self.grid_options['slurm'] > 0: + return self.set_slurm_status(string) + elif self.grid_options['condor'] > 0: + return self.set_condor_status(string) + else: + return None + + def HPC_get_status(self): + """ + Get the appropriate HPC job (Condor or Slurm) status for this job. + """ + if self.grid_options['slurm'] > 0: + return self.get_slurm_status() + elif self.grid_options['condor'] > 0: + return self.get_condor_status() + else: + return None + + def HPC_dirs(self): + """ + Function to return a list of directories for this HPC job. + """ + if self.grid_options['slurm'] > 0: + return self.slurm_dirs() + elif self.grid_options['condor'] > 0: + return self.condor_dirs() + else: + return [] + + def HPCgrid(self): + """ + Function to call the appropriate HPC grid function (Slurm or Condor) + """ + if self.grid_options['slurm'] > 0: + return self.slurm_grid() + elif self.grid_options['condor'] > 0: + return self.condor_grid() + else: + return None # should not happen + + def HPC_check_requirements(self): + """ + Function to check HPC option requirements have been met. Returns a tuple: (True,"") if all is ok, (False,<warning string>) otherwise. + """ + if self.grid_options['slurm'] > 0: + return self.slurm_check_requirements() + elif self.grid_options['condor'] > 0: + return self.condor_check_requirements() + else: + return (True,"") + + def HPC_restore(self): + """ + Wrapper function to restore previous HPC snapshots prior to restarting a grid. + """ + if self.grid_options['slurm'] > 0: + return self.slurm_restore() + elif self.grid_options['condor'] > 0: + return self.condor_check_requirements() + else: + return (True,"") + + def HPC_join_previous(self): + """ + Function to join previously generated datasets. + """ + # check that our job has finished + status = self.HPC_get_status() + print("Job status",status) + + if status != "finished": + # job did not finish : save a snapshot + print("This job did not finish (status is {status}) : cannot join".format(status=status)) + else: + # our job has finished + joinfiles = self.HPC_joinfiles() + joiningfile = self.HPCpath('joining') + print("Joinfiles: ",joinfiles) + print("Joingingfiles: ",joiningfile) + if self.HPC_can_join(joinfiles,joiningfile): + # join object files + print("We can join") + try: + # touch joiningfile + pathlib.Path(joiningfile).touch(exist_ok=False) + try: + self.HPC_join_from_files(self,joinfiles) + except Exception as e: + print("Join gave exception",e) + # disable analytics calculations : use the + # values we just loaded + self.grid_options['do_analytics'] = False + return + except: + pass + else: + print("cannot join : other tasks are not yet finished\n") + print("Finished this job : exiting") + self.exit(code=1) + + def HPCpath(self,path): + """ + Function to file the filename of this HPC job's file at path. + """ + if self.grid_options['slurm'] > 0: + return self.slurmpath(path) + elif self.grid_options['condor'] > 0: + return self.condorpath(path) + else: + return None + + def HPC_snapshot_filename(self): + """ + Function to return an HPC job's snapshot filename. + """ + if self.grid_options['slurm'] > 0: + file = os.path.join(self.grid_options['slurm_dir'], + 'snapshots', + self.jobID() + '.gz') + elif self.grid_options['condor'] > 0: + file = os.path.join(self.grid_options['condor_dir'], + 'snapshots', + self.jobID() + '.gz') + else: + return None diff --git a/binarycpython/utils/analytics.py b/binarycpython/utils/analytics.py index 43c557fd918aa676cf512a78723282c9b4efb062..b12f19f8c227f10af7eaf9cc575df5832788c6e4 100644 --- a/binarycpython/utils/analytics.py +++ b/binarycpython/utils/analytics.py @@ -70,7 +70,7 @@ class analytics(): for x in ["_start_time_evolution","_end_time_evolution"]: if not self.grid_options[x]: self.grid_options[x] = time.time() - print("{} missing : {}",x,self.grid_options[x]) + print("{} missing : {}".format(x,self.grid_options[x])) if force or "_time_elapsed" not in self.grid_options: self.grid_options["_time_elapsed"] = self.grid_options["_end_time_evolution"] - self.grid_options["_start_time_evolution"] diff --git a/binarycpython/utils/condor.py b/binarycpython/utils/condor.py index 6a9618f8367de10fe7b1addf60b652bafec1e06b..cd4c2d69bd8888a4420bd4bd27e8e402db888cbe 100644 --- a/binarycpython/utils/condor.py +++ b/binarycpython/utils/condor.py @@ -1,13 +1,415 @@ """ Binary_c-python's condor functions """ -from binarycpython.utils.HPC import HPC -class condor(HPC): +class condor(): def __init__(self, **kwargs): # don't do anything: we just inherit from this class return + + def condorID(self,clusterID=None,ProcID=None): + """ + Function to return a Condor job ID. The ClusterID and ProcID passed in are used if given, otherwise we default to the condor_ClusterID and condor_ProcID in grid_options. + """ + if ClusterID is None: + ClusterID= self.grid_options['condor_ClusterID'] + if ProcID is None: + ProcID = self.grid_options['condor_ProcID'] + return "{ClusterID}.{ProcID}".format(ClusterID=ClusterID, + ProcID=ProcID) + + def condorpath(self,path): + """ + Function to return the full condor directory path. + """ + return os.path.abspath(os.path.join(self.grid_options['condor_dir'],path)) + + def condor_status_file(self, + ClusterID=None, + ProcID=None): + """ + Return the condor status file corresponding to the ClusterID and ProcID, which default to grid_options condor_ClusterID and condor_ProcID, respectively. + """ + return os.path.join(self.condorpath('status'), + self.condorID(ClusterID,ProcID)) + + def condor_check_requirements(self): + """ + Function to check whether the condor parameters in grid_options have been set appropriately. + """ + if self.grid_options['condor'] > 0 and \ + self.grid_options['condor_dir'] is None: + return (False, + "You have set condor={condor}",self.grid_options['condor'],"but not set condor_dir (which is {condor_dir}). Please set it and try again.".format( + condor=self.grid_options['condor'], + condor_dir=self.grid_options['condor_dir'] + )) + else: + return (True,"") + + def condor_dirs(self): + """ + Directories associated specifically with this condor job. + """ + return ['condor_dir'] + + def set_condor_status(self,string): + """ + Set the condor status corresponing to the self object, which should have condor_ClusterID and condor_ProcID set. + """ + # save condor ClusterID to file + idfile = os.path.join(self.grid_options["condor_dir"], + "ClusterID") + if not os.path.exists(idfile): + with open(idfile,"w",encoding='utf-8') as fClusterID: + fClusterID.write("{ClusterID}\n".format(ClusterID=self.grid_options['condor_ClusterID'])) + fClusterID.close() + + # save condor status + file = self.condor_status_file() + if file: + with open(file,'w',encoding='utf-8') as f: + f.write(string) + f.close() + return + + def get_condor_status(self, + ClusterID=None, + ProcID=None): + """ + Get and return the condor status corresponing to the self object, or ClusterID.ProcID if they are passed in. If no status is found, returns an empty string.. + """ + if ClusterID is None: + ClusterID = self.grid_options['condor_ClusterID'] + if ProcID is None: + ProcID = self.grid_options['condor_ProcID'] + + if ClusterID is None or ProcID is None : + return None + + try: + path = pathlib.Path(self.condor_status_file(ClusterID=ClusterID, + ProcID=ProcID)) + if path: + return path.read_text().strip() + else: + return "" + except: + return "" + + def condor_outfile(self): + """ + return a standard filename for the condor chunk files + """ + file = "{id}.gz".format( + id = self.condorID() + ) + return os.path.abspath(os.path.join(self.grid_options['condor_dir'], + 'results', + file)) + + def make_condor_dirs(self): + + # make the condor directories + if not self.grid_options['condor_dir']: + print("You must set self.grid_options['condor_dir'] to a directory which we can use to set up binary_c-python's Condor files. This should be unique to your set of grids.") + os.exit() + + # make a list of directories, these contain the various condor + # output, status files, etc. + dirs = [] + for dir in ['stdout','stderr','log','results','status','snapshots']: + dirs.append(self.condorpath(dir)) + + # make the directories: we do not allow these to already exist + # as the condor directory should be a fresh location for each set of jobs + for dir in dirs: + try: + pathlib.Path(self.condorpath(dir)).mkdir(exist_ok=False, + parents=True) + except: + print("Tried to make the directory {dir} but it already exists. When you launch a set of binary_c jobs on Condor, you need to set your condor_dir to be a fresh directory with no contents.".format(dir=dir)) + self.exit(code=1) + + # check that they have been made and exist: we need this + # because on network mounts (NFS) there's often a delay between the mkdir + # above and the actual directory being made. This shouldn't be too long... + fail = True + count = 0 + count_warn = 10 + while fail is True: + fail = False + count += 1 + if count > count_warn: + print("Warning: Have been waiting about {} seconds for Condor directories to be made, there seems to be significant delay...".format(count)) + for dir in dirs: + if os.path.isdir(dir) is False: + fail = True + time.sleep(1) + break # break the "for dir in dirs:" + + def condor_grid(self): + """ + function to be called when running grids when grid_options['condor']>=1 + + if grid_options['condor']==1, we set up the condor script and launch the jobs, then exit. + if grid_options['condor']==2, we are being called from the jobs to run the grids + """ + + # if condor=1, we should have no evolution type, we just + # set up a load of condor scripts and get them evolving + # in condor array + if self.grid_options['condor'] == 1: + self.grid_options['evolution_type'] = None + + if self.grid_options['evolution_type'] == 'grid': + # run a grid of stars only, leaving the results + # in a file + + # set output file + self.grid_options['save_population_object'] = condor_outfile() + + return self.evolve() + elif self.grid_options['evolution_type'] == 'join': + # should not happen! + return + else: + # setup and launch condor jobs + self.make_condor_dirs() + + # check we're not using too much RAM + if datasize.DataSize(self.grid_options['condor_memory']) > datasize.DataSize(self.grid_options['condor_warn_max_memory']): + print("WARNING: you want to use {} MB of RAM : this is unlikely to be correct. If you believe it is, set condor_warn_max_memory to something very large (it is currently {} MB)\n".format( + self.grid_options['condor_memory'], + self.grid_options['condor_warn_max_memory'])) + self.exit(code=1) + + # set up condor_array + if not self.grid_options['condor_array_max_jobs']: + self.grid_options['condor_array_max_jobs'] = self.grid_options['condor_njobs'] + condor_array = self.grid_options['condor_array'] or "1-{njobs}\%{max_jobs}".format( + njobs=self.grid_options['condor_njobs'], + max_jobs=self.grid_options['condor_array_max_jobs']) + + # get job id (might be passed in) + ClusterID = self.grid_options['condor_ClusterID'] if self.grid_options['condor_ClusterID'] != "" else '$CONDOR_ARRAY_JOB_ID' + + # get job array index + ProcID = self.grid_options['condor_ProcID'] + if ProcID is None: + ProcID = '$CONDOR_ARRAY_TASK_ID' + + if self.grid_options['condor_njobs'] == 0: + print("binary_c-python Condor : You must set grid_option condor_njobs to be non-zero") + self.exit(code=1) + + # build the grid command + grid_command = [ + os.path.join("/usr","bin","env"), + sys.executable, + str(lib_programname.get_path_executed_script()), + ] + sys.argv[1:] + [ + 'condor=2', + 'start_at=' + str(ProcID) + '-1', # do we need the -1? + 'modulo=' + str(self.grid_options['condor_njobs']), + 'condor_njobs=' + str(self.grid_options['condor_njobs']), + 'condor_dir=' + self.grid_options['condor_dir'], + 'verbosity=' + str(self.grid_options['verbosity']), + 'num_cores=' + str(self.grid_options['num_processes']) + ] + + grid_command = ' '.join(grid_command) + + # make condor script paths + submit_script_path = self.condorpath('condor_submit_script') + job_script_path = self.condorpath('condor_job_script') + + # open the files + try: + submit_script = open(submit_script_path,'w',encoding='utf-8') + except IOError: + print("Could not open Condor script at {path} for writing: please check you have set {condor_dir} correctly (it is currently {condor_dir} and can write to this directory.".format( + path=submit_script_path, + condor_dir = self.grid_options['condor_dir'])) + try: + job_script = open(job_script_path,'w',encoding='utf-8') + except IOError: + print("Could not open Condor script at {path} for writing: please check you have set {condor_dir} correctly (it is currently {condor_dir} and can write to this directory.".format( + path=job_script_path, + condor_dir = self.grid_options['condor_dir'])) + + + + ############################################################ + # The condor job script calls your binary_c-pthyon script + ############################################################ + condor_job_script = """#!{bash} +# first two arguments are ClusterID and ProcID +ClusterID = $1 +shift +ProcID = $1 +shift + +# Set binary_c startup conditions +export BINARY_C_PYTHON_ORIGINAL_CMD_LINE={cmdline} +export BINARY_C_PYTHON_ORIGINAL_WD=`{pwd}` +export BINARY_C_PYTHON_ORIGINAL_SUBMISSION_TIME=`{date}` + +{grid_command} + +# set status to \"running\" +echo \"running\" > {condor_dir}/status/{id} + +# make list of files which is checked for joining +echo {condor_dir}/results/{id}.gz >> {condor_dir}/results/$ClusterID.all + +# run grid of stars and, if this returns 0, set status to finished +{grid_command} evolution_type=grid condor_ClusterID=$ClusterID condor_ProcID=$ProcID save_population_object={condor_dir}/results/$ProcID.gz && echo -n \"finished\" > {condor_dir}/status/{id} && echo """.format( + bash=self.grid_options['condor_bash'], + date=self.grid_options['condor_date'], + pwd=self.grid_options['condor_pwd'], + cmdline=repr(self.grid_options['command_line']), + grid_command=grid_command, + condor_dir=self.grid_options['condor_dir'], + id=self.condorID() +) + + if not self.grid_options['condor_postpone_join']: + joinfile = "{condor_dir}/results/{ClusterID}.all".format( + condor_dir=self.grid_options['condor_dir'], + ClusterID=ClusterID + ) + condor_job_script += """&& echo \"Checking if we can join...\" && echo && {grid_command} condor=2 evolution_type=join joinlist={joinfile} condor_ClusterID=$ClusterID condor_ProcID=$ProcID + """.format( + bash = self.grid_options['condor_bash'], + grid_command=grid_command, + joinfile=joinfile, + ) + + ############################################################ + # The Condor submit script is sent to condor_submit + # In here we know $(Cluster) and $(Item) which identify + # each job + ############################################################ + condor_submit_script = """ +executable = {env} +arguments = {bash} {scriptfile} $(Cluster) $(Item) {arguments} +universe = {universe} +output = {outfile} +error = {errfile} +log = {logfile} +initial_dir = {cwd} +stream_output = {stream_output} +stream_error = {stream_error} +request_memory = {request_memory} +request_cpus = {request_cpus} +should_transfer_files = {should_transfer_files} +when_to_transfer_output = {when_to_transfer_output} +requirements = {requirements} +{extra_settings} +queue {njobs} + """.format( + usr_bin_env = self.grid_options['condor_usr_bin_env'], + bash = self.grid_options['condor_bash'], + scriptfile = scriptfile, + arguments = ' '.join(arguments_list), + universe = self.grid_options['condor_universe'], + outfile = os.path.abspath(os.path.join(self.grid_options['condor_dir'], + 'stdout', + '$(Cluster).$(Item)')), + errfile = os.path.abspath(os.path.join(self.grid_options['condor_dir'], + 'stderr', + '$(Cluster).$(Item)')), + logfile = os.path.abspath(os.path.join(self.grid_options['condor_dir'], + 'log', + '$(Cluster).$(Item)')), + initial_dir = os.getcwd(), + stream_output = self.grid_options['condor_stream_output'], + stream_error = self.grid_options['condor_stream_error'], + request_memory = self.grid_options['stream_memory'], + request_cpus = self.grid_options['num_processes'], + should_transfer_files = self.grid_options['condor_should_transfer_files'], + when_to_transfer_output = self.grid_options['when_to_transfer_output'], + requirements = self.grid_options['condor_requirements'], + extra_settings = self.grid_options['condor_extra_settings'], + njobs = self.grid_options['condor_njobs'], + ) + + for key in self.grid_options['condor_extra_settings']: + condor_submit_script += "{key} = {value}\n".format( + key=key, + value=self.grid_options['condor_extra_settings'][key] + ) + + # write the scripts, close them and make them executable by + # all (so the condor user can pick it up) + for file,contents,path in [(submit_script,condor_submit_script), + (job_script,condor_job_script)]: + path = file.name + file.writelines(contents) + file.close() + os.chmod(path, + stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC | \ + stat.S_IRGRP | stat.S_IXGRP | \ + stat.S_IROTH | stat.S_IXOTH) + + if not self.grid_options['condor_postpone_sbatch']: + # call sbatch to launch the condor jobs + cmd = [self.grid_options['condor_sbatch'], submit_script_path] + pipes = subprocess.Popen(cmd, + stdout = subprocess.PIPE, + stderr = subprocess.PIPE) + std_out, std_err = pipes.communicate() + if pipes.returncode != 0: + # an error happened! + err_msg = "{red}{err}\nReturn Code: {code}{reset}".format(err=std_err.strip(), + code=pipes.returncode, + red=self.ANSI_colours["red"], + reset=self.ANSI_colours["reset"],) + raise Exception(err_msg) + + elif len(std_err): + print("{red}{err}{reset}".format(red=self.ANSI_colours["red"], + reset=self.ANSI_colours["reset"], + err=std_err.strip().decode('utf-8'))) + + print("{yellow}{out}{reset}".format(yellow=self.ANSI_colours["yellow"], + reset=self.ANSI_colours["reset"], + out=std_out.strip().decode('utf-8'))) + else: + # just say we would have (use this for testing) + print("Condor script is at {path} but has not been launched".format(path=submit_script_path)) + + + # some messages to the user, then return + if self.grid_options['condor_postpone_sbatch'] == 1: + print("Condor script written, to {path}, but launching the jobs with sbatch was postponed.".format(path=submit_script_path)) + else: + print("Condor jobs launched.") + print("All done in condor_grid().") + return + + def condor_ClusterID_from_dir(self,dir): + """ + Return the Condor ClusterID from a condor directory, passed in + """ + file = os.path.join(dir,'ClusterID') + f = open(file,"r",encoding='utf-8') + if not f: + print("Error: could not open {} to read the Condor ClusterID of the directory {}".format(file,dir)) + sys.exit(code=1) + oldClusterID = f.read().strip() + f.close() + if not oldClusterID: + print("Error: could not find ClusterID in {}".format(self.grid_options['condor_restart_dir'])) + self.exit(code=1) + + return oldClusterID + + + + ################################################### # CONDOR functions # @@ -101,16 +503,16 @@ class condor(HPC): # # command = "".join([ # # "{}".python_details['executable'], # # "{}".scriptname, - # # "offset=$jobarrayindex", + # # "offset=$ProcID", # # "modulo={}".format(self.grid_options['condor_njobs']), # # "vb={}".format(self.grid_options['verbosity']) - # # "results_hash_dumpfile=$self->{_grid_options}{slurm_dir}/results/$jobid.$jobarrayindex", - # # 'slurm_jobid='.$jobid, - # # 'slurm_jobarrayindex='.$jobarrayindex, - # # 'slurm_jobname=binary_grid_'.$jobid.'.'.$jobarrayindex, - # # "slurm_njobs=$njobs", - # # "slurm_dir=$self->{_grid_options}{slurm_dir}", + # # "results_hash_dumpfile=$self->{_grid_options}{condor_dir}/results/$ClusterID.$ProcID", + # # 'condor_ClusterID='.$ClusterID, + # # 'condor_ProcID='.$ProcID, + # # 'condor_jobname=binary_grid_'.$ClusterID.'.'.$ProcID, + # # "condor_njobs=$njobs", + # # "condor_dir=$self->{_grid_options}{condor_dir}", # # ); # # Create directory with info for the condor script. By creating this directory we also check whether all the values are set correctly @@ -171,8 +573,8 @@ class condor(HPC): # condor_script_contents += "\n" # if self.grid_options["condor_extra_settings"]: - # slurm_script_contents += "# Extra settings by user:" - # slurm_script_contents += "\n".join( + # condor_script_contents += "# Extra settings by user:" + # condor_script_contents += "\n".join( # [ # "{}\t={}".format( # key, self.grid_options["condor_extra_settings"][key] @@ -199,7 +601,7 @@ class condor(HPC): # # Log = logs/log.$(cluster) # # should_transfer_files = YES - # # when_to_transfer_output = ON_EXIT + # # when_to_transfer_output = ON_EXIT_OR_EVICT # # transfer_input_files = files/in1,files/in2 # # Arguments = files/in1 files/in2 files/out1 @@ -207,7 +609,7 @@ class condor(HPC): # # Write script contents to file # if self.grid_options["condor_postpone_join"]: - # condor_script_contents += "{} rungrid=0 results_hash_dumpfile={}/results/$jobid.all condor_command=join\n".format( + # condor_script_contents += "{} rungrid=0 results_hash_dumpfile={}/results/$ClusterID.all condor_command=join\n".format( # command, self.grid_options["condor_dir"] # ) diff --git a/binarycpython/utils/dataIO.py b/binarycpython/utils/dataIO.py index 39ca476e5118b057d76e8db5a147cf4e12569644..c37e3adf0edc1694ec89d9a8eb006d13491bae12 100644 --- a/binarycpython/utils/dataIO.py +++ b/binarycpython/utils/dataIO.py @@ -230,10 +230,8 @@ class dataIO(): """ Automatically choose the snapshot filename. """ - if self.grid_options['slurm'] > 0: - file = os.path.join(self.grid_options['slurm_dir'], - 'snapshots', - self.jobID() + '.gz') + if self.HPCjob(): + return self.HPC_snapshot_filename() else: file = os.path.join(self.grid_options['tmp_dir'], 'snapshot.gz') @@ -460,11 +458,9 @@ class dataIO(): def set_status(self, string, format_statment="process_{}.txt", - ID=None, - slurm=True, - condor=True): + ID=None): """ - function to set the status string in its appropriate file + Function to set the status string in its appropriate file """ if ID is None: @@ -482,8 +478,6 @@ class dataIO(): f.write(string) f.close() - # custom logging functions - if slurm and self.grid_options['slurm'] >= 1: - self.set_slurm_status(string) -# if self.grid_options['condor']==1: -# self.set_condor_status(string) + # custom logging functions for HPC jobs + if self.HPCjob(): + self.HPC_set_status(string) diff --git a/binarycpython/utils/grid.py b/binarycpython/utils/grid.py index 7562c11c322aec5c70be789bd5683469de97e6da..bba6eefec5d23d4634a4ec2ecbb0456bb77a9701 100644 --- a/binarycpython/utils/grid.py +++ b/binarycpython/utils/grid.py @@ -15,7 +15,6 @@ Tasks: - TODO: consider spreading the functions over more files. - TODO: type the private functions - TODO: fix the correct object types for the default values of the bse_options - - TODO: uncomment and implement the HPC functionality - TODO: think of a clean and nice way to unload and remove the custom_logging_info library from memory (and from disk) - TODO: think of a nice way to remove the loaded grid_code/ generator from memory. """ @@ -83,25 +82,23 @@ from binarycpython.utils.dicts import ( from binarycpython import _binary_c_bindings from binarycpython.utils.analytics import analytics -from binarycpython.utils.condor import condor from binarycpython.utils.dataIO import dataIO from binarycpython.utils.grid_logging import grid_logging from binarycpython.utils.grid_options_defaults import grid_options_defaults from binarycpython.utils.gridcode import gridcode +from binarycpython.utils.HPC import HPC from binarycpython.utils.metadata import metadata from binarycpython.utils.Moe_di_Stefano_2017 import Moe_di_Stefano_2017 -from binarycpython.utils.slurm import slurm from binarycpython.utils.version import version class Population(analytics, - condor, dataIO, grid_logging, grid_options_defaults, gridcode, + HPC, metadata, Moe_di_Stefano_2017, - slurm, version): """ Population Object. Contains all the necessary functions to set up, run and process a @@ -215,11 +212,10 @@ class Population(analytics, Function to return the job ID number of this process Normal processes return their process ID (PID) - Slurm processes return <jobid>.<jobarrayindex> + HPC processes return whatever HPCjobID() gives. """ - if self.grid_options['slurm'] > 0: - jobID = "{}.{}".format(self.grid_options['slurm_jobid'], - self.grid_options['slurm_jobarrayindex']) + if self.HPCjob(): + jobID = self.HPCjobID() else: jobID = "{}".format(self.process_ID) return jobID @@ -688,7 +684,6 @@ class Population(analytics, # set previous logging time _t = time.time() - self.shared_memory["prev_log_time"] = multiprocessing.Array( "d", [_t] * self.grid_options["n_logging_stats"] ) @@ -700,12 +695,15 @@ class Population(analytics, # arrays to store memory and max memory use per-thread mem = 1.0 * mem_use() - self.shared_memory["memory_use_per_thread"] = multiprocessing.Array( - "d", [mem] * self.grid_options["num_processes"] - ) - self.shared_memory["max_memory_use_per_thread"] = multiprocessing.Array( - "d", [mem] * self.grid_options["num_processes"] - ) + for x in ["","max_"]: + self.shared_memory[x + "memory_use_per_thread"] = multiprocessing.Array( + "d", [mem] * self.grid_options["num_processes"] + ) + + + ############################################################ + # set and check default directory locations + ############################################################ # check tmp_dir exists if self.grid_options['tmp_dir'] is None \ @@ -713,45 +711,32 @@ class Population(analytics, print("grid_options['tmp_dir'] is not set or it is not a directory : this should point to a temporary directory location, preferably local to your CPUs") self.exit(code=1) - - # set default directory locations - - # check slurm_dir is set should this be required - if self.grid_options['slurm'] > 0 and \ - self.grid_options['slurm_dir'] is None: - print("You have set slurm=",self.grid_options['slurm'],"but not set slurm_dir. Please set it and try again.") + # check any HPC requirements are met + if self.HPCjob() and not self.HPC_check_requirements()[0]: + print(self.HPC_check_requirements()[1]) self.exit(code=1) - # default status_dir to be tmp_dir/status + # default status_dir and cache_dir to be in tmp_dir + # # NOTE: binary_c-python uses its own status_dir, which is not - # the same dir as Slurm uses (so tmp_dir can be local - # to a Slurm job, while slurm_dir is common to all jobs) - if self.grid_options['status_dir'] is None: - self.grid_options['status_dir'] = os.path.join(self.grid_options['tmp_dir'], - 'status') - - # default cache_dir based on tmp_dir - if self.grid_options['cache_dir'] is None: - self.grid_options['cache_dir'] = os.path.join(self.grid_options['tmp_dir'], - 'cache') + # the same dir as HPC jobs use (so tmp_dir can be local + # to an HPC job, while the HPC status dir is common to + # all jobs) + for x in ['status','cache']: + if self.grid_options[x + '_dir'] is None: + self.grid_options[x + '_dir'] = os.path.join(self.grid_options['tmp_dir'],x) # make list of directories we want to use - dirs = ['tmp_dir','status_dir','cache_dir'] + dirs = ['tmp_dir','status_dir','cache_dir'] + self.HPC_dirs() - if self.grid_options['slurm']>0: - dirs += ['slurm_dir'] - if self.grid_options['condor']>0: - dirs += ['condor_dir'] - # try to make directories if they don't exist for dir in dirs: + # try to make directories if they don't exist path = self.grid_options[dir] if path != None: os.makedirs(path,exist_ok=True) - # check directories exist and can be written to - for dir in dirs: - path = self.grid_options[dir] + # check directories exist and can be written to if path != None and self.dir_ok(path) == False: print("Directory {dir} currently set to {path} cannot be written to. Please check that this directory is correct and you have write access.".format(dir=dir,path=path)) self.exit(code=1) @@ -765,40 +750,9 @@ class Population(analytics, print("Sub-Directory {subdir} (in tmp_dir) currently set to {path} cannot be written to. Please check that this directory is correct and you have write access.".format(subdir=subdir,path=path)) self.exit(code=1) - # load from existing Slurm run if provided, but only on slurm=1 runs - # (slurm=2 is to join data, which shouldn't preload) - if self.grid_options['slurm'] == 1 and \ - self.grid_options['slurm_restart_dir'] and \ - self.grid_options['slurm_jobarrayindex'] != None: - oldjobid = self.slurm_jobid_from_dir(self.grid_options['slurm_restart_dir']) - - print("Restart from dir {} which was jobid {}, we are jobarrayindex {}".format( - self.grid_options['slurm_restart_dir'], - oldjobid, - self.grid_options['slurm_jobarrayindex'] - )) - - # check status: if "finished", we don't have to do anything - file = os.path.join(self.grid_options['slurm_restart_dir'], - 'status', - "{}.{}".format(oldjobid, - self.grid_options['slurm_jobarrayindex'])) - status = open(file,encoding='utf-8').read() - if status == 'finished': - self.exit(code=0) - - file = os.path.join(self.grid_options['slurm_restart_dir'], - 'snapshots', - "{}.{}.gz".format(oldjobid, - self.grid_options['slurm_jobarrayindex'])) + # restore from existing HPC files + self.HPC_restore() - if os.path.exists(file): - # have data from which we can restore, set it in - # the appropriate grid option - self.grid_options['restore_from_snapshot_file'] = file - else: - # no need to restore - sys.exit(code=0) return def clean(self) -> None: @@ -817,37 +771,26 @@ class Population(analytics, Entry point function of the whole object. From here, based on the settings, we set up a grid and (probably) evolve the population. - There are no direct arguments to this function, rather it is based on the grid_options settings: - grid_options['slurm']: integer Boolean whether to use a slurm_grid evolution - grid_options['condor']: integer Boolean whether to use a condor_grid evolution + There are no direct arguments to this function, the grid_options + contain all the relevant settings. - If neither of the above is set, we continue without using HPC routines - (that doesn't mean this cannot be run on a server with many cores) - - Returns an dictionary containing the analytics of the run + Returns: + a dictionary containing the analytics of the run. """ # Just to make sure we don't have stuff from a previous run hanging around self._pre_run_setup() - if self.grid_options["slurm"]>=1 or self.grid_options["condor"]>=1: + if self.HPCjob(): + # run HPC grid: if this returns True, then exit immediately self.grid_options["symlink_latest_gridcode"] = False + if self.HPCgrid(): + self.exit(code=0) - if self.grid_options["condor"] >= 1: - # Execute condor subroutines - # self._condor_grid() - raise ValueError("Condor evolution not available at this moment") - - elif self.grid_options["slurm"] == 1: - # Slurm setup grid then exit - self.slurm_grid() - self.exit(code=0) - - elif self.grid_options['evolution_type'] == 'join': + if self.grid_options['evolution_type'] == 'join': # join previously calculated data and return immediately - self.join_previous() + self.HPC_join_previous() return - else: # Execute population evolution subroutines self._evolve_population() @@ -866,9 +809,9 @@ class Population(analytics, elif self.grid_options['save_population_object']: self.save_population_object() - # if we're running a slurm grid, exit here + # if we're running an HPC grid, exit here # unless we're joining - if self.grid_options["slurm"] >= 1 and \ + if self.HPCgrid() and \ self.grid_options['evolution_type'] != 'join': self.exit() @@ -1291,7 +1234,7 @@ class Population(analytics, self.set_status("signal {sig}".format(sig=sigstring)) if signum == signal.SIGINT: - # caught SIGINT: e.g. CTRL-C or Slurm's manager + # caught SIGINT: e.g. CTRL-C or HPC job manager # shutting us down print("Parent set stop_queue to True") self.grid_options['stop_queue'] = True @@ -1326,8 +1269,7 @@ class Population(analytics, ','.join(signal_data.keys()) )) - # SIGINT should stop the queue: this is - # what Slurm sends to end a process + # SIGINT should stop the queue nicely if signum == signal.SIGINT: self.grid_options['stop_queue'] = True self.grid_options['_killed'] = True @@ -2292,37 +2234,3 @@ class Population(analytics, self.grid_options["verbosity"], 3, ) - - def join_previous(self): - """ - Function to join previously generated datasets. - """ - # check that our job has finished - status = self.get_slurm_status() - - if status != "finished": - # job did not finish : save a snapshot - print("This job did not finish (status is {status}) : cannot join".format(status=status)) - else: - # our job has finished - joinfiles = self.joinfiles() - joiningfile = self.slurmpath('joining') - if self.can_join(joinfiles,joiningfile): - # join object files - try: - pathlib.Path(joiningfile).touch(exist_ok=False) - print("can join : all tasks are finished") - try: - self.join_from_files(self,joinfiles) - except Exception as e: - print("Join gave exception",e) - # disable analytics calculations : use the - # values we just loaded - self.grid_options['do_analytics'] = False - return - except: - pass - else: - print("cannot join : other tasks are not yet finished\n") - print("Finished this job : exiting") - self.exit(code=1) diff --git a/binarycpython/utils/grid_options_defaults.py b/binarycpython/utils/grid_options_defaults.py index 323b387d45d82e6d571e8ccbe4082cf971c64f19..b7b102474d0e8762f515a90c1f16c170e6ea973c 100644 --- a/binarycpython/utils/grid_options_defaults.py +++ b/binarycpython/utils/grid_options_defaults.py @@ -207,17 +207,20 @@ class grid_options_defaults(): "slurm_extra_settings": {}, # Dictionary of extra settings for Slurm to put in its launch script. "slurm_sbatch": "sbatch", # sbatch command "slurm_restart_dir" : None, # restart Slurm jobs from this directory + "slurm_bash" : "/bin/bash", # bash location for Slurm + "slurm_pwd" : "/usr/bin/pwd", # pwd command location for Slurm + "slurm_date" : "/usr/bin/date", # bash location for Slurm + ######################################## # Condor stuff ######################################## "condor": 0, # 1 to use condor, 0 otherwise - "condor_command": "", # condor command e.g. "evolve", "join" "condor_dir": "", # working directory containing e.g. scripts, output, logs (e.g. should be NFS available to all) "condor_njobs": "", # number of scripts/jobs that CONDOR will run in total - "condor_jobid": "", # condor job id + "condor_ClusterID": None, # condor cluster id, equivalent to Slurm's jobid + "condor_ProcID": None, # condor process id, equivalent to Slurm's jobarrayindex "condor_postpone_join": 0, # if 1, data is not joined, e.g. if you want to do it off the condor grid (e.g. with more RAM) # "condor_join_machine": None, # if defined then this is the machine on which the join command should be launched (must be sshable and not postponed) - "condor_join_pwd": "", # directory the join should be in (defaults to $ENV{PWD} if undef) "condor_memory": 1024, # in MB, the memory use (ImageSize) of the job "condor_universe": "vanilla", # usually vanilla universe "condor_extra_settings": {}, # Place to put extra configuration for the CONDOR submit file. The key and value of the dict will become the key and value of the line in te slurm batch file. Will be put in after all the other settings (and before the command). Take care not to overwrite something without really meaning to do so. @@ -227,19 +230,18 @@ class grid_options_defaults(): 'condor_checkpoint_interval':0, # checkpoint interval (seconds) 'condor_checkpoint_stamp_times':0, # if 1 then files are given timestamped names # (warning: lots of files!), otherwise just store the lates - 'condor_streams':0, # stream stderr/stdout by default (warning: might cause heavy network load) + 'condor_stream_output':True, # stream stdout + 'condor_stream_error':True, # stream stderr + 'condor_should_transfer_files' : 'IF_NEEDED', + 'when_to_transfer_output' : 'ON_EXIT_OR_EVICT', 'condor_save_joined_file':0, # if 1 then results/joined contains the results # (useful for debugging, otherwise a lot of work) - 'condor_requirements':'', # used? - # # resubmit options : if the status of a condor script is - # # either 'finished','submitted','running' or 'crashed', - # # decide whether to resubmit it. - # # NB Normally the status is empty, e.g. on the first run. - # # These are for restarting runs. - # condor_resubmit_finished:0, - 'condor_resubmit_submitted':0, - 'condor_resubmit_running':0, - 'condor_resubmit_crashed':0, + 'condor_requirements':'', # job requirements + 'condor_usr_bin_env' : '/usr/bin/env', # /usr/bin/env location + 'condor_bash' : '/bin/bash', # bash executable location + "condor_pwd" : "/usr/bin/pwd", # pwd command location for Condor + "condor_date" : "/usr/bin/date", # bash location for Condor + ########################## # Unordered. Need to go through this. Copied from the perl implementation. ########################## diff --git a/binarycpython/utils/gridcode.py b/binarycpython/utils/gridcode.py index 6ffacb040c32d184baa447003c0d72b637178aec..1ec3fe9f895f9a9e54d9cf1a225c6d7e377130fc 100644 --- a/binarycpython/utils/gridcode.py +++ b/binarycpython/utils/gridcode.py @@ -16,7 +16,7 @@ class gridcode(): # don't do anything: we just inherit from this class return - ################################################### + ################################################### # Grid code functions # # Function below are used to run populations with @@ -26,13 +26,12 @@ class gridcode(): """ Returns a filename for the gridcode. """ - if self.grid_options['slurm'] > 0: + if self.HPCjob(): filename = os.path.join( self.grid_options["tmp_dir"], - "binary_c_grid_{population_id}.{jobid}.{jobarrayindex}.py".format( + "binary_c_grid_{population_id}.{jobid}.py".format( population_id=self.grid_options["_population_id"], - jobid=self.grid_options['slurm_jobid'], - jobarrayindex=self.grid_options['slurm_jobarrayindex'], + jobid=self.jobID() ) ) else: @@ -615,8 +614,7 @@ class gridcode(): file.write(self.code_string) # perhaps create symlink - if self.grid_options['slurm']==0 and \ - self.grid_options["symlink_latest_gridcode"]: + if not self.HPCjob() and self.grid_options["symlink_latest_gridcode"]: global _count symlink = os.path.join( self.grid_options["tmp_dir"], "binary_c_grid-latest" + str(_count) diff --git a/binarycpython/utils/slurm.py b/binarycpython/utils/slurm.py index c46b7e09299550c471ba2d3e610ae39167c35545..c4114f2e38ecec4bfa53f5b224cfc45bf0ecad1e 100644 --- a/binarycpython/utils/slurm.py +++ b/binarycpython/utils/slurm.py @@ -13,14 +13,23 @@ import subprocess import sys import time -from binarycpython.utils.HPC import HPC - -class slurm(HPC): +class slurm(): def __init__(self, **kwargs): # don't do anything: we just inherit from this class return + def slurmID(self,jobid=None,jobarrayindex=None): + """ + Function to return a Slurm job ID. The jobid and jobarrayindex passed in are used if given, otherwise we default to the jobid and jobarrayindex in grid_options. + """ + if jobid is None: + jobid = self.grid_options['slurm_jobid'] + if jobarrayindex is None: + jobarrayindex = self.grid_options['slurm_jobarrayindex'] + return "{jobid}.{jobarrayindex}".format(jobid=jobid, + jobarrayindex=jobarrayindex) + def slurmpath(self,path): """ Function to return the full slurm directory path. @@ -31,17 +40,31 @@ class slurm(HPC): jobid=None, jobarrayindex=None): """ - Return the slurm status file corresponding to the jobid and jobarrayindex, which default to grid_options slurm_jobid and slurm_jobarrayindex, respectively. + Return the slurm status file corresponding to the jobid and jobarrayindex, which default to grid_options slurm_jobid and slurm_jobarrayindex, respectively. """ - if jobid is None: - jobid= self.grid_options['slurm_jobid'] - if jobarrayindex is None: - jobarrayindex = self.grid_options['slurm_jobarrayindex'] - if jobid and jobarrayindex: - return os.path.join(self.slurmpath('status'), - self.grid_options['slurm_jobid'] + '.' + self.grid_options['slurm_jobarrayindex']) + return os.path.join(self.slurmpath('status'), + self.slurmID(jobid,jobarrayindex)) + + def slurm_check_requirements(self): + """ + Function to check whether the slurm parameters in grid_options have been set appropriately. + """ + if self.grid_options['slurm'] > 0 and \ + self.grid_options['slurm_dir'] is None: + return (False, + "You have set slurm={slurm}",self.grid_options['slurm'],"but not set slurm_dir (which is {slurm_dir}). Please set it and try again.".format( + slurm=self.grid_options['slurm'], + slurm_dir=self.grid_options['slurm_dir'] + )) else: - return None + return (True,"") + + + def slurm_dirs(self): + """ + Directories associated specifically with this slurm job. + """ + return ['slurm_dir'] def set_slurm_status(self,string): """ @@ -67,7 +90,7 @@ class slurm(HPC): jobid=None, jobarrayindex=None): """ - Get and return the slurm status corresponing to the self object, or jobid.jobarrayindex if they are passed in. If no status is found, returns None. + Get and return the slurm status string corresponing to the self object, or jobid.jobarrayindex if they are passed in. If no status is found, returns an empty string. """ if jobid is None: jobid = self.grid_options['slurm_jobid'] @@ -77,20 +100,22 @@ class slurm(HPC): if jobid is None or jobarrayindex is None : return None - path = pathlib.Path(self.slurm_status_file(jobid=jobid, - jobarrayindex=jobarrayindex)) - if path: - return path.read_text().strip() - else: - return None + try: + path = pathlib.Path(self.slurm_status_file(jobid=jobid, + jobarrayindex=jobarrayindex)) + if path: + return path.read_text().strip() + else: + return "" + except: + return "" def slurm_outfile(self): """ return a standard filename for the slurm chunk files """ - file = "{jobid}.{jobarrayindex}.gz".format( - jobid = self.grid_options['slurm_jobid'], - jobarrayindex = self.grid_options['slurm_jobarrayindex'] + file = "{id}.gz".format( + id = self.slurmID(), ) return os.path.abspath(os.path.join(self.grid_options['slurm_dir'], 'results', @@ -129,7 +154,7 @@ class slurm(HPC): fail = False count += 1 if count > count_warn: - print("Warning: Have been waiting about {} seconds for Slurm directories to be made, there seems to be significant delay...".format(count)) + print("Warning: Have been waiting about {count} seconds for Slurm directories to be made, there seems to be significant delay...".format(count=count)) for dir in dirs: if os.path.isdir(dir) is False: fail = True @@ -140,42 +165,40 @@ class slurm(HPC): """ function to be called when running grids when grid_options['slurm']>=1 - if grid_options['slurm']==1, we set up the slurm script and launch the jobs, then exit. - if grid_options['slurm']==2, we are being called from the jobs to run the grids - """ + if grid_options['slurm']==1, we set up the slurm script and launch the jobs, then return True to exit. + if grid_options['slurm']==2, we run the stars, which means we return False to continue. + if grid_options['slurm']==3, we are being called from the jobs to run the grids, return False to continue. - # if slurm=1, we should have no evolution type, we just - # set up a load of slurm scripts and get them evolving - # in slurm array - if self.grid_options['slurm'] == 1: - self.grid_options['evolution_type'] = None + """ - if self.grid_options['evolution_type'] == 'grid': + if self.grid_options['slurm'] == 2: # run a grid of stars only, leaving the results - # in a file - - # set output file - self.grid_options['save_population_object'] = slurm_outfile() - - return self.evolve() - elif self.grid_options['evolution_type'] == 'join': - # should not happen! - return - else: + # in the appropriate outfile + return False + elif self.grid_options['slurm'] == 3: + # joining : set the evolution type to "join" and return + # False to continue + self.grid_options['evolution_type'] = 'join' + return False + elif self.grid_options['slurm'] == 1: + # if slurm=1, we should have no evolution type, we + # set up the Slurm scripts and get them evolving + # in slurm array + self.grid_options['evolution_type'] = None # setup and launch slurm jobs self.make_slurm_dirs() # check we're not using too much RAM if datasize.DataSize(self.grid_options['slurm_memory']) > datasize.DataSize(self.grid_options['slurm_warn_max_memory']): - print("WARNING: you want to use {} MB of RAM : this is unlikely to be correct. If you believe it is, set slurm_warn_max_memory to something very large (it is currently {} MB)\n".format( - self.grid_options['slurm_memory'], - self.grid_options['slurm_warn_max_memory'])) + print("WARNING: you want to use {slurm_memory} MB of RAM : this is unlikely to be correct. If you believe it is, set slurm_warn_max_memory to something very large (it is currently {slurm_warn_max_memory} MB)\n".format( + slurm_memory=self.grid_options['slurm_memory'], + slurm_warn_max_memory=self.grid_options['slurm_warn_max_memory'])) self.exit(code=1) # set up slurm_array if not self.grid_options['slurm_array_max_jobs']: self.grid_options['slurm_array_max_jobs'] = self.grid_options['slurm_njobs'] - slurm_array = self.grid_options['slurm_array'] or "1-{njobs}\%{max_jobs}".format( + slurm_array = self.grid_options['slurm_array'] or "1-{njobs}%{max_jobs}".format( njobs=self.grid_options['slurm_njobs'], max_jobs=self.grid_options['slurm_array_max_jobs']) @@ -197,7 +220,6 @@ class slurm(HPC): sys.executable, str(lib_programname.get_path_executed_script()), ] + sys.argv[1:] + [ - 'slurm=2', 'start_at=' + str(jobarrayindex) + '-1', # do we need the -1? 'modulo=' + str(self.grid_options['slurm_njobs']), 'slurm_njobs=' + str(self.grid_options['slurm_njobs']), @@ -216,75 +238,73 @@ class slurm(HPC): print("Could not open Slurm script at {path} for writing: please check you have set {slurm_dir} correctly (it is currently {slurm_dir} and can write to this directory.".format(path=scriptpath, slurm_dir = self.grid_options['slurm_dir'])) - # the joinfile contains the list of chunk files to be joined - joinfile = "{slurm_dir}/results/{jobid}.all".format( - slurm_dir=self.grid_options['slurm_dir'], - jobid=jobid - ) - lines = [ - "#!/bin/bash\n", - "# Slurm file for binary_grid2 and slurm\n", - "#SBATCH --error={slurm_dir}/stderr/\%A.\%a\n".format(slurm_dir=self.grid_options['slurm_dir']), - "#SBATCH --output={slurm_dir}/stdout/\%A.\%a\n".format(slurm_dir=self.grid_options['slurm_dir']), - "#SBATCH --job-name={slurm_jobname}\n".format(slurm_jobname=self.grid_options['slurm_jobname']), - "#SBATCH --partition={slurm_partition}\n".format(slurm_partition=self.grid_options['slurm_partition']), - "#SBATCH --time={slurm_time}\n".format(slurm_time=self.grid_options['slurm_time']), - "#SBATCH --mem={slurm_memory}\n".format(slurm_memory=self.grid_options['slurm_memory']), - "#SBATCH --ntasks={slurm_ntasks}\n".format(slurm_ntasks=self.grid_options['slurm_ntasks']), - "#SBATCH --array={slurm_array}\n".format(slurm_array=slurm_array), - "#SBATCH --cpus-per-task={ncpus}\n".format(ncpus=self.grid_options['num_processes']) - ] + + slurmscript = """#!{bash} +# Slurm launch script created by binary_c-python + +# Slurm options +#SBATCH --error={slurm_dir}/stderr/%A.%a +#SBATCH --output={slurm_dir}/stdout/%A.%a +#SBATCH --job-name={slurm_jobname} +#SBATCH --partition={slurm_partition} +#SBATCH --time={slurm_time} +#SBATCH --mem={slurm_memory} +#SBATCH --ntasks={slurm_ntasks} +#SBATCH --array={slurm_array} +#SBATCH --cpus-per-task={ncpus} +""".format( + bash=self.grid_options['slurm_bash'], + slurm_dir=self.grid_options['slurm_dir'], + slurm_jobname=self.grid_options['slurm_jobname'], + slurm_partition=self.grid_options['slurm_partition'], + slurm_time=self.grid_options['slurm_time'], + slurm_ntasks=self.grid_options['slurm_ntasks'], + slurm_memory=self.grid_options['slurm_memory'], + slurm_array=slurm_array, + ncpus=self.grid_options['num_processes'] + ) for key in self.grid_options['slurm_extra_settings']: - lines += [ "#SBATCH --{key} = {value}\n".format( + slurmscript += "#SBATCH --{key} = {value}\n".format( key=key, value=self.grid_options['slurm_extra_settings'][key] - )] + ) - # save original command line, working directory, time - lines += [ - "\nexport BINARY_C_PYTHON_ORIGINAL_CMD_LINE={cmdline}".format(cmdline=repr(self.grid_options['command_line'])), - "\nexport BINARY_C_PYTHON_ORIGINAL_WD=`pwd`", - "\nexport BINARY_C_PYTHON_ORIGINAL_SUBMISSION_TIME=`date`", - ] + slurmscript += """ - lines += [ - "\n# set status to \"running\"\n", - "echo \"running\" > {slurm_dir}/status/{jobid}.{jobarrayindex}\n\n".format(slurm_dir=self.grid_options['slurm_dir'], - jobid=jobid, - jobarrayindex=jobarrayindex), - "\n# make list of files\n", - "\necho {slurm_dir}/results/{jobid}.{jobarrayindex}.gz >> {slurm_dir}/results/{jobid}.all\n".format(slurm_dir=self.grid_options['slurm_dir'], - jobid=jobid, - jobarrayindex=jobarrayindex, - joinfile=joinfile), - "\n# run grid of stars and, if this returns 0, set status to finished\n", - - # note: the next line ends in && - "{grid_command} evolution_type=grid slurm_jobid={jobid} slurm_jobarrayindex={jobarrayindex} save_population_object={slurm_dir}/results/{jobid}.{jobarrayindex}.gz && echo -n \"finished\" > {slurm_dir}/status/{jobid}.{jobarrayindex} && \\\n".format( - slurm_dir=self.grid_options['slurm_dir'], - jobid=jobid, - jobarrayindex=jobarrayindex, - grid_command=grid_command), - ] +export BINARY_C_PYTHON_ORIGINAL_CMD_LINE={cmdline} +export BINARY_C_PYTHON_ORIGINAL_WD=`{pwd}` +export BINARY_C_PYTHON_ORIGINAL_SUBMISSION_TIME=`{date}` + +# set status to \"running\" +echo \"running\" > {slurm_dir}/status/$SLURM_ARRAY_JOB_ID.$SLURM_ARRAY_TASK_ID + +# make list of files which is checked for joining +echo {slurm_dir}/results/$SLURM_ARRAY_JOB_ID.$SLURM_ARRAY_TASK_ID.gz >> {slurm_dir}/results/$SLURM_ARRAY_JOB_ID.all + +# run grid of stars and, if this returns 0, set status to finished +{grid_command} slurm=2 evolution_type=grid slurm_jobid=$SLURM_ARRAY_JOB_ID slurm_jobarrayindex=$SLURM_ARRAY_TASK_ID save_population_object={slurm_dir}/results/$SLURM_ARRAY_JOB_ID.$SLURM_ARRAY_TASK_ID.gz && echo -n \"finished\" > {slurm_dir}/status/$SLURM_ARRAY_JOB_ID.$SLURM_ARRAY_TASK_ID && echo """.format( + slurm_dir=self.grid_options['slurm_dir'], + grid_command=grid_command, + cmdline=repr(self.grid_options['command_line']), + date=self.grid_options['slurm_date'], + pwd=self.grid_options['slurm_pwd'], + ) if not self.grid_options['slurm_postpone_join']: - lines += [ - # the following line also ends in && so that if one fails, the rest - # also fail - "echo && echo \"Checking if we can join...\" && echo && \\\n", - "{grid_command} slurm=2 evolution_type=join joinlist={joinfile} slurm_jobid={jobid} slurm_jobarrayindex={jobarrayindex}\n\n".format( - grid_command=grid_command, - joinfile=joinfile, - jobid=jobid, - jobarrayindex=jobarrayindex - )] + slurmscript += """&& echo \"Checking if we can join...\" && echo && {grid_command} slurm=3 evolution_type=join joinlist={slurm_dir}/results/$SLURM_ARRAY_JOB_ID.all slurm_jobid=$SLURM_ARRAY_JOB_ID slurm_jobarrayindex=$SLURM_ARRAY_TASK_ID + """.format( + slurm_dir=self.grid_options['slurm_dir'], + grid_command=grid_command, + ) + else: + slurmscript += "\n" # write to script, close it and make it executable by # all (so the slurm user can pick it up) - script.writelines(lines) + script.write(slurmscript) script.close() os.chmod(scriptpath, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC | \ @@ -325,7 +345,9 @@ class slurm(HPC): else: print("Slurm jobs launched") print("All done in slurm_grid().") - return + + # return True so we exit immediately + return True def slurm_jobid_from_dir(self,dir): """ @@ -334,12 +356,49 @@ class slurm(HPC): file = os.path.join(dir,'jobid') f = open(file,"r",encoding='utf-8') if not f: - print("Error: could not open {} to read the Slurm jobid of the directory {}".format(file,dir)) + print("Error: could not open {file} to read the Slurm jobid of the directory {dir}".format(file=file,dir=dir)) sys.exit(code=1) oldjobid = f.read().strip() f.close() if not oldjobid: - print("Error: could not find jobid in {}".format(self.grid_options['slurm_restart_dir'])) + print("Error: could not find jobid in {slurm_restart_dir}".format(slurm_restart_dir=self.grid_options['slurm_restart_dir'])) self.exit(code=1) return oldjobid + + def slurm_restore(self): + # restore location from existing Slurm run if provided, + # but only on slurm=2 runs + if self.grid_options['slurm'] == 2 and \ + self.grid_options['slurm_restart_dir'] and \ + self.grid_options['slurm_jobarrayindex'] != None: + oldjobid = self.slurm_jobid_from_dir(self.grid_options['slurm_restart_dir']) + + print("Restart from dir {dir} which was jobid {jobid}, we are jobarrayindex {index}".format( + dir=self.grid_options['slurm_restart_dir'], + jobid=oldjobid, + index=self.grid_options['slurm_jobarrayindex'] + )) + + # check status: if "finished", we don't have to do anything + file = os.path.join(self.grid_options['slurm_restart_dir'], + 'status', + "{id}.{index}".format(id=oldjobid, + index=self.grid_options['slurm_jobarrayindex'])) + status = open(file,encoding='utf-8').read() + if status == 'finished': + self.exit(code=0) + + file = os.path.join(self.grid_options['slurm_restart_dir'], + 'snapshots', + "{id}.{index}.gz".format(id=oldjobid, + index=self.grid_options['slurm_jobarrayindex'])) + + if os.path.exists(file): + # have data from which we can restore, set it in + # the appropriate grid option + self.grid_options['restore_from_snapshot_file'] = file + else: + # no snapshot: so no need to restore, just exit + sys.exit(code=0) + return