From 46df2f0911a4ff0c39474397a269e01454aa7cf7 Mon Sep 17 00:00:00 2001 From: David Hendriks <davidhendriks93@gmail.com> Date: Sun, 10 Jan 2021 23:37:19 +0000 Subject: [PATCH] removed evolve_population_lin, restructured some stuff --- binarycpython/utils/grid.py | 229 +++++++++---------- binarycpython/utils/grid_options_defaults.py | 14 +- 2 files changed, 109 insertions(+), 134 deletions(-) diff --git a/binarycpython/utils/grid.py b/binarycpython/utils/grid.py index e21aed4e9..35ab71bd3 100644 --- a/binarycpython/utils/grid.py +++ b/binarycpython/utils/grid.py @@ -677,7 +677,8 @@ class Population: grid_options['slurm']: integer boolean whether to use a slurm_grid evolution grid_options['condor']: integer boolean whether to use a condor_grid evolution - If neither of the above is set, we continue without using HPC routines (that doesn't mean this cannot be run on a server with many cores) + If neither of the above is set, we continue without using HPC routines + (that doesn't mean this cannot be run on a server with many cores) """ # Check which type: @@ -705,6 +706,7 @@ class Population: - TODO: include options for different ways of generating a population here. (i.e. MC or source file) """ + # Reset some settings: population_id, results, ensemble_results etc self.grid_options["_population_id"] = uuid.uuid4().hex ## @@ -719,10 +721,10 @@ class Population: self.grid_options["evolution_type"] in self.grid_options["_evolution_type_options"] ): - if self.grid_options["evolution_type"] == "mp": - self._evolve_population_mp() - elif self.grid_options["evolution_type"] == "linear": - self._evolve_population_lin() + if self.grid_options["evolution_type"] == "grid": + self._evolve_population_grid() + # elif self.grid_options["evolution_type"] == "mc": + # # TODO: add MC option else: print( "Warning. you chose a wrong option for the grid evolution types.\ @@ -779,14 +781,110 @@ class Population: # Clean up code: remove files, unset values. self._cleanup() - def _process_run_population(self, ID): + def _evolve_population_grid(self): + """ + Function to evolve the population with multiprocessing approach. + Using pathos to be able to include class-owned functions. + + This function will create a pool with <self.grid_options["amt_cores"]> processes, and + perform an imap_unordered to run the different `threads`. + Before this was done by giving a generator as the iterable, and have the processes get a + certain chunksize each round. + Later on this seemed to be a bad decision, because it is difficult to pass information + back to the main controller, and because with each new batch of systems a new object instance was created. + + What I do now is I spawn these X amount of processes, and pass a range(self.grid_options["amt_cores"]) as iterable. + In that way, only once do they fetch a `job`, but that job is just a ID number. + With this ID number each thread/process loops over the whole generator, + but only runs the one <ID>'th system (if (localcounter+ID) % self.grid_options["amt_cores"]==0)' + + When they are finished, these jobs are instructed to return a set of information + (the result dict, TODO: describe what more) + + These resultation dictionaries are then merged and stored as object properties again. + """ + + # TODO: make further use of a queue to handle jobs or at least + # get information on the process ids etc + # https://stackoverflow.com/questions/10190981/get-a-unique-id-for-worker-in-python-multiprocessing-pool + # https://stackoverflow.com/questions/8640367/python-manager-dict-in-multiprocessing/9536888 + # for muting values through dicts + # https://python-forum.io/Thread-Dynamic-updating-of-a-nested-dictionary-in-multiprocessing-pool + # https://stackoverflow.com/questions/28740955/working-with-pathos-multiprocessing-tool-in-python-and + + # TODO: make good example of how to deal with a result_dict + # https://www.programcreek.com/python/example/58176/multiprocessing.Value + # https://stackoverflow.com/questions/17377426/shared-variable-in-pythons-multiprocessing + + # Create the pool + pool = Pool(processes=self.grid_options["amt_cores"]) + + # start the processes by giving them an ID value + result = list( + pool.imap_unordered( + self._process_run_population_grid, range(self.grid_options["amt_cores"]) + ) + ) + + # Handle clean termination of the whole multiprocessing (making sure there are no zombie + # processes (https://en.wikipedia.org/wiki/Zombie_process)) + pool.close() + pool.join() + + # Handle the results by merging all the dictionaries. How that merging happens exactly is + # described in the merge_dicts description. + combined_output_dict = {} + for output_dict in result: + combined_output_dict = merge_dicts(combined_output_dict, output_dict) + + print(combined_output_dict) + + # Put the values back as object properties + self.grid_options["results"] = combined_output_dict["results"] + self.grid_options["_failed_count"] = combined_output_dict["_failed_count"] + self.grid_options["_failed_prob"] = combined_output_dict["_failed_prob"] + self.grid_options["_failed_systems_error_codes"] = list( + set(combined_output_dict["_failed_systems_error_codes"]) + ) + self.grid_options["_errors_exceeded"] = combined_output_dict["_errors_exceeded"] + self.grid_options["_errors_found"] = combined_output_dict["_errors_found"] + self.grid_options["_probtot"] = combined_output_dict["_probtot"] + self.grid_options["_count"] = combined_output_dict["_count"] + + def _evolve_system_mp(self, full_system_dict): + """ + Function that the multiprocessing evolution method calls to evolve a system + + this function is called by _process_run_population_grid + """ + + binary_cmdline_string = self._return_argline(full_system_dict) + + # Get results binary_c + out = _binary_c_bindings.run_system( + argstring=binary_cmdline_string, + custom_logging_func_memaddr=self.grid_options[ + "custom_logging_func_memaddr" + ], + store_memaddr=self.grid_options["_store_memaddr"], + population=1, # since this system is part of a population, we set this flag to prevent the store from being freed + ) + + # Check for errors + _ = self._check_binary_c_error(out, full_system_dict) + + # Have some user-defined function do stuff with the data. + if self.grid_options["parse_function"]: + self.grid_options["parse_function"](self, out) + + def _process_run_population_grid(self, ID): """ Function that loops over the whole generator, but only runs systems that fit to: if (localcounter+ID) % self.grid_options["amt_cores"] == 0 - That way with 4 processes, process 1 runs sytem 0, 4, 8... process 2 runs system 1, 5, 9..., etc + That way with e.g. 4 processes, process 1 runs sytem 0, 4, 8... process 2 runs system 1, 5, 9..., etc - This function is called by _evolve_population_mp + This function is called by _evolve_population_grid """ self.process_ID = ( @@ -877,121 +975,6 @@ class Population: return output_dict - def _evolve_population_mp(self): - """ - Function to evolve the population with multiprocessing approach. - Using pathos to be able to include class-owned functions. - - This function will create a pool with <self.grid_options["amt_cores"]> processes, and perform an imap_unordered to run the different `threads`. - Before this was done by giving a generator as the iterable, and have the processes get a certain chunksize each round. - Later on this seemed to be a bad decision, because it is difficult to pass information back to the main controller, and because with each new batch of systems a new object instance was created. - - What I do now is I spawn these X amount of processes, and pass a range(self.grid_options["amt_cores"]) as iterable. - In that way, only once do they fetch a `job`, but that job is just a ID number. - With this ID number each thread/process loops over the whole generator, - but only runs the one <ID>'th system (if (localcounter+ID) % self.grid_options["amt_cores"]==0)' - - When they are finished, these jobs are instructed to return a set of information (the result dict, TODO: describe what more) - - These resultation dictionaries are then merged and stored as object properties again. - """ - - # TODO: make further use of a queue to handle jobs or at least - # get information on the process ids etc - # https://stackoverflow.com/questions/10190981/get-a-unique-id-for-worker-in-python-multiprocessing-pool - # https://stackoverflow.com/questions/8640367/python-manager-dict-in-multiprocessing/9536888 - # for muting values through dicts - # https://python-forum.io/Thread-Dynamic-updating-of-a-nested-dictionary-in-multiprocessing-pool - # https://stackoverflow.com/questions/28740955/working-with-pathos-multiprocessing-tool-in-python-and - - # TODO: make good example of how to deal with a result_dict - # https://www.programcreek.com/python/example/58176/multiprocessing.Value - # https://stackoverflow.com/questions/17377426/shared-variable-in-pythons-multiprocessing - - # Create the pool - pool = Pool(processes=self.grid_options["amt_cores"]) - - # start the processes by giving them an ID value - result = list( - pool.imap_unordered( - self._process_run_population, range(self.grid_options["amt_cores"]) - ) - ) - - # Handle clean termination of the whole multiprocessing (making sure there are no zombie - # processes (https://en.wikipedia.org/wiki/Zombie_process)) - pool.close() - pool.join() - - # Handle the results by merging all the dictionaries. How that merging happens exactly is described in the merge_dicts description. - combined_output_dict = {} - for output_dict in result: - combined_output_dict = merge_dicts(combined_output_dict, output_dict) - - print(combined_output_dict) - - # Put the values back as object properties - self.grid_options["results"] = combined_output_dict["results"] - self.grid_options["_failed_count"] = combined_output_dict["_failed_count"] - self.grid_options["_failed_prob"] = combined_output_dict["_failed_prob"] - self.grid_options["_failed_systems_error_codes"] = list( - set(combined_output_dict["_failed_systems_error_codes"]) - ) - self.grid_options["_errors_exceeded"] = combined_output_dict["_errors_exceeded"] - self.grid_options["_errors_found"] = combined_output_dict["_errors_found"] - self.grid_options["_probtot"] = combined_output_dict["_probtot"] - self.grid_options["_count"] = combined_output_dict["_count"] - - def _evolve_population_lin(self): - """ - Function to evolve the population linearly (i.e. 1 core, no multiprocessing methods) - """ - - for i, system in enumerate(self.grid_options["_system_generator"](self)): - full_system_dict = self.bse_options.copy() - full_system_dict.update(system) - binary_cmdline_string = self._return_argline(full_system_dict) - out = _binary_c_bindings.run_system( - argstring=binary_cmdline_string, - custom_logging_func_memaddr=self.grid_options[ - "custom_logging_func_memaddr" - ], - store_memaddr=self.grid_options["_store_memaddr"], - population=1, - ) - self._print_info( - i + 1, self.grid_options["_total_starcount"], full_system_dict - ) - - if self.grid_options["parse_function"]: - self.grid_options["parse_function"](self, out) - - def _evolve_system_mp(self, full_system_dict): - """ - Function that the multiprocessing evolution method calls to evolve a system - - this function is called by _process_run_population - """ - - binary_cmdline_string = self._return_argline(full_system_dict) - - # Get - out = _binary_c_bindings.run_system( - argstring=binary_cmdline_string, - custom_logging_func_memaddr=self.grid_options[ - "custom_logging_func_memaddr" - ], - store_memaddr=self.grid_options["_store_memaddr"], - population=1, - ) - - # Check for errors - _ = self._check_binary_c_error(out, full_system_dict) - - # Have some user-defined function do stuff with the data. - if self.grid_options["parse_function"]: - self.grid_options["parse_function"](self, out) - # Single system def evolve_single(self, clean_up_custom_logging_files: bool = True) -> Any: """ diff --git a/binarycpython/utils/grid_options_defaults.py b/binarycpython/utils/grid_options_defaults.py index 5f156624a..36a1b5ded 100644 --- a/binarycpython/utils/grid_options_defaults.py +++ b/binarycpython/utils/grid_options_defaults.py @@ -72,18 +72,12 @@ grid_options_defaults_dict = { # Population evolution ########################## ## General - "evolution_type": "mp", # Flag for type of population evolution + "evolution_type": "grid", # Flag for type of population evolution "_evolution_type_options": [ - "mp", - "linear", - ], # available choices for type of population evolution + "grid", + ], # available choices for type of population evolution. # TODO: fill later with monte carlo, sourcefile "_system_generator": None, # value that holds the function that generates the system # (result of building the grid script) - "population_type": "grid", # - "_population_type_options": [ - "grid", - "source_file", - ], # Available choices for type of population generation. Unused for now. # TODO: fill later with monte carlo etc "source_file_filename": None, # filename for the source "_count": 0, # count of systems "_total_starcount": 0, # Total count of systems in this generator @@ -464,8 +458,6 @@ grid_options_descriptions = { "parse_function": "Function that the user can provide to handle the output the binary_c. This function has to take the arguments (self, output). Its best not to return anything in this function, and just store stuff in the grid_options['results'] dictionary, or just output results to a file", "condor": "Int flag whether to use a condor type population evolution.", # TODO: describe this in more detail "slurm": "Int flag whether to use a slurm type population evolution.", # TODO: describe this in more detail - "population_type": "variable storing what kind of population type should be evolved. See population_type_options for the options.", # TODO: make this functionality work properly - "_population_type_options": "List storing the population_type options.", "weight": "Weight factor for each system. The calculated probability is mulitplied by this. If the user wants each system to be repeated several times, then this variable should not be changed, rather change the _repeat variable instead, as that handles the reduction in probability per system. This is useful for systems that have a process with some random element in it.", # TODO: add more info here, regarding the evolution splitting. "repeat": "Factor of how many times a system should be repeated. Consider the evolution splitting binary_c argument for supernovae kick repeating.", # TODO: make sure this is used. "evolution_type": "Variable containing the type of evolution used of the grid. Multiprocessing or linear processing", -- GitLab