Skip to content
Snippets Groups Projects
Commit bda9386d authored by David Hendriks's avatar David Hendriks
Browse files

working on the montecarlo custom sampling. needs much more work though

parent 7ab9ae1f
No related branches found
No related tags found
No related merge requests found
...@@ -187,7 +187,8 @@ class evolution_functions: ...@@ -187,7 +187,8 @@ class evolution_functions:
# Set up the manager object and queues # Set up the manager object and queues
manager = multiprocessing.Manager() manager = multiprocessing.Manager()
job_queue = manager.Queue(maxsize=self.grid_options["max_queue_size"]) job_queue = manager.Queue(maxsize=self.grid_options["max_queue_size"])
result_queue = manager.Queue(maxsize=self.grid_options["num_processes"]) result_queue = manager.Queue(maxsize=self.grid_options["max_queue_size"])
final_result_queue = manager.Queue(maxsize=self.grid_options["num_processes"])
############ ############
# data to be sent to signal handlers # data to be sent to signal handlers
...@@ -203,7 +204,7 @@ class evolution_functions: ...@@ -203,7 +204,7 @@ class evolution_functions:
processes.append( processes.append(
multiprocessing.Process( multiprocessing.Process(
target=self._process_queue, target=self._process_queue,
args=(job_queue, result_queue, ID), args=(job_queue, result_queue, final_result_queue, ID),
) )
) )
...@@ -226,7 +227,10 @@ class evolution_functions: ...@@ -226,7 +227,10 @@ class evolution_functions:
# Set up the system_queue in the parent process # Set up the system_queue in the parent process
self.grid_options["_job_crashed"] = False self.grid_options["_job_crashed"] = False
self._system_queue_handler( self._system_queue_handler(
job_queue, processes, num_processes=self.grid_options["num_processes"] job_queue,
result_queue,
processes,
num_processes=self.grid_options["num_processes"],
) )
############ ############
...@@ -310,7 +314,7 @@ class evolution_functions: ...@@ -310,7 +314,7 @@ class evolution_functions:
# combine the dicts that were output from our # combine the dicts that were output from our
# subprocesses # subprocesses
sentinel = object() sentinel = object()
for output_dict in iter(result_queue.get, sentinel): for output_dict in iter(final_result_queue.get, sentinel):
if output_dict: if output_dict:
# don't let Xinit be added # don't let Xinit be added
try: try:
...@@ -322,7 +326,7 @@ class evolution_functions: ...@@ -322,7 +326,7 @@ class evolution_functions:
combined_output_dict = merge_dicts( combined_output_dict = merge_dicts(
combined_output_dict, keys_to_floats(output_dict) combined_output_dict, keys_to_floats(output_dict)
) )
if result_queue.empty(): if final_result_queue.empty():
break break
# Extra ensemble result manipulation: # Extra ensemble result manipulation:
...@@ -389,11 +393,9 @@ class evolution_functions: ...@@ -389,11 +393,9 @@ class evolution_functions:
# Have some user-defined function do stuff with the data. # Have some user-defined function do stuff with the data.
if self.grid_options["parse_function"]: if self.grid_options["parse_function"]:
self.custom_options["parameter_dict"] = full_system_dict self.custom_options["parameter_dict"] = full_system_dict
self.grid_options["parse_function"](self, out) system_result = self.grid_options["parse_function"](self, out)
# TODO: check if the parse function returns something and return it again return system_result
return
def evolve_single(self, clean_up_custom_logging_files: bool = True) -> Any: def evolve_single(self, clean_up_custom_logging_files: bool = True) -> Any:
""" """
...@@ -452,7 +454,7 @@ class evolution_functions: ...@@ -452,7 +454,7 @@ class evolution_functions:
################### ###################
# queue and worker functions # queue and worker functions
def _system_queue_handler(self, job_queue, processes, num_processes): def _system_queue_handler(self, job_queue, result_queue, processes, num_processes):
""" """
Function that is responsible for keeping the queue filled. Function that is responsible for keeping the queue filled.
...@@ -541,14 +543,19 @@ class evolution_functions: ...@@ -541,14 +543,19 @@ class evolution_functions:
###### ######
# Handle monte-carlo threshold based on evolved mass # Handle monte-carlo threshold based on evolved mass
elif self.grid_options["evolution_type"] == "monte_carlo": elif self.grid_options["evolution_type"] == "monte_carlo":
# TODO: process this in a way that the systems that are running will still finish
# Check based on mass threshold # Check based on mass threshold
monte_carlo_threshold_reached = ( self._monte_carlo_sampling_check_mass_threshold(system_dict)
self._monte_carlo_sampling_check_mass_threshold(system_dict)
)
# Check based on custom threshold, which uses the result_queue # Check based on custom threshold, which uses the result_queue
# TODO: process the result_queue and pass this. HOw we do this (get only one, or get until the result queue is empty again) # TODO: process the result_queue and pass this. HOw we do this (get only one, or get until the result queue is empty again)
self._monte_carlo_sampling_check_custom_threshold(result_queue)
######
# Check if evolution threshold is reached.
# this can be set in the _monte_carlo_sampling_check_mass_threshold function or via _monte_carlo_sampling_check_custom_threshold
if self.grid_options["_monte_carlo_threshold_reached"]:
# TODO: Write a correct signalling of stopping the queue while
print("_monte_carlo_threshold_reached True. WHAT TO DO NOW??")
######## ########
# Put system in the job queue # Put system in the job queue
...@@ -565,6 +572,7 @@ class evolution_functions: ...@@ -565,6 +572,7 @@ class evolution_functions:
3, 3,
) )
# Signal queue is done
self.grid_options["_queue_done"] = True self.grid_options["_queue_done"] = True
####### #######
...@@ -576,14 +584,15 @@ class evolution_functions: ...@@ -576,14 +584,15 @@ class evolution_functions:
if self.grid_options["verbosity"] >= self._LOGGER_VERBOSITY_LEVEL: if self.grid_options["verbosity"] >= self._LOGGER_VERBOSITY_LEVEL:
stream_logger.debug(f"Signalling processes to stop") # DEBUG stream_logger.debug(f"Signalling processes to stop") # DEBUG
def _process_queue(self, job_queue, result_queue, ID): def _process_queue(self, job_queue, result_queue, final_result_queue, ID):
""" """
Worker process that gets items from the job_queue and runs those systems. Worker process that gets items from the job_queue and runs those systems.
It keeps track of several things like failed systems, total time spent on systems etc. It keeps track of several things like failed systems, total time spent on systems etc.
Input: Input:
job_queue: Queue object containing system dicts job_queue: Queue object containing system dicts
result_queue: Queue where the resulting analytic dictionaries will be put in result_queue: Queue object where some results passed via the parse_function can be placed in to process in the monte-carlo sampling
final_result_queue: Queue where the resulting analytic dictionaries will be put in
ID: id of the worker process ID: id of the worker process
""" """
...@@ -802,8 +811,16 @@ class evolution_functions: ...@@ -802,8 +811,16 @@ class evolution_functions:
zero_prob_stars_skipped += 1 zero_prob_stars_skipped += 1
if run_system: if run_system:
# Evolve the system. TODO: if the system returns something (via the parse_function) then put it back into the result queue # Evolve the system
self._evolve_system_mp(system_number, full_system_dict) system_result = self._evolve_system_mp(
system_number, full_system_dict
)
# If we're doing monte-carlo sampling with a custom
if system_result is not None:
# TODO: fix that this is only handle if user-defined function is given
# result_queue.put(system_result)
raise NotImplementedError("Functionality not implemented")
end_runtime_binary_c = time.time() end_runtime_binary_c = time.time()
...@@ -984,7 +1001,7 @@ class evolution_functions: ...@@ -984,7 +1001,7 @@ class evolution_functions:
1, 1,
) )
result_queue.put(output_dict) final_result_queue.put(output_dict)
if self.grid_options["verbosity"] >= self._LOGGER_VERBOSITY_LEVEL: if self.grid_options["verbosity"] >= self._LOGGER_VERBOSITY_LEVEL:
stream_logger.debug(f"Process-{self.process_ID} is finished.") stream_logger.debug(f"Process-{self.process_ID} is finished.")
......
...@@ -190,6 +190,8 @@ class grid_options_defaults: ...@@ -190,6 +190,8 @@ class grid_options_defaults:
## Monte carlo type evolution ## Monte carlo type evolution
"monte_carlo_mass_threshold": -1, "monte_carlo_mass_threshold": -1,
"_monte_carlo_current_total_mass_evolved": 0, "_monte_carlo_current_total_mass_evolved": 0,
"_monte_carlo_threshold_reached": False,
"_monte_carlo_custom_threshold_function": None,
## Evolution from source file ## Evolution from source file
# TODO: make run from sourcefile options. # TODO: make run from sourcefile options.
######################################## ########################################
......
...@@ -76,8 +76,6 @@ class monte_carlo_sampling: ...@@ -76,8 +76,6 @@ class monte_carlo_sampling:
Function to handle checking the total mass evolved and signal to stop Function to handle checking the total mass evolved and signal to stop
""" """
monte_carlo_threshold_reached = False
# Only if the monte_carlo_mass_threshold is positive (default = -1) # Only if the monte_carlo_mass_threshold is positive (default = -1)
if self.grid_options["monte_carlo_mass_threshold"] > 0: if self.grid_options["monte_carlo_mass_threshold"] > 0:
...@@ -91,6 +89,12 @@ class monte_carlo_sampling: ...@@ -91,6 +89,12 @@ class monte_carlo_sampling:
self.grid_options["_monte_carlo_current_total_mass_evolved"] self.grid_options["_monte_carlo_current_total_mass_evolved"]
> self.grid_options["monte_carlo_mass_threshold"] > self.grid_options["monte_carlo_mass_threshold"]
): ):
monte_carlo_threshold_reached = True self.grid_options["_monte_carlo_threshold_reached"] = True
def _monte_carlo_sampling_check_custom_threshold(self, result_queue):
"""
Function to handle checking the content of the result queue and look for a
TODO: put in the user-defined custom threshold function
"""
return monte_carlo_threshold_reached raise NotImplementedError("Functionality in not implemented")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment