diff --git a/binarycpython/utils/HPC.py b/binarycpython/utils/HPC.py index ad91a04e12adb1a2267669573bd6e67db6a3b5fb..821450bf802599dfea1eb8d0d6fb3eb627ab09fe 100644 --- a/binarycpython/utils/HPC.py +++ b/binarycpython/utils/HPC.py @@ -71,54 +71,93 @@ class HPC(condor,slurm): prefix = os.path.join(dir,'results') file = os.path.join(prefix,id + '.all') + # make the output before checking anything, we do + # this to remove any asynchronicity + lines = [] + for i in range(0,n): + lines += [ os.path.join(prefix,"{id}.{i}.gz\n".format( + id=id, + i=i + )) ] + string = "".join(lines) + # check the joiningfile doesn't exist if not overwrite and os.path.isfile(file): # file already exists - print("Cannot make joiningfile at {file} because it already exists.".format(file=file)) + print("Cannot make joiningfile at {file} because it already exists, instead I am waiting for it to be unlocked.".format(file=file)) + self.wait_for_unlock(file) + joinfiles = self.HPC_load_joinfiles_list(joinlist=file) + print("Unlocked and got {} should be {}".format(len(joinfiles), + self.HPC_njobs())) # perhaps exit here? (e.g. for debugging) if error_on_overwrite: self.exit(code=1) x = False else: # open the file, but locked so we have first unique access - (f,lock) = self.locked_open(file, - mode="w", - encoding="utf-8") + (f,lock) = self.locked_open_for_write(file) # write to it if we are first to obtain unique access if lock and f: - for i in range(0,n): - f.write(os.path.join(prefix, - "{i}.gz\n".format(i=i))) + print("Making joiningfile list range (0,{}) at {}".format(n,file)) + f.write(string) + f.flush() + os.fsync(f) x = True + self.locked_close(f,lock) + os.sync() + self.NFS_flush_hack(file) + + print("Checking joiningfile {} length (size = {})".format( + file, + os.path.getsize(file) + )) + joinfiles = self.HPC_load_joinfiles_list(joinlist=file) + print("Got {} should be {}".format(len(joinfiles), + self.HPC_njobs())) + else: x = False - self.locked_close(f,lock) + print("Joiningfile failed to get lock: waiting for it to be unlocked") + self.wait_for_unlock(file) return x - def HPC_joinfiles(self,joinlist=None): + def HPC_joinlist(self,joinlist=None): """ - Function to load in the joinlist to a list and return it. + Function to return the default HPC joinlist file. + """ + if joinlist is None: + joinlist = self.grid_options['joinlist'] + return joinlist + + def HPC_load_joinfiles_list(self,joinlist=None): + """ + Function to load in the list of files we should join, and return it. If grid_options['HPC_rebuild_joinlist'] is True, we rebuild it. """ prefix = os.path.join(self.HPC_dir(), 'results') + if self.grid_options['HPC_rebuild_joinlist'] == 1: # we should rebuild the joinlist from the # files we find at the prefix directory + print("Rebuild joinlist from existing files") list = glob.glob(str(prefix) + '/*.gz') return list - - if joinlist is None: - joinlist = self.grid_options['joinlist'] + + joinlist = self.HPC_joinlist(joinlist=joinlist) try: + self.wait_for_unlock(joinlist) f = open(joinlist,'r',encoding='utf-8') list = f.read().splitlines() f.close() - if self.grid_options['HPC_prepend_dir_to_joinlist'] is True: - list = [os.path.join(prefix,file) for file in list] + if False: + print("HPC_load_joinfiles_list read joinlist {} -> gave file list of length {} with contents {}".format( + joinlist, + len(list), + list)) except Exception as e: print("Failed to open joinlist at {list} : {e}".format(list=joinlist,e=e)) self.exit(code=1) @@ -139,7 +178,7 @@ class HPC(condor,slurm): Check the joinfiles to make sure they all exist and their .saved equivalents also exist """ - print("HPC check if we can join at {}",self.now()) + print("HPC check if we can join at {}".format(self.now())) if self.grid_options['HPC_force_join'] == 0 and \ os.path.exists(joiningfile): if vb: @@ -307,11 +346,9 @@ class HPC(condor,slurm): """ id = self.HPC_jobID_tuple()[0] - if makejoiningfile and \ self.HPC_job_task() == 2 and \ id is not None: - print("Making joiningfile from HPC_grid().") self.HPC_make_joiningfile() if self.grid_options['slurm'] > 0: @@ -319,7 +356,7 @@ class HPC(condor,slurm): elif self.grid_options['condor'] > 0: x = self.condor_grid() else: - x =None # should not happen + x = None # should not happen return x @@ -437,17 +474,25 @@ class HPC(condor,slurm): print("This job did not finish (status is {status}) : cannot join".format(status=status)) else: # our job has finished - joinfiles = self.HPC_joinfiles() + joinfiles = self.HPC_load_joinfiles_list() joiningfile = self.HPC_path('joining') - print("Joinfiles : ",joinfiles) - print("Joingingfiles : ",joiningfile) + print("Joinfile list n={n} (should be {m}".format( + n=len(joinfiles), + m=self.HPC_njobs() + )) + print("Joingingfile is at : ",joiningfile) + + if len(joinfiles) != self.HPC_njobs(): + print("Number of joinfiles != njobs : this is wrong, exiting.") + self.exit(1) + if self.HPC_can_join(joinfiles,joiningfile,vb=True): # join object files print("We can join") try: # touch joiningfile if self.grid_options['HPC_force_join'] == 0: - print("Making joiningfile at {}",joiningfile) + print("Making joiningfile at {}".format(joiningfile)) pathlib.Path(joiningfile).touch(exist_ok=False) try: print("Calling HPC_join_from_files()") diff --git a/binarycpython/utils/condor.py b/binarycpython/utils/condor.py index 3cb71852c59385f50ea7586fcba7ade6816ed825..c711f944c4ea7e13db409f768313b3e1807b5360 100644 --- a/binarycpython/utils/condor.py +++ b/binarycpython/utils/condor.py @@ -186,11 +186,15 @@ class condor(): if self.grid_options['condor'] == 2: # run a grid of stars only, leaving the results # in the appropriate outfile + # + # return False to actually run the stars + self.grid_options['evolution_type'] = 'grid' return False elif self.grid_options['condor'] == 3: - # joining : set the evolution type to "join" and return - # False to continue + # joining : set the evolution type to "join" + # + # return False to continue self.grid_options['evolution_type'] = 'join' return False diff --git a/binarycpython/utils/dataIO.py b/binarycpython/utils/dataIO.py index a609960fcc53527ae1258c44a13e468554dc317b..099b8ef98a8a2fbe38de36acd3203472f3b7810e 100644 --- a/binarycpython/utils/dataIO.py +++ b/binarycpython/utils/dataIO.py @@ -12,6 +12,7 @@ import json import msgpack import os import pathlib +import time from typing import Union, Any from binarycpython.utils.ensemble import ( @@ -493,40 +494,59 @@ class dataIO(): file, lock): """ - Partner function to locked_open() + Partner function to locked_open_for_write() Closes and unlocks the file """ - print("locked_close:\nFile now = {}".format(file)) - print("Lock now = {}".format(lock)) + #print("locked_close:\nFile now = {}".format(file)) + #print("Lock now = {}".format(lock)) if file: - print("Close file {}".format(file)) + #print("Close file {}".format(file)) file.close() if lock: - print("Unlock {}".format(lock)) + #print("Unlock {}".format(lock)) lock.unlock() - print("File now = {}".format(file)) - print("Lock now = {}".format(lock)) - - def locked_open(self, - filename, - mode="r", - encoding="utf-8", - lock_suffix='.lock', - lock_timeout=5, - lock_lifetime=60, - fatal_open_errors=True, - **kwargs): + #print("File now = {}".format(file)) + #print("Lock now = {}".format(lock)) + if file: + self.NFS_flush_hack(file.name) + + def wait_for_unlock(self, + filename, + lock_suffix='.lock'): + """ + Companion to locked_open_for_write that waits for a filename + to a) exist and b) be unlocked. + + This should work because the lock file is created before the file + is created. + """ + while not os.path.isfile(filename): + time.sleep(0.25) + while os.path.isfile(filename + lock_suffix): + time.sleep(0.25) + + def locked_open_for_write(self, + filename, + encoding="utf-8", + lock_suffix='.lock', + lock_timeout=5, + lock_lifetime=60, + exists_ok=False, + fatal_open_errors=True, + vb=False, + **kwargs): """ - Wrapper for Python's open(filename) + Wrapper for Python's open(filename) which opens a file at + filename for writing (mode "w") and locks it. - We check whether the file's lockfile already exists, in which - case just return (None,None), and if we cannot obtain a - lock on the file we also return (None,None). + We check whether the file's lockfile already exists, in which + case just return (None,None), and if we cannot obtain a + lock on the file we also return (None,None). - If the file does not exist, we keep trying to lock until it does. + If the file does not exist, we keep trying to lock until it does. - To do the locking, we use flufl.lock which is NFS safe. + To do the locking, we use flufl.lock which is NFS safe. Args: lock_lifetime: (passed to flufl.lock.Lock()) default 60 seconds. @@ -534,68 +554,104 @@ class dataIO(): lock_timeout: (passed to flufl.lock.Lock()) default 5 seconds. This should be non-zero. fatal_open_errors: if open() fails and fatal_open_errors is True, exit. - + exists_ok: if False and the file at filename exists, return (None,None) (default False) + vb: verbose logging if True, defaults to False + Returns: (file_object, lock_object) tuple. If the file was not opened, returns (None,None). """ + if exists_ok is False and os.path.isfile(filename): + if vb: + print("File at {} already exists: cannot write to it".format(filename)) + return (None,None) + # set the lockfile path: this should be the same # for all processes, so it's just the original file # plus the lock_suffix lockfilename = filename + lock_suffix - print("lockfile={}".format(lockfilename)) + if vb: + print("lockfile={}".format(lockfilename)) while True: # if the file exists, just return if os.path.isfile(lockfilename): - print("lockfile at {} already exists (corresponding to file at {})".format( - lockfilename, - filename)) + if vb: + print("lockfile at {} already exists (corresponding to file at {})".format( + lockfilename, + filename)) return (None,None) # make the lock object by opening the lockfile lock = flufl.lock.Lock(lockfilename, default_timeout=lock_timeout) - print("post-lock: {}",format(lock)) + if vb: + print("post-lock: {}".format(lock)) if lock: # we have the lockfile, so set the lifetime and try to lock it lock.lifetime = datetime.timedelta(seconds=lock_lifetime) try: - print("try to lock",format(lock)) + if vb: + print("try to lock {}".format(lock)) lock.lock() - print("locked {}".format(lock)) + if vb: + if lock.is_locked(): + print("locked {}".format(lock)) + else: + print("failed to lock {}".format(lock)) except: pass # if we acquired the lock, try to open the file if lock.is_locked: - print("{} is locked by {} to {}", - filename,lock,lockfilename) - + if vb: + print("{} is locked by {} to {}".format( + filename,lock,lockfilename)) + + if exists_ok is False and os.path.isfile(filename): + if vb: + print("File at {} already exists (2): cannot write to it, unlocking and returning (None,None)".format(filename)) + lock.unlock() + return (None,None) + # All is apparently ok: file is locked try: - print("Try to open file at {}",filename) + if vb: + print("Try to open file at {}".format(filename)) f = open(filename, - mode=mode, + mode="w", encoding=encoding, **kwargs) - print("Return locked file {}, {}".format(f,lock)) + if vb: + print("Return locked file {}, {}".format(f,lock)) return (f,lock) # error on open should be fatal except Exception as e: - print("Error in locked_open() : {}",e) + if vb: + print("Error in locked_open_for_write() : {}".format(e)) if fatal_open_errors: - print("fatal EXIT") + if vb: + print("fatal exit on open") self.exit(1) else: - print("unlock {}".format(lock)) + if vb: + print("unlock {}".format(lock)) lock.unlock() - print("unlocked {} return None,None".format(lock)) + if vb: + print("unlocked {} return None,None".format(lock)) return (None,None) # failed to lock this time, keep trying # (we shouldn't lock up the CPU because the timeout is non-zero) continue + + def NFS_flush_hack(self,filename): + """ + Use opendir()/closedir() to flush NFS access to a file. + Note: this may or may not work! + """ + dir = os.path.dirname(filename) + os.scandir(dir) diff --git a/binarycpython/utils/grid.py b/binarycpython/utils/grid.py index d76584d7a696a23cd7ec2f51e84b367826852b5d..f42357b68908c0b733160a601089beea6721a80b 100644 --- a/binarycpython/utils/grid.py +++ b/binarycpython/utils/grid.py @@ -611,8 +611,8 @@ class Population(analytics, # open locked settings file, then output if we get the lock - (f,lock) = self.locked_open(settings_fullname, - mode="w") + (f,lock) = self.locked_open_for_write(settings_fullname) + if lock and f: self.verbose_print( "Writing settings to {}".format(settings_fullname), @@ -1113,10 +1113,10 @@ class Population(analytics, num_processes=self.grid_options["num_processes"]) # Join the processes - print("Do join...") + print("Do join of subprocesses ...") for p in processes: p.join() - print("Joined.") + print("Joined subprocesses.") # todo: error codes diff --git a/binarycpython/utils/grid_options_defaults.py b/binarycpython/utils/grid_options_defaults.py index a5a83a4004cf3e853974285e3e57c7f2b4f23a45..1b3327cc1e77067f5eb1a9972aea05e38ebd0193 100644 --- a/binarycpython/utils/grid_options_defaults.py +++ b/binarycpython/utils/grid_options_defaults.py @@ -195,7 +195,7 @@ class grid_options_defaults(): # HPC variables ######################################## "HPC_force_join" : 0, # if True, and the HPC variable ("slurm" or "condor") is 3, skip checking our own job and force the join - "HPC_rebuild_joinlist": 1, # if True, ignore the joinlist we would usually use and rebuild it automatically + "HPC_rebuild_joinlist": 0, # if True, ignore the joinlist we would usually use and rebuild it automatically ######################################## # Slurm stuff diff --git a/setup.py b/setup.py index 15fd6b9deb432cf724166f420237457afd5b1193..9ed029c681bd5cacabf8ed0114217530b83c02ae 100644 --- a/setup.py +++ b/setup.py @@ -262,6 +262,7 @@ setup( ], install_requires=[ "astropy", + "cachetools", "colorama", "compress_pickle", "datasize",