From 219a6a95efd3d66adfb3cf79ab95b5992867b6e0 Mon Sep 17 00:00:00 2001 From: Robert Izzard <r.izzard@surrey.ac.uk> Date: Wed, 10 Nov 2021 19:12:05 +0000 Subject: [PATCH] now killing the child or parent process kills them both Slurm restarts work, just take your old (unfinished) Slurm dir and set it with slurm_restart_dir=<whatever> --- binarycpython/utils/functions.py | 1 + binarycpython/utils/grid.py | 613 +++++++++++++------ binarycpython/utils/grid_options_defaults.py | 7 +- 3 files changed, 444 insertions(+), 177 deletions(-) diff --git a/binarycpython/utils/functions.py b/binarycpython/utils/functions.py index 51058652f..9aea0de81 100644 --- a/binarycpython/utils/functions.py +++ b/binarycpython/utils/functions.py @@ -113,6 +113,7 @@ def get_ANSI_colours(): "magenta": Fore.MAGENTA, "white": Fore.WHITE, "black": Fore.BLACK, + "bold": Style.BRIGHT } background_colours = { diff --git a/binarycpython/utils/grid.py b/binarycpython/utils/grid.py index 240f9f78b..e8412d0f8 100644 --- a/binarycpython/utils/grid.py +++ b/binarycpython/utils/grid.py @@ -41,6 +41,7 @@ import platform import pprint # for debugging only import psutil import py_rinterpolate +import queue import re import resource import setproctitle @@ -50,6 +51,7 @@ import strip_ansi import subprocess import sys import time +import traceback import uuid @@ -165,8 +167,7 @@ class Population: # Custom options self.custom_options = { - 'stop_queue' : False, - 'save_snapshot' : False + 'save_snapshot' : False, } # grid code generation @@ -234,12 +235,29 @@ class Population: def jobID(self): # job ID if self.grid_options['slurm'] > 0: - jobID = "{}.{}".format(self.grid_options['jobid'], + jobID = "{}.{}".format(self.grid_options['slurm_jobid'], self.grid_options['slurm_jobarrayindex']) else: jobID = "{}".format(self.process_ID) return jobID + def exit(self,code=None,message=True,flush=True,stacktrace=False): + # wrapper for sys.exit() to return the correct exit code + + # if we've been killed, set exit code to 1 + if self.grid_options['exit_code']==0 and self.grid_options['_killed']: + self.grid_options['exit_code'] = 1 + # but override with code passed in + if code: + self.grid_options['exit_code'] = code + if message: + print("exit from binary_c-python Population with code {}".format(self.grid_options['exit_code'])) + if flush: + sys.stdout.flush() + if stacktrace: + traceback.print_stack() + sys.exit(self.grid_options['exit_code']) + ################################################### # Argument functions ################################################### @@ -283,7 +301,7 @@ class Population: verbose_print( "adding: {}={} to BSE_options".format(key, kwargs[key]), self.grid_options["verbosity"], - 1, + 2, ) self.bse_options[key] = kwargs[key] @@ -360,58 +378,64 @@ class Population: cmdline_dict = {} for cmdline_arg in cmdline_args: split = cmdline_arg.split("=") - parameter = split[0] - value = split[1] - old_value_found = False - - # Find an old value - if parameter in self.grid_options: - old_value = self.grid_options[parameter] - old_value_found = True - - elif parameter in self.defaults: - old_value = self.defaults[parameter] - old_value_found = True - - elif parameter in self.custom_options: - old_value = self.custom_options[parameter] - old_value_found = True - - # (attempt to) convert - if old_value_found: - if old_value != None: - try: - verbose_print( - "Converting type of {} from {} to {}".format( - parameter, type(value), type(old_value) - ), - self.grid_options["verbosity"], - 2, - ) - value = type(old_value)(value) - verbose_print("Success!", self.grid_options["verbosity"], 2) - except ValueError: + if len(split)==2: - # might be able to eval the parameter, e.g. - # an expression like "2-1" can eval to "1" - # which would be valid - try: - evaled = eval(value) - value = type(old_value)(evaled) - verbose_print("Success! (evaled)", self.grid_options["verbosity"], 2) + parameter = split[0] + value = split[1] + old_value_found = False - except ValueError: + # Find an old value + if parameter in self.grid_options: + old_value = self.grid_options[parameter] + old_value_found = True + + elif parameter in self.defaults: + old_value = self.defaults[parameter] + old_value_found = True + + elif parameter in self.custom_options: + old_value = self.custom_options[parameter] + old_value_found = True + + # (attempt to) convert + if old_value_found: + if old_value != None: + try: verbose_print( - "Tried to convert the given parameter {}/value {} to its correct type {} (from old value {}). But that wasn't possible.".format( - parameter, value, type(old_value), old_value + "Converting type of {} from {} to {}".format( + parameter, type(value), type(old_value) ), self.grid_options["verbosity"], - 0, + 2, ) + value = type(old_value)(value) + verbose_print("Success!", self.grid_options["verbosity"], 2) + + except ValueError: - # Add to dict - cmdline_dict[parameter] = value + # might be able to eval the parameter, e.g. + # an expression like "2-1" can eval to "1" + # which would be valid + try: + evaled = eval(value) + value = type(old_value)(evaled) + verbose_print("Success! (evaled)", self.grid_options["verbosity"], 2) + + except ValueError: + verbose_print( + "Tried to convert the given parameter {}/value {} to its correct type {} (from old value {}). But that wasn't possible.".format( + parameter, value, type(old_value), old_value + ), + self.grid_options["verbosity"], + 0, + ) + # Add to dict + cmdline_dict[parameter] = value + + else: + print("Error: I do not know how to process",cmdline_arg," : cmdline args should be in the format x=y, yours appears not to be.") + self.exit(1) # unpack the dictionary into the setting function that handles where the values are set self.set(**cmdline_dict) @@ -420,7 +444,9 @@ class Population: def set_status(self, string, format_statment="process_{}.txt", - ID=None): + ID=None, + slurm=True, + condor=True): """ function to set the status string in its appropriate file """ @@ -440,7 +466,7 @@ class Population: f.close() # custom logging functions - if self.grid_options['slurm'] >= 1: + if slurm and self.grid_options['slurm'] >= 1: self.set_slurm_status(string) # if self.grid_options['condor']==1: # self.set_condor_status(string) @@ -658,7 +684,8 @@ class Population: # check parameters if False and dphasevol != -1.0 and gridtype == 'discrete': print("Error making grid: you have set the phasevol to be not -1 and gridtype to discrete, but a discrete grid has no phasevol calculation. You should only set the gridtype to discrete and not set the phasevol in this case.") - sys.exit() + + self.exit(code=1) # Add grid_variable grid_variable = { @@ -703,7 +730,7 @@ class Population: verbose_print( "Added grid variable: {}".format(json.dumps(grid_variable, indent=4)), self.grid_options["verbosity"], - 1, + 2, ) ################################################### @@ -806,7 +833,10 @@ class Population: Function to return how much CPU time we've used """ dt = self.time_elapsed() - ncpus = self.grid_options['num_processes'] + try: + ncpus = self.grid_options['num_processes'] + except: + ncpus = 1 return dt * ncpus def export_all_info( @@ -1113,7 +1143,7 @@ class Population: if self.grid_options['tmp_dir'] is None \ or not os.path.isdir(self.grid_options['tmp_dir']): print("grid_options['tmp_dir'] is not set or it is not a directory : this should point to a temporary directory location, preferably local to your CPUs") - sys.exit(1) + self.exit(code=1) # set default directory locations @@ -1122,7 +1152,7 @@ class Population: if self.grid_options['slurm'] > 0 and \ self.grid_options['slurm_dir'] is None: print("You have set slurm=",self.grid_options['slurm'],"but not set slurm_dir. Please set it and try again.") - sys.exit(1) + self.exit(code=1) # default status_dir to be tmp_dir/status # NOTE: binary_c-python uses its own status_dir, which is not @@ -1156,7 +1186,7 @@ class Population: path = self.grid_options[dir] if path != None and dir_ok(path) == False: print("Directory {dir} currently set to {path} cannot be written to. Please check that this directory is correct and you have write access.".format(dir=dir,path=path)) - sys.exit(1) + self.exit(code=1) # Make sure the subdirs of the tmp dir exist subdirs = ['failed_systems','current_system','process_summary','runtime_systems','snapshots'] @@ -1165,8 +1195,47 @@ class Population: os.makedirs(path,exist_ok=True) if dir_ok(path) == False: print("Sub-Directory {subdir} (in tmp_dir) currently set to {path} cannot be written to. Please check that this directory is correct and you have write access.".format(subdir=subdir,path=path)) - sys.exit(1) + self.exit(code=1) + + # load from existing Slurm run if provided, but only on slurm=1 runs + # (slurm=2 is to join data, which shouldn't preload) + if self.grid_options['slurm'] == 1 and \ + self.grid_options['slurm_restart_dir'] and \ + self.grid_options['slurm_jobarrayindex'] != None: + f = open(os.path.join(self.grid_options['slurm_restart_dir'],'jobid')) + oldjobid = f.read().strip() + f.close() + if not oldjobid: + print("Error: could not find jobid in {}".format(self.grid_options['slurm_restart_dir'])) + self.exit(code=1) + + print("Restart from dir {} which was jobid {}, we are jobarrayindex {}".format( + self.grid_options['slurm_restart_dir'], + oldjobid, + self.grid_options['slurm_jobarrayindex'] + )) + + # check status: if "finished", we don't have to do anything + file = os.path.join(self.grid_options['slurm_restart_dir'], + 'status', + "{}.{}".format(oldjobid, + self.grid_options['slurm_jobarrayindex'])) + status = open(file).read() + if status == 'finished': + self.exit(code=0) + + file = os.path.join(self.grid_options['slurm_restart_dir'], + 'snapshots', + "{}.{}.gz".format(oldjobid, + self.grid_options['slurm_jobarrayindex'])) + if os.path.exists(file): + # have data from which we can restore, set it in + # the appropriate grid option + self.grid_options['restore_from_snapshot_file'] = file + else: + # no need to restore + sys.exit(code=0) return def clean(self) -> None: @@ -1194,6 +1263,10 @@ class Population: Returns an dictionary containing the analytics of the run """ +# signal.signal(signal.SIGTERM, +# functools.partial(self._parent_signal_handler,{'where':'evolve'})) +# signal.signal(signal.SIGINT, +# functools.partial(self._parent_signal_handler,{'where':'evolve'})) # Just to make sure we don't have stuff from a previous run hanging around self._pre_run_setup() @@ -1211,7 +1284,7 @@ class Population: self.slurm_grid() # and then exit print("Slurm jobs launched : exiting") - sys.exit() + self.exit(code=0) else: # Execute population evolution subroutines self._evolve_population() @@ -1257,7 +1330,8 @@ class Population: # we must save a snapshot, not the population object self.grid_options['start_at'] = self.grid_options["_count"] self.save_snapshot() - sys.exit() + code = 1 if self.was_killed() else 0 + self.exit(code=code) # Save object to a pickle file elif self.grid_options['save_population_object']: @@ -1267,7 +1341,7 @@ class Population: # unless we're joining if self.grid_options["slurm"] >= 1 and \ self.grid_options['evolution_type'] != 'join': - sys.exit() + self.exit() ## # Clean up code: remove files, unset values, unload interpolators etc. This is placed in the general evolve function, @@ -1296,27 +1370,37 @@ class Population: # special cases if self.grid_options['evolution_type'] == 'join': - joinfiles = self.joinfiles() - joiningfile = self.slurmpath('joining') - if self.can_join(joinfiles,joiningfile): - # join object files - try: - pathlib.Path(joiningfile).touch(exist_ok=False) - print("can join : all tasks are finished") - try: - self.join_from_files(self,joinfiles) - except Exception as e: - print("Join gave exception",e) - # disable analytics calculations : use the - # values we just loaded - self.grid_options['do_analytics'] = False - return - except: - pass + + # check that our job has finished + status = self.get_slurm_status() + + if status != "finished": + # job did not finish : save a snapshot + print("This job did not finish (status is {status}) : cannot join".format(status=status)) + self.exit(code=1) else: - print("cannot join : other tasks are not yet finished\n") - print("Finished this job : exiting") - sys.exit() + # our job has finished + joinfiles = self.joinfiles() + joiningfile = self.slurmpath('joining') + if self.can_join(joinfiles,joiningfile): + # join object files + try: + pathlib.Path(joiningfile).touch(exist_ok=False) + print("can join : all tasks are finished") + try: + self.join_from_files(self,joinfiles) + except Exception as e: + print("Join gave exception",e) + # disable analytics calculations : use the + # values we just loaded + self.grid_options['do_analytics'] = False + return + except: + pass + else: + print("cannot join : other tasks are not yet finished\n") + print("Finished this job : exiting") + self.exit(code=1) ############################################################ # Evolve systems @@ -1337,8 +1421,6 @@ class Population: ) ) - - # finished! self.grid_options["_end_time_evolution"] = time.time() @@ -1355,6 +1437,13 @@ class Population: totaldtsecs=timedelta(dtsecs * self.grid_options["num_processes"]), memuse=sum(self.shared_memory["max_memory_use_per_thread"]), ) + + + # add warning about a grid that was killed + if self.was_killed(): + string2 += "\n>>> Grid was killed <<<" + self.set_status("killed") + verbose_print(self._boxed(string1, string2), self.grid_options["verbosity"], 0) if self.grid_options["_errors_found"]: @@ -1388,7 +1477,7 @@ class Population: ) else: verbose_print( - "There were no errors found in this run.", + "No failed systems were found in this run.", self.grid_options["verbosity"], 0, ) @@ -1407,12 +1496,14 @@ class Population: return stream_logger - def _system_queue_filler(self, job_queue, num_cores): + def _system_queue_filler(self, job_queue, num_processes): """ Function that is responsible for keeping the queue filled. This will generate the systems until it is full, and then keeps trying to fill it. Will have to play with the size of this. + + This function is called in the parent process. """ stream_logger = self._get_stream_logger() @@ -1429,25 +1520,20 @@ class Population: self._load_grid_function() generator = self.grid_options["_system_generator"]( - self, print_results=False + self, + print_results=False ) - # Continuously fill the queue - signal.signal(signal.SIGTERM, - functools.partial(self._signal_handler)) - signal.signal(signal.SIGINT, - functools.partial(self._signal_handler)) - # start_at can be an expression : we should eval it # prior to running the loop self.grid_options['start_at'] = eval(str(self.grid_options['start_at'])) if self.grid_options['start_at'] > 0: print("Starting at model {} ".format(self.grid_options['start_at'])) + # Continuously fill the queue while we are allowed to for system_number, system_dict in enumerate(generator): - - if self.custom_options['stop_queue']: - print("QUEUE DETECTED STOP") + if self.grid_options['stop_queue']: + break # skip systems before start_at elif system_number < self.grid_options["start_at"]: @@ -1473,21 +1559,31 @@ class Population: continue # Put job in queue - job_queue.put((system_number, system_dict)) + if self.grid_options['stop_queue']: + break + else: + try: + job_queue.put((system_number, system_dict),block=True) + except Exception as e: + # error on queueing : stop the queue + self.grid_options['stop_queue'] = True - # Print some info - verbose_print( - "Queue produced system {}".format(system_number), - self.grid_options["verbosity"], - 3, - ) + # Print some info + verbose_print( + "Queue produced system {}".format(system_number), + self.grid_options["verbosity"], + 3, + ) + + self.grid_options['_queue_done'] = True # Send closing signal to workers. When they receive this they will terminate if self.grid_options["verbosity"] >= _LOGGER_VERBOSITY_LEVEL: stream_logger.debug(f"Signalling processes to stop") # DEBUG - for _ in range(num_cores): - job_queue.put("STOP") + if True:#not self.grid_options['stop_queue']: + for _ in range(num_processes): + job_queue.put("STOP") def _evolve_population_grid(self): """ @@ -1511,16 +1607,29 @@ class Population: We read out the information in the result queue and store them in the grid object """ + # Set process name setproctitle.setproctitle("binarycpython parent process") + # if max_queue_size is zero, calculate automatically + # to be double the number of processes - you don't want to + # make the queue too large because when it's killed you + # want to end quickly + if self.grid_options["max_queue_size"] == 0: + self.grid_options["max_queue_size"] = 2 * self.grid_options["num_processes"] + # Set up the manager object that can share info between processes manager = multiprocessing.Manager() job_queue = manager.Queue(maxsize=self.grid_options["max_queue_size"]) - result_queue = manager.Queue(maxsize=self.grid_options["num_processes"]) - # Create process instances + # data to be sent to signal handlers + signal_data = { + 'where' : '_evolve_population_grid', + 'queue' : job_queue, + } + + # Create process instances to run the stars processes = [] for ID in range(self.grid_options["num_processes"]): processes.append( @@ -1534,12 +1643,23 @@ class Population: for p in processes: p.start() - # Set up the system_queue - self._system_queue_filler(job_queue, num_cores=self.grid_options["num_processes"]) + # activate signal handlers + # * the child processes ignore these signals + # * the parent will be in _system_queue_filler when these are caught + signal.signal(signal.SIGTERM, + functools.partial(self._parent_signal_handler,signal_data)) + signal.signal(signal.SIGINT, + functools.partial(self._parent_signal_handler,signal_data)) + + # Set up the system_queue in the parent process + self._system_queue_filler(job_queue, + num_processes=self.grid_options["num_processes"]) # Join the processes + print("Do join") for p in processes: p.join() + print("Joined") keylist = ["_failed_count", "_failed_prob", @@ -1549,13 +1669,16 @@ class Population: "_count", "_total_mass_run", "_total_probability_weighted_mass_run", - "_zero_prob_stars_skipped"] + "_zero_prob_stars_skipped", + "_killed"] # todo: error codes # Handle the results by merging all the dictionaries. How that merging happens exactly is # described in the merge_dicts description. if self.preloaded_population: + # use preloaded population's data as a basis + # for our combined_output_dict combined_output_dict = { "ensemble_results" : keys_to_floats(self.preloaded_population.grid_ensemble_results), "results": keys_to_floats(self.preloaded_population.grid_results) @@ -1565,30 +1688,34 @@ class Population: try: combined_output_dict[x] = self.preloaded_population.grid_options[x] except Exception as e: - print("OOPS",e,x) + print("Tried to set combined_output_dict key",x,"from preloaded_popuation, but this failed:",e) print("Pre-loaded data from {} stars".format(combined_output_dict["_count"])) else: + # new empty combined output combined_output_dict = OrderedDict() + combined_output_dict['ensemble_results'] = OrderedDict() + combined_output_dict['results'] = OrderedDict() sentinel = object() for output_dict in iter(result_queue.get, sentinel): - - # don't let Xinit be added - if "ensemble_results" in combined_output_dict and \ - "ensemble" in combined_output_dict["ensemble_results"] and \ - "Xinit" in combined_output_dict["ensemble_results"]["ensemble"]: - del combined_output_dict["ensemble_results"]["ensemble"]["Xinit"] - - # merge dicts - combined_output_dict = merge_dicts(combined_output_dict, - keys_to_floats(output_dict)) + if output_dict: + # don't let Xinit be added + if "ensemble_results" in combined_output_dict and \ + "ensemble" in combined_output_dict["ensemble_results"] and \ + "Xinit" in combined_output_dict["ensemble_results"]["ensemble"]: + del combined_output_dict["ensemble_results"]["ensemble"]["Xinit"] + + # merge dicts + combined_output_dict = merge_dicts(combined_output_dict, + keys_to_floats(output_dict)) if result_queue.empty(): break # Extra ensemble result manipulation: - combined_output_dict["ensemble_results"]["ensemble"] = format_ensemble_results( - combined_output_dict["ensemble_results"].get("ensemble", {}) - ) + if 'ensemble_results' in combined_output_dict: + combined_output_dict["ensemble_results"]["ensemble"] = format_ensemble_results( + combined_output_dict["ensemble_results"].get("ensemble", {}) + ) gc.collect() # Put the values back as object properties @@ -1614,6 +1741,7 @@ class Population: ], ) self.grid_ensemble_results["metadata"]["factored_in_probability_weighted_mass"] = True + self.grid_ensemble_results["metadata"]["_killed"] = self.grid_options["_killed"] # Add settings of the populations all_info = self.return_all_info( @@ -1632,6 +1760,11 @@ class Population: self.grid_options[x] = combined_output_dict[x] self.grid_options["_failed_systems_error_codes"] = list(set(combined_output_dict["_failed_systems_error_codes"])) + # if we were killed, save snapshot + if self.grid_options['save_snapshots'] and self.grid_options['_killed']: + self.custom_options['save_snapshot'] = True + + return def _evolve_system_mp(self, full_system_dict): """ @@ -1671,10 +1804,20 @@ class Population: self.custom_options["parameter_dict"] = full_system_dict self.grid_options["parse_function"](self, out) - def _signal_handler(self,signum,frame): + return + + def _parent_signal_handler(self,signal_data,signum,frame): """ - Signal handling function. + Signal handling function for the parent process. """ + + if 'queue' in signal_data: + q = signal_data['queue'] + else: + q = None + + # this function is called by both queues when they + # catch a signal sigstring = signal.Signals(signum).name if sigstring in self.signal_count: @@ -1682,24 +1825,61 @@ class Population: else: self.signal_count[sigstring] = 1 + if self.signal_count[sigstring] > 3: + print("caught > 3 times : exit") + self.exit(code=2) + # tell the user what has happened - print("Signal {} caught by process {} count {}".format(sigstring, - self.jobID(), - self.signal_count[sigstring])) + print("Parent signal {} caught (count {}) handler set in {} [ keys {} ]".format( + sigstring, + self.signal_count[sigstring], + signal_data['where'], + ','.join(signal_data.keys()) + )) + + # set status files + self.set_status("signal {sig}".format(sig=sigstring)) if signum == signal.SIGINT: - self.custom_options['stop_queue'] = True + # caught SIGINT: e.g. CTRL-C or Slurm's manager + # shutting us down + print("Parent set stop_queue to True") + self.grid_options['stop_queue'] = True self.custom_options['save_snapshot'] = True - if self.signal_count[sigstring] > 3: - print("caught > 3 times : exit") - sys.exit() + self.grid_options['_killed'] = True return - else: # what to do? return + + def _child_signal_handler(self,signal_data,signum,frame): + sigstring = signal.Signals(signum).name + + if sigstring in self.signal_count: + self.signal_count[sigstring] += 1 + else: + self.signal_count[sigstring] = 1 + if self.signal_count[sigstring] > 3: + print("caught > 3 times : exit") + self.exit(code=2) + + print("Child signal {} caught (count {}) handler set in {} [ keys {} ]".format( + sigstring, + self.signal_count[sigstring], + signal_data['where'], + ','.join(signal_data.keys()) + )) + + if signum == signal.SIGINT: + self.grid_options['stop_queue'] = True + self.grid_options['_killed'] = True + + # propagate signal to parent + os.kill(self.grid_options["_main_pid"],signum) + + def _process_run_population_grid(self, job_queue, result_queue, ID): """ Worker process that gets items from the job_queue and runs those systems. @@ -1712,6 +1892,14 @@ class Population: """ + # ignore SIGINT and SIGTERM : these are + # handled by our parent process (hence in + # _evolve_population_grid) + signal.signal(signal.SIGTERM, + functools.partial(self._child_signal_handler,{'where':'_process_run_population_grid'})) + signal.signal(signal.SIGINT, + functools.partial(self._child_signal_handler,{'where':'_process_run_population_grid'})) + # set start timer start_process_time = datetime.datetime.now() @@ -1719,12 +1907,6 @@ class Population: self.process_ID = ( ID ) print("Set process ID",self.process_ID) - # set handler to catch SIGINT and SIGTERM and exit gracefully - signal.signal(signal.SIGTERM, - functools.partial(self._signal_handler)) - signal.signal(signal.SIGINT, - functools.partial(self._signal_handler)) - stream_logger = self._get_stream_logger() if self.grid_options["verbosity"] >= _LOGGER_VERBOSITY_LEVEL: stream_logger.debug(f"Setting up processor: process-{self.process_ID}") @@ -1798,6 +1980,14 @@ class Population: # Go over the queue for system_number, system_dict in iter(job_queue.get, "STOP"): + if False: + print("Child: Job Queue system_number = {}, dict={}, n={} check {}".format( + system_number, + system_dict, + number_of_systems_run, + self.grid_options['stop_queue'])) + sys.stdout.flush() + # Combine that with the other settings full_system_dict = self.bse_options.copy() full_system_dict.update(system_dict) @@ -1972,10 +2162,23 @@ class Population: total_mass_system * full_system_dict.get("probability", 1) ) - if self.custom_options['stop_queue']: - print("Stop queue at system {n}".format(n=number_of_systems_run)) + if self.grid_options['stop_queue']: + print("Child: Stop queue at system {n}".format(n=number_of_systems_run)) break + + if self.grid_options['stop_queue']: + print("Child: FLUSH JOB QUEUE") + # any remaining jobs should be ignored + try: + while True: + job_queue.get_nowait() + except queue.Empty: + pass + print("Child: FLUSHED JOB QUEUE") + + print("Child : Q finished",flush=True) + # Set status to finishing self.set_status("finishing") @@ -2070,15 +2273,18 @@ class Population: "_total_mass_run": total_mass_run, "_total_probability_weighted_mass_run": total_probability_weighted_mass_run, "_zero_prob_stars_skipped": zero_prob_stars_skipped, + "_killed" : self.grid_options["_killed"], } end_process_time = datetime.datetime.now() + killed = self.was_killed() + # thread end message colour = "cyan on black" verbose_print( self._boxed( - "{colour}Process {ID} finished:\ngenerator started at {start}\ngenerator finished at {end}\ntotal: {timesecs}\nof which {binary_c_secs} with binary_c\nRan {nsystems} systems\nwith a total probability of {psystems:g}\n{failcolour}This thread had {nfail} failing systems{colour}\n{failcolour}with a total failed probability of {pfail}{colour}\n{zerocolour}Skipped a total of {nzero} zero-probability systems{zeroreset}\n".format( + "{colour}Process {ID} finished:\ngenerator started at {start}\ngenerator finished at {end}\ntotal: {timesecs}\nof which {binary_c_secs} with binary_c\nRan {nsystems} systems\nwith a total probability of {psystems:g}\n{failcolour}This thread had {nfail} failing systems{colour}\n{failcolour}with a total failed probability of {pfail}{colour}\n{zerocolour}Skipped a total of {nzero} zero-probability systems{zeroreset}\n{failednotice}".format( colour=self.ANSI_colours[colour], ID=ID, start=start_process_time.isoformat(), @@ -2104,6 +2310,9 @@ class Population: zeroreset=self.ANSI_colours[colour] if zero_prob_stars_skipped > 0 else "", + failednotice=">>> Process was killed <<<\n" + if killed + else "" ), colour=colour, ), @@ -2139,7 +2348,10 @@ class Population: f.close() # Set status to finished - self.set_status("finished") + if self.was_killed(): + self.set_status("killed") + else: + self.set_status("finished") verbose_print( "process {} queue put output_dict ".format(ID), @@ -2302,7 +2514,8 @@ class Population: verbose_print( self._boxed( - "Total starcount for this run is {starcount}".format( + "Dry run", + "Total starcount is {starcount}".format( starcount=self.grid_options["_total_starcount"] ), "Total probability is {probtot:g}".format( @@ -2313,7 +2526,7 @@ class Population: 0, ) if self.grid_options["exit_after_dry_run"]: - sys.exit() + self.exit(code=0) ####################### # Reset values and prepare the grid function @@ -2358,7 +2571,7 @@ class Population: self._dry_run_source_file() print( - "Total starcount for this run will be: {}".format( + "Total starcount will be: {}".format( self.grid_options["_total_starcount"] ) ) @@ -3498,7 +3711,7 @@ class Population: # ) # verbose_print("all done!", self.grid_options["verbosity"], 0) - # sys.exit() + # self.exit() # elif self.grid_options["condor_command"] == "evolve": # # TODO: write this function @@ -3563,7 +3776,7 @@ class Population: print( "Unable to determine file type from ensemble filename {} : it should be .json or .msgpack." ).format(output_file) - sys.exit() + self.exit(code=1) elif file_type == "JSON": # JSON output if compression == "gzip": @@ -5004,6 +5217,15 @@ eccentricity3=0 """ Set the slurm status corresponing to the self object, which should have slurm_jobid and slurm_jobarrayindex set. """ + # save slurm jobid to file + idfile = os.path.join(self.grid_options["slurm_dir"], + "jobid") + if not os.path.exists(idfile): + with open(idfile,"w") as fjobid: + fjobid.write("{jobid}\n".format(jobid=self.grid_options['slurm_jobid'])) + fjobid.close() + + # save slurm status file = self.slurm_status_file() if file: with open(file,'w') as f: @@ -5021,13 +5243,13 @@ eccentricity3=0 if jobarrayindex is None: jobarrayindex = self.grid_options['slurm_jobarrayindex'] - if jobid is None or jobid or jobarrayindex is None : + if jobid is None or jobarrayindex is None : return None path = pathlib.Path(self.slurm_status_file(jobid=jobid, jobarrayindex=jobarrayindex)) if path: - return path.read_text() + return path.read_text().strip() else: return None @@ -5064,7 +5286,7 @@ eccentricity3=0 parents=True) except: print("Tried to make the directory {dir} but it already exists. When you launch a set of binary_c jobs on Slurm, you need to set your slurm_dir to be a fresh directory with no contents.".format(dir=dir)) - sys.exit() + self.exit(code=1) # check that they have been made and exist: we need this # because on network mounts (NFS) there's often a delay between the mkdir @@ -5117,7 +5339,7 @@ eccentricity3=0 print("WARNING: you want to use {} MB of RAM : this is unlikely to be correct. If you believe it is, set slurm_warn_max_memory to something very large (it is currently {} MB)\n".format( self.grid_options['slurm_memory'], self.grid_options['slurm_warn_max_memory'])) - sys.exit() + self.exit(code=1) # set slurm_array slurm_array = self.grid_options['slurm_array'] or "1-{njobs}\%{njobs}".format(njobs=self.grid_options['slurm_njobs']) @@ -5127,13 +5349,12 @@ eccentricity3=0 # get job array index jobarrayindex = self.grid_options['slurm_jobarrayindex'] - if jobarrayindex is None: jobarrayindex = '$SLURM_ARRAY_TASK_ID' if self.grid_options['slurm_njobs'] == 0: print("binary_c-python Slurm : You must set grid_option slurm_njobs to be non-zero") - sys.exit() + self.exit(code=1) # build the grid command grid_command = [ @@ -5204,21 +5425,23 @@ eccentricity3=0 jobid=jobid, jobarrayindex=jobarrayindex, joinfile=joinfile), - "\n# run grid of stars", - "\n{grid_command} evolution_type=grid save_population_object={slurm_dir}/results/{jobid}.{jobarrayindex}.gz\n".format(slurm_dir=self.grid_options['slurm_dir'], - jobid=jobid, - jobarrayindex=jobarrayindex, - grid_command=grid_command), - "\n# set status to \"finished\"\necho \"finished\" > {slurm_dir}/status/{jobid}.{jobarrayindex}\n\n\n".format(slurm_dir=self.grid_options['slurm_dir'], - jobid=jobid, - jobarrayindex=jobarrayindex) + "\n# run grid of stars and, if this returns 0, set status to finished\n", + "{grid_command} evolution_type=grid slurm_jobid={jobid} slurm_jobarrayindex={jobarrayindex} save_population_object={slurm_dir}/results/{jobid}.{jobarrayindex}.gz && echo -n \"finished\" > {slurm_dir}/status/{jobid}.{jobarrayindex} && \\\n".format( + slurm_dir=self.grid_options['slurm_dir'], + jobid=jobid, + jobarrayindex=jobarrayindex, + grid_command=grid_command), ] if not self.grid_options['slurm_postpone_join']: - lines.append("\n# check if we can join\n{grid_command} slurm=2 evolution_type=join joinlist={joinfile}\n\n".format( - grid_command=grid_command, - joinfile=joinfile - )) + lines += [ + "echo && echo \"Checking if we can join...\" && echo && \\\n", + "{grid_command} slurm=2 evolution_type=join joinlist={joinfile} slurm_jobid={jobid} slurm_jobarrayindex={jobarrayindex}\n\n".format( + grid_command=grid_command, + joinfile=joinfile, + jobid=jobid, + jobarrayindex=jobarrayindex + )] # write to script, close it and make it executable by # all (so the slurm user can pick it up) @@ -5419,6 +5642,8 @@ eccentricity3=0 for key in ["_probtot"]: refpop.grid_options[key] += newpop.grid_options[key] + refpop.grid_options['_killed'] |= newpop.grid_options['_killed'] + return def merge_populations_from_file(self,refpop,filename): @@ -5467,7 +5692,7 @@ eccentricity3=0 def can_join(self,joinfiles,joiningfile,vb=False): # check the joinfiles to make sure they all exist # and their .saved equivalents also exist - + vb= True if os.path.exists(joiningfile): if vb: print("cannot join: joiningfile exists at {}".format(joiningfile)) @@ -5537,14 +5762,34 @@ eccentricity3=0 try: self.grid_ensemble_results["metadata"]['duration'] = self.time_elapsed() + except Exception as e: + print("Failure to calculate time elapsed:",e) + pass + try: self.grid_ensemble_results["metadata"]['CPU_time'] = self.CPU_time() except Exception as e: - print("Failure to calculate time elapsed and/or CPU time consumed") + print("Failure to calculate CPU time consumed:",e) pass - return + + def snapshot_filename(self): + """ + Automatically choose the snapshot filename. + """ + if self.grid_options['slurm'] > 0: + file = os.path.join(self.grid_options['slurm_dir'], + 'snapshots', + self.jobID() + '.gz') + else: + file = os.path.join(self.grid_options['tmp_dir'], + 'snapshot.gz') + return file + def load_snapshot(self,file): + """ + Load a snapshot from file and set it in the preloaded_population placeholder. + """ newpop = self.load_population_object(file) self.preloaded_population = newpop self.grid_options['start_at'] = newpop.grid_options['start_at'] @@ -5553,17 +5798,33 @@ eccentricity3=0 n=self.grid_options['start_at'])) return - def save_snapshot(self): + def save_snapshot(self,file=None): + """ + Save the population object to a snapshot file, automatically choosing the filename if none is given. + """ - if self.grid_options['slurm'] > 0: - file = os.path.join(self.grid_options['slurm_dir'], - 'snapshots', - self.jobID() + '.gz') - else: - file = os.path.join(self.grid_options['tmp_dir'], - 'snapshot.gz') + if file == None: + file = self.snapshot_filename() + try: + n = self.grid_options['_count'] + except: + n = '?' + print("Saving snapshot containing {} stars to {}".format(n,file)) self.save_population_object(object=self, filename=file) return + + def was_killed(self): + # determine if the process was killed + killed = self.grid_options['_killed'] + try: + killed = killed or self.grid_results['_killed'] + except: + pass + try: + killed = killed or self.grid_ensemble_results['metadata']['_killed'] + except: + pass + return killed diff --git a/binarycpython/utils/grid_options_defaults.py b/binarycpython/utils/grid_options_defaults.py index a564ba651..9d28ad9c0 100644 --- a/binarycpython/utils/grid_options_defaults.py +++ b/binarycpython/utils/grid_options_defaults.py @@ -44,7 +44,7 @@ grid_options_defaults_dict = { "_commandline_input": "", "log_runtime_systems": 0, # whether to log the runtime of the systems (1 file per thread. stored in the tmp_dir) "_actually_evolve_system": True, # Whether to actually evolve the systems of just act as if. for testing. used in _process_run_population_grid - "max_queue_size": 1000, # Maximum size of the system call queue. + "max_queue_size": 0, # Maximum size of the system call queue. Set to 0 for this to be calculated automatically "run_zero_probability_system": True, # Whether to run the zero probability systems "_zero_prob_stars_skipped": 0, "ensemble_factor_in_probability_weighted_mass": False, # Whether to multiply the ensemble results by 1/probability_weighted_mass @@ -150,6 +150,10 @@ grid_options_defaults_dict = { 'save_snapshots' : True, # if True, save snapshots on SIGINT 'restore_from_snapshot_file' : None, # file to restore from 'restore_from_snapshot_dir' : None, # dir to restore from + 'exit_code' : 0, # return code + 'stop_queue' : False, + '_killed' : False, + '_queue_done' : False, ## Monte carlo type evolution # TODO: make MC options ## Evolution from source file @@ -196,6 +200,7 @@ grid_options_defaults_dict = { "slurm_array": None, # override for --array, useful for rerunning jobs "slurm_extra_settings": {}, # Dictionary of extra settings for Slurm to put in its launch script. "slurm_sbatch": "sbatch", # sbatch command + "slurm_restart_dir" : None, # restart Slurm jobs from this directory ######################################## # Condor stuff ######################################## -- GitLab