From 7b982fe7794e474af4a80b109cac813ca92d968b Mon Sep 17 00:00:00 2001 From: Izzard <ri0005@orca.eps.surrey.ac.uk> Date: Fri, 19 Nov 2021 13:34:26 +0000 Subject: [PATCH] add custon self.locked_open function to lock and uniquely open files --- binarycpython/utils/HPC.py | 31 ++++--------- binarycpython/utils/dataIO.py | 85 +++++++++++++++++++++++++++++++++++ binarycpython/utils/grid.py | 30 +++++++------ 3 files changed, 111 insertions(+), 35 deletions(-) diff --git a/binarycpython/utils/HPC.py b/binarycpython/utils/HPC.py index 33343107a..c335811bb 100644 --- a/binarycpython/utils/HPC.py +++ b/binarycpython/utils/HPC.py @@ -80,33 +80,20 @@ class HPC(condor,slurm): self.exit(code=1) x = False else: - # lock the joiningfile - lock = flufl.lock.Lock(file) + # open the file, but locked so we have first unique access + (f,lock) = self.locked_open(file, + mode="w", + encoding="utf-8") - # writing this file should not take > 1 hour - lock.lifetime = datetime.timedelta(seconds=3600) - - if os.path.isfile(file): - print("Cannot make joiningfile at {file} because it already exists.".format(file=file)) - x = False - else: - if lock.is_locked: - # write the joiningfile - print("Making joiningfile at {file} with range 0 to {n}".format( - file=file, - n=n - )) - with open(file,"w",encoding="utf-8") as f: + # write to it if we are first to obtain unique access + x = False + if lock: + if f: for i in range(0,n): f.write(os.path.join(prefix, "{i}.gz\n".format(i=i))) x = True - else: - print("Could not lock joiningfile at {file}".format(file=file)) - x = False - - # unlock the file - lock.unlock() + lock.unlock() return x def HPC_joinfiles(self,joinlist=None): diff --git a/binarycpython/utils/dataIO.py b/binarycpython/utils/dataIO.py index 6bf7f703f..9dea08a09 100644 --- a/binarycpython/utils/dataIO.py +++ b/binarycpython/utils/dataIO.py @@ -6,6 +6,7 @@ import bz2 import compress_pickle import copy import datetime +import flufl.lock import gzip import json import msgpack @@ -487,3 +488,87 @@ class dataIO(): # custom logging functions for HPC jobs if self.HPC_job(): self.HPC_set_status(string) + + def locked_open(self, + file, + mode="r", + encoding="utf-8", + lock_suffix='.lock', + lock_timeout=5, + lock_lifetime=60, + fatal_open_errors=True, + **kwargs): + """ + Wrapper for Python's open(file) + + We check whether the file exists, in which case just return + (None,None), and if we cannot get the lock we return + (None,None). + + If the file does not exist, we keep trying to lock until it does. + + To do the locking, we use flufl.lock which is NFS safe. + + Args: + lock_lifetime: (passed to flufl.lock.Lock()) default 60 seconds. + It should take less than this time to write the file. + lock_timeout: (passed to flufl.lock.Lock()) default 5 seconds. + This should be non-zero. + fatal_open_errors: if open() fails and fatal_open_errors is True, exit. + + Returns: + (file_object, lock_object) tuple. + If the file was not opened, returns (None,None). + """ + + # set the lockfile path: this should be the same + # for all processes, so it's just the original file + # plus the lock_suffix + lockfile = file + lock_suffix + + while True: + # if the file exists, just return + if os.path.isfile(lockfile): + return (None,None) + + # make the lock object by opening the lockfile + lock = flufl.lock.Lock(lockfile, + default_timeout=lock_timeout) + + if lock: + # we have the lockfile, so set the lifetime and try to lock it + lock.lifetime = datetime.timedelta(seconds=lock_lifetime) + try: + lock.lock() + except: + pass + + # if we acquired the lock, try to open the file + if lock.is_locked: + + # check it hasn't been opened in the meantime + if os.path.isfile(file): + lock.unlock() + return (None,None) + + # All is apparently ok: file is locked and does not + # exist so open the file + try: + f = open(file, + mode=mode, + encoding=encoding, + **kwargs) + return (f,lock) + + # error on open should be fatal + except Exception as e: + print("Error in locked_open() : {}",e) + if fatal_open_errors: + self.exit(1) + else: + lock.unlock() + return (None,None) + + # failed to lock this time, keep trying + # (we shouldn't lock up the CPU because the timeout is non-zero) + continue diff --git a/binarycpython/utils/grid.py b/binarycpython/utils/grid.py index 75dc6439d..0eeb544eb 100644 --- a/binarycpython/utils/grid.py +++ b/binarycpython/utils/grid.py @@ -598,21 +598,25 @@ class Population(analytics, self.custom_options["data_dir"], settings_name ) - self.verbose_print( - "Writing settings to {}".format(settings_fullname), - self.grid_options["verbosity"], - 1, - ) - # if not outfile.endswith('json'): - with open(settings_fullname, "w") as file: - json.dump( - all_info_cleaned, - file, - indent=indent, - default=binaryc_json_serializer, - ensure_ascii=ensure_ascii + # open locked settings file, then output if we get the lock + (f,lock) = self.locked_open(settings_fullname, + mode="w") + if lock: + self.verbose_print( + "Writing settings to {}".format(settings_fullname), + self.grid_options["verbosity"], + 1, ) + if f: + json.dump( + all_info_cleaned, + f, + indent=indent, + default=binaryc_json_serializer, + ensure_ascii=ensure_ascii + ) + lock.unlock() return settings_fullname else: msg = "Exporting all info without passing a value for `outfile` requires custom_options['data_dir'] to be present. That is not the cause. Either set the `data_dir` or pass a value for `outfile` " -- GitLab