From a8ef8e1f7db2787027b3c81672528eed749e8ee1 Mon Sep 17 00:00:00 2001 From: David Hendriks <davidhendriks93@gmail.com> Date: Mon, 28 Dec 2020 23:09:02 +0000 Subject: [PATCH] restructured the multiprocessing so that the processes just use a %==0 kind of method --- binarycpython/utils/grid.py | 203 +++++++++++++------ binarycpython/utils/grid_options_defaults.py | 3 +- 2 files changed, 148 insertions(+), 58 deletions(-) diff --git a/binarycpython/utils/grid.py b/binarycpython/utils/grid.py index ae16650c9..7ea1b0418 100644 --- a/binarycpython/utils/grid.py +++ b/binarycpython/utils/grid.py @@ -668,10 +668,89 @@ class Population: # Clean up code: remove files, unset values. self._cleanup() + def _process_run_population(self, ID): + """ + Function that loops over the whole generator, but only runs systems that fit to: if (localcounter+ID) % self.grid_options["amt_cores"] == 0 + + That way with 4 processes, process 1 runs sytem 0, 4, 8... process 2 runs system 1, 5, 9..., etc + + This function is called by _evolve_population_mp + """ + + # apparently we have to re-load this for every process, otherwise NameErrors arise (seems like a bug but I'm not sure) + spec = importlib.util.spec_from_file_location( + "binary_c_python_grid", + os.path.join(self.grid_options["gridcode_filename"]), + ) + grid_file = importlib.util.module_from_spec(spec) + spec.loader.exec_module(grid_file) + generator = grid_file.grid_code + + self.grid_options["system_generator"] = generator + + # Set up generator + generator = self.grid_options["system_generator"](self) + + # Set up local variables + running = True + localcounter = 0 + + print("Process {} started".format(ID)) + + # Go over the generator + while running: + try: + # Get the system + system = next(generator) + + # Check if the ID is the correct one for this process + if (localcounter+ID) % self.grid_options["amt_cores"] == 0: + # Combine that with the other settings + full_system_dict = self.bse_options.copy() + full_system_dict.update(system) + + # self._print_info( + # i + 1, self.grid_options["total_starcount"], full_system_dict + # ) + + # + print("Process {} is handling system {}".format(ID, localcounter)) + + # Evolve the system + self._evolve_system_mp(full_system_dict) + + except StopIteration: + print("Process {}: generator done".format(ID)) + running = False + + localcounter += 1 + + # Return a set of results and errors + output_dict = { + "results": self.grid_options["results"], + "total_errors": self.grid_options['failed_count'], + "total_probability_errors": self.grid_options['failed_prob'] + } + + return output_dict + def _evolve_population_mp(self): """ Function to evolve the population with multiprocessing approach. Using pathos to be able to include class-owned functions. + + This function will create a pool with <self.grid_options["amt_cores"]> processes, and perform an imap_unordered to run the different `threads`. + Before this was done by giving a generator as the iterable, and have the processes get a certain chunksize each round. + Later on this seemed to be a bad decision, because it is difficult to pass information back to the main controller, and because with each new batch of systems a new object instance was created. + + What I do now is I spawn these X amount of processes, and pass a range(self.grid_options["amt_cores"]) as iterable. + In that way, only once do they fetch a `job`, but that job is just a ID number. + With this ID number each thread/process loops over the whole generator, + but only runs the one <ID>'th system (if (localcounter+ID) % self.grid_options["amt_cores"]==0)' + + When they are finished, these jobs are instructed to return a set of information (the result dict, TODO: describe what more) + + These resultation dictionaries are then merged and stored as object properties again. """ # TODO: make further use of a queue to handle jobs or at least @@ -685,43 +764,68 @@ class Population: # TODO: make good example of how to deal with a result_dict # https://www.programcreek.com/python/example/58176/multiprocessing.Value # https://stackoverflow.com/questions/17377426/shared-variable-in-pythons-multiprocessing - manager = pathos_multiprocess.Manager() - self.grid_options["result_dict"] = manager.dict() - prob_counter = manager.Value('i', 0) - count_counter = manager.Value('i', 0) - counter_lock = manager.Lock() - error_exceeded_counter = manager.Value('i', 0) - - self.grid_options['error_exceeded'] = error_exceeded_counter - self.grid_options['failed_prob'] = prob_counter - self.grid_options['failed_count'] = count_counter - self.custom_options['counter_lock'] = counter_lock - - # Create pool - pool = Pool(processes=self.grid_options["amt_cores"]) - # Execute - # TODO: calculate the chunksize value based on: total starcount and cores used. - _ = list( - pool.imap_unordered( - self.evolve_system_mp, self._yield_system_mp(), chunksize=20 - ) - ) + pathos_multiprocess.freeze_support() # needed for Windows + + # Create the pool + pool = Pool(processes=self.grid_options["amt_cores"]) + + # start the processes by giving them an ID value + result = list(pool.imap_unordered(self._process_run_population, range(self.grid_options["amt_cores"]))) # Handle clean termination of the whole multiprocessing (making sure there are no zombie # processes (https://en.wikipedia.org/wiki/Zombie_process)) pool.close() pool.join() - print("Total shit: {}".format(self.grid_options['failed_prob'].value)) - print("Total amt: {}".format(self.grid_options['failed_count'].value)) + # Handle the results + print(result) + + combined_output_dict = {} + for output_dict in result: + combined_output_dict = merge_dicts(combined_output_dict, output_dict) + + print(combined_output_dict) + + + # manager = pathos_multiprocess.Manager() + # self.grid_options["result_dict"] = manager.dict() + # prob_counter = manager.Value('i', 0) + # count_counter = manager.Value('i', 0) + # counter_lock = manager.Lock() + # error_exceeded_counter = manager.Value('i', 0) + + # self.grid_options['error_exceeded'] = error_exceeded_counter + # self.grid_options['failed_prob'] = prob_counter + # self.grid_options['failed_count'] = count_counter + # self.custom_options['counter_lock'] = counter_lock + + # # Create pool + # pool = Pool(processes=self.grid_options["amt_cores"]) + + # # Execute + # # TODO: calculate the chunksize value based on: total starcount and cores used. + + # _ = list( + # pool.imap_unordered( + # self.evolve_system_mp, self._yield_system_mp(), chunksize=20 + # ) + # ) + + # # Handle clean termination of the whole multiprocessing (making sure there are no zombie + # # processes (https://en.wikipedia.org/wiki/Zombie_process)) + # pool.close() + # pool.join() + + # print("Total shit: {}".format(self.grid_options['failed_prob'].value)) + # print("Total amt: {}".format(self.grid_options['failed_count'].value)) - # if results: - # print(results) - # self.grid_options['error'] = 1 - # self.grid_options['failed_count'] += len(results) - # self.grid_options['failed_prob'] += sum(results) + # # if results: + # # print(results) + # # self.grid_options['error'] = 1 + # # self.grid_options['failed_count'] += len(results) + # # self.grid_options['failed_prob'] += sum(results) def _evolve_population_lin(self): """ @@ -748,16 +852,18 @@ class Population: self.grid_options["parse_function"](self, out) - def evolve_system_mp(self, full_system_dict): + def _evolve_system_mp(self, full_system_dict): """ Function that the multiprocessing evolution method calls to evolve a system + + this function is called by _process_run_population """ binary_cmdline_string = self._return_argline(full_system_dict) - if not self.grid_options['error_exceeded'].value==1: - print("NO MORE SYSTEMS") - + # Check whether the amount of errors has been exceeded + if not self.grid_options['error_exceeded']==1: + # Get out = _binary_c_bindings.run_system( argstring=binary_cmdline_string, custom_logging_func_memaddr=self.grid_options[ @@ -769,30 +875,14 @@ class Population: err = self._check_binary_c_error(out, full_system_dict) + # TODO: fix that this goes good. if err: return err if self.grid_options["parse_function"]: self.grid_options["parse_function"](self, out) else: + # Handle this better print("NO MORE SYSTEMS") - def _yield_system_mp(self): - """ - Function that the multiprocessing evolution method calls to yield systems - """ - - for i, system in enumerate(self.grid_options["system_generator"](self)): - full_system_dict = self.bse_options.copy() - full_system_dict.update(system) - - # binary_cmdline_string = self._return_argline(full_system_dict) - - self._print_info( - i + 1, self.grid_options["total_starcount"], full_system_dict - ) - yield full_system_dict - - print("generator done") - # Single system def evolve_single(self, clean_up_custom_logging_files=True): """ @@ -2261,15 +2351,14 @@ class Population: if binary_c_output.startswith("SYSTEM_ERROR"): print("FAILING SYSTEM FAILING SYSTEM") - with self.custom_options['counter_lock']: - self.grid_options['failed_prob'].set(self.grid_options['failed_prob'].value + system_dict['probability']) - self.grid_options['failed_count'].set(self.grid_options['failed_count'].value + 1) + self.grid_options['failed_prob'] += system_dict['probability'] + self.grid_options['failed_count'] += 1 - if self.grid_options['failed_count'].value > 20: - print("{}".format(self.grid_options['failed_count'].value)) - print("stopping logging to file") + if self.grid_options['failed_count'] > 20: + print("failed_count: {}".format(self.grid_options['failed_count'])) + print("stopping logging to file") - self.grid_options['error_exceeded'].set(1) + self.grid_options['error_exceeded'] = 1 # Set values diff --git a/binarycpython/utils/grid_options_defaults.py b/binarycpython/utils/grid_options_defaults.py index 8c6b81d21..d2e7f252f 100644 --- a/binarycpython/utils/grid_options_defaults.py +++ b/binarycpython/utils/grid_options_defaults.py @@ -77,10 +77,11 @@ grid_options_defaults_dict = { "probtot": 0, # total probability "weight": 1.0, # weighting for the probability "repeat": 1.0, # number of times to repeat each system (probability is adjusted to be 1/repeat) - "results_per_worker": {}, # dict which can store info per worker. meh. doesnt work properly + "results": {}, # dict to store the results. Every process fills this on its own and then it will be joined later "start_time_evolution": 0, # Start time of the grid "end_time_evolution": 0, # end time of the grid "error": 0, # error? + "error_exceeded": 0, # Flag whether the amt of errors have exceeded the limit "failed_count": 0, # amt of failed systems "failed_prob": 0, # Summed probability of failed systems "failed_systems_error_codes": [], -- GitLab