diff --git a/binarycpython/utils/grid.py b/binarycpython/utils/grid.py index 5ba426fe010373f604a554b896b43169ec6e2ffe..303334b0617154664a1d7ffd1adf8d7d2826e448 100644 --- a/binarycpython/utils/grid.py +++ b/binarycpython/utils/grid.py @@ -832,10 +832,6 @@ class Population: 0, ) - - def profile_worker(self, num): - cProfile.runctx('import os;self._process_run_population_grid(num)', globals(), locals(), 'profile-%d.out' %num) - def _evolve_population_grid(self): """ Function to evolve the population with multiprocessing approach. @@ -962,10 +958,21 @@ class Population: # set start timer start_process_time = datetime.datetime.now() + # self.process_ID = ( ID # Store the ID as a object property again, lets see if that works. ) + # Set to starting up + with open( + os.path.join( + self.grid_options["tmp_dir"], "process_status", + "process_{}.txt".format(self.process_ID), + ), + "w", + ) as f: + f.write("STARTING") + # lets try out making stores for all the grids: self.grid_options["_store_memaddr"] = _binary_c_bindings.return_store_memaddr() @@ -1028,6 +1035,18 @@ class Population: while running: # round_number_mod = (localcounter+1)%self.grid_options["amt_cores"] + if localcounter == 0: + + # Set status to running + with open( + os.path.join( + self.grid_options["tmp_dir"], "process_status", + "process_{}.txt".format(self.process_ID), + ), + "w", + ) as f: + f.write("RUNNING") + try: # Get the system system = next(generator) @@ -1041,34 +1060,11 @@ class Population: full_system_dict = self.bse_options.copy() full_system_dict.update(system) - # self._print_info( - # i + 1, self.grid_options["_total_starcount"], full_system_dict - # ) - - # - verbose_print( - "Process {} is handling system {}".format(ID, localcounter), - self.grid_options["verbosity"], - 2, - ) - - # In some cases, the whole run crashes. To be able to figure out which system - # that was on, we log each current system to a file (each thread has one). - # Each new system overrides the previous - with open( - os.path.join( - self.grid_options["tmp_dir"], - "thread_{}_current_system.txt".format(self.process_ID), - ), - "w", - ) as f: - binary_cmdline_string = self._return_argline(full_system_dict) - f.write(binary_cmdline_string) # In the first system, explicitly check all the keys that are passed to see if # they match the keys known to binary_c. # Won't do that every system cause that is a bit of a waste of computing time. - if localcounter == 0: + if number_of_systems_run==0: # TODO: Put this someplace else and wrap in a functioncall for key in full_system_dict.keys(): if not key in self.available_keys: @@ -1088,7 +1084,31 @@ class Population: key ) raise ValueError(msg) - + + # self._print_info( + # i + 1, self.grid_options["_total_starcount"], full_system_dict + # ) + + # + verbose_print( + "Process {} is handling system {}".format(ID, localcounter), + self.grid_options["verbosity"], + 2, + ) + + # In some cases, the whole run crashes. To be able to figure out which system + # that was on, we log each current system to a file (each thread has one). + # Each new system overrides the previous + with open( + os.path.join( + self.grid_options["tmp_dir"], "current_system", + "process_{}.txt".format(self.process_ID), + ), + "w", + ) as f: + binary_cmdline_string = self._return_argline(full_system_dict) + f.write(binary_cmdline_string) + start_runtime_binary_c = time.time() # Evolve the system @@ -1105,8 +1125,8 @@ class Population: if self.grid_options["log_runtime_systems"] == 1: with open( os.path.join( - self.grid_options["tmp_dir"], - "thread_{}_runtime_systems.txt".format(self.process_ID), + self.grid_options["tmp_dir"], "runtime_systems", + "process_{}.txt".format(self.process_ID), ), "a+", ) as f: @@ -1114,7 +1134,7 @@ class Population: full_system_dict ) f.write( - "{} {}\n".format( + "{} {} '{}'\n".format(start_runtime_binary_c, end_runtime_binary_c - start_runtime_binary_c, binary_cmdline_string, ) @@ -1132,6 +1152,7 @@ class Population: except StopIteration: running = False + # Rotate the round number mod. The idea here is to prevent a thread from always getting the same sampled period of whatever. This just rotates everyone if (localcounter + 1) % self.grid_options["amt_cores"] == 0: round_number_mod += 1 @@ -1140,6 +1161,16 @@ class Population: # Has to be here because this one is used for the (localcounter+ID) % (self..) localcounter += 1 + # Set status to running + with open( + os.path.join( + self.grid_options["tmp_dir"], "process_status", + "process_{}.txt".format(self.process_ID), + ), + "w", + ) as f: + f.write("FINISHING") + # Handle ensemble output: is ensemble==1, then either directly write that data to a file, or combine everything into 1 file. ensemble_json = {} # Make sure it exists already if self.bse_options.get("ensemble", 0) == 1: @@ -1232,6 +1263,36 @@ class Population: 0, ) + # Write summary + summary_dict = { + 'population_id': self.grid_options["_population_id"], + 'process_id': self.process_ID, + 'start_process_time': start_process_time.timestamp(), + 'end_process_time': end_process_time.timestamp(), + 'total_time_calling_binary_c': total_time_calling_binary_c, + 'number_of_systems_run': number_of_systems_run, + 'probability_of_systems_run': probability_of_systems_run, + 'failed_systems': self.grid_options["_failed_count"], + 'failed_probability': self.grid_options["_failed_prob"], + 'failed_system_error_codes': self.grid_options[ + "_failed_systems_error_codes" + ], + } + with open( + os.path.join(self.grid_options["tmp_dir"], "process_summary", "process_{}.json".format(self.process_ID)), 'w' + ) as f: + f.write(json.dumps(summary_dict, indent=4)) + + # Set status to running + with open( + os.path.join( + self.grid_options["tmp_dir"], "process_status", + "process_{}.txt".format(self.process_ID), + ), + "w", + ) as f: + f.write("FINISHED") + return output_dict # Single system @@ -1292,6 +1353,14 @@ class Population: function """ + # Make sure the subdirs of the tmp dir exist: + os.makedirs(os.path.join(self.grid_options["tmp_dir"], "failed_systems"), exist_ok=True) + os.makedirs(os.path.join(self.grid_options["tmp_dir"], "current_system"), exist_ok=True) + os.makedirs(os.path.join(self.grid_options["tmp_dir"], "process_status"), exist_ok=True) + os.makedirs(os.path.join(self.grid_options["tmp_dir"], "process_summary"), exist_ok=True) + os.makedirs(os.path.join(self.grid_options["tmp_dir"], "runtime_systems"), exist_ok=True) + + # Check for parse function if not self.grid_options["parse_function"]: print("Warning: No parse function set. Make sure you intended to do this.") @@ -2900,9 +2969,9 @@ class Population: argstring = self._return_argline(system_dict) with open( os.path.join( - self.grid_options["tmp_dir"], - "failed_systems_{}_process_{}.txt".format( - self.grid_options["_population_id"], self.process_ID + self.grid_options["tmp_dir"], "failed_systems", + "process_{}.txt".format( + self.process_ID ), ), "a+",