diff --git a/binarycpython/utils/population_extensions/evolution_functions.py b/binarycpython/utils/population_extensions/evolution_functions.py index 2ff956f075cd7626a219948737789dad0fd68b72..52e2a5a20f29f4ad91d2d32f393f92d7ce9cd54f 100644 --- a/binarycpython/utils/population_extensions/evolution_functions.py +++ b/binarycpython/utils/population_extensions/evolution_functions.py @@ -187,7 +187,8 @@ class evolution_functions: # Set up the manager object and queues manager = multiprocessing.Manager() job_queue = manager.Queue(maxsize=self.grid_options["max_queue_size"]) - result_queue = manager.Queue(maxsize=self.grid_options["num_processes"]) + result_queue = manager.Queue(maxsize=self.grid_options["max_queue_size"]) + final_result_queue = manager.Queue(maxsize=self.grid_options["num_processes"]) ############ # data to be sent to signal handlers @@ -203,7 +204,7 @@ class evolution_functions: processes.append( multiprocessing.Process( target=self._process_queue, - args=(job_queue, result_queue, ID), + args=(job_queue, result_queue, final_result_queue, ID), ) ) @@ -226,7 +227,10 @@ class evolution_functions: # Set up the system_queue in the parent process self.grid_options["_job_crashed"] = False self._system_queue_handler( - job_queue, processes, num_processes=self.grid_options["num_processes"] + job_queue, + result_queue, + processes, + num_processes=self.grid_options["num_processes"], ) ############ @@ -310,7 +314,7 @@ class evolution_functions: # combine the dicts that were output from our # subprocesses sentinel = object() - for output_dict in iter(result_queue.get, sentinel): + for output_dict in iter(final_result_queue.get, sentinel): if output_dict: # don't let Xinit be added try: @@ -322,7 +326,7 @@ class evolution_functions: combined_output_dict = merge_dicts( combined_output_dict, keys_to_floats(output_dict) ) - if result_queue.empty(): + if final_result_queue.empty(): break # Extra ensemble result manipulation: @@ -389,11 +393,9 @@ class evolution_functions: # Have some user-defined function do stuff with the data. if self.grid_options["parse_function"]: self.custom_options["parameter_dict"] = full_system_dict - self.grid_options["parse_function"](self, out) + system_result = self.grid_options["parse_function"](self, out) - # TODO: check if the parse function returns something and return it again - - return + return system_result def evolve_single(self, clean_up_custom_logging_files: bool = True) -> Any: """ @@ -452,7 +454,7 @@ class evolution_functions: ################### # queue and worker functions - def _system_queue_handler(self, job_queue, processes, num_processes): + def _system_queue_handler(self, job_queue, result_queue, processes, num_processes): """ Function that is responsible for keeping the queue filled. @@ -541,14 +543,19 @@ class evolution_functions: ###### # Handle monte-carlo threshold based on evolved mass elif self.grid_options["evolution_type"] == "monte_carlo": - # TODO: process this in a way that the systems that are running will still finish # Check based on mass threshold - monte_carlo_threshold_reached = ( - self._monte_carlo_sampling_check_mass_threshold(system_dict) - ) + self._monte_carlo_sampling_check_mass_threshold(system_dict) # Check based on custom threshold, which uses the result_queue # TODO: process the result_queue and pass this. HOw we do this (get only one, or get until the result queue is empty again) + self._monte_carlo_sampling_check_custom_threshold(result_queue) + + ###### + # Check if evolution threshold is reached. + # this can be set in the _monte_carlo_sampling_check_mass_threshold function or via _monte_carlo_sampling_check_custom_threshold + if self.grid_options["_monte_carlo_threshold_reached"]: + # TODO: Write a correct signalling of stopping the queue while + print("_monte_carlo_threshold_reached True. WHAT TO DO NOW??") ######## # Put system in the job queue @@ -565,6 +572,7 @@ class evolution_functions: 3, ) + # Signal queue is done self.grid_options["_queue_done"] = True ####### @@ -576,14 +584,15 @@ class evolution_functions: if self.grid_options["verbosity"] >= self._LOGGER_VERBOSITY_LEVEL: stream_logger.debug(f"Signalling processes to stop") # DEBUG - def _process_queue(self, job_queue, result_queue, ID): + def _process_queue(self, job_queue, result_queue, final_result_queue, ID): """ Worker process that gets items from the job_queue and runs those systems. It keeps track of several things like failed systems, total time spent on systems etc. Input: job_queue: Queue object containing system dicts - result_queue: Queue where the resulting analytic dictionaries will be put in + result_queue: Queue object where some results passed via the parse_function can be placed in to process in the monte-carlo sampling + final_result_queue: Queue where the resulting analytic dictionaries will be put in ID: id of the worker process """ @@ -802,8 +811,16 @@ class evolution_functions: zero_prob_stars_skipped += 1 if run_system: - # Evolve the system. TODO: if the system returns something (via the parse_function) then put it back into the result queue - self._evolve_system_mp(system_number, full_system_dict) + # Evolve the system + system_result = self._evolve_system_mp( + system_number, full_system_dict + ) + + # If we're doing monte-carlo sampling with a custom + if system_result is not None: + # TODO: fix that this is only handle if user-defined function is given + # result_queue.put(system_result) + raise NotImplementedError("Functionality not implemented") end_runtime_binary_c = time.time() @@ -984,7 +1001,7 @@ class evolution_functions: 1, ) - result_queue.put(output_dict) + final_result_queue.put(output_dict) if self.grid_options["verbosity"] >= self._LOGGER_VERBOSITY_LEVEL: stream_logger.debug(f"Process-{self.process_ID} is finished.") diff --git a/binarycpython/utils/population_extensions/grid_options_defaults.py b/binarycpython/utils/population_extensions/grid_options_defaults.py index 1611ff4f870f58fa5754f7048542e0269634972c..7670f6565e78c1739aacd67f15f1f43732893b7d 100644 --- a/binarycpython/utils/population_extensions/grid_options_defaults.py +++ b/binarycpython/utils/population_extensions/grid_options_defaults.py @@ -190,6 +190,8 @@ class grid_options_defaults: ## Monte carlo type evolution "monte_carlo_mass_threshold": -1, "_monte_carlo_current_total_mass_evolved": 0, + "_monte_carlo_threshold_reached": False, + "_monte_carlo_custom_threshold_function": None, ## Evolution from source file # TODO: make run from sourcefile options. ######################################## diff --git a/binarycpython/utils/population_extensions/monte_carlo_sampling.py b/binarycpython/utils/population_extensions/monte_carlo_sampling.py index 0237209faaaaf5821d9519930ac6f3befb749e41..3b03e6e6315d6b9285d45f831ca0b3fcbe9a1aad 100644 --- a/binarycpython/utils/population_extensions/monte_carlo_sampling.py +++ b/binarycpython/utils/population_extensions/monte_carlo_sampling.py @@ -76,8 +76,6 @@ class monte_carlo_sampling: Function to handle checking the total mass evolved and signal to stop """ - monte_carlo_threshold_reached = False - # Only if the monte_carlo_mass_threshold is positive (default = -1) if self.grid_options["monte_carlo_mass_threshold"] > 0: @@ -91,6 +89,12 @@ class monte_carlo_sampling: self.grid_options["_monte_carlo_current_total_mass_evolved"] > self.grid_options["monte_carlo_mass_threshold"] ): - monte_carlo_threshold_reached = True + self.grid_options["_monte_carlo_threshold_reached"] = True + + def _monte_carlo_sampling_check_custom_threshold(self, result_queue): + """ + Function to handle checking the content of the result queue and look for a + TODO: put in the user-defined custom threshold function + """ - return monte_carlo_threshold_reached + raise NotImplementedError("Functionality in not implemented")