diff --git a/binarycpython/utils/grid.py b/binarycpython/utils/grid.py index 2618e57da5f27a1702bb40ac5b3d7ff434b40782..db2be2d2606cce73fd5545607a226b3b0191bcf4 100644 --- a/binarycpython/utils/grid.py +++ b/binarycpython/utils/grid.py @@ -972,7 +972,12 @@ class Population( return # Execute population evolution subroutines - self._evolve_population() + result = self._evolve_population() + + if result is False: + print("Error detected in _evolve_population() : stopping here") + + sys.exit() # make analytics information analytics_dict = self.make_analytics_dict() @@ -1021,6 +1026,8 @@ class Population( Tasks: - TODO: include options for different ways of generating a population here. (i.e. MC or source file) + + Returns True if the grid runs, False on error. """ ############################################################ @@ -1037,7 +1044,8 @@ class Population( in self.grid_options["_evolution_type_options"] ): if self.grid_options["evolution_type"] in ["grid", "custom_generator"]: - self._evolve_population_grid() + if self._evolve_population_grid() is False: + return False # elif self.grid_options["evolution_type"] == "mc": # # TODO: add MC option @@ -1108,9 +1116,9 @@ class Population( 0, ) - return + return True - def _system_queue_filler(self, job_queue, num_processes): + def _system_queue_filler(self, job_queue, processes, num_processes): """ Function that is responsible for keeping the queue filled. @@ -1143,8 +1151,11 @@ class Population( if self.grid_options["start_at"] > 0: print("Starting at model {} ".format(self.grid_options["start_at"])) + prev_process_check = None + # Continuously fill the queue while we are allowed to for system_number, system_dict in enumerate(generator): + # on stop, quit this loop if self.grid_options["stop_queue"]: break @@ -1182,6 +1193,15 @@ class Population( if self.grid_options["stop_queue"]: break else: + + # check children are running every 1s + _now = time.time() + if prev_process_check is None or _now - prev_process_check > 1: + prev_process_check = _now + if self._all_children_running(processes) is False: + self.grid_options['_job_crashed'] = True + return + try: job_queue.put((system_number, system_dict), block=True) except Exception as e: @@ -1226,6 +1246,8 @@ class Population( We then add any previous population We read out the information in the result queue and store them in the grid object + + Returns True if things go well, False on error. """ # Set process name @@ -1274,17 +1296,38 @@ class Population( ) # Set up the system_queue in the parent process + self.grid_options['_job_crashed'] = False self._system_queue_filler( - job_queue, num_processes=self.grid_options["num_processes"] + job_queue, processes, num_processes=self.grid_options["num_processes"] ) - # Join the processes - print("Do join of subprocesses ...") - for p in processes: - p.join() - print("Joined subprocesses.") + if self.grid_options['_job_crashed'] is True: + # job crashed while in system_queue_filler : kill children + # and return False + print("A child process crashed or was killed : I will not join incomplete data") + self._kill_child_processes(processes) + return False + + else: + # Join the processes after the queue filler has finished + print("Do join of subprocesses ...") + + while self.grid_options['_job_crashed'] is False and processes: + if self._all_children_running(processes) is False: + # job crashed: stop children and return False + self.grid_options['_job_crashed'] = True + else: + # join first process: it should finish work soon + p = processes.pop(0) + p.join() + + if self.grid_options['_job_crashed'] is True: + print("A child process crashed or was killed : I will not join incomplete data") + self._kill_child_processes(processes) + return False + else: + print("Joined all subprocesses.") - # todo: error codes # Handle the results by merging all the dictionaries. How that merging happens exactly is # described in the merge_dicts description. @@ -1373,6 +1416,9 @@ class Population( if self.grid_options["save_snapshots"] and self.grid_options["_killed"]: self.custom_options["save_snapshot"] = True + # return True because all seems well + return True + def _evolve_system_mp(self, system_number, full_system_dict): """ Function that the multiprocessing evolution method calls to evolve a system @@ -2460,30 +2506,7 @@ class Population( ) # log failing args? - if self.grid_options['log_failed_systems'] == True and \ - self.grid_options['log_failed_systems_dir'] != None: - path = os.path.join( - self.grid_options['log_failed_systems_dir'] - ) - os.makedirs(path,exist_ok=True) - failed_systems_file = os.path.join( - self.grid_options["log_failed_systems_dir"], - "process_{}.txt".format(self.jobID()), - ) - if self.dir_ok(path): - with self.open( - failed_systems_file, - "a", # append - encoding="utf-8" - ) as f: - now = datetime.datetime.now() - binary_c_cmdline_string = \ - f"system {system_number} at " + \ - now.strftime("%d/%m/%Y %H:%M:%S\n") + \ - self._return_argline(system_dict) + "\n" - f.write(binary_c_cmdline_string) - f.close() - + self._log_failure(system_dict=system_dict) # Check if we have exceeded the number of errors print(f"Check failed count {self.grid_options['_failed_count']} vs max {self.grid_options['failed_systems_threshold']}") @@ -2526,3 +2549,61 @@ class Population( 'metadata' : {}, 'ensemble' : {} } + + def _all_children_running(self,processes): + """ + Function to test if all child processes are running. + """ + for p in processes: + if p.is_alive() is False and \ + p.exitcode != 0: + print("Warning: process",p,"is no longer alive and hasn't returned good data.") + self._log_failure(process=p,exitcode=p.exitcode) + return False + return True + + def _kill_child_processes(self,processes): + """ + Function to kill all child processes. + """ + for p in processes: + if p.is_alive(): + p.terminate() + for p in processes: + if p.is_alive(): + p.kill() + + def _log_failure(self, + system_dict=None, + process=None, + exitcode=None): + """ + Log failing or crashed system to file in log_failed_systems_dir + """ + if self.grid_options['log_failed_systems'] == True and \ + self.grid_options['log_failed_systems_dir'] != None: + path = os.path.join( + self.grid_options['log_failed_systems_dir'] + ) + os.makedirs(path,exist_ok=True) + if self.dir_ok(path): + failed_systems_file = os.path.join( + self.grid_options["log_failed_systems_dir"], + "process_{}.txt".format(self.jobID()), + ) + with self.open( + failed_systems_file, + "a", # append + encoding="utf-8" + ) as f: + now = datetime.datetime.now() + now = now.strftime("%d/%m/%Y %H:%M:%S\n") + if system_dict: + binary_c_cmdline_string = \ + f"system {system_number} at {now} " + \ + self._return_argline(system_dict) + "\n" + f.write(binary_c_cmdline_string) + if process: + print(f"logged crashed process to {failed_systems_file}") + f.write(f"Process {process} crashed at {now} with exit code {exitcode}.") + return