From 55599b77c3ada0fa3c590d74c21e997760360712 Mon Sep 17 00:00:00 2001 From: Robert Izzard <r.izzard@surrey.ac.uk> Date: Sun, 14 Nov 2021 22:39:44 +0000 Subject: [PATCH] more HPC API improvements to merge functions that were repeated in condor and slurm fix some bugs in restarting from snapshots: the start_at was calculated incorrectly (needed to take into account modulo and previous start_at) --- binarycpython/utils/HPC.py | 191 ++++++++++--- binarycpython/utils/condor.py | 270 +------------------ binarycpython/utils/dataIO.py | 16 +- binarycpython/utils/grid.py | 21 +- binarycpython/utils/grid_options_defaults.py | 1 + binarycpython/utils/slurm.py | 53 ---- 6 files changed, 176 insertions(+), 376 deletions(-) diff --git a/binarycpython/utils/HPC.py b/binarycpython/utils/HPC.py index 12b2fac92..465262323 100644 --- a/binarycpython/utils/HPC.py +++ b/binarycpython/utils/HPC.py @@ -23,7 +23,7 @@ class HPC(condor,slurm): def HPC_joinfiles(self): """ - Function to load in the joinlist to an array and return it. + Function to load in the joinlist to a list and return it. """ f = open(self.grid_options['joinlist'],'r',encoding='utf-8') list = f.read().splitlines() @@ -87,73 +87,95 @@ class HPC(condor,slurm): Function to return True if we're running an HPC (Slurm or Condor) job, False otherwise. """ if self.grid_options['slurm'] > 0 or self.grid_options['condor'] > 0: - return True + x = True else: - return False - + x = False + return x + def HPCjobtype(self): """ Function to return a string telling us the type of an HPC job, i.e. "slurm", "condor" or "None". """ if self.grid_options['slurm'] > 0: - return "slurm" + type = "slurm" elif self.grid_options['condor'] > 0: - return "condor" + type = "condor" else: - return "None" - + type = "None" + return type + def HPCjobID(self): """ - Function to return an HPC (Slurm or Condor) job id. Returns None if not an HPC job. + Function to return an HPC (Slurm or Condor) job id in the form x.y. Returns None if not an HPC job. """ if self.grid_options['slurm'] > 0: - return self.slurmID() + id = self.slurmID() elif self.grid_options['condor'] > 0: - return self.condorID() + id = self.condorID() else: # not an HPC job - return None - + id = None + return id + + def HPCjobIDtuple(self): + """ + Return the job ID as a tuple, (x,y), or (None,None) on failure + """ + id = self.HPCjobID() + if id: + t = tuple(id.split('.')) + else: + t = (None,None) + return t + def HPC_set_status(self,string): """ Set the appropriate HPC job (Condor or Slurm) status file to whatever is given in string. Arguments: string : the new contents of the status file + + Returns: + True if the status was set, False otherwise. + (As returned by either the appropriate Condor or Slurm function) """ if self.grid_options['slurm'] > 0: - return self.set_slurm_status(string) + status = self.set_slurm_status(string) elif self.grid_options['condor'] > 0: - return self.set_condor_status(string) + status = self.set_condor_status(string) else: - return None + status = None + return status def HPC_get_status(self): """ - Get the appropriate HPC job (Condor or Slurm) status for this job. + Get and return the appropriate HPC job (Condor or Slurm) status string for this job. """ if self.grid_options['slurm'] > 0: - return self.get_slurm_status() + status = self.get_slurm_status() elif self.grid_options['condor'] > 0: - return self.get_condor_status() + status = self.get_condor_status() else: - return None - + status = None + return status + def HPC_dirs(self): """ - Function to return a list of directories for this HPC job. + Function to return a list of directories required for this HPC job. """ if self.grid_options['slurm'] > 0: - return self.slurm_dirs() + dirs = self.slurm_dirs() elif self.grid_options['condor'] > 0: - return self.condor_dirs() + dirs = self.condor_dirs() else: - return [] - + dirs = [] + return dirs + def HPCgrid(self): """ - Function to call the appropriate HPC grid function (Slurm or Condor) + Function to call the appropriate HPC grid function + (e.g. Slurm or Condor) and return what it returns """ if self.grid_options['slurm'] > 0: return self.slurm_grid() @@ -167,22 +189,105 @@ class HPC(condor,slurm): Function to check HPC option requirements have been met. Returns a tuple: (True,"") if all is ok, (False,<warning string>) otherwise. """ if self.grid_options['slurm'] > 0: - return self.slurm_check_requirements() + t = self.slurm_check_requirements() elif self.grid_options['condor'] > 0: - return self.condor_check_requirements() + t = self.condor_check_requirements() else: - return (True,"") - - def HPC_restore(self): + t = (True,"") + return t + + def HPC_id_filename(self,dir): """ - Wrapper function to restore previous HPC snapshots prior to restarting a grid. + HPC jobs have a filename in their directory which specifies the job id. This function returns the contents of that file as a string, or None on failure. """ if self.grid_options['slurm'] > 0: - return self.slurm_restore() + filename = 'jobid' elif self.grid_options['condor'] > 0: - return self.condor_check_requirements() + filename = 'ClusterID' else: - return (True,"") + filename = None + return filename + + def HPC_id_from_dir(self,dir): + """ + Function to return the ID of an HPC run given its (already existing) directory. + """ + filename = self.HPC_id_filename(dir) + if not filename: + return None + file = os.path.join(dir,filename) + f = open(file,"r",encoding='utf-8') + if not f: + print("Error: could not open {file} to read the HPC jobid of the directory {dir}".format(file=file, + dir=dir)) + self.exit(code=1) + oldjobid = f.read().strip() + if not oldjobid: + print("Error: could not find jobid in {dir}".format(dir=dir)) + self.exit(code=1) + else: + f.close() + return oldjobid + + + + def HPC_restore(self): + """ + Set grid_options['restore_from_snapshot_file'] so that we restore data from existing + an HPC run if self.grid_options[type+'_restart_dir'], where type is "slurm" or "condor", + is provided, otherwise do nothing. This only works if grid_options[type] == 2, which is + the run-grid stage of the process. + """ + type = self.HPCjobtype() + if type is None: + return + key = type + '_restart_dir' + if key not in self.grid_options: + return + # get restart directory + dir = self.grid_options[type + '_restart_dir'] + if dir is None: + return + # get HPC job index + index = self.HPCjobIDtuple()[1] + if index is None: + return + if self.grid_options[type] == 2: + old_id = self.HPC_id_from_dir(dir) + print("Restart from dir {dir} which was has (old) ID {old_id}, we are job index {index}".format( + dir=dir, + old_id=old_id, + index=index + )) + + # check status: if "finished", we don't have to do anything + file = os.path.join(dir, + 'status', + "{id}.{index}".format(id=old_id, + index=index)) + status = open(file,encoding='utf-8').read() + + if status == 'finished': + print("Status is finished, cannot restart.") + self.exit(code=0) + + file = os.path.join(dir, + 'snapshots', + "{id}.{index}.gz".format(id=old_id, + index=index)) + + if os.path.exists(file): + # have data from which we can restore, set it in + # the appropriate grid option + print("Restore this run from snapshot {file}".format(file=file)) + self.grid_options['restore_from_snapshot_file'] = file + else: + # no snapshot: so no need to restore, just exit + print("Expected snapshot at {file} but none was found".format(file=file)) + self.exit(code=0) + return + + def HPC_join_previous(self): """ @@ -227,11 +332,12 @@ class HPC(condor,slurm): Function to file the filename of this HPC job's file at path. """ if self.grid_options['slurm'] > 0: - return self.slurmpath(path) + p = self.slurmpath(path) elif self.grid_options['condor'] > 0: - return self.condorpath(path) + p = self.condorpath(path) else: - return None + p = None + return p def HPC_snapshot_filename(self): """ @@ -240,10 +346,11 @@ class HPC(condor,slurm): if self.grid_options['slurm'] > 0: file = os.path.join(self.grid_options['slurm_dir'], 'snapshots', - self.jobID() + '.gz') + self.HPCjobID() + '.gz') elif self.grid_options['condor'] > 0: file = os.path.join(self.grid_options['condor_dir'], 'snapshots', - self.jobID() + '.gz') + self.HPCjobID() + '.gz') else: - return None + file = None + return file diff --git a/binarycpython/utils/condor.py b/binarycpython/utils/condor.py index c8bf626ad..737d9cce2 100644 --- a/binarycpython/utils/condor.py +++ b/binarycpython/utils/condor.py @@ -403,272 +403,4 @@ queue {njobs} # return True so we exit immediately return True - def condor_ClusterID_from_dir(self,dir): - """ - Return the Condor ClusterID from a condor directory, passed in - """ - file = os.path.join(dir,'ClusterID') - f = open(file,"r",encoding='utf-8') - if not f: - print("Error: could not open {} to read the Condor ClusterID of the directory {}".format(file,dir)) - sys.exit(code=1) - oldClusterID = f.read().strip() - f.close() - if not oldClusterID: - print("Error: could not find ClusterID in {}".format(self.grid_options['condor_restart_dir'])) - self.exit(code=1) - - return oldClusterID - - - - - ################################################### - # CONDOR functions - # - # subroutines to run CONDOR grids - ################################################### - - # def _condor_grid(self): - # """ - # Main function that manages the CONDOR setup. - - # Has three stages: - - # - setup - # - evolve - # - join - - # Which stage is used is determined by the value of grid_options['condor_command']: - - # <empty>: the function will know its the user that executed the script and - # it will set up the necessary condor stuff - - # 'evolve': evolve_population is called to evolve the population of stars - - # 'join': We will attempt to join the output - # """ - - # # TODO: Put in function - # condor_version = get_condor_version() - # if not condor_version: - # verbose_print( - # "CONDOR: Error: No installation of condor found", - # self.grid_options["verbosity"], - # 0, - # ) - # else: - # major_version = int(condor_version.split(".")[0]) - # minor_version = int(condor_version.split(".")[1]) - - # if (major_version == 8) and (minor_version > 4): - # verbose_print( - # "CONDOR: Found version {} which is new enough".format( - # condor_version - # ), - # self.grid_options["verbosity"], - # 0, - # ) - # elif major_version > 9: - # verbose_print( - # "CONDOR: Found version {} which is new enough".format( - # condor_version - # ), - # self.grid_options["verbosity"], - # 0, - # ) - # else: - # verbose_print( - # "CONDOR: Found version {} which is too old (we require 8.3/8.4+)".format( - # condor_version - # ), - # self.grid_options["verbosity"], - # 0, - # ) - - # verbose_print( - # "Running Condor grid. command={}".format( - # self.grid_options["condor_command"] - # ), - # self.grid_options["verbosity"], - # 1, - # ) - # if not self.grid_options["condor_command"]: - # # Setting up - # verbose_print( - # "CONDOR: Main controller script. Setting up", - # self.grid_options["verbosity"], - # 1, - # ) - - # # Set up working directories: - # verbose_print( - # "CONDOR: creating working directories", - # self.grid_options["verbosity"], - # 1, - # ) - # create_directories_hpc(self.grid_options["condor_dir"]) - - # # Create command - # current_workingdir = os.getcwd() - # python_details = get_python_details() - # scriptname = path_of_calling_script() - # # command = "".join([ - # # "{}".python_details['executable'], - # # "{}".scriptname, - # # "offset=$Process", - # # "modulo={}".format(self.grid_options['condor_njobs']), - # # "vb={}".format(self.grid_options['verbosity']) - - # # "results_hash_dumpfile=$self->{_grid_options}{condor_dir}/results/$ClusterID.$Process", - # # 'condor_ClusterID='.$ClusterID, - # # 'condor_Process='.$Process, - # # 'condor_jobname=binary_grid_'.$ClusterID.'.'.$Process, - # # "condor_njobs=$njobs", - # # "condor_dir=$self->{_grid_options}{condor_dir}", - # # ); - - # # Create directory with info for the condor script. By creating this directory we also check whether all the values are set correctly - # # TODO: create the condor script. - # condor_script_options = {} - # # condor_script_options['n'] = - # condor_script_options["njobs"] = self.grid_options["condor_njobs"] - # condor_script_options["dir"] = self.grid_options["condor_dir"] - # condor_script_options["memory"] = self.grid_options["condor_memory"] - # condor_script_options["working_dir"] = self.grid_options[ - # "condor_working_dir" - # ] - # condor_script_options["command"] = self.grid_options["command"] - # condor_script_options["streams"] = self.grid_options["streams"] - - # # TODO: condor works with running an executable. - - # # Create script contents - # condor_script_contents = "" - # condor_script_contents += """ - # ################################################# - # # - # # Condor script to run a binary_grid via python - # # - # ################################################# - # """ - # condor_script_contents += "Executable\t= {}".format(executable) - # condor_script_contents += "arguments\t= {}".format(arguments) - # condor_script_contents += "environment\t= {}".format(environment) - # condor_script_contents += "universe\t= {}".format( - # self.grid_options["condor_universe"] - # ) - # condor_script_contents += "\n" - # condor_script_contents += "output\t= {}/stdout/$id\n".format( - # self.grid_options["condor_dir"] - # ) - # condor_script_contents += "error\t={}/sterr/$id".format( - # self.grid_options["condor_dir"] - # ) - # condor_script_contents += "log\t={}\n".format( - # self.grid_options["condor_dir"] - # ) - # condor_script_contents += "initialdir\t={}\n".format(current_workingdir) - # condor_script_contents += "remote_initialdir\t={}\n".format( - # current_workingdir - # ) - # condor_script_contents += "\n" - # condor_script_contents += "steam_output\t={}".format(stream) - # condor_script_contents += "steam_error\t={}".format(stream) - # condor_script_contents += "+WantCheckpoint = False" - # condor_script_contents += "\n" - # condor_script_contents += "request_memory\t={}".format( - # self.grid_options["condor_memory"] - # ) - # condor_script_contents += "ImageSize\t={}".format( - # self.grid_options["condor_memory"] - # ) - # condor_script_contents += "\n" - - # if self.grid_options["condor_extra_settings"]: - # condor_script_contents += "# Extra settings by user:" - # condor_script_contents += "\n".join( - # [ - # "{}\t={}".format( - # key, self.grid_options["condor_extra_settings"][key] - # ) - # for key in self.grid_options["condor_extra_settings"] - # ] - # ) - - # condor_script_contents += "\n" - - # # request_memory = $_[0]{memory} - # # ImageSize = $_[0]{memory} - - # # Requirements = (1) \&\& (". - # # $self->{_grid_options}{condor_requirements}.")\n"; - - # # - # # file name: my_program.condor - # # Condor submit description file for my_program - # # Executable = my_program - # # Universe = vanilla - # # Error = logs/err.$(cluster) - # # Output = logs/out.$(cluster) - # # Log = logs/log.$(cluster) - - # # should_transfer_files = YES - # # when_to_transfer_output = ON_EXIT_OR_EVICT - # # transfer_input_files = files/in1,files/in2 - - # # Arguments = files/in1 files/in2 files/out1 - # # Queue - - # # Write script contents to file - # if self.grid_options["condor_postpone_join"]: - # condor_script_contents += "{} rungrid=0 results_hash_dumpfile={}/results/$ClusterID.all condor_command=join\n".format( - # command, self.grid_options["condor_dir"] - # ) - - # condor_script_filename = os.path.join( - # self.grid_options["condor_dir"], "condor_script" - # ) - # with open(condor_script_filename, "w") as condor_script_file: - # condor_script_file.write(condor_script_contents) - - # if self.grid_options["condor_postpone_sbatch"]: - # # Execute or postpone the real call to sbatch - # submit_command = "condor_submit {}".format(condor_script_filename) - # verbose_print( - # "running condor script {}".format(condor_script_filename), - # self.grid_options["verbosity"], - # 0, - # ) - # # subprocess.Popen(sbatch_command, close_fds=True) - # # subprocess.Popen(sbatch_command, creationflags=subprocess.DETACHED_PROCESS) - # verbose_print("Submitted scripts.", self.grid_options["verbosity"], 0) - # else: - # verbose_print( - # "Condor script is in {} but hasnt been executed".format( - # condor_script_filename - # ), - # self.grid_options["verbosity"], - # 0, - # ) - - # verbose_print("all done!", self.grid_options["verbosity"], 0) - # self.exit() - - # elif self.grid_options["condor_command"] == "evolve": - # # TODO: write this function - # # Part to evolve the population. - # # TODO: decide how many CPUs - # verbose_print( - # "CONDOR: Evolving population", self.grid_options["verbosity"], 1 - # ) - - # # - # self.evolve_population() - - # elif self.grid_options["condor_command"] == "join": - # # TODO: write this function - # # Joining the output. - # verbose_print("CONDOR: Joining results", self.grid_options["verbosity"], 1) - - # pass + diff --git a/binarycpython/utils/dataIO.py b/binarycpython/utils/dataIO.py index c37e3adf0..88396d84f 100644 --- a/binarycpython/utils/dataIO.py +++ b/binarycpython/utils/dataIO.py @@ -242,29 +242,35 @@ class dataIO(): Load a snapshot from file and set it in the preloaded_population placeholder. """ newpop = self.load_population_object(file) + + # unset the _killed flag, in case it was set + newpop.grid_options['_killed'] = False + + # set in preloaded_population for later merge self.preloaded_population = newpop + + # set the start position for new stars self.grid_options['start_at'] = newpop.grid_options['start_at'] - print("Loaded from snapshot at {file} : start at star {n}".format( + + print("Loaded from snapshot at {file} : {nstars} stars, start at star {nstart}".format( file=file, - n=self.grid_options['start_at'])) + nstars=0,#self.grid_options[''], + nstart=self.grid_options['start_at'])) return def save_snapshot(self,file=None): """ Save the population object to a snapshot file, automatically choosing the filename if none is given. """ - if file == None: file = self.snapshot_filename() try: n = self.grid_options['_count'] except: n = '?' - print("Saving snapshot containing {} stars to {}".format(n,file)) self.save_population_object(object=self, filename=file) - return def write_ensemble(self, output_file, data=None, sort_keys=True, indent=4, encoding='utf-8', ensure_ascii=False): diff --git a/binarycpython/utils/grid.py b/binarycpython/utils/grid.py index 3c48ab8b9..5b6b1a6bb 100644 --- a/binarycpython/utils/grid.py +++ b/binarycpython/utils/grid.py @@ -809,7 +809,11 @@ class Population(analytics, if self.custom_options['save_snapshot']: # we must save a snapshot, not the population object - self.grid_options['start_at'] = self.grid_options["_count"] + # ... also save the new starting point: this has to take into + # account where we originally started, and that the modulo may + # not be == 1. + self.grid_options['start_at'] = self.grid_options['start_at'] + self.grid_options["_count"] * self.grid_options['modulo'] + # then save the snapshot self.save_snapshot() exitcode = 1 if self.was_killed() else 0 self.exit(code=exitcode) @@ -1116,12 +1120,18 @@ class Population(analytics, "results": keys_to_floats(self.preloaded_population.grid_results) } + for x in self._metadata_keylist(): try: combined_output_dict[x] = self.preloaded_population.grid_options[x] except Exception as e: print("Tried to set combined_output_dict key",x,"from preloaded_popuation, but this failed:",e) print("Pre-loaded data from {} stars".format(combined_output_dict["_count"])) + + # do not propagate _killed + #combined_output_dict['results']['_killed'] = False + #combined_output_dict['_killed'] = False + self.preloaded_population = None gc.collect() else: @@ -1363,7 +1373,7 @@ class Population(analytics, 0 # counter for the probability of the actual systems this tread ran ) number_of_systems_run = ( - 0 # counter for the actual amt of systems this thread ran + 0 # counter for the actual number of systems this thread ran ) zero_prob_stars_skipped = 0 total_time_calling_binary_c = 0 @@ -1381,7 +1391,8 @@ class Population(analytics, self.set_status('running') ############################################################ - # Go over the queue + # Run stellar systems in the queue + ############################################################ for system_number, system_dict in iter(job_queue.get, "STOP"): if False: @@ -2150,10 +2161,6 @@ class Population(analytics, Function to determine if the process was killed. Returns True if so, false otherwise. """ killed = self.grid_options['_killed'] - try: - killed = killed or self.grid_results['_killed'] - except: - pass try: killed = killed or self.grid_ensemble_results['metadata']['_killed'] except: diff --git a/binarycpython/utils/grid_options_defaults.py b/binarycpython/utils/grid_options_defaults.py index 8ca4abc37..910f8cb52 100644 --- a/binarycpython/utils/grid_options_defaults.py +++ b/binarycpython/utils/grid_options_defaults.py @@ -247,6 +247,7 @@ class grid_options_defaults(): "condor_initial_dir" : None, # directory from which condor is run, if None is the directory in which your script is run "condor_submit" : "condor_submit", # the condor_submit command "condor_getenv" : True, # if True condor takes the environment at submission and copies it to the jobs. You almost certainly want this. + "condor_restart_dir" : None, # restart Condor jobs from this directory ########################## # Unordered. Need to go through this. Copied from the perl implementation. ########################## diff --git a/binarycpython/utils/slurm.py b/binarycpython/utils/slurm.py index 4b942d994..86f1abe55 100644 --- a/binarycpython/utils/slurm.py +++ b/binarycpython/utils/slurm.py @@ -352,56 +352,3 @@ echo {slurm_dir}/results/$SLURM_ARRAY_JOB_ID.$SLURM_ARRAY_TASK_ID.gz >> {slurm_d # return True so we exit immediately return True - def slurm_jobid_from_dir(self,dir): - """ - Return the Slurm jobid from a slurm directory, passed in - """ - file = os.path.join(dir,'jobid') - f = open(file,"r",encoding='utf-8') - if not f: - print("Error: could not open {file} to read the Slurm jobid of the directory {dir}".format(file=file,dir=dir)) - sys.exit(code=1) - oldjobid = f.read().strip() - f.close() - if not oldjobid: - print("Error: could not find jobid in {slurm_restart_dir}".format(slurm_restart_dir=self.grid_options['slurm_restart_dir'])) - self.exit(code=1) - - return oldjobid - - def slurm_restore(self): - # restore location from existing Slurm run if provided, - # but only on slurm=2 runs - if self.grid_options['slurm'] == 2 and \ - self.grid_options['slurm_restart_dir'] and \ - self.grid_options['slurm_jobarrayindex'] != None: - oldjobid = self.slurm_jobid_from_dir(self.grid_options['slurm_restart_dir']) - - print("Restart from dir {dir} which was jobid {jobid}, we are jobarrayindex {index}".format( - dir=self.grid_options['slurm_restart_dir'], - jobid=oldjobid, - index=self.grid_options['slurm_jobarrayindex'] - )) - - # check status: if "finished", we don't have to do anything - file = os.path.join(self.grid_options['slurm_restart_dir'], - 'status', - "{id}.{index}".format(id=oldjobid, - index=self.grid_options['slurm_jobarrayindex'])) - status = open(file,encoding='utf-8').read() - if status == 'finished': - self.exit(code=0) - - file = os.path.join(self.grid_options['slurm_restart_dir'], - 'snapshots', - "{id}.{index}.gz".format(id=oldjobid, - index=self.grid_options['slurm_jobarrayindex'])) - - if os.path.exists(file): - # have data from which we can restore, set it in - # the appropriate grid option - self.grid_options['restore_from_snapshot_file'] = file - else: - # no snapshot: so no need to restore, just exit - sys.exit(code=0) - return -- GitLab