Skip to content
Snippets Groups Projects
Commit 7b982fe7 authored by Izzard's avatar Izzard
Browse files

add custon self.locked_open function to lock and uniquely open files

parent 91113467
No related branches found
No related tags found
No related merge requests found
......@@ -80,33 +80,20 @@ class HPC(condor,slurm):
self.exit(code=1)
x = False
else:
# lock the joiningfile
lock = flufl.lock.Lock(file)
# open the file, but locked so we have first unique access
(f,lock) = self.locked_open(file,
mode="w",
encoding="utf-8")
# writing this file should not take > 1 hour
lock.lifetime = datetime.timedelta(seconds=3600)
if os.path.isfile(file):
print("Cannot make joiningfile at {file} because it already exists.".format(file=file))
x = False
else:
if lock.is_locked:
# write the joiningfile
print("Making joiningfile at {file} with range 0 to {n}".format(
file=file,
n=n
))
with open(file,"w",encoding="utf-8") as f:
# write to it if we are first to obtain unique access
x = False
if lock:
if f:
for i in range(0,n):
f.write(os.path.join(prefix,
"{i}.gz\n".format(i=i)))
x = True
else:
print("Could not lock joiningfile at {file}".format(file=file))
x = False
# unlock the file
lock.unlock()
lock.unlock()
return x
def HPC_joinfiles(self,joinlist=None):
......
......@@ -6,6 +6,7 @@ import bz2
import compress_pickle
import copy
import datetime
import flufl.lock
import gzip
import json
import msgpack
......@@ -487,3 +488,87 @@ class dataIO():
# custom logging functions for HPC jobs
if self.HPC_job():
self.HPC_set_status(string)
def locked_open(self,
file,
mode="r",
encoding="utf-8",
lock_suffix='.lock',
lock_timeout=5,
lock_lifetime=60,
fatal_open_errors=True,
**kwargs):
"""
Wrapper for Python's open(file)
We check whether the file exists, in which case just return
(None,None), and if we cannot get the lock we return
(None,None).
If the file does not exist, we keep trying to lock until it does.
To do the locking, we use flufl.lock which is NFS safe.
Args:
lock_lifetime: (passed to flufl.lock.Lock()) default 60 seconds.
It should take less than this time to write the file.
lock_timeout: (passed to flufl.lock.Lock()) default 5 seconds.
This should be non-zero.
fatal_open_errors: if open() fails and fatal_open_errors is True, exit.
Returns:
(file_object, lock_object) tuple.
If the file was not opened, returns (None,None).
"""
# set the lockfile path: this should be the same
# for all processes, so it's just the original file
# plus the lock_suffix
lockfile = file + lock_suffix
while True:
# if the file exists, just return
if os.path.isfile(lockfile):
return (None,None)
# make the lock object by opening the lockfile
lock = flufl.lock.Lock(lockfile,
default_timeout=lock_timeout)
if lock:
# we have the lockfile, so set the lifetime and try to lock it
lock.lifetime = datetime.timedelta(seconds=lock_lifetime)
try:
lock.lock()
except:
pass
# if we acquired the lock, try to open the file
if lock.is_locked:
# check it hasn't been opened in the meantime
if os.path.isfile(file):
lock.unlock()
return (None,None)
# All is apparently ok: file is locked and does not
# exist so open the file
try:
f = open(file,
mode=mode,
encoding=encoding,
**kwargs)
return (f,lock)
# error on open should be fatal
except Exception as e:
print("Error in locked_open() : {}",e)
if fatal_open_errors:
self.exit(1)
else:
lock.unlock()
return (None,None)
# failed to lock this time, keep trying
# (we shouldn't lock up the CPU because the timeout is non-zero)
continue
......@@ -598,21 +598,25 @@ class Population(analytics,
self.custom_options["data_dir"], settings_name
)
self.verbose_print(
"Writing settings to {}".format(settings_fullname),
self.grid_options["verbosity"],
1,
)
# if not outfile.endswith('json'):
with open(settings_fullname, "w") as file:
json.dump(
all_info_cleaned,
file,
indent=indent,
default=binaryc_json_serializer,
ensure_ascii=ensure_ascii
# open locked settings file, then output if we get the lock
(f,lock) = self.locked_open(settings_fullname,
mode="w")
if lock:
self.verbose_print(
"Writing settings to {}".format(settings_fullname),
self.grid_options["verbosity"],
1,
)
if f:
json.dump(
all_info_cleaned,
f,
indent=indent,
default=binaryc_json_serializer,
ensure_ascii=ensure_ascii
)
lock.unlock()
return settings_fullname
else:
msg = "Exporting all info without passing a value for `outfile` requires custom_options['data_dir'] to be present. That is not the cause. Either set the `data_dir` or pass a value for `outfile` "
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment