diff --git a/binarycpython/utils/HPC.py b/binarycpython/utils/HPC.py index 821450bf802599dfea1eb8d0d6fb3eb627ab09fe..81e87e933856c004312ecbe9d298238f8e177223 100644 --- a/binarycpython/utils/HPC.py +++ b/binarycpython/utils/HPC.py @@ -11,6 +11,7 @@ import datetime import flufl.lock import glob +import json import os import pathlib import time @@ -102,7 +103,7 @@ class HPC(condor,slurm): print("Making joiningfile list range (0,{}) at {}".format(n,file)) f.write(string) f.flush() - os.fsync(f) + os.fsync(f.fileno()) x = True self.locked_close(f,lock) os.sync() @@ -307,20 +308,31 @@ class HPC(condor,slurm): status = None return status - def HPC_get_status(self,dir=None): + def HPC_get_status(self, + job_id=None, + job_index=None, + dir=None): """ - Get and return the appropriate HPC job (Condor or Slurm) status string for this job. + Get and return the appropriate HPC job (Condor or Slurm) status string for this job (or, if given, the job at id.index) Args: dir : optional HPC run directory. If not set, the default (e.g. slurm_dir or condor_dir) is used. + job_id,job_index : the id and index of the job to be queried """ + #print("HPC get status job_id={} job_index={} dir={}".format( + # job_id,job_index,dir)) if self.grid_options['slurm'] > 0: - status = self.get_slurm_status(dir=dir) + status = self.get_slurm_status(jobid=job_id, + jobarrayindex=job_index, + dir=dir) elif self.grid_options['condor'] > 0: - status = self.get_condor_status(dir=dir) + status = self.get_condor_status(ClusterID=job_id, + Process=job_index, + dir=dir) else: status = None + #print("return ",status) return status def HPC_dirs(self): @@ -346,6 +358,9 @@ class HPC(condor,slurm): """ id = self.HPC_jobID_tuple()[0] + # give some current status about the HPC run + self.HPC_dump_status("HPC grid before") + if makejoiningfile and \ self.HPC_job_task() == 2 and \ id is not None: @@ -358,6 +373,9 @@ class HPC(condor,slurm): else: x = None # should not happen + # give some current status about the HPC run + self.HPC_dump_status("HPC grid after") + return x def HPC_check_requirements(self): @@ -474,14 +492,25 @@ class HPC(condor,slurm): print("This job did not finish (status is {status}) : cannot join".format(status=status)) else: # our job has finished + HPC_status = self.HPC_status() + + #HPC_queue_stats = self.HPC_queue_stats() + + if HPC_status['status']['finished'] != HPC_status['njobs']: + print("HPC_status reports {} finished jobs out of {}. We cannot join because not all the jobs are finished. Exiting.".format( + HPC_status['status']['finished'], + HPC_status['njobs'] + )) + self.exit(1) + joinfiles = self.HPC_load_joinfiles_list() joiningfile = self.HPC_path('joining') print("Joinfile list n={n} (should be {m}".format( n=len(joinfiles), m=self.HPC_njobs() )) - print("Joingingfile is at : ",joiningfile) - + print("Joingingfile path : ",joiningfile) + if len(joinfiles) != self.HPC_njobs(): print("Number of joinfiles != njobs : this is wrong, exiting.") self.exit(1) @@ -493,7 +522,7 @@ class HPC(condor,slurm): # touch joiningfile if self.grid_options['HPC_force_join'] == 0: print("Making joiningfile at {}".format(joiningfile)) - pathlib.Path(joiningfile).touch(exist_ok=False) + self.HPC_touch(joiningfile) try: print("Calling HPC_join_from_files()") self.HPC_join_from_files(self,joinfiles) @@ -546,3 +575,88 @@ class HPC(condor,slurm): else: d = None return d + + def HPC_touch(self,filename,string=None): + """ + Function to touch the file at filename, put into it the job number + and (if given) the string passed in. + """ + try: + f = open(filename,'w',encoding='utf-8') + if f: + job = self.HPC_jobID() + jobtype = self.HPC_job_type() + if job: + s = str(job) + if jobtype: + s += ' ' + str(jobtype) + f.write(s + "\n") + if string: + f.write(string) + f.flush() + f.close() + self.NFS_flush_hack(filename) + except: + pass + + def HPC_status(self): + """ + Return a dict of useful information about the current status + of this HPC run. + """ + d = {} # returned + _id,_index = self.HPC_jobID_tuple() + d['job_id'] = _id + d['job_index'] = _index + if _id and _index: + n = self.HPC_njobs() + d['njobs'] = n + d['job_task'] = self.HPC_job_task() + d['job_type'] = self.HPC_job_type() + d['job_status'] = self.HPC_get_status() + d['dir'] = self.HPC_dir() + d['dirs'] = self.HPC_dirs() + + # get fellow jobs' status + d["status"] = {} + d["joblist"] = {} + + # default types + for x in ["running","starting","finishing","finished","killed"]: + d["status"][x] = 0 + d["joblist"][x] = [] + + for i in range(0,n): + s = self.HPC_get_status(job_id=_id, + job_index=i) + if s is None: + s = "unknown" + if not s in d["status"]: + d["status"][s] = 1 + else: + d["status"][s] += 1 + if not s in d["joblist"]: + d["joblist"][s] = [str(_id) + '.' + str(i)] + else: + d["joblist"][s] += [str(_id) + '.' + str(i)] + + return d + + + def HPC_dump_status(self,string=None): + d = self.HPC_status() + print("############################################################") + if not string: + string = "" + print("HPC job status " + string) + print(json.dumps(d,indent=4)) + print("############################################################") + + def HPC_queue_stats(self): + if self.grid_options['slurm'] > 0: + x = self.slurm_queue_stats() + elif self.grid_options['condor'] > 0: + x = self.condor_queue_stats() + else: + x = None + return x diff --git a/binarycpython/utils/condor.py b/binarycpython/utils/condor.py index c711f944c4ea7e13db409f768313b3e1807b5360..de7d4983bd56f69591215507ba909fb1e536386a 100644 --- a/binarycpython/utils/condor.py +++ b/binarycpython/utils/condor.py @@ -9,6 +9,7 @@ import multiprocessing import os import pathlib import signal +import re import stat import subprocess import sys @@ -25,7 +26,7 @@ class condor(): Function to return a Condor job ID. The ClusterID and Process passed in are used if given, otherwise we default to the condor_ClusterID and condor_Process in grid_options. """ if ClusterID is None: - ClusterID= self.grid_options['condor_ClusterID'] + ClusterID = self.grid_options['condor_ClusterID'] if Process is None: Process = self.grid_options['condor_Process'] return "{ClusterID}.{Process}".format(ClusterID=ClusterID, @@ -86,13 +87,15 @@ class condor(): with open(idfile,"w",encoding='utf-8') as fClusterID: fClusterID.write("{ClusterID}\n".format(ClusterID=self.grid_options['condor_ClusterID'])) fClusterID.close() - + self.NFS_flush_hack(idfile) + # save condor status file = self.condor_status_file(dir=dir) if file: with open(file,'w',encoding='utf-8') as f: f.write(string) f.close() + self.NFS_flush_hack(file) return def get_condor_status(self, @@ -108,15 +111,22 @@ class condor(): Process = self.grid_options['condor_Process'] if ClusterID is None or Process is None : return None + #print("get_condor_status {}.{}".format(ClusterID,Process)) try: path = pathlib.Path(self.condor_status_file(dir=dir, ClusterID=ClusterID, Process=Process)) + #print("path={}".format(path)) + #print("size={}".format(path.stat().st_size)) if path: - return path.read_text().strip() + s = path.read_text().strip() + #print("contains {}".format(s)) + return s else: + #print("path empty") return "" except: + #print("read failed") return "" def condor_outfile(self,dir=None): @@ -423,3 +433,33 @@ queue {njobs} # return True so we exit immediately return True + + def condor_queue_stats(self): + """ + Return condor queue statistics for this job + """ + _id = self.grid_options['condor_ClusterID'] + if not _id: + return None + cmd = "{} {} 2>&1".format( + '/usr/bin/condor_q',#self.grid_options["condor_q"], + _id + ) + print("Q cmd",cmd) + result = subprocess.Popen(cmd, + shell=True, + stdout=subprocess.PIPE).stdout.read() + print("Q result ",result) + if not result: + return None + d = {} + for x in ["jobs","completed","removed","idle","running","held","suspended"]: + print("Q x ",x) + m = re.search("(\d+)\s+{}".format(x), + result) + print("Q m ",m) + if m: + d[x] = m.group(0) + + print("Q d ",d) + return d diff --git a/binarycpython/utils/dataIO.py b/binarycpython/utils/dataIO.py index 099b8ef98a8a2fbe38de36acd3203472f3b7810e..6d625b99b60b1b4c276b8391c0515e93f27e6b23 100644 --- a/binarycpython/utils/dataIO.py +++ b/binarycpython/utils/dataIO.py @@ -118,15 +118,18 @@ class dataIO(): object.grid_options['store_memaddr'] = _store_memaddr object.persistent_data_memory_dict = persistent_data_memory_dict + self.NFS_flush_hack(filename) + # touch 'saved' file - pathlib.Path(filename + '.saved').touch(exist_ok=True) - + saved = filename + '.saved' + self.HPC_touch(saved) return def load_population_object(self,filename): """ returns the Population object loaded from filename """ + self.NFS_flush_hack(filename) if filename is None: obj = None else: @@ -216,8 +219,30 @@ class dataIO(): The file should be saved using save_population_object() """ + + mtime = time.localtime(os.path.getmtime(filename)) + modtime = time.strftime("%a, %d %b %Y %H:%M:%S",mtime) + print("Load data from {filename} : size {size}, modtime {modtime}".format( + filename=filename, + size=os.path.getsize(filename), + modtime=modtime, + )) + newpop = self.load_population_object(filename) + try: + n = newpop.grid_options['total_count'] + except: + try: + n = newpop.grid_options['_count'] + except: + try: + n = newpop.grid_ensemble_results["metadata"]["_count"] + except: + n = -1 + print("Loaded data from {n} stars".format( + n=n + )) # merge with refpop try: @@ -475,17 +500,19 @@ class dataIO(): ID = self.process_ID if self.grid_options['status_dir']: + path = os.path.join( + self.grid_options["status_dir"], + format_statment.format(ID), + ) with open( - os.path.join( - self.grid_options["status_dir"], - format_statment.format(ID), - ), + path, "w", encoding='utf-8' ) as f: f.write(string) f.close() - + self.NFS_flush_hack(path) + # custom logging functions for HPC jobs if self.HPC_job(): self.HPC_set_status(string) @@ -498,16 +525,10 @@ class dataIO(): Closes and unlocks the file """ - #print("locked_close:\nFile now = {}".format(file)) - #print("Lock now = {}".format(lock)) if file: - #print("Close file {}".format(file)) file.close() if lock: - #print("Unlock {}".format(lock)) lock.unlock() - #print("File now = {}".format(file)) - #print("Lock now = {}".format(lock)) if file: self.NFS_flush_hack(file.name) @@ -653,5 +674,7 @@ class dataIO(): Use opendir()/closedir() to flush NFS access to a file. Note: this may or may not work! """ + os.sync() dir = os.path.dirname(filename) os.scandir(dir) + diff --git a/binarycpython/utils/grid.py b/binarycpython/utils/grid.py index d2ebfea798b7492d7ea96ba0d8784b3042ce718a..c604de20ff881adce7eff926f331e447b125ceb8 100644 --- a/binarycpython/utils/grid.py +++ b/binarycpython/utils/grid.py @@ -400,7 +400,7 @@ class Population(analytics, parameter, type(value), type(old_value) ), self.grid_options["verbosity"], - 1, + 3, ) try: if isinstance(old_value, bool): @@ -442,7 +442,7 @@ class Population(analytics, parameter, value ), self.grid_options["verbosity"], - 1, + 3, ) cmdline_dict[parameter] = value @@ -833,6 +833,9 @@ class Population(analytics, # make analytics information analytics_dict = self.make_analytics_dict() + if self.HPC_job(): + self.HPC_dump_status("HPC grid after analytics") + if self.custom_options['save_snapshot']: # we must save a snapshot, not the population object # ... also save the new starting point: this has to take into @@ -847,10 +850,10 @@ class Population(analytics, # Save object to a pickle file elif self.grid_options['save_population_object']: self.save_population_object() - + # if we're running an HPC grid, exit here # unless we're joining - if self.HPC_grid() and \ + if self.HPC_job() and \ self.grid_options['evolution_type'] != 'join': self.exit() @@ -2285,7 +2288,7 @@ class Population(analytics, 3, ) - def now(self,style=None,specifier=None): + def now(self,now=None,style=None,specifier=None): """ convenience function to return a string of the current time, using the format "%m/%d/%Y %H:%M:%S" @@ -2296,7 +2299,8 @@ class Population(analytics, specifier: if set, uses this as a specifier rather than whatever is set by default or in the style variable """ - now = datetime.datetime.now() + if not now: + now = datetime.datetime.now() if not specifier: if style == 'nospace': # special case diff --git a/binarycpython/utils/grid_options_defaults.py b/binarycpython/utils/grid_options_defaults.py index 33dee5f0feba7fc1115905724a918caf858e5116..cb903637e577bf3a8d08c491f80288aaab9b2d4d 100644 --- a/binarycpython/utils/grid_options_defaults.py +++ b/binarycpython/utils/grid_options_defaults.py @@ -252,6 +252,7 @@ class grid_options_defaults(): "condor_date" : shutil.which("date"), # bash location for Condor "condor_initial_dir" : None, # directory from which condor is run, if None is the directory in which your script is run "condor_submit" : shutil.which("condor_submit"), # the condor_submit command + "condor_q" : shutil.which("condor_q"), # the condor_submit command "condor_getenv" : True, # if True condor takes the environment at submission and copies it to the jobs. You almost certainly want this. "condor_batchname" : "binary_c-condor", # Condor batchname option "condor_kill_sig" : "SIGINT", # signal Condor should use to stop a process : note that grid.py expects this to be "SIGINT" @@ -331,6 +332,7 @@ class grid_options_defaults(): "condor_initial_dir" : "String. Directory from which condor scripts are run. If set to the default, None, this is the directory from which your script is run.", "condor_submit" : "String. The Condor_submit command, usually \"/usr/bin/condor_submit\" but will depend on your HTCondor installation.", + "condor_q" : "String. The Condor_q command, usually \"/usr/bin/condor_q\" but will depend on your HTCondor installation.", "condor_getenv" : "Boolean. If True, the default, condor takes the environment at submission and copies it to the jobs. You almost certainly want this to be True.", "condor_batchname" : "String. Condor batchname option: this is what appears in condor_q. Defaults to \"binary_c-condor\"", "condor_kill_sig" : "String. Signal Condor should use to stop a process. Note that grid.py expects this to be \"SIGINT\" which is the default.", diff --git a/binarycpython/utils/slurm.py b/binarycpython/utils/slurm.py index 8f20683937aaa790bf3af8b725fa70ca018cebe8..5a6a152016032a75e997c7a21f625cb4c20d832e 100644 --- a/binarycpython/utils/slurm.py +++ b/binarycpython/utils/slurm.py @@ -86,6 +86,7 @@ class slurm(): with open(idfile,"w",encoding='utf-8') as fjobid: fjobid.write("{jobid}\n".format(jobid=self.grid_options['slurm_jobid'])) fjobid.close() + self.NFS_flush_hach(idfile) # save slurm status file = self.slurm_status_file(dir=dir) @@ -93,6 +94,7 @@ class slurm(): with open(file,'w',encoding='utf-8') as f: f.write(string) f.close() + self.NFS_fluch_hack(file) return def get_slurm_status(self, @@ -109,10 +111,12 @@ class slurm(): if jobid is None or jobarrayindex is None : return None try: + path = pathlib.Path(self.slurm_status_file(dir=dir, jobid=jobid, jobarrayindex=jobarrayindex)) if path: + self.NFS_flush_hack(path) return path.read_text().strip() else: return "" @@ -365,3 +369,6 @@ echo \"running\" > {slurm_dir}/status/$SLURM_ARRAY_JOB_ID.$SLURM_ARRAY_TASK_ID # return True so we exit immediately return True + + def slurm_queue_stats(self): + return None diff --git a/binarycpython/utils/spacing_functions.py b/binarycpython/utils/spacing_functions.py index 4b7fe64652be002b511025d3b47dc70b0b7ae8d0..d18f252534d591ece733497a88a4008029784428 100644 --- a/binarycpython/utils/spacing_functions.py +++ b/binarycpython/utils/spacing_functions.py @@ -257,8 +257,11 @@ class spacing_functions(): # _const_dt bse_stripped = bse_options.copy() for x in ['multiplicity']: - del bse_stripped[x] - + try: + del bse_stripped[x] + except: + pass + # make a JSON string of the options (this can be # used to check the cache) bse_options_json = json.dumps(bse_stripped,