From 661edaaa9c6bde6e003008ec4fd437ee6a0832ce Mon Sep 17 00:00:00 2001 From: Robert Izzard <r.izzard@surrey.ac.uk> Date: Tue, 16 Nov 2021 19:00:53 +0000 Subject: [PATCH] add option to force join --- binarycpython/utils/HPC.py | 54 +++++++++-------- binarycpython/utils/condor.py | 63 +++++++++++-------- binarycpython/utils/grid_options_defaults.py | 5 ++ binarycpython/utils/slurm.py | 64 ++++++++++++-------- 4 files changed, 111 insertions(+), 75 deletions(-) diff --git a/binarycpython/utils/HPC.py b/binarycpython/utils/HPC.py index 465262323..57053bd23 100644 --- a/binarycpython/utils/HPC.py +++ b/binarycpython/utils/HPC.py @@ -91,7 +91,7 @@ class HPC(condor,slurm): else: x = False return x - + def HPCjobtype(self): """ Function to return a string telling us the type of an HPC job, i.e. @@ -104,7 +104,7 @@ class HPC(condor,slurm): else: type = "None" return type - + def HPCjobID(self): """ Function to return an HPC (Slurm or Condor) job id in the form x.y. Returns None if not an HPC job. @@ -117,7 +117,7 @@ class HPC(condor,slurm): # not an HPC job id = None return id - + def HPCjobIDtuple(self): """ Return the job ID as a tuple, (x,y), or (None,None) on failure @@ -128,7 +128,7 @@ class HPC(condor,slurm): else: t = (None,None) return t - + def HPC_set_status(self,string): """ Set the appropriate HPC job (Condor or Slurm) status file to whatever is given in string. @@ -148,18 +148,22 @@ class HPC(condor,slurm): status = None return status - def HPC_get_status(self): + def HPC_get_status(self,dir=None): """ Get and return the appropriate HPC job (Condor or Slurm) status string for this job. + + Args: + dir : optional HPC run directory. If not set, the default (e.g. slurm_dir or condor_dir) + is used. """ if self.grid_options['slurm'] > 0: - status = self.get_slurm_status() + status = self.get_slurm_status(dir=dir) elif self.grid_options['condor'] > 0: - status = self.get_condor_status() + status = self.get_condor_status(dir=dir) else: status = None return status - + def HPC_dirs(self): """ Function to return a list of directories required for this HPC job. @@ -171,10 +175,10 @@ class HPC(condor,slurm): else: dirs = [] return dirs - + def HPCgrid(self): """ - Function to call the appropriate HPC grid function + Function to call the appropriate HPC grid function (e.g. Slurm or Condor) and return what it returns """ if self.grid_options['slurm'] > 0: @@ -195,7 +199,7 @@ class HPC(condor,slurm): else: t = (True,"") return t - + def HPC_id_filename(self,dir): """ HPC jobs have a filename in their directory which specifies the job id. This function returns the contents of that file as a string, or None on failure. @@ -207,7 +211,7 @@ class HPC(condor,slurm): else: filename = None return filename - + def HPC_id_from_dir(self,dir): """ Function to return the ID of an HPC run given its (already existing) directory. @@ -229,13 +233,13 @@ class HPC(condor,slurm): f.close() return oldjobid - - + + def HPC_restore(self): """ Set grid_options['restore_from_snapshot_file'] so that we restore data from existing an HPC run if self.grid_options[type+'_restart_dir'], where type is "slurm" or "condor", - is provided, otherwise do nothing. This only works if grid_options[type] == 2, which is + is provided, otherwise do nothing. This only works if grid_options[type] == 2, which is the run-grid stage of the process. """ type = self.HPCjobtype() @@ -261,21 +265,22 @@ class HPC(condor,slurm): )) # check status: if "finished", we don't have to do anything - file = os.path.join(dir, - 'status', - "{id}.{index}".format(id=old_id, - index=index)) - status = open(file,encoding='utf-8').read() - + status = self.HPC_get_status(dir=dir) + #file = os.path.join(dir, + # 'status', + # "{id}.{index}".format(id=old_id, + # index=index)) + #status = open(file,encoding='utf-8').read() + if status == 'finished': - print("Status is finished, cannot restart.") + print("Status is finished, cannot and do not need to restart.") self.exit(code=0) file = os.path.join(dir, 'snapshots', "{id}.{index}.gz".format(id=old_id, index=index)) - + if os.path.exists(file): # have data from which we can restore, set it in # the appropriate grid option @@ -287,7 +292,6 @@ class HPC(condor,slurm): self.exit(code=0) return - def HPC_join_previous(self): """ @@ -297,7 +301,7 @@ class HPC(condor,slurm): status = self.HPC_get_status() print("Job status",status) - if status != "finished": + if self.grid_options['HPC_force_join'] != 0 or status != "finished": # job did not finish : save a snapshot print("This job did not finish (status is {status}) : cannot join".format(status=status)) else: diff --git a/binarycpython/utils/condor.py b/binarycpython/utils/condor.py index 34a0effcd..4f1d6eb5b 100644 --- a/binarycpython/utils/condor.py +++ b/binarycpython/utils/condor.py @@ -31,19 +31,22 @@ class condor(): return "{ClusterID}.{Process}".format(ClusterID=ClusterID, Process=Process) - def condorpath(self,path): + def condorpath(self,path,dir=None): """ Function to return the full condor directory path. """ - return os.path.abspath(os.path.join(self.grid_options['condor_dir'],path)) + if dir is None: + dir = self.grid_options['condor_dir'] + return os.path.abspath(os.path.join(dir,path)) def condor_status_file(self, ClusterID=None, - Process=None): + Process=None, + dir=None): """ Return the condor status file corresponding to the ClusterID and Process, which default to grid_options condor_ClusterID and condor_Process, respectively. """ - return os.path.join(self.condorpath('status'), + return os.path.join(self.condorpath('status',dir=dir), self.condorID(ClusterID,Process)) def condor_check_requirements(self): @@ -66,12 +69,18 @@ class condor(): """ return ['condor_dir'] - def set_condor_status(self,string): + def set_condor_status(self,string,dir=None): """ Set the condor status corresponing to the self object, which should have condor_ClusterID and condor_Process set. + + Args: + string : the status string to be set + dir : the directory in which the status directory is held. If not set, this defaults to the HPC directory (e.g. slurm_dir or condor_dir). """ # save condor ClusterID to file - idfile = os.path.join(self.grid_options["condor_dir"], + if dir is None: + dir = self.grid_options["condor_dir"] + idfile = os.path.join(dir, "ClusterID") if not os.path.exists(idfile): with open(idfile,"w",encoding='utf-8') as fClusterID: @@ -79,7 +88,7 @@ class condor(): fClusterID.close() # save condor status - file = self.condor_status_file() + file = self.condor_status_file(dir=dir) if file: with open(file,'w',encoding='utf-8') as f: f.write(string) @@ -87,8 +96,9 @@ class condor(): return def get_condor_status(self, - ClusterID=None, - Process=None): + ClusterID=None, + Process=None, + dir=None): """ Get and return the condor status corresponing to the self object, or ClusterID.Process if they are passed in. If no status is found, returns an empty string.. """ @@ -96,12 +106,11 @@ class condor(): ClusterID = self.grid_options['condor_ClusterID'] if Process is None: Process = self.grid_options['condor_Process'] - if ClusterID is None or Process is None : return None - try: - path = pathlib.Path(self.condor_status_file(ClusterID=ClusterID, + path = pathlib.Path(self.condor_status_file(dir=dir, + ClusterID=ClusterID, Process=Process)) if path: return path.read_text().strip() @@ -110,38 +119,42 @@ class condor(): except: return "" - def condor_outfile(self): + def condor_outfile(self,dir=None): """ return a standard filename for the condor chunk files """ file = "{id}.gz".format( id = self.condorID() ) - return os.path.abspath(os.path.join(self.grid_options['condor_dir'], + if dir is None: + dir = self.grid_options['condor_dir'] + return os.path.abspath(os.path.join(dir, 'results', file)) - def make_condor_dirs(self): + def make_condor_dirs(self,dir=None): # make the condor directories - if not self.grid_options['condor_dir']: - print("You must set self.grid_options['condor_dir'] to a directory which we can use to set up binary_c-python's Condor files. This should be unique to your set of grids.") + if dir is None: + dir = self.grid_options['condor_dir'] + if not dir: + print("You must set self.grid_options['condor_dir'] (or pass dir=whatever to make_condor_dirs()) to a directory which we can use to set up binary_c-python's Condor files. This should be unique to your set of grids.") os.exit() # make a list of directories, these contain the various condor # output, status files, etc. dirs = [] - for dir in ['stdout','stderr','log','results','status','snapshots']: - dirs.append(self.condorpath(dir)) + for d in ['stdout','stderr','log','results','status','snapshots']: + dirs.append(self.condorpath(d,dir=dir)) # make the directories: we do not allow these to already exist # as the condor directory should be a fresh location for each set of jobs - for dir in dirs: + for d in dirs: try: - pathlib.Path(self.condorpath(dir)).mkdir(exist_ok=False, + pathlib.Path(self.condorpath(d,dir=dir)).mkdir(exist_ok=False, parents=True) except: - print("Tried to make the directory {dir} but it already exists. When you launch a set of binary_c jobs on Condor, you need to set your condor_dir to be a fresh directory with no contents.".format(dir=dir)) + print("Tried to make the directory {d} but it already exists. When you launch a set of binary_c jobs on Condor, you need to set your condor_dir to be a fresh directory with no contents.".format(d=d)) self.exit(code=1) # check that they have been made and exist: we need this @@ -155,11 +168,11 @@ class condor(): count += 1 if count > count_warn: print("Warning: Have been waiting about {} seconds for Condor directories to be made, there seems to be significant delay...".format(count)) - for dir in dirs: - if os.path.isdir(dir) is False: + for d in dirs: + if os.path.isdir(d) is False: fail = True time.sleep(1) - break # break the "for dir in dirs:" + break def condor_grid(self): """ diff --git a/binarycpython/utils/grid_options_defaults.py b/binarycpython/utils/grid_options_defaults.py index a87ecc261..2f459719f 100644 --- a/binarycpython/utils/grid_options_defaults.py +++ b/binarycpython/utils/grid_options_defaults.py @@ -187,6 +187,11 @@ class grid_options_defaults(): # _lock => undef, # _evcode_pids => [], # }; + ######################################## + # HPC variables + ######################################## + "HPC_force_join" : 0, # if True, and the HPC variable ("slurm" or "condor") is 3, skip checking our own job and force the join + ######################################## # Slurm stuff ######################################## diff --git a/binarycpython/utils/slurm.py b/binarycpython/utils/slurm.py index 7fb4c5a45..cd03e4166 100644 --- a/binarycpython/utils/slurm.py +++ b/binarycpython/utils/slurm.py @@ -30,20 +30,24 @@ class slurm(): return "{jobid}.{jobarrayindex}".format(jobid=jobid, jobarrayindex=jobarrayindex) - def slurmpath(self,path): + def slurmpath(self,path,dir=None): """ Function to return the full slurm directory path. """ - return os.path.abspath(os.path.join(self.grid_options['slurm_dir'],path)) + if dir is None: + dir = self.grid_options['slurm_dir'] + return os.path.abspath(os.path.join(dir,path)) def slurm_status_file(self, jobid=None, - jobarrayindex=None): + jobarrayindex=None, + dir=None): """ Return the slurm status file corresponding to the jobid and jobarrayindex, which default to grid_options slurm_jobid and slurm_jobarrayindex, respectively. """ - return os.path.join(self.slurmpath('status'), - self.slurmID(jobid,jobarrayindex)) + return os.path.join(self.slurmpath('status',dir=dir), + self.slurmID(jobid=jobid, + jobarrayindex=jobarrayindex)) def slurm_check_requirements(self): """ @@ -66,20 +70,25 @@ class slurm(): """ return ['slurm_dir'] - def set_slurm_status(self,string): + def set_slurm_status(self,string,dir=None): """ Set the slurm status corresponing to the self object, which should have slurm_jobid and slurm_jobarrayindex set. + + Args: + string : the status string to be set + dir : the directory in which the status directory is held. If not set, this defaults to the HPC directory (e.g. slurm_dir or condor_dir). """ # save slurm jobid to file - idfile = os.path.join(self.grid_options["slurm_dir"], - "jobid") + if dir is None: + dir = self.grid_options["slurm_dir"] + idfile = os.path.join(dir,"jobid") if not os.path.exists(idfile): with open(idfile,"w",encoding='utf-8') as fjobid: fjobid.write("{jobid}\n".format(jobid=self.grid_options['slurm_jobid'])) fjobid.close() # save slurm status - file = self.slurm_status_file() + file = self.slurm_status_file(dir=dir) if file: with open(file,'w',encoding='utf-8') as f: f.write(string) @@ -88,7 +97,8 @@ class slurm(): def get_slurm_status(self, jobid=None, - jobarrayindex=None): + jobarrayindex=None, + dir=None): """ Get and return the slurm status string corresponing to the self object, or jobid.jobarrayindex if they are passed in. If no status is found, returns an empty string. """ @@ -96,12 +106,11 @@ class slurm(): jobid = self.grid_options['slurm_jobid'] if jobarrayindex is None: jobarrayindex = self.grid_options['slurm_jobarrayindex'] - if jobid is None or jobarrayindex is None : return None - try: - path = pathlib.Path(self.slurm_status_file(jobid=jobid, + path = pathlib.Path(self.slurm_status_file(dir=dir, + jobid=jobid, jobarrayindex=jobarrayindex)) if path: return path.read_text().strip() @@ -110,22 +119,27 @@ class slurm(): except: return "" - def slurm_outfile(self): + def slurm_outfile(self,dir=None): """ return a standard filename for the slurm chunk files """ file = "{id}.gz".format( id = self.slurmID(), ) - return os.path.abspath(os.path.join(self.grid_options['slurm_dir'], + if dir is None: + dir = self.grid_options['slurm_dir'] + return os.path.abspath(os.path.join(dir, 'results', file)) - def make_slurm_dirs(self): + def make_slurm_dirs(self,dir=None): # make the slurm directories - if not self.grid_options['slurm_dir']: - print("You must set self.grid_options['slurm_dir'] to a directory which we can use to set up binary_c-python's Slurm files. This should be unique to your set of grids.") + if dir is None: + dir = self.grid_options['slurm_dir'] + + if not dir: + print("You must set self.grid_options['slurm_dir'] (or pass dir=whatever to make_slurm_dirs()) to a directory which we can use to set up binary_c-python's Slurm files. This should be unique to your set of grids.") os.exit() # make a list of directories, these contain the various slurm @@ -136,12 +150,12 @@ class slurm(): # make the directories: we do not allow these to already exist # as the slurm directory should be a fresh location for each set of jobs - for dir in dirs: + for d in dirs: try: - pathlib.Path(self.slurmpath(dir)).mkdir(exist_ok=False, - parents=True) + pathlib.Path(self.slurmpath(d,dir=dir)).mkdir(exist_ok=False, + parents=True) except: - print("Tried to make the directory {dir} but it already exists. When you launch a set of binary_c jobs on Slurm, you need to set your slurm_dir to be a fresh directory with no contents.".format(dir=dir)) + print("Tried to make the directory {d} but it already exists. When you launch a set of binary_c jobs on Slurm, you need to set your slurm_dir to be a fresh directory with no contents.".format(d=d)) self.exit(code=1) # check that they have been made and exist: we need this @@ -155,11 +169,11 @@ class slurm(): count += 1 if count > count_warn: print("Warning: Have been waiting about {count} seconds for Slurm directories to be made, there seems to be significant delay...".format(count=count)) - for dir in dirs: - if os.path.isdir(dir) is False: + for d in dirs: + if os.path.isdir(d) is False: fail = True time.sleep(1) - break # break the "for dir in dirs:" + break def slurm_grid(self): """ -- GitLab