""" Binary_c-python's HPC functions These functions form a single API through which you can access HPC resources. Generally, you should call an HPC function rather than the Slurm or Condor interface directly. The HPC function then decides which interface to use, so that all the other modules can use a single API rather than have to choose to use the Slurm or Condor API. """ import os import pathlib from binarycpython.utils.slurm import slurm from binarycpython.utils.condor import condor class HPC(condor,slurm): def __init__(self, **kwargs): # don't do anything: we just inherit from this class return def HPC_joinfiles(self,joinlist=None): """ Function to load in the joinlist to a list and return it. If grid_options['HPC_rebuild_joinlist'] is True, we rebuild it. """ prefix = os.path.join(self.HPC_dir(), 'results') if self.grid_options['HPC_rebuild_joinlist'] == 1: # we should rebuild the joinlist from the # files we find at the prefix directory list = glob.glob(str(prefix) + '/*.gz') return list if joinlist is None: joinlist = self.grid_options['joinlist'] try: f = open(joinlist,'r',encoding='utf-8') list = f.read().splitlines() f.close() if self.grid_options['HPC_prepend_dir_to_joinlist'] = True: list = [os.path.join(prefix,file) for file in list] except: print("Failed to open joinlist at {list}".format(list=joinlist)) self.exit(code=1) return list def HPC_join_from_files(self,newobj,joinfiles): """ Merge the results from the list joinfiles into newobj. """ for file in joinfiles: print("Join data in",file) self.merge_populations_from_file(newobj, file) return newobj def HPC_can_join(self,joinfiles,joiningfile,vb=False): """ Check the joinfiles to make sure they all exist and their .saved equivalents also exist """ if os.path.exists(joiningfile): if vb: print("cannot join: joiningfile exists at {}".format(joiningfile)) return False elif vb: print("joiningfile (at {}) does not exist".format(joiningfile)) for file in joinfiles: if vb: print("check for {}".format(file)) if os.path.exists(file) == False: if vb: print("cannot join: {} does not exist".format(file)) return False savedfile = file + '.saved' if vb: print("check for {}".format(savedfile)) if os.path.exists(savedfile) == False: if vb: print("cannot join: {} does not exist".format(savedfile)) return False # found both files if vb: print("found {} and {}".format(file,savedfile)) # check for joiningfile again if os.path.exists(joiningfile): if vb: print("cannot join: joiningfile exists at {}".format(joiningfile)) return False elif vb: print("joiningfile does not exist") if vb: print("returning True from HPC_can_join()") return True def HPCjob(self): """ Function to return True if we're running an HPC (Slurm or Condor) job, False otherwise. """ if self.grid_options['slurm'] > 0 or \ self.grid_options['condor'] > 0: x = True else: x = False return x def HPCjobtype(self): """ Function to return a string telling us the type of an HPC job, i.e. "slurm", "condor" or "None". """ if self.grid_options['slurm'] > 0: type = "slurm" elif self.grid_options['condor'] > 0: type = "condor" else: type = "None" return type def HPCjobID(self): """ Function to return an HPC (Slurm or Condor) job id in the form x.y. Returns None if not an HPC job. """ if self.grid_options['slurm'] > 0: id = self.slurmID() elif self.grid_options['condor'] > 0: id = self.condorID() else: # not an HPC job id = None return id def HPCjobIDtuple(self): """ Return the job ID as a tuple, (x,y), or (None,None) on failure """ id = self.HPCjobID() if id: t = tuple(id.split('.')) else: t = (None,None) return t def HPC_set_status(self,string): """ Set the appropriate HPC job (Condor or Slurm) status file to whatever is given in string. Arguments: string : the new contents of the status file Returns: True if the status was set, False otherwise. (As returned by either the appropriate Condor or Slurm function) """ if self.grid_options['slurm'] > 0: status = self.set_slurm_status(string) elif self.grid_options['condor'] > 0: status = self.set_condor_status(string) else: status = None return status def HPC_get_status(self,dir=None): """ Get and return the appropriate HPC job (Condor or Slurm) status string for this job. Args: dir : optional HPC run directory. If not set, the default (e.g. slurm_dir or condor_dir) is used. """ if self.grid_options['slurm'] > 0: status = self.get_slurm_status(dir=dir) elif self.grid_options['condor'] > 0: status = self.get_condor_status(dir=dir) else: status = None return status def HPC_dirs(self): """ Function to return a list of directories required for this HPC job. """ if self.grid_options['slurm'] > 0: dirs = self.slurm_dirs() elif self.grid_options['condor'] > 0: dirs = self.condor_dirs() else: dirs = [] return dirs def HPCgrid(self): """ Function to call the appropriate HPC grid function (e.g. Slurm or Condor) and return what it returns """ if self.grid_options['slurm'] > 0: return self.slurm_grid() elif self.grid_options['condor'] > 0: return self.condor_grid() else: return None # should not happen def HPC_check_requirements(self): """ Function to check HPC option requirements have been met. Returns a tuple: (True,"") if all is ok, (False,<warning string>) otherwise. """ if self.grid_options['slurm'] > 0: t = self.slurm_check_requirements() elif self.grid_options['condor'] > 0: t = self.condor_check_requirements() else: t = (True,"") return t def HPC_id_filename(self,dir): """ HPC jobs have a filename in their directory which specifies the job id. This function returns the contents of that file as a string, or None on failure. """ if self.grid_options['slurm'] > 0: filename = 'jobid' elif self.grid_options['condor'] > 0: filename = 'ClusterID' else: filename = None return filename def HPC_id_from_dir(self,dir): """ Function to return the ID of an HPC run given its (already existing) directory. """ filename = self.HPC_id_filename(dir) if not filename: return None file = os.path.join(dir,filename) f = open(file,"r",encoding='utf-8') if not f: print("Error: could not open {file} to read the HPC jobid of the directory {dir}".format(file=file, dir=dir)) self.exit(code=1) oldjobid = f.read().strip() if not oldjobid: print("Error: could not find jobid in {dir}".format(dir=dir)) self.exit(code=1) else: f.close() return oldjobid def HPC_restore(self): """ Set grid_options['restore_from_snapshot_file'] so that we restore data from existing an HPC run if self.grid_options[type+'_restart_dir'], where type is "slurm" or "condor", is provided, otherwise do nothing. This only works if grid_options[type] == 2, which is the run-grid stage of the process. """ type = self.HPCjobtype() if type is None: return key = type + '_restart_dir' if key not in self.grid_options: return # get restart directory dir = self.grid_options[type + '_restart_dir'] if dir is None: return # get HPC job index index = self.HPCjobIDtuple()[1] if index is None: return if self.grid_options[type] == 2: old_id = self.HPC_id_from_dir(dir) print("Restart from dir {dir} which was has (old) ID {old_id}, we are job index {index}".format( dir=dir, old_id=old_id, index=index )) # check status: if "finished", we don't have to do anything status = self.HPC_get_status(dir=dir) #file = os.path.join(dir, # 'status', # "{id}.{index}".format(id=old_id, # index=index)) #status = open(file,encoding='utf-8').read() if status == 'finished': print("Status is finished, cannot and do not need to restart.") self.exit(code=0) file = os.path.join(dir, 'snapshots', "{id}.{index}.gz".format(id=old_id, index=index)) if os.path.exists(file): # have data from which we can restore, set it in # the appropriate grid option print("Restore this run from snapshot {file}".format(file=file)) self.grid_options['restore_from_snapshot_file'] = file else: # no snapshot: so no need to restore, just exit print("Expected snapshot at {file} but none was found".format(file=file)) self.exit(code=0) return def HPC_join_previous(self): """ Function to join previously generated datasets. """ # check that our job has finished status = self.HPC_get_status() print("Job status",status) if self.grid_options['HPC_force_join'] == 0 and \ status != "finished": # job did not finish : save a snapshot print("This job did not finish (status is {status}) : cannot join".format(status=status)) else: # our job has finished joinfiles = self.HPC_joinfiles() joiningfile = self.HPCpath('joining') print("Joinfiles: ",joinfiles) print("Joingingfiles: ",joiningfile) if self.HPC_can_join(joinfiles,joiningfile,vb=True): # join object files print("We can join") try: # touch joiningfile pathlib.Path(joiningfile).touch(exist_ok=False) try: self.HPC_join_from_files(self,joinfiles) except Exception as e: print("Join gave exception",e) # disable analytics calculations : use the # values we just loaded self.grid_options['do_analytics'] = False return except: pass else: print("cannot join : other tasks are not yet finished\n") print("Finished this job : exiting") self.exit(code=1) def HPCpath(self,path): """ Function to file the filename of this HPC job's file at path. """ if self.grid_options['slurm'] > 0: p = self.slurmpath(path) elif self.grid_options['condor'] > 0: p = self.condorpath(path) else: p = None return p def HPC_snapshot_filename(self): """ Function to return an HPC job's snapshot filename. """ if self.HPCjob(): file = os.path.join(self.HPC_dir, 'snapshots', self.HPCjobID() + '.gz') else: file = None return file def HPC_dir(self): """ Function to return an HPC job's directory. """ if self.grid_options['slurm'] > 0: d = self.grid_options['slurm_dir'] elif self.grid_options['condor'] > 0: d = self.grid_options['condor_dir'] else: d = None return d