diff --git a/binarycpython/utils/population_class.py b/binarycpython/utils/population_class.py index fb3179bd02bc522f9b8d7ca6a964943c371f2456..88409e0394934cc8fd9ee337d6a27f9e2ecf544c 100644 --- a/binarycpython/utils/population_class.py +++ b/binarycpython/utils/population_class.py @@ -18,17 +18,9 @@ import sys import time import uuid import copy -import datetime -import functools -import gc import json import multiprocessing import os -import queue -import signal - -import psutil -import setproctitle from binarycpython.utils.functions import ( check_if_in_shell, @@ -36,7 +28,6 @@ from binarycpython.utils.functions import ( get_ANSI_colours, get_defaults, mem_use, - timedelta, now, hostnames, calculate_total_mass_system, @@ -101,7 +92,9 @@ from binarycpython.utils.population_extensions.miscellaneous_functions import ( from binarycpython.utils.population_extensions.evolution_functions import ( evolution_functions, ) - +from binarycpython.utils.population_extensions.failing_systems_functions import ( + failing_systems_functions, +) from binarycpython.utils.ensemble import new_grid_ensemble_results @@ -139,6 +132,8 @@ class Population( argument_handling, termination_functions, miscellaneous_functions, + evolution_functions, + failing_systems_functions, ): """ Population Object. Contains all the necessary functions to set up, run and process a @@ -174,6 +169,8 @@ class Population( ensemble.__init__(self) termination_functions.__init__(self) miscellaneous_functions.__init__(self) + evolution_functions.__init__(self) + failing_systems_functions.__init__(self) # caches self.caches = {} @@ -464,7 +461,7 @@ class Population( return # Execute population evolution subroutines - result = self._evolve_population() + result = self._evolve_population_wrapper() if result is False: print("Error detected in _evolve_population() : stopping here") @@ -507,811 +504,6 @@ class Population( return analytics_dict - def _system_queue_filler(self, job_queue, processes, num_processes): - """ - Function that is responsible for keeping the queue filled. - - This will generate the systems until it is full, and then keeps trying to fill it. - Will have to play with the size of this. - - This function is called as part of the parent process. - - TODO: clean and implement methods to have monte-carlo things check the thresholds and flush etc - """ - - stream_logger = self._get_stream_logger() - if self.grid_options["verbosity"] >= self._LOGGER_VERBOSITY_LEVEL: - stream_logger.debug(f"setting up the system_queue_filler now") - - # Setup of the generator - # Check again if we use custom generator or not: - # TODO: put this elsewhere - if self.grid_options["evolution_type"] == "custom_generator": - generator = self.grid_options["custom_generator"] - else: - self._generate_grid_code(dry_run=False) - - self._load_grid_function() - - generator = self.grid_options["_system_generator"]( - self, print_results=False - ) - - # start_at can be an expression : we should eval it - # prior to running the loop - self.grid_options["start_at"] = eval(str(self.grid_options["start_at"])) - if self.grid_options["start_at"] > 0: - print("Starting at model {} ".format(self.grid_options["start_at"])) - - prev_process_check = None - - # Continuously fill the queue while we are allowed to - for system_number, system_dict in enumerate(generator): - # on stop, quit this loop - if self.grid_options["stop_queue"]: - break - - # skip systems before start_at - elif system_number < self.grid_options["start_at"]: - self.verbose_print( - "skip system {n} because < start_at = {start}".format( - n=system_number, start=self.grid_options["start_at"] - ), - self.grid_options["verbosity"], - 3, - ) - continue - - # apply modulo - if not ( - (system_number - self.grid_options["start_at"]) - % self.grid_options["modulo"] - == 0 - ): - self.verbose_print( - "skip system {n} because modulo {mod} == {donemod}".format( - n=system_number, - mod=self.grid_options["modulo"], - donemod=(system_number - self.grid_options["start_at"]) - % self.grid_options["modulo"], - ), - self.grid_options["verbosity"], - 3, - ) - - continue - - # Either break the queue or put system into queue - if self.grid_options["stop_queue"]: - break - - # check children are running every 1s # TODO: allow frequency change? - _now = time.time() - if prev_process_check is None or _now - prev_process_check > 1: - prev_process_check = _now - if self._all_children_running(processes) is False: - self.grid_options["_job_crashed"] = True - return - - try: - job_queue.put((system_number, system_dict), block=True) - except Exception as e: - # error on queueing : stop the queue - self.grid_options["stop_queue"] = True - - # Print some info - self.verbose_print( - "Queue produced system {}".format(system_number), - self.grid_options["verbosity"], - 3, - ) - - self.grid_options["_queue_done"] = True - - # Send closing signal to workers. When they receive this they will terminate - if self.grid_options["verbosity"] >= self._LOGGER_VERBOSITY_LEVEL: - stream_logger.debug(f"Signalling processes to stop") # DEBUG - - if True: # not self.grid_options['stop_queue']: - for _ in range(num_processes): - job_queue.put("STOP") - - def _evolve_population_grid(self): - """ - Function that handles running the population using multiprocessing. - - First we set up the multiprocessing manager and the job and result queue. - - Then we spawn <self.grid_options["num_processes"]> number of process instances, - and signal them to start. - - While the processes are waiting for their instructions, we start the queue filler, - which goes over the grid code and puts all the tasks in a queue until its full. - - The processes take these jobs, evolve the and store results. - - When all the systems have been put in the queue we pass a STOP signal - that will make the processes wrap up. - - We then add any previous population - - We read out the information in the result queue and store them in the grid object - - Returns True if things go well, False on error. - """ - - # Set process name - setproctitle.setproctitle("binarycpython parent process") - - # if max_queue_size is zero, calculate automatically - # to be double the number of processes - you don't want to - # make the queue too large because when it's killed you - # want to end quickly - if self.grid_options["max_queue_size"] == 0: - self.grid_options["max_queue_size"] = 2 * self.grid_options["num_processes"] - - # Set up the manager object that can share info between processes - manager = multiprocessing.Manager() - job_queue = manager.Queue(maxsize=self.grid_options["max_queue_size"]) - result_queue = manager.Queue(maxsize=self.grid_options["num_processes"]) - - # data to be sent to signal handlers - signal_data = { - "where": "_evolve_population_grid", - "queue": job_queue, - } - - # Create process instances to run the stars - processes = [] - for ID in range(self.grid_options["num_processes"]): - processes.append( - multiprocessing.Process( - target=self._process_queue, - args=(job_queue, result_queue, ID), - ) - ) - - # Activate the processes - for p in processes: - p.start() - - # activate signal handlers - # * the child processes ignore these signals - # * the parent will be in _system_queue_filler when these are caught - signal.signal( - signal.SIGTERM, functools.partial(self._parent_signal_handler, signal_data) - ) - signal.signal( - signal.SIGINT, functools.partial(self._parent_signal_handler, signal_data) - ) - - # Set up the system_queue in the parent process - self.grid_options["_job_crashed"] = False - self._system_queue_filler( - job_queue, processes, num_processes=self.grid_options["num_processes"] - ) - - if self.grid_options["_job_crashed"] is True: - # job crashed while in system_queue_filler : kill children - # and return False - print( - "A child process crashed or was killed : I will not join incomplete data" - ) - self._kill_child_processes(processes) - return False - - else: - # Join the processes after the queue filler has finished - print("Do join of subprocesses ...") - - while self.grid_options["_job_crashed"] is False and processes: - if self._all_children_running(processes) is False: - # job crashed: stop children and return False - self.grid_options["_job_crashed"] = True - else: - # join first process: it should finish work soon - p = processes.pop(0) - p.join() - - if self.grid_options["_job_crashed"] is True: - print( - "A child process crashed or was killed : I will not join incomplete data" - ) - self._kill_child_processes(processes) - return False - else: - print("Joined all subprocesses.") - - # Handle the results by merging all the dictionaries. How that merging happens exactly is - # described in the merge_dicts description. - # - # If there is a preloaded_population, we add this first, - # then we add the populations run just now - - # 1) - # use preloaded population's data as a basis - # for our combined_output_dict - if self.preloaded_population: - combined_output_dict = { - "ensemble_results": keys_to_floats( - self.preloaded_population.grid_ensemble_results - ), - "results": keys_to_floats(self.preloaded_population.grid_results), - } - - for x in self._metadata_keylist(): - try: - combined_output_dict[x] = self.preloaded_population.grid_options[x] - except Exception as e: - print( - "Tried to set combined_output_dict key", - x, - "from preloaded_popuation, but this failed:", - e, - ) - print( - "Pre-loaded data from {} stars".format(combined_output_dict["_count"]) - ) - - # do not propagate _killed - # combined_output_dict['results']['_killed'] = False - # combined_output_dict['_killed'] = False - - self.preloaded_population = None - gc.collect() - else: - # new empty combined output - combined_output_dict = OrderedDict() - combined_output_dict["ensemble_results"] = OrderedDict() - combined_output_dict["results"] = OrderedDict() - - # 2) - # combine the dicts that were output from our - # subprocesses - sentinel = object() - for output_dict in iter(result_queue.get, sentinel): - if output_dict: - # don't let Xinit be added - try: - del combined_output_dict["ensemble_results"]["ensemble"]["Xinit"] - except: - pass - - # merge dicts - combined_output_dict = merge_dicts( - combined_output_dict, keys_to_floats(output_dict) - ) - if result_queue.empty(): - break - - # Extra ensemble result manipulation: - if "ensemble_results" in combined_output_dict: - combined_output_dict["ensemble_results"][ - "ensemble" - ] = format_ensemble_results( - combined_output_dict["ensemble_results"].get("ensemble", {}) - ) - gc.collect() - - # Put the values back as object properties - self.grid_results = combined_output_dict["results"] - - ################################# - # Put Ensemble results - self.grid_ensemble_results = combined_output_dict[ - "ensemble_results" - ] # Ensemble results are also passed as output from that dictionary - - # Add metadata - self.add_ensemble_metadata(combined_output_dict) - - # if we were killed, save snapshot - if self.grid_options["save_snapshots"] and self.grid_options["_killed"]: - self.custom_options["save_snapshot"] = True - - # return True because all seems well - return True - - def _evolve_system_mp(self, system_number, full_system_dict): - """ - Function that the multiprocessing evolution method calls to evolve a system - - this function is called by _process_queue - """ - - binary_cmdline_string = self._return_argline(full_system_dict) - - persistent_data_memaddr = -1 - if self.bse_options.get("ensemble", 0) == 1: - persistent_data_memaddr = self.persistent_data_memory_dict[self.process_ID] - # print("thread {}: persistent_data_memaddr: ".format(self.process_ID), persistent_data_memaddr) - - # vb2 logging - if self.grid_options["verbosity"] >= 2: - self.vb2print(full_system_dict, binary_cmdline_string) - - # Get results binary_c - # print("running: {}".format(binary_cmdline_string)) - out = _binary_c_bindings.run_system( - argstring=binary_cmdline_string, - custom_logging_func_memaddr=self.grid_options[ - "custom_logging_func_memaddr" - ], - store_memaddr=self.grid_options["_store_memaddr"], - population=1, # since this system is part of a population, we set this flag to prevent the store from being freed - persistent_data_memaddr=persistent_data_memaddr, - ) - - # Check for errors - _ = self._check_binary_c_error(system_number, out, full_system_dict) - - # Have some user-defined function do stuff with the data. - if self.grid_options["parse_function"]: - self.custom_options["parameter_dict"] = full_system_dict - self.grid_options["parse_function"](self, out) - - return - - def _process_queue(self, job_queue, result_queue, ID): - """ - Worker process that gets items from the job_queue and runs those systems. - It keeps track of several things like failed systems, total time spent on systems etc. - - Input: - job_queue: Queue object containing system dicts - result_queue: Queue where the resulting analytic dictionaries will be put in - ID: id of the worker process - - """ - - # ignore SIGINT and SIGTERM : these are - # handled by our parent process (hence in - # _evolve_population_grid) - signal.signal( - signal.SIGTERM, - functools.partial(self._child_signal_handler, {"where": "_process_queue"}), - ) - signal.signal( - signal.SIGINT, - functools.partial(self._child_signal_handler, {"where": "_process_queue"}), - ) - - # set start timer - start_process_time = datetime.datetime.now() - - # set the process ID - self.process_ID = ID - - # Set up stream logger for the worker processes - stream_logger = self._get_stream_logger() - if self.grid_options["verbosity"] >= self._LOGGER_VERBOSITY_LEVEL: - stream_logger.debug(f"Setting up processor: process-{self.process_ID}") - - # Set the process names - name_proc = "binarycpython population process {}".format(ID) - setproctitle.setproctitle(name_proc) - - # Set to starting up - self.set_status("starting") - - # lets try out making stores for all the grids: - self.grid_options["_store_memaddr"] = _binary_c_bindings.return_store_memaddr() - - self.verbose_print( - "Process {} started at {}.\tUsing store memaddr {}".format( - ID, - now(), - self.grid_options["_store_memaddr"], - ), - self.grid_options["verbosity"], - 3, - ) - - # Set the ensemble memory address - if self.bse_options.get("ensemble", 0) == 1: - # set persistent data memory address if necessary. - persistent_data_memaddr = ( - _binary_c_bindings.return_persistent_data_memaddr() - ) - - self.persistent_data_memory_dict = { - self.process_ID: persistent_data_memaddr - } - - self.verbose_print( - "\tUsing persistent_data memaddr: {}".format(persistent_data_memaddr), - self.grid_options["verbosity"], - 3, - ) - - # Set up local variables - localcounter = ( - 0 # global counter for the whole loop. (need to be ticked every loop) - ) - probability_of_systems_run = ( - 0 # counter for the probability of the actual systems this tread ran - ) - number_of_systems_run = ( - 0 # counter for the actual number of systems this thread ran - ) - zero_prob_stars_skipped = 0 - total_time_calling_binary_c = 0 - total_mass_run = 0 - total_probability_weighted_mass_run = 0 - - # variables for the statu bar prints - start_grid_time = time.time() - next_log_time = ( - self.shared_memory["prev_log_time"][0] + self.grid_options["log_dt"] - ) - next_mem_update_time = start_grid_time + self.grid_options["log_dt"] - - # Set status to running - self.set_status("running") - - ############################################################ - # Run stellar systems in the queue - ############################################################ - for system_number, system_dict in iter(job_queue.get, "STOP"): - - if False: - print( - "Child: Job Queue system_number = {}, dict={}, n={} check {}".format( - system_number, - system_dict, - number_of_systems_run, - self.grid_options["stop_queue"], - ) - ) - sys.stdout.flush() - - # Combine that with the other settings - full_system_dict = self.bse_options.copy() - full_system_dict.update(system_dict) - - # In the first system, explicitly check all the keys that are passed to see if - # they match the keys known to binary_c. - # Won't do that every system cause that is a bit of a waste of computing time. - if number_of_systems_run == 0: - # Check if keys match known binary_c arguments - self._check_full_system_dict_keys() - - ###################### - # Print status of runs - # save the current time (used often) - time_now = time.time() - - # update memory use stats every log_dt seconds (not every time, this is likely a bit expensive) - if time_now > next_mem_update_time: - m = mem_use() - self.shared_memory["memory_use_per_thread"][ID] = m - next_mem_update_time = time_now + self.grid_options["log_dt"] - if m > self.shared_memory["max_memory_use_per_thread"][ID]: - self.shared_memory["max_memory_use_per_thread"][ID] = m - - # calculate the next logging time - next_log_time = ( - self.shared_memory["prev_log_time"][0] + self.grid_options["log_dt"] - ) - - # Check if we need to log info again - # TODO: Check if we can put this functionality elsewhere - if time_now > next_log_time: - # we have exceeded the next log time : output and update timers - # Lock the threads. TODO: Do we need to release this? - lock = multiprocessing.Lock() - - # Do the printing itself - self.vb1print(ID, time_now, system_number, system_dict) - - # Set some values for next time - next_log_time = time_now + self.grid_options["log_dt"] - - # print("PREV ",self.shared_memory["prev_log_time"]) - # print("N LOG STATS",self.shared_memory["n_saved_log_stats"].value) - - # shift the arrays - self.shared_memory["prev_log_time"][ - -(self.grid_options["n_logging_stats"] - 1) : - ] = self.shared_memory["prev_log_time"][ - : (self.grid_options["n_logging_stats"] - 1) - ] - self.shared_memory["prev_log_system_number"][ - -(self.grid_options["n_logging_stats"] - 1) : - ] = self.shared_memory["prev_log_system_number"][ - : (self.grid_options["n_logging_stats"] - 1) - ] - - # set the current time and system number - self.shared_memory["prev_log_time"][0] = time_now - self.shared_memory["prev_log_system_number"][0] = system_number - - # increase the number of stats - self.shared_memory["n_saved_log_stats"].value = min( - self.shared_memory["n_saved_log_stats"].value + 1, - self.grid_options["n_logging_stats"], - ) - - # print("FIRST (0) ",self.shared_memory["prev_log_time"][0]) - # print("LAST (",self.shared_memory["n_saved_log_stats"].value-1,")",self.shared_memory["prev_log_time"][self.shared_memory["n_saved_log_stats"].value-1]) - - ############### - # Log current system info - - # In some cases, the whole run crashes. To be able to figure out which system - # that was on, we log each current system to a file (each thread has one). - # Each new system overrides the previous - if self.grid_options["log_args"]: - argfile = os.path.join( - self.grid_options["log_args_dir"], - "process_{}.txt".format(self.jobID()), - ) - with self.open( - argfile, - "w", - encoding="utf-8", - ) as f: - binary_c_cmdline_string = self._return_argline(full_system_dict) - f.write(binary_c_cmdline_string) - f.close() - - ############## - # Running the system - start_runtime_binary_c = time.time() - - # If we want to actually evolve the systems - if self.grid_options["_actually_evolve_system"]: - run_system = True - - # Check option to ignore 0 probability systems - if not self.grid_options["run_zero_probability_system"]: - if full_system_dict.get("probability", 1) == 0: - run_system = False - zero_prob_stars_skipped += 1 - - if run_system: - # Evolve the system - self._evolve_system_mp(system_number, full_system_dict) - - end_runtime_binary_c = time.time() - - # keep track of total binary_c call time - total_time_calling_binary_c += end_runtime_binary_c - start_runtime_binary_c - - ############ - # Logging runtime - - # Debug line: logging all the lines - if self.grid_options["log_runtime_systems"] == 1: - with self.open( - os.path.join( - self.grid_options["tmp_dir"], - "runtime_systems", - "process_{}.txt".format(self.process_ID), - ), - "a+", - encoding="utf-8", - ) as f: - binary_cmdline_string = self._return_argline(full_system_dict) - f.write( - "{} {} '{}'\n".format( - start_runtime_binary_c, - end_runtime_binary_c - start_runtime_binary_c, - binary_cmdline_string, - ) - ) - f.close() - - #################### - # Tallying system information - - # Keep track of systems: - probability_of_systems_run += full_system_dict.get("probability", 1) - number_of_systems_run += 1 - localcounter += 1 - - # Tally up some numbers - total_mass_system = calculate_total_mass_system(full_system_dict) - total_mass_run += total_mass_system - total_probability_weighted_mass_run += ( - total_mass_system * full_system_dict.get("probability", 1) - ) - - if self.grid_options["stop_queue"]: - print("Child: Stop queue at system {n}".format(n=number_of_systems_run)) - break - - if self.grid_options["stop_queue"]: - # any remaining jobs should be ignored - try: - while True: - job_queue.get_nowait() - except queue.Empty: - pass - - # Set status to finishing - self.set_status("finishing") - if self.grid_options["verbosity"] >= self._LOGGER_VERBOSITY_LEVEL: - stream_logger.debug(f"Process-{self.process_ID} is finishing.") - - ########################### - # Handle ensemble outut - ensemble_json = self._process_handle_ensemble_output(ID=ID) - - ########################## - # Clean up and return - self.verbose_print( - "process {} free memory and return ".format(ID), - self.grid_options["verbosity"], - 1, - ) - # free store memory: - _binary_c_bindings.free_store_memaddr(self.grid_options["_store_memaddr"]) - - # Return a set of results and errors - output_dict = { - "results": self.grid_results, - "ensemble_results": ensemble_json, - "_failed_count": self.grid_options["_failed_count"], - "_failed_prob": self.grid_options["_failed_prob"], - "_failed_systems_error_codes": self.grid_options[ - "_failed_systems_error_codes" - ], - "_errors_exceeded": self.grid_options["_errors_exceeded"], - "_errors_found": self.grid_options["_errors_found"], - "_probtot": probability_of_systems_run, - "_count": number_of_systems_run, - "_total_mass_run": total_mass_run, - "_total_probability_weighted_mass_run": total_probability_weighted_mass_run, - "_zero_prob_stars_skipped": zero_prob_stars_skipped, - "_killed": self.grid_options["_killed"], - } - - end_process_time = datetime.datetime.now() - - killed = self.was_killed() - - # thread end message - colour = "cyan on black" - self.verbose_print( - self._boxed( - "{colour}Process {ID} finished:\ngenerator started at {start}\ngenerator finished at {end}\ntotal: {timesecs}\nof which {binary_c_secs} with binary_c\nRan {nsystems} systems\nwith a total probability of {psystems:g}\n{failcolour}This thread had {nfail} failing systems{colour}\n{failcolour}with a total failed probability of {pfail}{colour}\n{zerocolour}Skipped a total of {nzero} zero-probability systems{zeroreset}\n{failednotice}".format( - colour=self.ANSI_colours[colour], - ID=ID, - start=start_process_time.isoformat(), - end=end_process_time.isoformat(), - timesecs=timedelta( - (end_process_time - start_process_time).total_seconds() - ), - binary_c_secs=timedelta(total_time_calling_binary_c), - nsystems=number_of_systems_run, - psystems=probability_of_systems_run, - failcolour=self.ANSI_colours["red"] - if self.grid_options["_failed_count"] > 0 - else "", - failreset=self.ANSI_colours[colour] - if self.grid_options["_failed_count"] > 0 - else "", - nfail=self.grid_options["_failed_count"], - pfail=self.grid_options["_failed_prob"], - nzero=zero_prob_stars_skipped, - zerocolour=self.ANSI_colours["yellow"] - if zero_prob_stars_skipped > 0 - else "", - zeroreset=self.ANSI_colours[colour] - if zero_prob_stars_skipped > 0 - else "", - failednotice=">>> Process was killed <<<\n" if killed else "", - ), - colour=colour, - ), - self.grid_options["verbosity"], - 1, - ) - - # Write summary - summary_dict = { - "population_id": self.grid_options["_population_id"], - "process_id": self.process_ID, - "start_process_time": start_process_time.timestamp(), - "end_process_time": end_process_time.timestamp(), - "total_time_calling_binary_c": total_time_calling_binary_c, - "number_of_systems_run": number_of_systems_run, - "probability_of_systems_run": probability_of_systems_run, - "failed_systems": self.grid_options["_failed_count"], - "failed_probability": self.grid_options["_failed_prob"], - "failed_system_error_codes": self.grid_options[ - "_failed_systems_error_codes" - ], - "zero_prob_stars_skipped": zero_prob_stars_skipped, - } - with self.open( - os.path.join( - self.grid_options["tmp_dir"], - "process_summary", - "process_{}.json".format(self.process_ID), - ), - "w", - encoding="utf-8", - ) as f: - json.dump(summary_dict, f, indent=4, ensure_ascii=False) - - # Set status to finished - if self.was_killed(): - self.set_status("killed") - else: - self.set_status("finished") - - self.verbose_print( - "process {} queue put output_dict ".format(ID), - self.grid_options["verbosity"], - 1, - ) - - result_queue.put(output_dict) - - if self.grid_options["verbosity"] >= self._LOGGER_VERBOSITY_LEVEL: - stream_logger.debug(f"Process-{self.process_ID} is finished.") - - self.verbose_print( - "process {} return ".format(ID), - self.grid_options["verbosity"], - 1, - ) - return - - # Single system - def evolve_single(self, clean_up_custom_logging_files: bool = True) -> Any: - """ - Function to run a single system, based on the settings in the grid_options - - The output of the run gets returned, unless a parse function is given to this function. - - Args: - clean_up_custom_logging_files: whether the clean up all the custom_logging files. - - returns: - either returns the raw binary_c output, or whatever the parse_function does - """ - - ### Custom logging code: - self._set_custom_logging() - - # Check if there are actually arguments passed: - if self.bse_options: - - # Get argument line and - argline = self._return_argline(self.bse_options) - - self.verbose_print( - "Running {}".format(argline), self.grid_options["verbosity"], 1 - ) - - # Run system - out = _binary_c_bindings.run_system( - argstring=argline, - custom_logging_func_memaddr=self.grid_options[ - "custom_logging_func_memaddr" - ], - store_memaddr=self.grid_options["_store_memaddr"], - population=0, - ) - - # Clean up custom logging - if clean_up_custom_logging_files: - self._clean_up_custom_logging(evol_type="single") - - # Parse output and return the result - if self.grid_options["parse_function"]: - return self.grid_options["parse_function"](self, out) - - # Otherwise just return the raw output - return out - - else: - msg = "No actual evolution options passed to the evolve call. Aborting" - raise ValueError(msg) - ############################################################ def _setup(self): """ @@ -1450,137 +642,30 @@ class Population( """ self.grid_options["_count"] += 1 - def _check_binary_c_error(self, system_number, binary_c_output, system_dict): + ##### + # + def _get_generator(self): """ - Function to check whether binary_c throws an error and handle accordingly. + Function to get the generator. Handles the choice of evolution method """ - if binary_c_output: - if (binary_c_output.splitlines()[0].startswith("SYSTEM_ERROR")) or ( - binary_c_output.splitlines()[-1].startswith("SYSTEM_ERROR") - ): - self.verbose_print( - "FAILING SYSTEM FOUND", - self.grid_options["verbosity"], - 0, - ) + ### + # Get generator for different evolution methods - # Keep track of the amount of failed systems and their error codes - self.grid_options["_failed_prob"] += system_dict.get("probability", 1) - self.grid_options["_failed_count"] += 1 - self.grid_options["_errors_found"] = True - - try: - error_code = int( - binary_c_output.splitlines()[0] - .split("with error code")[-1] - .split(":")[0] - .strip() - ) - self.verbose_print( - f"Have error code {error_code}", - self.grid_options["verbosity"], - 0, - ) - except: - self.verbose_print( - "Failed to extract error code", - self.grid_options["verbosity"], - 0, - ) - pass - - # Try catching the error code and keep track of the unique ones. - try: - error_code = int( - binary_c_output.splitlines()[0] - .split("with error code")[-1] - .split(":")[0] - .strip() - ) - - if ( - not error_code - in self.grid_options["_failed_systems_error_codes"] - ): - print(f"Caught errr code {error_code}") - self.grid_options["_failed_systems_error_codes"].append( - error_code - ) - except ValueError: - error_code = None - self.verbose_print( - "Failed to extract the error-code", - self.grid_options["verbosity"], - 1, - ) + # grid type + if self.grid_options["evolution_type"] == "grid": + generator = self._grid_sampling_get_generator() - # log failing args? - self._log_failure(system_dict=system_dict) + # user-provided custom generator + if self.grid_options["evolution_type"] == "custom_generator": + generator = self._custom_generator_sampling_get_generator() - # Check if we have exceeded the number of errors - print( - f"Check failed count {self.grid_options['_failed_count']} vs max {self.grid_options['failed_systems_threshold']}" - ) - if ( - self.grid_options["_failed_count"] - > self.grid_options["failed_systems_threshold"] - ): - - # stop evolving systems - self.grid_options["stop_queue"] - - # warn the user the first time we exceed failed_systems_threshold - if not self.grid_options["_errors_exceeded"]: - self.verbose_print( - self._boxed( - "Process {} exceeded the maximum ({}) number of failing systems. Stopped logging them to files now".format( - self.process_ID, - self.grid_options["failed_systems_threshold"], - ) - ), - self.grid_options["verbosity"], - 1, - ) - self.grid_options["_errors_exceeded"] = True + # Source file + elif self.grid_options["evolution_type"] == "source_file": + generator = self._source_file_sampling_get_generator() - else: - self.verbose_print( - "binary_c output nothing - this is strange. If there is ensemble output being generated then this is fine.", - self.grid_options["verbosity"], - 3, - ) + # Monte-carlo + elif self.grid_options["evolution_type"] == "monte_carlo": + generator = self._monte_carlo_sampling_get_generator() - def _log_failure(self, system_dict=None, process=None, exitcode=None): - """ - Log failing or crashed system to file in log_failed_systems_dir - """ - if ( - self.grid_options["log_failed_systems"] == True - and self.grid_options["log_failed_systems_dir"] != None - ): - path = os.path.join(self.grid_options["log_failed_systems_dir"]) - os.makedirs(path, exist_ok=True) - if self.dir_ok(path): - failed_systems_file = os.path.join( - self.grid_options["log_failed_systems_dir"], - "process_{}.txt".format(self.jobID()), - ) - with self.open( - failed_systems_file, "a", encoding="utf-8" # append - ) as f: - now = datetime.datetime.now() - now = now.strftime("%d/%m/%Y %H:%M:%S\n") - if system_dict: - binary_c_cmdline_string = ( - f"system {system_number} at {now} " - + self._return_argline(system_dict) - + "\n" - ) - f.write(binary_c_cmdline_string) - if process: - print(f"logged crashed process to {failed_systems_file}") - f.write( - f"Process {process} crashed at {now} with exit code {exitcode}." - ) - return + return generator diff --git a/binarycpython/utils/population_extensions/custom_generator_sampling.py b/binarycpython/utils/population_extensions/custom_generator_sampling.py index 0cb0627e6ca9f015659ee54d766c6c00b345eea7..f1766462c602f07b4ed436c5cfd08b76c648fcf7 100644 --- a/binarycpython/utils/population_extensions/custom_generator_sampling.py +++ b/binarycpython/utils/population_extensions/custom_generator_sampling.py @@ -9,7 +9,7 @@ from collections.abc import Iterable # drop `.abc` with Python 2.7 or lower class custom_generator_sampling: """ - Extension for the Population class containing the code for source-file sampling functions + Extension for the Population class containing the code for custom_generator sampling functions """ def __init__(self, **kwargs): @@ -19,6 +19,15 @@ class custom_generator_sampling: return + def _custom_generator_sampling_get_generator(self): + """ + Function to get the generator for the _custom_generator_sampling sampling method. Called by _get_generator and used in the actual evolution loop. + """ + + generator = self.grid_options["custom_generator"] + + return generator + def _custom_generator_sampling_setup(self): """ Function to prepare the class for sampling via a custom generator diff --git a/binarycpython/utils/population_extensions/evolution_functions.py b/binarycpython/utils/population_extensions/evolution_functions.py index 9891ffe7e6752782248330abb3d5968122972cca..2ff956f075cd7626a219948737789dad0fd68b72 100644 --- a/binarycpython/utils/population_extensions/evolution_functions.py +++ b/binarycpython/utils/population_extensions/evolution_functions.py @@ -3,6 +3,35 @@ The class extension for the population object that contains the evolution functi """ # pylint: disable=E1101 +import os +import gc +import sys +import time +import json +import queue +import signal +import datetime +import functools +import multiprocessing +from typing import Any + +from collections import ( + OrderedDict, +) + +from binarycpython.utils.dicts import keys_to_floats, merge_dicts +from binarycpython.utils.functions import ( + timedelta, + now, + calculate_total_mass_system, + mem_use, +) +from binarycpython.utils.ensemble import ( + format_ensemble_results, +) +from binarycpython import _binary_c_bindings + +import setproctitle class evolution_functions: @@ -12,12 +41,15 @@ class evolution_functions: def __init__(self, **kwargs): """ - Init function for the spacing_functions class + Init function for the evolution_functions class """ return - def _evolve_population(self): + ################### + # Evolution functions + + def _evolve_population_wrapper(self): """ Function to evolve populations. This handles the setting up, evolving and cleaning up of a population of stars. @@ -43,7 +75,7 @@ class evolution_functions: "custom_generator", "monte_carlo", ]: - if self._evolve_population_grid() is False: + if self._evolve_population_core() is False: return False else: print( @@ -52,6 +84,7 @@ class evolution_functions: self.grid_options["_evolution_type_options"] ) ) + raise ValueError() self.set_time("end") ############################################################ @@ -115,3 +148,850 @@ class evolution_functions: ) return True + + def _evolve_population_core(self): + """ + Function that handles running the population using multiprocessing. + + First we set up the multiprocessing manager and the job and result queue. + + Then we spawn <self.grid_options["num_processes"]> number of process instances, + and signal them to start. + + While the processes are waiting for their instructions, we start the queue filler, + which goes over the grid code and puts all the tasks in a queue until its full. + + The processes take these jobs, evolve the and store results. + + When all the systems have been put in the queue we pass a STOP signal + that will make the processes wrap up. + + We then add any previous population + + We read out the information in the result queue and store them in the grid object + + Returns True if things go well, False on error. + """ + + # Set process name + setproctitle.setproctitle("binarycpython parent process") + + # if max_queue_size is zero, calculate automatically + # to be double the number of processes - you don't want to + # make the queue too large because when it's killed you + # want to end quickly + if self.grid_options["max_queue_size"] == 0: + self.grid_options["max_queue_size"] = 2 * self.grid_options["num_processes"] + + ############ + # Set up the manager object and queues + manager = multiprocessing.Manager() + job_queue = manager.Queue(maxsize=self.grid_options["max_queue_size"]) + result_queue = manager.Queue(maxsize=self.grid_options["num_processes"]) + + ############ + # data to be sent to signal handlers + signal_data = { + "where": "_evolve_population_core", + "queue": job_queue, + } + + ############ + # Create process instances to run the stars + processes = [] + for ID in range(self.grid_options["num_processes"]): + processes.append( + multiprocessing.Process( + target=self._process_queue, + args=(job_queue, result_queue, ID), + ) + ) + + ############ + # Activate the processes + for p in processes: + p.start() + + ############ + # activate signal handlers + # * the child processes ignore these signals + # * the parent will be in _system_queue_handler when these are caught + signal.signal( + signal.SIGTERM, functools.partial(self._parent_signal_handler, signal_data) + ) + signal.signal( + signal.SIGINT, functools.partial(self._parent_signal_handler, signal_data) + ) + + # Set up the system_queue in the parent process + self.grid_options["_job_crashed"] = False + self._system_queue_handler( + job_queue, processes, num_processes=self.grid_options["num_processes"] + ) + + ############ + # Handle killing of processes or join and clean up + if self.grid_options["_job_crashed"] is True: + # job crashed while in system_queue_filler : kill children + # and return False + print( + "A child process crashed or was killed : I will not join incomplete data" + ) + self._kill_child_processes(processes) + return False + + else: + # Join the processes after the queue filler has finished + print("Do join of subprocesses ...") + + while self.grid_options["_job_crashed"] is False and processes: + if self._all_children_running(processes) is False: + # job crashed: stop children and return False + self.grid_options["_job_crashed"] = True + else: + # join first process: it should finish work soon + p = processes.pop(0) + p.join() + + if self.grid_options["_job_crashed"] is True: + print( + "A child process crashed or was killed : I will not join incomplete data" + ) + self._kill_child_processes(processes) + return False + else: + print("Joined all subprocesses.") + + ############ + # Handle the results by merging all the dictionaries. How that merging happens exactly is + # described in the merge_dicts description. + # + # If there is a preloaded_population, we add this first, + # then we add the populations run just now + + # 1) + # use preloaded population's data as a basis + # for our combined_output_dict + if self.preloaded_population: + combined_output_dict = { + "ensemble_results": keys_to_floats( + self.preloaded_population.grid_ensemble_results + ), + "results": keys_to_floats(self.preloaded_population.grid_results), + } + + for x in self._metadata_keylist(): + try: + combined_output_dict[x] = self.preloaded_population.grid_options[x] + except Exception as e: + print( + "Tried to set combined_output_dict key", + x, + "from preloaded_popuation, but this failed:", + e, + ) + print( + "Pre-loaded data from {} stars".format(combined_output_dict["_count"]) + ) + + # do not propagate _killed + # combined_output_dict['results']['_killed'] = False + # combined_output_dict['_killed'] = False + + self.preloaded_population = None + gc.collect() + else: + # new empty combined output + combined_output_dict = OrderedDict() + combined_output_dict["ensemble_results"] = OrderedDict() + combined_output_dict["results"] = OrderedDict() + + # 2) + # combine the dicts that were output from our + # subprocesses + sentinel = object() + for output_dict in iter(result_queue.get, sentinel): + if output_dict: + # don't let Xinit be added + try: + del combined_output_dict["ensemble_results"]["ensemble"]["Xinit"] + except: + pass + + # merge dicts + combined_output_dict = merge_dicts( + combined_output_dict, keys_to_floats(output_dict) + ) + if result_queue.empty(): + break + + # Extra ensemble result manipulation: + if "ensemble_results" in combined_output_dict: + combined_output_dict["ensemble_results"][ + "ensemble" + ] = format_ensemble_results( + combined_output_dict["ensemble_results"].get("ensemble", {}) + ) + gc.collect() + + # Put the values back as object properties + self.grid_results = combined_output_dict["results"] + + ################################# + # Put Ensemble results + self.grid_ensemble_results = combined_output_dict[ + "ensemble_results" + ] # Ensemble results are also passed as output from that dictionary + + # Add metadata + self.add_ensemble_metadata(combined_output_dict) + + # if we were killed, save snapshot + if self.grid_options["save_snapshots"] and self.grid_options["_killed"]: + self.custom_options["save_snapshot"] = True + + # return True because all seems well + return True + + def _evolve_system_mp(self, system_number, full_system_dict): + """ + Function that the multiprocessing evolution method calls to evolve a system + + this function is called by _process_queue + """ + + binary_cmdline_string = self._return_argline(full_system_dict) + + persistent_data_memaddr = -1 + if self.bse_options.get("ensemble", 0) == 1: + persistent_data_memaddr = self.persistent_data_memory_dict[self.process_ID] + # print("thread {}: persistent_data_memaddr: ".format(self.process_ID), persistent_data_memaddr) + + # vb2 logging + if self.grid_options["verbosity"] >= 2: + self.vb2print(full_system_dict, binary_cmdline_string) + + # Get results binary_c + # print("running: {}".format(binary_cmdline_string)) + out = _binary_c_bindings.run_system( + argstring=binary_cmdline_string, + custom_logging_func_memaddr=self.grid_options[ + "custom_logging_func_memaddr" + ], + store_memaddr=self.grid_options["_store_memaddr"], + population=1, # since this system is part of a population, we set this flag to prevent the store from being freed + persistent_data_memaddr=persistent_data_memaddr, + ) + + # Check for errors + _ = self._check_binary_c_error(system_number, out, full_system_dict) + + # Have some user-defined function do stuff with the data. + if self.grid_options["parse_function"]: + self.custom_options["parameter_dict"] = full_system_dict + self.grid_options["parse_function"](self, out) + + # TODO: check if the parse function returns something and return it again + + return + + def evolve_single(self, clean_up_custom_logging_files: bool = True) -> Any: + """ + Function to run a single system, based on the settings in the grid_options + + The output of the run gets returned, unless a parse function is given to this function. + + Args: + clean_up_custom_logging_files: whether the clean up all the custom_logging files. + + returns: + either returns the raw binary_c output, or whatever the parse_function does + """ + + ###### + # Set custom logging functionality + self._set_custom_logging() + + ###### + # Handle single system evolution + + # Check if there are actually arguments passed: + if self.bse_options: + # Get argument line + argline = self._return_argline(self.bse_options) + + self.verbose_print( + "Running {}".format(argline), self.grid_options["verbosity"], 1 + ) + + # Run system + out = _binary_c_bindings.run_system( + argstring=argline, + custom_logging_func_memaddr=self.grid_options[ + "custom_logging_func_memaddr" + ], + store_memaddr=self.grid_options["_store_memaddr"], + population=0, + ) + + # Clean up custom logging + if clean_up_custom_logging_files: + self._clean_up_custom_logging(evol_type="single") + + # Parse output and return the result + if self.grid_options["parse_function"]: + return self.grid_options["parse_function"](self, out) + + # Otherwise just return the raw output + return out + + # Raise error if no evolution options are passed + msg = "No actual evolution options passed to the evolve call. Aborting" + raise ValueError(msg) + + ################### + # queue and worker functions + + def _system_queue_handler(self, job_queue, processes, num_processes): + """ + Function that is responsible for keeping the queue filled. + + This will generate the systems until it is full, and then keeps trying to fill it. + Will have to play with the size of this. + + This function is called as part of the parent process. + """ + + ####### + # Set up logging + stream_logger = self._get_stream_logger() + if self.grid_options["verbosity"] >= self._LOGGER_VERBOSITY_LEVEL: + stream_logger.debug(f"setting up the system_queue_filler now") + + ####### + # Start up the generator + generator = self._get_generator() + + ####### + # Start and handle start_at value + + # start_at can be an expression : we should eval it + # prior to running the loop + self.grid_options["start_at"] = eval(str(self.grid_options["start_at"])) + if self.grid_options["start_at"] > 0: + print("Starting at model {} ".format(self.grid_options["start_at"])) + + prev_process_check = None + + ####### + # Continuously fill the queue while we are allowed to. + # The loop terminates when: + # - the generator is exhausted + # - a signal to stop the queue is passed + # - TODO: mass threshold met when monte_carlo sampling + # - TODO: custom threshold (e.g. type of system) + for system_number, system_dict in enumerate(generator): + # on stop, quit this loop + if self.grid_options["stop_queue"]: + break + + ######## + # Handle start_at and modulo + + # skip systems before start_at + elif system_number < self.grid_options["start_at"]: + self.verbose_print( + "skip system {n} because < start_at = {start}".format( + n=system_number, start=self.grid_options["start_at"] + ), + self.grid_options["verbosity"], + 3, + ) + continue + + # apply modulo + if not ( + (system_number - self.grid_options["start_at"]) + % self.grid_options["modulo"] + == 0 + ): + self.verbose_print( + "skip system {n} because modulo {mod} == {donemod}".format( + n=system_number, + mod=self.grid_options["modulo"], + donemod=(system_number - self.grid_options["start_at"]) + % self.grid_options["modulo"], + ), + self.grid_options["verbosity"], + 3, + ) + + continue + + ###### + # check children are running every 1s + # TODO: allow frequency change? + _now = time.time() + if prev_process_check is None or _now - prev_process_check > 1: + prev_process_check = _now + if self._all_children_running(processes) is False: + self.grid_options["_job_crashed"] = True + return + + ###### + # Handle monte-carlo threshold based on evolved mass + elif self.grid_options["evolution_type"] == "monte_carlo": + # TODO: process this in a way that the systems that are running will still finish + # Check based on mass threshold + monte_carlo_threshold_reached = ( + self._monte_carlo_sampling_check_mass_threshold(system_dict) + ) + + # Check based on custom threshold, which uses the result_queue + # TODO: process the result_queue and pass this. HOw we do this (get only one, or get until the result queue is empty again) + + ######## + # Put system in the job queue + try: + job_queue.put((system_number, system_dict), block=True) + except Exception as e: + # error on queueing : stop the queue + self.grid_options["stop_queue"] = True + + # Print some info + self.verbose_print( + "Queue produced system {}".format(system_number), + self.grid_options["verbosity"], + 3, + ) + + self.grid_options["_queue_done"] = True + + ####### + # Send closing signal to workers. When they receive this they will terminate + if True: # not self.grid_options['stop_queue']: + for _ in range(num_processes): + job_queue.put("STOP") + + if self.grid_options["verbosity"] >= self._LOGGER_VERBOSITY_LEVEL: + stream_logger.debug(f"Signalling processes to stop") # DEBUG + + def _process_queue(self, job_queue, result_queue, ID): + """ + Worker process that gets items from the job_queue and runs those systems. + It keeps track of several things like failed systems, total time spent on systems etc. + + Input: + job_queue: Queue object containing system dicts + result_queue: Queue where the resulting analytic dictionaries will be put in + ID: id of the worker process + """ + + ############ + # Handle signals + + # ignore SIGINT and SIGTERM : these are + # handled by our parent process (hence in + # _evolve_population_core) + signal.signal( + signal.SIGTERM, + functools.partial(self._child_signal_handler, {"where": "_process_queue"}), + ) + signal.signal( + signal.SIGINT, + functools.partial(self._child_signal_handler, {"where": "_process_queue"}), + ) + + ######### + # Start up process function and set local variations and memory + + # Set to starting up + self.set_status("starting") + + # set start timer + start_process_time = datetime.datetime.now() + + # set the process ID + self.process_ID = ID + + # Set the process names + name_proc = "binarycpython population process {}".format(ID) + setproctitle.setproctitle(name_proc) + + # Set up local variables + localcounter = ( + 0 # global counter for the whole loop. (need to be ticked every loop) + ) + probability_of_systems_run = ( + 0 # counter for the probability of the actual systems this tread ran + ) + number_of_systems_run = ( + 0 # counter for the actual number of systems this thread ran + ) + zero_prob_stars_skipped = 0 + total_time_calling_binary_c = 0 + total_mass_run = 0 + total_probability_weighted_mass_run = 0 + + # variables for the statu bar prints + start_grid_time = time.time() + next_log_time = ( + self.shared_memory["prev_log_time"][0] + self.grid_options["log_dt"] + ) + next_mem_update_time = start_grid_time + self.grid_options["log_dt"] + + # Load store memory adress: TODO: this might be prohibitive if we run with MINT + self.grid_options["_store_memaddr"] = _binary_c_bindings.return_store_memaddr() + + # Set the ensemble memory address + if self.bse_options.get("ensemble", 0) == 1: + # set persistent data memory address if necessary. + persistent_data_memaddr = ( + _binary_c_bindings.return_persistent_data_memaddr() + ) + + self.persistent_data_memory_dict = { + self.process_ID: persistent_data_memaddr + } + + self.verbose_print( + "\tUsing persistent_data memaddr: {}".format(persistent_data_memaddr), + self.grid_options["verbosity"], + 3, + ) + + ######### + # Log the starting of the process + + # Set up stream logger for the worker processes + stream_logger = self._get_stream_logger() + if self.grid_options["verbosity"] >= self._LOGGER_VERBOSITY_LEVEL: + stream_logger.debug(f"Setting up processor: process-{self.process_ID}") + + self.verbose_print( + "Process {} started at {}.\tUsing store memaddr {}".format( + ID, + now(), + self.grid_options["_store_memaddr"], + ), + self.grid_options["verbosity"], + 3, + ) + + # Set status to running + self.set_status("running") + + ############################################################ + # Run stellar systems in the queue + ############################################################ + for system_number, system_dict in iter(job_queue.get, "STOP"): + + if False: + print( + "Child: Job Queue system_number = {}, dict={}, n={} check {}".format( + system_number, + system_dict, + number_of_systems_run, + self.grid_options["stop_queue"], + ) + ) + sys.stdout.flush() + + ######### + # Create full system dict and perform check + full_system_dict = self.bse_options.copy() + full_system_dict.update(system_dict) + + # Check if all keys are known to binary_c. + if number_of_systems_run == 0: + # Check if keys match known binary_c arguments + self._check_full_system_dict_keys() + + ###################### + # Handle logging of progress + + # save the current time (used often) + time_now = time.time() + + # update memory use stats every log_dt seconds (not every time, this is likely a bit expensive) + if time_now > next_mem_update_time: + m = mem_use() + self.shared_memory["memory_use_per_thread"][ID] = m + next_mem_update_time = time_now + self.grid_options["log_dt"] + if m > self.shared_memory["max_memory_use_per_thread"][ID]: + self.shared_memory["max_memory_use_per_thread"][ID] = m + + # calculate the next logging time + next_log_time = ( + self.shared_memory["prev_log_time"][0] + self.grid_options["log_dt"] + ) + + # Check if we need to log info again + # TODO: Check if we can put this functionality elsewhere + if time_now > next_log_time: + # we have exceeded the next log time : output and update timers + # Lock the threads. TODO: Do we need to release this? + lock = multiprocessing.Lock() + + # Do the printing itself + self.vb1print(ID, time_now, system_number, system_dict) + + # Set some values for next time + next_log_time = time_now + self.grid_options["log_dt"] + + # print("PREV ",self.shared_memory["prev_log_time"]) + # print("N LOG STATS",self.shared_memory["n_saved_log_stats"].value) + + # shift the arrays + self.shared_memory["prev_log_time"][ + -(self.grid_options["n_logging_stats"] - 1) : + ] = self.shared_memory["prev_log_time"][ + : (self.grid_options["n_logging_stats"] - 1) + ] + self.shared_memory["prev_log_system_number"][ + -(self.grid_options["n_logging_stats"] - 1) : + ] = self.shared_memory["prev_log_system_number"][ + : (self.grid_options["n_logging_stats"] - 1) + ] + + # set the current time and system number + self.shared_memory["prev_log_time"][0] = time_now + self.shared_memory["prev_log_system_number"][0] = system_number + + # increase the number of stats + self.shared_memory["n_saved_log_stats"].value = min( + self.shared_memory["n_saved_log_stats"].value + 1, + self.grid_options["n_logging_stats"], + ) + + # print("FIRST (0) ",self.shared_memory["prev_log_time"][0]) + # print("LAST (",self.shared_memory["n_saved_log_stats"].value-1,")",self.shared_memory["prev_log_time"][self.shared_memory["n_saved_log_stats"].value-1]) + + ############### + # Log current system info + + # In some cases, the whole run crashes. To be able to figure out which system + # that was on, we log each current system to a file (each thread has one). + # Each new system overrides the previous + if self.grid_options["log_args"]: + argfile = os.path.join( + self.grid_options["log_args_dir"], + "process_{}.txt".format(self.jobID()), + ) + with self.open( + argfile, + "w", + encoding="utf-8", + ) as f: + binary_c_cmdline_string = self._return_argline(full_system_dict) + f.write(binary_c_cmdline_string) + f.close() + + ############## + # Running the system + start_runtime_binary_c = time.time() + + # If we want to actually evolve the systems + if self.grid_options["_actually_evolve_system"]: + run_system = True + + # Check option to ignore 0 probability systems + if not self.grid_options["run_zero_probability_system"]: + if full_system_dict.get("probability", 1) == 0: + run_system = False + zero_prob_stars_skipped += 1 + + if run_system: + # Evolve the system. TODO: if the system returns something (via the parse_function) then put it back into the result queue + self._evolve_system_mp(system_number, full_system_dict) + + end_runtime_binary_c = time.time() + + # keep track of total binary_c call time + total_time_calling_binary_c += end_runtime_binary_c - start_runtime_binary_c + + ############ + # Logging runtime + + # Debug line: logging all the lines + if self.grid_options["log_runtime_systems"] == 1: + with self.open( + os.path.join( + self.grid_options["tmp_dir"], + "runtime_systems", + "process_{}.txt".format(self.process_ID), + ), + "a+", + encoding="utf-8", + ) as f: + binary_cmdline_string = self._return_argline(full_system_dict) + f.write( + "{} {} '{}'\n".format( + start_runtime_binary_c, + end_runtime_binary_c - start_runtime_binary_c, + binary_cmdline_string, + ) + ) + f.close() + + #################### + # Tallying system information + + # Keep track of systems: + probability_of_systems_run += full_system_dict.get("probability", 1) + number_of_systems_run += 1 + localcounter += 1 + + # Tally up some numbers + total_mass_system = calculate_total_mass_system(full_system_dict) + total_mass_run += total_mass_system + total_probability_weighted_mass_run += ( + total_mass_system * full_system_dict.get("probability", 1) + ) + + if self.grid_options["stop_queue"]: + print("Child: Stop queue at system {n}".format(n=number_of_systems_run)) + break + + #################### + # Handle stopping of queue + + if self.grid_options["stop_queue"]: + # any remaining jobs should be ignored + try: + while True: + job_queue.get_nowait() + except queue.Empty: + pass + + ########################## + # Clean up and return + + # Set status to finishing + self.set_status("finishing") + if self.grid_options["verbosity"] >= self._LOGGER_VERBOSITY_LEVEL: + stream_logger.debug(f"Process-{self.process_ID} is finishing.") + + # free store memory: + _binary_c_bindings.free_store_memaddr(self.grid_options["_store_memaddr"]) + + self.verbose_print( + "process {} free memory and return ".format(ID), + self.grid_options["verbosity"], + 1, + ) + + # Handle ensemble outut + ensemble_json = self._process_handle_ensemble_output(ID=ID) + + # Return a set of results and errors + output_dict = { + "results": self.grid_results, + "ensemble_results": ensemble_json, + "_failed_count": self.grid_options["_failed_count"], + "_failed_prob": self.grid_options["_failed_prob"], + "_failed_systems_error_codes": self.grid_options[ + "_failed_systems_error_codes" + ], + "_errors_exceeded": self.grid_options["_errors_exceeded"], + "_errors_found": self.grid_options["_errors_found"], + "_probtot": probability_of_systems_run, + "_count": number_of_systems_run, + "_total_mass_run": total_mass_run, + "_total_probability_weighted_mass_run": total_probability_weighted_mass_run, + "_zero_prob_stars_skipped": zero_prob_stars_skipped, + "_killed": self.grid_options["_killed"], + } + + end_process_time = datetime.datetime.now() + + killed = self.was_killed() + + # thread end message + colour = "cyan on black" + self.verbose_print( + self._boxed( + "{colour}Process {ID} finished:\ngenerator started at {start}\ngenerator finished at {end}\ntotal: {timesecs}\nof which {binary_c_secs} with binary_c\nRan {nsystems} systems\nwith a total probability of {psystems:g}\n{failcolour}This thread had {nfail} failing systems{colour}\n{failcolour}with a total failed probability of {pfail}{colour}\n{zerocolour}Skipped a total of {nzero} zero-probability systems{zeroreset}\n{failednotice}".format( + colour=self.ANSI_colours[colour], + ID=ID, + start=start_process_time.isoformat(), + end=end_process_time.isoformat(), + timesecs=timedelta( + (end_process_time - start_process_time).total_seconds() + ), + binary_c_secs=timedelta(total_time_calling_binary_c), + nsystems=number_of_systems_run, + psystems=probability_of_systems_run, + failcolour=self.ANSI_colours["red"] + if self.grid_options["_failed_count"] > 0 + else "", + failreset=self.ANSI_colours[colour] + if self.grid_options["_failed_count"] > 0 + else "", + nfail=self.grid_options["_failed_count"], + pfail=self.grid_options["_failed_prob"], + nzero=zero_prob_stars_skipped, + zerocolour=self.ANSI_colours["yellow"] + if zero_prob_stars_skipped > 0 + else "", + zeroreset=self.ANSI_colours[colour] + if zero_prob_stars_skipped > 0 + else "", + failednotice=">>> Process was killed <<<\n" if killed else "", + ), + colour=colour, + ), + self.grid_options["verbosity"], + 1, + ) + + # Write summary + summary_dict = { + "population_id": self.grid_options["_population_id"], + "process_id": self.process_ID, + "start_process_time": start_process_time.timestamp(), + "end_process_time": end_process_time.timestamp(), + "total_time_calling_binary_c": total_time_calling_binary_c, + "number_of_systems_run": number_of_systems_run, + "probability_of_systems_run": probability_of_systems_run, + "failed_systems": self.grid_options["_failed_count"], + "failed_probability": self.grid_options["_failed_prob"], + "failed_system_error_codes": self.grid_options[ + "_failed_systems_error_codes" + ], + "zero_prob_stars_skipped": zero_prob_stars_skipped, + } + with self.open( + os.path.join( + self.grid_options["tmp_dir"], + "process_summary", + "process_{}.json".format(self.process_ID), + ), + "w", + encoding="utf-8", + ) as f: + json.dump(summary_dict, f, indent=4, ensure_ascii=False) + + # Set status to finished + if self.was_killed(): + self.set_status("killed") + else: + self.set_status("finished") + + self.verbose_print( + "process {} queue put output_dict ".format(ID), + self.grid_options["verbosity"], + 1, + ) + + result_queue.put(output_dict) + + if self.grid_options["verbosity"] >= self._LOGGER_VERBOSITY_LEVEL: + stream_logger.debug(f"Process-{self.process_ID} is finished.") + + self.verbose_print( + "process {} return ".format(ID), + self.grid_options["verbosity"], + 1, + ) + return diff --git a/binarycpython/utils/population_extensions/failing_systems_functions.py b/binarycpython/utils/population_extensions/failing_systems_functions.py new file mode 100644 index 0000000000000000000000000000000000000000..773777f62e6fadd0c6a00a098ff7b818abe4dc43 --- /dev/null +++ b/binarycpython/utils/population_extensions/failing_systems_functions.py @@ -0,0 +1,157 @@ +""" +Main script to provide the failing systems functions class extension +""" + +# pylint: disable=E1101 + +import os +import datetime + + +class failing_systems_functions: + """ + Extension for the Population class containing the code for failing systems functionality + """ + + def __init__(self, **kwargs): + """ + Init function for the spacing_functions class + """ + + return + + def _log_failure(self, system_dict=None, process=None, exitcode=None): + """ + Log failing or crashed system to file in log_failed_systems_dir + """ + + if ( + self.grid_options["log_failed_systems"] == True + and self.grid_options["log_failed_systems_dir"] != None + ): + path = os.path.join(self.grid_options["log_failed_systems_dir"]) + os.makedirs(path, exist_ok=True) + if self.dir_ok(path): + failed_systems_file = os.path.join( + self.grid_options["log_failed_systems_dir"], + "process_{}.txt".format(self.jobID()), + ) + with self.open( + failed_systems_file, "a", encoding="utf-8" # append + ) as f: + now = datetime.datetime.now() + now = now.strftime("%d/%m/%Y %H:%M:%S\n") + if system_dict: + binary_c_cmdline_string = ( + f"system {system_number} at {now} " + + self._return_argline(system_dict) + + "\n" + ) + f.write(binary_c_cmdline_string) + if process: + print(f"logged crashed process to {failed_systems_file}") + f.write( + f"Process {process} crashed at {now} with exit code {exitcode}." + ) + return + + def _check_binary_c_error(self, system_number, binary_c_output, system_dict): + """ + Function to check whether binary_c throws an error and handle accordingly. + """ + + if binary_c_output: + if (binary_c_output.splitlines()[0].startswith("SYSTEM_ERROR")) or ( + binary_c_output.splitlines()[-1].startswith("SYSTEM_ERROR") + ): + self.verbose_print( + "FAILING SYSTEM FOUND", + self.grid_options["verbosity"], + 0, + ) + + # Keep track of the amount of failed systems and their error codes + self.grid_options["_failed_prob"] += system_dict.get("probability", 1) + self.grid_options["_failed_count"] += 1 + self.grid_options["_errors_found"] = True + + try: + error_code = int( + binary_c_output.splitlines()[0] + .split("with error code")[-1] + .split(":")[0] + .strip() + ) + self.verbose_print( + f"Have error code {error_code}", + self.grid_options["verbosity"], + 0, + ) + except: + self.verbose_print( + "Failed to extract error code", + self.grid_options["verbosity"], + 0, + ) + pass + + # Try catching the error code and keep track of the unique ones. + try: + error_code = int( + binary_c_output.splitlines()[0] + .split("with error code")[-1] + .split(":")[0] + .strip() + ) + + if ( + not error_code + in self.grid_options["_failed_systems_error_codes"] + ): + print(f"Caught errr code {error_code}") + self.grid_options["_failed_systems_error_codes"].append( + error_code + ) + except ValueError: + error_code = None + self.verbose_print( + "Failed to extract the error-code", + self.grid_options["verbosity"], + 1, + ) + + # log failing args? + self._log_failure(system_dict=system_dict) + + # Check if we have exceeded the number of errors + print( + f"Check failed count {self.grid_options['_failed_count']} vs max {self.grid_options['failed_systems_threshold']}" + ) + if ( + self.grid_options["_failed_count"] + > self.grid_options["failed_systems_threshold"] + ): + + # stop evolving systems + self.grid_options["stop_queue"] + + # warn the user the first time we exceed failed_systems_threshold + if not self.grid_options["_errors_exceeded"]: + self.verbose_print( + self._boxed( + "Process {} exceeded the maximum ({}) number of failing systems. Stopped logging them to files now".format( + self.process_ID, + self.grid_options["failed_systems_threshold"], + ) + ), + self.grid_options["verbosity"], + 1, + ) + self.grid_options["_errors_exceeded"] = True + + else: + self.verbose_print( + "binary_c output nothing - this is strange. If there is ensemble output being generated then this is fine.", + self.grid_options["verbosity"], + 3, + ) diff --git a/binarycpython/utils/population_extensions/grid_options_defaults.py b/binarycpython/utils/population_extensions/grid_options_defaults.py index 54619d23a05d7841ea300306ff33d01da61e9329..1611ff4f870f58fa5754f7048542e0269634972c 100644 --- a/binarycpython/utils/population_extensions/grid_options_defaults.py +++ b/binarycpython/utils/population_extensions/grid_options_defaults.py @@ -188,7 +188,8 @@ class grid_options_defaults: "_killed": False, "_queue_done": False, ## Monte carlo type evolution - # TODO: make MC options + "monte_carlo_mass_threshold": -1, + "_monte_carlo_current_total_mass_evolved": 0, ## Evolution from source file # TODO: make run from sourcefile options. ######################################## diff --git a/binarycpython/utils/population_extensions/grid_sampling.py b/binarycpython/utils/population_extensions/grid_sampling.py index 408d8902043407c8fbbabb78ada23c2dccf1cab0..2a5600e4c80115af83458a1a012c3dc572bb09f4 100644 --- a/binarycpython/utils/population_extensions/grid_sampling.py +++ b/binarycpython/utils/population_extensions/grid_sampling.py @@ -29,6 +29,20 @@ class grid_sampling: return + def _grid_sampling_get_generator(self): + """ + Function to get the generator for the grid_sampling sampling method. Called by _get_generator and used in the actual evolution loop. + """ + + # write generator + self._generate_grid_code(dry_run=False) + + # load generator + self._load_grid_function() + generator = self.grid_options["_system_generator"](self, print_results=False) + + return generator + def _grid_sampling_setup(self): """ Setup function for the grid sampling sampling type diff --git a/binarycpython/utils/population_extensions/monte_carlo_sampling.py b/binarycpython/utils/population_extensions/monte_carlo_sampling.py index adb7e73d98b620ef91aef34604af89b9575645a1..0237209faaaaf5821d9519930ac6f3befb749e41 100644 --- a/binarycpython/utils/population_extensions/monte_carlo_sampling.py +++ b/binarycpython/utils/population_extensions/monte_carlo_sampling.py @@ -1,8 +1,11 @@ """ -Main script to provide the Monte-Carlo sampling class extensions +Main script to provide the Monte-Carlo sampling class extensions """ # pylint: disable=E1101 +from binarycpython.utils.functions import ( + calculate_total_mass_system, +) class monte_carlo_sampling: @@ -17,12 +20,14 @@ class monte_carlo_sampling: return - def create_CDF(self): + def _create_CDF(self): """ Function to create a CDF from a given (normalized) distribution function and """ - def write_monte_carlo_generator(self): + raise NotImplementedError("This functionality is not available yet") + + def _write_monte_carlo_generator(self): """ Function that generates the monte-carlo grid file that gets imported """ @@ -36,14 +41,56 @@ class monte_carlo_sampling: # - (optionally) a latin hypercube implementation # - function to write the CDF sampling functions for each grid_variable - def load_monte_carlo_generator(self): + raise NotImplementedError("This functionality is not available yet") + + def _load_monte_carlo_generator(self): """ Function to load the monte_carlo grid """ + raise NotImplementedError("This functionality is not available yet") + def _monte_carlo_sampling_setup(self): """ Function to prepate the class for a monte-carlo sampling simulation """ raise NotImplementedError("This functionality is not available yet") + + def _monte_carlo_sampling_get_generator(self): + """ + Function to get the generator for the source_file sampling method. Called by _get_generator and used in the actual evolution loop. + """ + + # Write monte_carlo generator + self._write_monte_carlo_generator() + + # Load generator + self._load_monte_carlo_generator() + generator = self.grid_options["_system_generator"] + + return generator + + def _monte_carlo_sampling_check_mass_threshold(self, system_dict): + """ + Function to handle checking the total mass evolved and signal to stop + """ + + monte_carlo_threshold_reached = False + + # Only if the monte_carlo_mass_threshold is positive (default = -1) + if self.grid_options["monte_carlo_mass_threshold"] > 0: + + # Add total mass of current system tot total mass evolved + self.grid_options[ + "_monte_carlo_current_total_mass_evolved" + ] += calculate_total_mass_system(system_dict) + + # Check if exceeds threshold + if ( + self.grid_options["_monte_carlo_current_total_mass_evolved"] + > self.grid_options["monte_carlo_mass_threshold"] + ): + monte_carlo_threshold_reached = True + + return monte_carlo_threshold_reached diff --git a/binarycpython/utils/population_extensions/source_file_sampling.py b/binarycpython/utils/population_extensions/source_file_sampling.py index 0d05e70e673c647236398a1c8efbd66f5d524e88..53e19e3a0ad3a7aa4ab6e3cceeba436e5aba4337 100644 --- a/binarycpython/utils/population_extensions/source_file_sampling.py +++ b/binarycpython/utils/population_extensions/source_file_sampling.py @@ -114,3 +114,15 @@ class source_file_sampling: ) raise ValueError("This functionality is not available yet") + + def _source_file_sampling_get_generator(self): + """ + Function to get the generator for the source_file sampling method. Called by _get_generator and used in the actual evolution loop. + """ + + # load source-file + self._load_source_file() + + generator = self.grid_options["_system_generator"] + + return generator