""" Module containing the Population grid class object. Here all the functionality of a Population object is defined. Useful for the user to understand the functionality, but copying functionality isn't recommended except if you know what you are doing Tasks: - TODO: add functionality to 'on-init' set arguments - TODO: add functionality to return the initial_abundance_hash - TODO: add functionality to return the isotope_hash - TODO: add functionality to return the isotope_list - TODO: add functionality to return the nuclear_mass_hash - TODO: add functionality to return the nuclear_mass_list - TODO: add functionality to return the source_list - TODO: add functionality to return the ensemble_list - TODO: consider spreading the functions over more files. - TODO: type the private functions - TODO: fix the correct object types for the default values of the bse_options - TODO: uncomment and implement the HPC functionality - TODO: think of a clean and nice way to unload and remove the custom_logging_info library from memory (and from disk) - TODO: think of a nice way to remove the loaded grid_code/ generator from memory. - TODO: Create a designated dict for results """ import os import sys import copy import json import time import uuid import logging import datetime import argparse import importlib.util import multiprocessing from typing import Union, Any from pathos.helpers import mp as pathos_multiprocess from pathos.pools import _ProcessPool as Pool from binarycpython.utils.grid_options_defaults import ( grid_options_defaults_dict, moe_distefano_default_options, ) from binarycpython.utils.custom_logging_functions import ( autogen_C_logging_code, binary_c_log_code, create_and_load_logging_function, ) from binarycpython.utils.functions import ( get_defaults, remove_file, filter_arg_dict, get_help_all, return_binary_c_version_info, binaryc_json_serializer, verbose_print, merge_dicts, update_dicts, extract_ensemble_json_from_string, get_moe_distefano_dataset, ) # from binarycpython.utils.hpc_functions import ( # get_condor_version, # get_slurm_version, # create_directories_hpc, # path_of_calling_script, # get_python_details, # ) from binarycpython.utils.distribution_functions import ( Moecache, LOG_LN_CONVERTER, ) from binarycpython import _binary_c_bindings class Population: """ Population Object. Contains all the necessary functions to set up, run and process a population of systems """ def __init__(self): """ Initialisation function of the population class """ # Different sections of options # get binary_c defaults and create a cleaned up dict # Setting stuff will check against the defaults to see if the input is correct. self.defaults = get_defaults() self.cleaned_up_defaults = self._cleanup_defaults() self.available_keys = list(self.defaults.keys()) self.special_params = [ el for el in list(self.defaults.keys()) if el.endswith("%d") ] # make the input dictionary self.bse_options = {} # bse_options is just empty. # Grid options self.grid_options = copy.deepcopy(grid_options_defaults_dict) # Custom options self.custom_options = {} # Argline dict self.argline_dict = {} # Set main process id self.grid_options["_main_pid"] = os.getpid() # Set some memory dicts self.persistent_data_memory_dict = {} # self.process_ID = 0 # Create location to store results. Users should write to this dictionary. self.grid_results = {} # Create location where ensemble results are written to self.grid_ensemble_results = {} ################################################### # Argument functions ################################################### # General flow of generating the arguments for the binary_c call: # - user provides parameter and value via set (or manually but that is risky) # - The parameter names of these input get compared to the parameter names in the self.defaults; # with this, we know that its a valid parameter to give to binary_c. # - For a single system, the bse_options will be written as a arg line # - For a population the bse_options will get copied to a temp_bse_options dict and updated with # all the parameters generated by the grid # I will NOT create the argument line by fully writing ALL the defaults and overriding user # input, that seems not necessary because by using the get_defaults() function we already # know for sure which parameter names are valid for the binary_c version # And because binary_c uses internal defaults, its not necessary to explicitly pass them. # I do however suggest everyone to export the binary_c defaults to a file, so that you know # exactly which values were the defaults. def set(self, **kwargs) -> None: """ Function to set the values of the population. This is the preferred method to set values of functions, as it provides checks on the input. the bse_options will get populated with all the those that have a key that is present in the self.defaults the grid_options will get updated with all the those that have a key that is present in the self.grid_options If neither of above is met; the key and the value get stored in a custom_options dict. Args: via kwargs all the arguments are either set to binary_c parameters, grid_options or custom_options (see above) """ # Select the params that end with %d # Go over all the input for key in kwargs: # Filter out keys for the bse_options if key in self.defaults.keys(): verbose_print( "adding: {}={} to BSE_options".format(key, kwargs[key]), self.grid_options["verbosity"], 1, ) self.bse_options[key] = kwargs[key] # Extra check to check if the key fits one of parameter names that end with %d elif any( [ True if (key.startswith(param[:-2]) and len(param[:-2]) < len(key)) else False for param in self.special_params ] ): verbose_print( "adding: {}={} to BSE_options by catching the %d".format( key, kwargs[key] ), self.grid_options["verbosity"], 1, ) self.bse_options[key] = kwargs[key] # Filter out keys for the grid_options elif key in self.grid_options.keys(): verbose_print( "adding: {}={} to grid_options".format(key, kwargs[key]), self.grid_options["verbosity"], 1, ) self.grid_options[key] = kwargs[key] # The of the keys go into a custom_options dict else: print( "!! Key doesnt match previously known parameter: \ adding: {}={} to custom_options".format( key, kwargs[key] ) ) self.custom_options[key] = kwargs[key] def parse_cmdline(self) -> None: """ Function to handle settings values via the command line. Best to be called after all the .set(..) lines, and just before the .evolve() is called If you input any known parameter (i.e. contained in grid_options, defaults/bse_options or custom_options), this function will attempt to convert the input from string (because everything is string) to the type of the value that option had before. The values of the bse_options are initially all strings, but after user input they can change to ints. The value of any new parameter (which will go to custom_options) will be a string. Tasks: - TODO: remove the need for --cmdline """ parser = argparse.ArgumentParser() parser.add_argument( "--cmdline", help='Setting values via the commandline. Input like --cmdline "metallicity=0.02"', ) args = parser.parse_args() # How its set up now is that as input you need to give --cmdline "metallicity=0.002" # Its checked if this exists and handled accordingly. if args.cmdline: verbose_print( "Found cmdline args. Parsing them now", self.grid_options["verbosity"], 1, ) # Grab the input and split them up, while accepting only non-empty entries cmdline_args = args.cmdline self.grid_options["_commandline_input"] = cmdline_args split_args = [ cmdline_arg for cmdline_arg in cmdline_args.split(" ") if not cmdline_arg == "" ] # Make dict and fill it cmdline_dict = {} for cmdline_arg in split_args: split = cmdline_arg.split("=") parameter = split[0] value = split[1] old_value_found = False # Find an old value if parameter in self.grid_options: old_value = self.grid_options[parameter] old_value_found = True elif parameter in self.defaults: old_value = self.defaults[parameter] old_value_found = True elif parameter in self.custom_options: old_value = self.custom_options[parameter] old_value_found = True # (attempt to) convert if old_value_found: try: verbose_print( "Converting type of {} from {} to {}".format( parameter, type(value), type(old_value) ), self.grid_options["verbosity"], 1, ) value = type(old_value)(value) verbose_print("Success!", self.grid_options["verbosity"], 1) except ValueError: verbose_print( "Tried to convert the given parameter {}/value {} to its correct type {} (from old value {}). But that wasn't possible.".format( parameter, value, type(old_value), old_value ), self.grid_options["verbosity"], 0, ) # Add to dict cmdline_dict[parameter] = value # unpack the dictionary into the setting function that handles where the values are set self.set(**cmdline_dict) def _return_argline(self, parameter_dict=None): """ Function to create the string for the arg line from a parameter dict """ if not parameter_dict: parameter_dict = self.bse_options argline = "binary_c " for param_name in sorted(parameter_dict): argline += "{} {} ".format(param_name, parameter_dict[param_name]) argline = argline.strip() return argline def last_grid_variable(self): """ Functon that returns the last grid variable (i.e. the one with the highest grid_variable_number) """ number = len(self.grid_options["_grid_variables"]) for grid_variable in self.grid_options["_grid_variables"]: if ( self.grid_options["_grid_variables"][grid_variable][ "grid_variable_number" ] == number - 1 ): return grid_variable def add_grid_variable( self, name: str, longname: str, valuerange: Union[list, str], resolution: str, spacingfunc: str, probdist: str, dphasevol: Union[str, int], parameter_name: str, gridtype: str = "edge", branchpoint: int = 0, precode: Union[str, None] = None, condition: Union[str, None] = None, ) -> None: """ Function to add grid variables to the grid_options. TODO: Fix this complex function. The execution of the grid generation will be through a nested forloop. Each of the grid variables will get create a deeper for loop. The real function that generates the numbers will get written to a new file in the TMP_DIR, and then loaded imported and evaluated. beware that if you insert some destructive piece of code, it will be executed anyway. Use at own risk. Args: name: name of parameter. This is evaluated as a parameter and you can use it throughout the rest of the function example: name = 'lnm1' longname: Long name of parameter example: longname = 'Primary mass' range: Range of values to take. Does not get used really, the spacingfunction is used to get the values from example: range = [math.log(m_min), math.log(m_max)] resolution: Resolution of the sampled range (amount of samples). TODO: check if this is used anywhere example: resolution = resolution["M_1"] spacingfunction: Function determining how the range is sampled. You can either use a real function, or a string representation of a function call. Will get written to a file and then evaluated. example: spacingfunction = "const(math.log(m_min), math.log(m_max), {})".format( resolution['M_1'] ) precode: Extra room for some code. This code will be evaluated within the loop of the sampling function (i.e. a value for lnm1 is chosen already) example: precode = 'M_1=math.exp(lnm1);' probdist: FUnction determining the probability that gets asigned to the sampled parameter example: probdist = 'Kroupa2001(M_1)*M_1' dphasevol: part of the parameter space that the total probability is calculated with. Put to -1 if you want to ignore any dphasevol calculations and set the value to 1 example: dphasevol = 'dlnm1' condition: condition that has to be met in order for the grid generation to continue example: condition = 'self.grid_options['binary']==1' gridtype: Method on how the value range is sampled. Can be either 'edge' (steps starting at the lower edge of the value range) or 'center' (steps starting at lower edge + 0.5 * stepsize). """ # TODO: Add check for the gridtype input value # TODO: add functionality for branchpoint # Add grid_variable grid_variable = { "name": name, "longname": longname, "valuerange": valuerange, "resolution": resolution, "spacingfunc": spacingfunc, "precode": precode, "probdist": probdist, "dphasevol": dphasevol, "parameter_name": parameter_name, "condition": condition, "gridtype": gridtype, "branchpoint": branchpoint, "grid_variable_number": len(self.grid_options["_grid_variables"]), } # Load it into the grid_options self.grid_options["_grid_variables"][grid_variable["name"]] = grid_variable verbose_print( "Added grid variable: {}".format(json.dumps(grid_variable, indent=4)), self.grid_options["verbosity"], 1, ) ################################################### # Return functions ################################################### def return_population_settings(self) -> dict: """ Function that returns all the options that have been set. Can be combined with json to make a nice file. Returns: dictionary containing "bse_options", "grid_options", "custom_options" """ options = { "bse_options": self.bse_options, "grid_options": self.grid_options, "custom_options": self.custom_options, } return options def _return_binary_c_version_info(self, parsed=False): """ Function that returns the version information of binary_c """ version_info = return_binary_c_version_info(parsed=parsed) return version_info def _return_binary_c_defaults(self): """ Function that returns the defaults of the binary_c version that is used. """ return self.defaults def return_all_info( self, include_population_settings: bool = True, include_binary_c_defaults: bool = True, include_binary_c_version_info: bool = True, include_binary_c_help_all: bool = True, ) -> dict: """ Function that returns all the information about the population and binary_c Args: include_population_settings: whether to include the population_settings (see function return_population_settings) include_binary_c_defaults: whether to include a dict containing the binary_c parameters and their default values include_binary_c_version_info: whether to include a dict containing all the binary_c version info (see return_binary_c_version_info) include_binary_c_help_all: whether to include a dict containing all the information about the binary_c parameters (see get_help_all) Return: dictionary containing all, or part of, the above dictionaries """ # all_info = {} # if include_population_settings: population_settings = self.return_population_settings() all_info["population_settings"] = population_settings # if include_binary_c_defaults: binary_c_defaults = self._return_binary_c_defaults() all_info["binary_c_defaults"] = binary_c_defaults if include_binary_c_version_info: binary_c_version_info = return_binary_c_version_info(parsed=True) all_info["binary_c_version_info"] = binary_c_version_info if include_binary_c_help_all: binary_c_help_all_info = get_help_all(print_help=False) all_info["binary_c_help_all"] = binary_c_help_all_info return all_info def export_all_info( self, use_datadir: bool = True, outfile: Union[str, None] = None, include_population_settings: bool = True, include_binary_c_defaults: bool = True, include_binary_c_version_info: bool = True, include_binary_c_help_all: bool = True, ) -> Union[str, None]: """ Function that exports the all_info to a json file Tasks: - TODO: if any of the values in the dicts here is of a not-serializable form, then we need to change that to a string or something so, use a recursive function that goes over the all_info dict and finds those that fit - TODO: Fix to write things to the directory. which options do which etc - TODO: theres flawed logic here. rewrite this part pls - TODO: consider actually just removing the whole 'output to file' part and let the user do this. Args: include_population_settings: whether to include the population_settings (see function return_population_settings) include_binary_c_defaults: whether to include a dict containing the binary_c parameters and their default values include_binary_c_version_info: whether to include a dict containing all the binary_c version info (see return_binary_c_version_info) include_binary_c_help_all: whether to include a dict containing all the information about the binary_c parameters (see get_help_all) use_datadir: boolean whether to use the custom_options['data_dir'] to write the file to. If the custom_options["base_filename"] is set, the output file will be called <custom_options["base_filename"]>_settings.json. Otherwise a file called simulation_<date+time>_settings.json will be created outfile: if use_datadir is false, a custom filename will be used """ all_info = self.return_all_info( include_population_settings=include_population_settings, include_binary_c_defaults=include_binary_c_defaults, include_binary_c_version_info=include_binary_c_version_info, include_binary_c_help_all=include_binary_c_help_all, ) # Copy dict all_info_cleaned = copy.deepcopy(all_info) if use_datadir: if not self.custom_options.get("base_filename", None): base_name = "simulation_{}".format( datetime.datetime.strftime(datetime.datetime.now(), "%Y%m%d_%H%M%S") ) else: base_name = os.path.splitext(self.custom_options["base_filename"])[0] settings_name = base_name + "_settings.json" # Check directory, make if necessary os.makedirs(self.custom_options["data_dir"], exist_ok=True) settings_fullname = os.path.join( self.custom_options["data_dir"], settings_name ) verbose_print( "Writing settings to {}".format(settings_fullname), self.grid_options["verbosity"], 1, ) # if not outfile.endswith('json'): with open(settings_fullname, "w") as file: file.write( json.dumps( all_info_cleaned, indent=4, default=binaryc_json_serializer, ) ) return settings_fullname else: verbose_print( "Writing settings to {}".format(outfile), self.grid_options["verbosity"], 1, ) if not outfile.endswith("json"): verbose_print( "Error: outfile ({}) must end with .json".format(outfile), self.grid_options["verbosity"], 0, ) raise ValueError with open(outfile, "w") as file: file.write( json.dumps( all_info_cleaned, indent=4, default=binaryc_json_serializer ) ) return outfile def _set_custom_logging(self): """ Function/routine to set all the custom logging so that the function memory pointer is known to the grid. """ # C_logging_code gets priority of C_autogen_code verbose_print( "Creating and loading custom logging functionality", self.grid_options["verbosity"], 1, ) if self.grid_options["C_logging_code"]: # Generate entire shared lib code around logging lines custom_logging_code = binary_c_log_code( self.grid_options["C_logging_code"], verbose=self.grid_options["verbosity"], ) # Load memory adress ( self.grid_options["custom_logging_func_memaddr"], self.grid_options["_custom_logging_shared_library_file"], ) = create_and_load_logging_function( custom_logging_code, verbose=self.grid_options["verbosity"], custom_tmp_dir=self.grid_options["tmp_dir"], ) elif self.grid_options["C_auto_logging"]: # Generate real logging code logging_line = autogen_C_logging_code( self.grid_options["C_auto_logging"], verbose=self.grid_options["verbosity"], ) # Generate entire shared lib code around logging lines custom_logging_code = binary_c_log_code( logging_line, verbose=self.grid_options["verbosity"] ) # Load memory adress ( self.grid_options["custom_logging_func_memaddr"], self.grid_options["_custom_logging_shared_library_file"], ) = create_and_load_logging_function( custom_logging_code, verbose=self.grid_options["verbosity"], custom_tmp_dir=self.grid_options["tmp_dir"], ) ################################################### # Ensemble functions ################################################### # Now they are stored in the _process_run_population thing. # Needed less code since they all ################################################### # Evolution functions ################################################### def _pre_run_cleanup(self): """ Function to clean up some stuff in the grid before a run (like results, ensemble results etc) """ # empty results self.grid_options["results"] = {} def evolve(self) -> None: """ Entrypoint function of the whole object. From here, based on the settings, we set up a SLURM or CONDOR grid, or if no setting is given we go straight to evolving the population There are no direct arguments to this function, rather it is based on the grid_options settings: grid_options['slurm']: integer boolean whether to use a slurm_grid evolution grid_options['condor']: integer boolean whether to use a condor_grid evolution If neither of the above is set, we continue without using HPC routines (that doesn't mean this cannot be run on a server with many cores) Returns an dictionary containing the analytics of the run TODO: change the way this is done. Slurm & CONDOR should probably do this different """ # Just to make sure we don't have stuff from a previous run hanging around self._pre_run_cleanup() # Check which type: if self.grid_options["slurm"] == 1: # Execute slurm subroutines self._slurm_grid() elif self.grid_options["condor"] == 1: # Execute condor subroutines self._condor_grid() else: # Execute population evolution subroutines self.evolve_population() # Put all interesting stuff in a variable and output that afterwards, as analytics of the run. analytics_dict = { "population_name": self.grid_options["_population_id"], "evolution_type": self.grid_options["evolution_type"], "failed_count": self.grid_options["_failed_count"], "failed_prob": self.grid_options["_failed_prob"], "failed_systems_error_codes": self.grid_options[ "_failed_systems_error_codes" ].copy(), "errors_exceeded": self.grid_options["_errors_exceeded"], "errors_found": self.grid_options["_errors_found"], "total_probability": self.grid_options["_probtot"], "total_count": self.grid_options["_count"], "start_timestamp": self.grid_options["_start_time_evolution"], "end_timestamp": self.grid_options["_end_time_evolution"], "total_mass_run": self.grid_options["_total_mass_run"], "total_probability_weighted_mass_run": self.grid_options[ "_total_probability_weighted_mass_run" ], } # print(Moecache) print(Moecache.keys()) ## # Clean up code: remove files, unset values. This is placed in the general evolve function, # because that makes for easier control self._cleanup() return analytics_dict def evolve_population(self): """ Function to evolve populations. This handles the setting up, evolving and cleaning up of a population of stars. Choices here are: to evolve a population via multiprocessing or linearly on 1 core. NOT IMPLEMENTED YET to evolve a population via a variable grid, a source file or MC Tasks: - TODO: include options for different ways of generating a population here. (i.e. MC or source file) """ # Reset some settings: population_id, results, ensemble_results etc self.grid_options["_population_id"] = uuid.uuid4().hex ## # Prepare code/initialise grid. # set custom logging, set up store_memaddr, build grid code. dry run grid code. self._setup() ## # Evolve systems: via grid_options one can choose to do this linearly, or # multiprocessing method. if ( self.grid_options["evolution_type"] in self.grid_options["_evolution_type_options"] ): if self.grid_options["evolution_type"] == "grid": self._evolve_population_grid() # elif self.grid_options["evolution_type"] == "mc": # # TODO: add MC option else: print( "Warning. you chose a wrong option for the grid evolution types.\ Please choose from the following: {}.".format( self.grid_options["_evolution_type_options"] ) ) # self.grid_options["_end_time_evolution"] = time.time() # Log and print some information verbose_print( "Population-{} finished! The total probability was: {}. It took a total of {}s to run {} systems on {} cores".format( self.grid_options["_population_id"], self.grid_options["_probtot"], self.grid_options["_end_time_evolution"] - self.grid_options["_start_time_evolution"], self.grid_options["_total_starcount"], self.grid_options["amt_cores"], ), self.grid_options["verbosity"], 0, ) if self.grid_options["_errors_found"]: # Some information afterwards verbose_print( "During the run {} failed systems were found, with a total probability of {} and with the following unique error codes: {} ".format( self.grid_options["_failed_count"], self.grid_options["_failed_prob"], self.grid_options["_failed_systems_error_codes"], ), self.grid_options["verbosity"], 0, ) # Some information afterwards verbose_print( "The full argline commands for {} these systems have been written to {}".format( "ALL" if not self.grid_options["_errors_exceeded"] else "SOME (only the first ones, as there were too many to log all of them)", os.path.join( self.grid_options["tmp_dir"], "failed_systems_{}_X.txt".format( self.grid_options["_population_id"] ), ), ), self.grid_options["verbosity"], 0, ) else: verbose_print( "There were no errors found in this run.", self.grid_options["verbosity"], 0, ) def get_stream_logger(self, level=logging.DEBUG): """Return logger with configured StreamHandler.""" stream_logger = logging.getLogger("stream_logger") stream_logger.handlers = [] stream_logger.setLevel(level) sh = logging.StreamHandler() sh.setLevel(level) fmt = "[%(asctime)s %(levelname)-8s %(processName)s] --- %(message)s" formatter = logging.Formatter(fmt) sh.setFormatter(formatter) stream_logger.addHandler(sh) return stream_logger def system_queue_filler(self, job_queue, amt_cores): """ Function that is responsible for keeping the queue filled. This will generate the systems until it is full, and then keeps trying to fill it. Will have to play with the size of this. """ stream_logger = self.get_stream_logger() stream_logger.debug(f"setting up the system_queue_filler now") # Setup of the generator self._generate_grid_code(dry_run=False) self._load_grid_function() generator = self.grid_options["_system_generator"](self, print_results=False) # TODO: build in method to handle with the HPC. # Continously fill the queue for system_number, system_dict in enumerate(generator): # stream_logger.debug(f"producing: {system_number}") # DEBUG job_queue.put((system_number, system_dict)) # Print current size # print("Current size: {}".format(save_que.qsize())) # Send closing signal to workers. When they receive this they will terminate stream_logger.debug(f"Signaling stop to processes") # DEBUG for _ in range(amt_cores): job_queue.put("STOP") def _evolve_population_grid(self): """ Function to evolve the population with multiprocessing approach. Using pathos to be able to include class-owned functions. This function will create a pool with <self.grid_options["amt_cores"]> processes, and perform an imap_unordered to run the different `threads`. Before this was done by giving a generator as the iterable, and have the processes get a certain chunksize each round. Later on this seemed to be a bad decision, because it is difficult to pass information back to the main controller, and because with each new batch of systems a new object instance was created. What I do now is I spawn these X amount of processes, and pass a range(self.grid_options["amt_cores"]) as iterable. In that way, only once do they fetch a `job`, but that job is just a ID number. With this ID number each thread/process loops over the whole generator, but only runs the one <ID>'th system (if (localcounter+ID) % self.grid_options["amt_cores"]==0)' When they are finished, these jobs are instructed to return a set of information (the result dict, TODO: describe what more) These resultation dictionaries are then merged and stored as object properties again. """ # TODO: make further use of a queue to handle jobs or at least # get information on the process ids etc # https://stackoverflow.com/questions/10190981/get-a-unique-id-for-worker-in-python-multiprocessing-pool # https://stackoverflow.com/questions/8640367/python-manager-dict-in-multiprocessing/9536888 # for muting values through dicts # https://python-forum.io/Thread-Dynamic-updating-of-a-nested-dictionary-in-multiprocessing-pool # https://stackoverflow.com/questions/28740955/working-with-pathos-multiprocessing-tool-in-python-and # TODO: make good example of how to deal with a result_dict # https://www.programcreek.com/python/example/58176/multiprocessing.Value # https://stackoverflow.com/questions/17377426/shared-variable-in-pythons-multiprocessing # Set up the manager object that can share info between processes # pathos_multiprocess manager = multiprocessing.Manager() job_queue = manager.Queue(maxsize=self.grid_options['max_queue_size']) result_queue = manager.Queue(maxsize=self.grid_options["amt_cores"]) # Create process instances processes = [] for ID in range(self.grid_options["amt_cores"]): processes.append( multiprocessing.Process( target=self._process_run_population_grid, args=(job_queue, result_queue, ID), ) ) # Activate the processes for p in processes: p.start() # Set up the system_queue self.system_queue_filler(job_queue, amt_cores=self.grid_options["amt_cores"]) # Join the processes for p in processes: p.join() # Handle the results by merging all the dictionaries. How that merging happens exactly is # described in the merge_dicts description. combined_output_dict = {} sentinel = object() for output_dict in iter(result_queue.get, sentinel): print(output_dict) combined_output_dict = merge_dicts(combined_output_dict, output_dict) if result_queue.empty(): break # Put the values back as object properties self.grid_results = combined_output_dict["results"] self.grid_ensemble_results = combined_output_dict[ "ensemble_results" ] # Ensemble results are also passed as output from that dictionary # Add some metadata self.grid_ensemble_results["population_id"] = self.grid_options[ "_population_id" ] self.grid_options["_failed_count"] = combined_output_dict["_failed_count"] self.grid_options["_failed_prob"] = combined_output_dict["_failed_prob"] self.grid_options["_failed_systems_error_codes"] = list( set(combined_output_dict["_failed_systems_error_codes"]) ) self.grid_options["_errors_exceeded"] = combined_output_dict["_errors_exceeded"] self.grid_options["_errors_found"] = combined_output_dict["_errors_found"] self.grid_options["_probtot"] = combined_output_dict["_probtot"] self.grid_options["_count"] = combined_output_dict["_count"] self.grid_options["_total_mass_run"] = combined_output_dict["_total_mass_run"] self.grid_options[ "_total_probability_weighted_mass_run" ] = combined_output_dict["_total_probability_weighted_mass_run"] def _evolve_system_mp(self, full_system_dict): """ Function that the multiprocessing evolution method calls to evolve a system this function is called by _process_run_population_grid """ binary_cmdline_string = self._return_argline(full_system_dict) persistent_data_memaddr = -1 if self.bse_options.get("ensemble", 0) == 1: persistent_data_memaddr = self.persistent_data_memory_dict[self.process_ID] # print("thread {}: persistent_data_memaddr: ".format(self.process_ID), persistent_data_memaddr) # Get results binary_c out = _binary_c_bindings.run_system( argstring=binary_cmdline_string, custom_logging_func_memaddr=self.grid_options[ "custom_logging_func_memaddr" ], store_memaddr=self.grid_options["_store_memaddr"], population=1, # since this system is part of a population, we set this flag to prevent the store from being freed persistent_data_memaddr=persistent_data_memaddr, ) # Check for errors _ = self._check_binary_c_error(out, full_system_dict) # Have some user-defined function do stuff with the data. if self.grid_options["parse_function"]: self.grid_options["parse_function"](self, out) def _process_run_population_grid(self, job_queue, result_queue, ID): """ Function that loops over the whole generator, but only runs systems that fit to: if (localcounter+ID) % self.grid_options["amt_cores"] == 0 That way with e.g. 4 processes, process 1 runs sytem 0, 4, 8... process 2 runs system 1, 5, 9..., etc This function is called by _evolve_population_grid """ # set start timer start_process_time = datetime.datetime.now() # self.process_ID = ( ID # Store the ID as a object property again, lets see if that works. ) stream_logger = self.get_stream_logger() stream_logger.debug(f"Setting up processor: process-{self.process_ID}") # Set to starting up with open( os.path.join( self.grid_options["tmp_dir"], "process_status", "process_{}.txt".format(self.process_ID), ), "w", ) as f: f.write("STARTING") # lets try out making stores for all the grids: self.grid_options["_store_memaddr"] = _binary_c_bindings.return_store_memaddr() verbose_print( "Process {} started at {}.\tUsing store memaddr {}".format( ID, datetime.datetime.now().isoformat(), self.grid_options["_store_memaddr"], ), self.grid_options["verbosity"], 0, ) # Set the ensemble memaddr if self.bse_options.get("ensemble", 0) == 1: # set persistent data memaddr if necessary. persistent_data_memaddr = ( _binary_c_bindings.return_persistent_data_memaddr() ) self.persistent_data_memory_dict = { self.process_ID: persistent_data_memaddr } verbose_print( "\tUsing persistent_data memaddr: {}".format(persistent_data_memaddr), self.grid_options["verbosity"], 0, ) # Set up local variables localcounter = ( 0 # global counter for the whole loop. (need to be ticked every loop) ) probability_of_systems_run = ( 0 # counter for the probability of the actual systems this tread ran ) number_of_systems_run = ( 0 # counter for the actual amt of systems this thread ran ) total_time_calling_binary_c = 0 total_mass_run = 0 total_probability_weighted_mass_run = 0 # Go over the queue for system_number, system_dict in iter(job_queue.get, "STOP"): if localcounter == 0: # Set status to running with open( os.path.join( self.grid_options["tmp_dir"], "process_status", "process_{}.txt".format(self.process_ID), ), "w", ) as f: f.write("RUNNING") # Combine that with the other settings full_system_dict = self.bse_options.copy() full_system_dict.update(system_dict) # In the first system, explicitly check all the keys that are passed to see if # they match the keys known to binary_c. # Won't do that every system cause that is a bit of a waste of computing time. if number_of_systems_run == 0: # TODO: Put this someplace else and wrap in a functioncall for key in full_system_dict.keys(): if not key in self.available_keys: # Deal with special keys if not any( [ True if ( key.startswith(param[:-2]) and len(param[:-2]) < len(key) ) else False for param in self.special_params ] ): msg = "Error: Found a parameter unknown to binary_c: {}. Abort".format( key ) raise ValueError(msg) # self._print_info( # i + 1, self.grid_options["_total_starcount"], full_system_dict # ) # verbose_print( "Process {} is handling system {}".format(ID, system_number), self.grid_options["verbosity"], 2, ) # In some cases, the whole run crashes. To be able to figure out which system # that was on, we log each current system to a file (each thread has one). # Each new system overrides the previous with open( os.path.join( self.grid_options["tmp_dir"], "current_system", "process_{}.txt".format(self.process_ID), ), "w", ) as f: binary_cmdline_string = self._return_argline(full_system_dict) f.write(binary_cmdline_string) start_runtime_binary_c = time.time() # Evolve the system if self.grid_options["_actually_evolve_system"]: self._evolve_system_mp(full_system_dict) end_runtime_binary_c = time.time() total_time_calling_binary_c += ( end_runtime_binary_c - start_runtime_binary_c ) # keep track of total binary_c call time # Debug line: logging all the lines if self.grid_options["log_runtime_systems"] == 1: with open( os.path.join( self.grid_options["tmp_dir"], "runtime_systems", "process_{}.txt".format(self.process_ID), ), "a+", ) as f: binary_cmdline_string = self._return_argline(full_system_dict) f.write( "{} {} '{}'\n".format( start_runtime_binary_c, end_runtime_binary_c - start_runtime_binary_c, binary_cmdline_string, ) ) # Keep track of systems: probability_of_systems_run += full_system_dict["probability"] number_of_systems_run += 1 localcounter += 1 # Tally up some numbers total_mass_system = ( full_system_dict.get("M_1", 0) + full_system_dict.get("M_2", 0) + full_system_dict.get("M_3", 0) + full_system_dict.get("M_4", 0) ) total_mass_run += total_mass_system total_probability_weighted_mass_run += ( total_mass_system * full_system_dict["probability"] ) # Set status to finishing with open( os.path.join( self.grid_options["tmp_dir"], "process_status", "process_{}.txt".format(self.process_ID), ), "w", ) as f: f.write("FINISHING") stream_logger.debug(f"Process-{self.process_ID} is finishing.") # Handle ensemble output: is ensemble==1, then either directly write that data to a file, or combine everything into 1 file. ensemble_json = {} # Make sure it exists already if self.bse_options.get("ensemble", 0) == 1: verbose_print( "Process {}: is freeing ensemble output (using persisten_data memaddr {})".format( ID, self.persistent_data_memory_dict[self.process_ID] ), self.grid_options["verbosity"], 2, ) ensemble_raw_output = ( _binary_c_bindings.free_persistent_data_memaddr_and_return_json_output( self.persistent_data_memory_dict[self.process_ID] ) ) if ensemble_raw_output is None: verbose_print( "Process {}: Warning! Ensemble output is empty. ".format(ID), self.grid_options["verbosity"], 1, ) # if self.grid_options["combine_ensemble_with_thread_joining"] is True: verbose_print( "Process {}: Extracting ensemble info from raw string".format(ID), self.grid_options["verbosity"], 2, ) ensemble_json["ensemble"] = extract_ensemble_json_from_string( ensemble_raw_output ) # Load this into a dict so that we can combine it later else: # If we do not allow this, automatically we will export this to the data_dir, in # some formatted way output_file = os.path.join( self.custom_options["data_dir"], "ensemble_output_{}_{}.json".format( self.grid_options["_population_id"], self.process_ID ), ) # Write to file with open(output_file, "w") as f: f.write(ensemble_raw_output) print( "Thread {}: Wrote ensemble results directly to file: {}".format( self.process_ID, output_file ) ) # free store mem: _binary_c_bindings.free_store_memaddr(self.grid_options["_store_memaddr"]) # Return a set of results and errors output_dict = { "results": self.grid_results, "ensemble_results": ensemble_json, "_failed_count": self.grid_options["_failed_count"], "_failed_prob": self.grid_options["_failed_prob"], "_failed_systems_error_codes": self.grid_options[ "_failed_systems_error_codes" ], "_errors_exceeded": self.grid_options["_errors_exceeded"], "_errors_found": self.grid_options["_errors_found"], "_probtot": probability_of_systems_run, "_count": number_of_systems_run, "_total_mass_run": total_mass_run, "_total_probability_weighted_mass_run": total_probability_weighted_mass_run, } end_process_time = datetime.datetime.now() verbose_print( "Process {} finished:\n\tgenerator started at {}, done at {} (total: {}s of which {}s interfacing with binary_c).\n\tRan {} systems with a total probability of {}.\n\tThis thread had {} failing systems with a total probability of {}".format( ID, start_process_time.isoformat(), end_process_time.isoformat(), (end_process_time - start_process_time).total_seconds(), total_time_calling_binary_c, number_of_systems_run, probability_of_systems_run, self.grid_options["_failed_count"], self.grid_options["_failed_prob"], ), self.grid_options["verbosity"], 0, ) # Write summary summary_dict = { "population_id": self.grid_options["_population_id"], "process_id": self.process_ID, "start_process_time": start_process_time.timestamp(), "end_process_time": end_process_time.timestamp(), "total_time_calling_binary_c": total_time_calling_binary_c, "number_of_systems_run": number_of_systems_run, "probability_of_systems_run": probability_of_systems_run, "failed_systems": self.grid_options["_failed_count"], "failed_probability": self.grid_options["_failed_prob"], "failed_system_error_codes": self.grid_options[ "_failed_systems_error_codes" ], } with open( os.path.join( self.grid_options["tmp_dir"], "process_summary", "process_{}.json".format(self.process_ID), ), "w", ) as f: f.write(json.dumps(summary_dict, indent=4)) # Set status to running with open( os.path.join( self.grid_options["tmp_dir"], "process_status", "process_{}.txt".format(self.process_ID), ), "w", ) as f: f.write("FINISHED") result_queue.put(output_dict) stream_logger.debug(f"Process-{self.process_ID} is finished.") return # Single system def evolve_single(self, clean_up_custom_logging_files: bool = True) -> Any: """ Function to run a single system, based on the settings in the grid_options The output of the run gets returned, unless a parse function is given to this function. Args: clean_up_custom_logging_files: whether the clean up all the custom_logging files. returns: either returns the raw binary_c output, or whatever the parse_function does """ ### Custom logging code: self._set_custom_logging() # Get argument line argline = self._return_argline(self.bse_options) verbose_print("Running {}".format(argline), self.grid_options["verbosity"], 1) # Run system out = _binary_c_bindings.run_system( argstring=argline, custom_logging_func_memaddr=self.grid_options[ "custom_logging_func_memaddr" ], store_memaddr=self.grid_options["_store_memaddr"], population=0, ) # TODO: add call to function that cleans up the temp customlogging dir, # and unloads the loaded libraries. # TODO: make a switch to turn this off if clean_up_custom_logging_files: self._clean_up_custom_logging(evol_type="single") # Parse if self.grid_options["parse_function"]: return self.grid_options["parse_function"](self, out) return out def _setup(self): """ Function to set up the necessary stuff for the population evolution. The idea is to do all the stuff that is necessary for a population to run. Since we have different methods of running a population, this setup function will do different things depending on different settings Tasks: TODO: Make other kinds of populations possible. i.e, read out type of grid, and set up accordingly TODO: make this function more general. Have it explicitly set the system_generator function """ # Make sure the subdirs of the tmp dir exist: os.makedirs( os.path.join(self.grid_options["tmp_dir"], "failed_systems"), exist_ok=True ) os.makedirs( os.path.join(self.grid_options["tmp_dir"], "current_system"), exist_ok=True ) os.makedirs( os.path.join(self.grid_options["tmp_dir"], "process_status"), exist_ok=True ) os.makedirs( os.path.join(self.grid_options["tmp_dir"], "process_summary"), exist_ok=True ) os.makedirs( os.path.join(self.grid_options["tmp_dir"], "runtime_systems"), exist_ok=True ) # Check for parse function if not self.grid_options["parse_function"]: print("Warning: No parse function set. Make sure you intended to do this.") # ####################### # ### Custom logging code: self._set_custom_logging() # ### Load store: Make sure this actually works. ### ensemble: make some checks for this ## check the settings and set all the warnings. if self.bse_options.get("ensemble", None): if not self.bse_options.get("ensemble_defer", 0) == 1: verbose_print( "Error, if you want to run an ensemble in a population, the output needs to be deferred. Please set 'ensemble_defer' to 1", self.grid_options["verbosity"], 0, ) raise ValueError if not any( [key.startswith("ensemble_filter_") for key in self.bse_options] ): verbose_print( "Warning: Running the ensemble without any filter requires alot of available RAM", self.grid_options["verbosity"], 0, ) if self.bse_options.get("ensemble_filters_off", 0) != 1: verbose_print( "Warning: Running the ensemble without any filter requires alot of available RAM", self.grid_options["verbosity"], 0, ) if self.grid_options["combine_ensemble_with_thread_joining"] == False: if not self.custom_options.get("data_dir", None): verbose_print( "Error: chosen to write the ensemble output directly to files but data_dir isnt set", self.grid_options["verbosity"], 0, ) raise ValueError # Check which type of population generation if self.grid_options["evolution_type"] == "grid": ####################### # Dry run and getting starcount self.grid_options["_probtot"] = 0 # Put in check if len(self.grid_options["_grid_variables"]) == 0: print("Error: you havent defined any grid variables! Aborting") raise ValueError # Set up the grid code with a dry run option to see total probability self._generate_grid_code(dry_run=True) # Load the grid code self._load_grid_function() # Do a dry run self._dry_run() print( "Total starcount for this run will be: {}".format( self.grid_options["_total_starcount"] ) ) ####################### # Reset values and prepare the grid function self.grid_options[ "_probtot" ] = 0 # To make sure that the values are reset. TODO: fix this in a cleaner way self.grid_options[ "_start_time_evolution" ] = time.time() # Setting start time of grid # # Making sure the loaded grid code isn't lingering in the main PID # self._generate_grid_code(dry_run=False) # # # self._load_grid_function() # self.grid_options["_system_generator"] = None # Source file elif self.grid_options["evolution_type"] == "source_file": ####################### # Dry run and getting starcount self.grid_options["_probtot"] = 0 # Load the grid code # TODO: fix this function # self._load_source_file_function() # Do a dry run self._dry_run_source_file() print( "Total starcount for this run will be: {}".format( self.grid_options["_total_starcount"] ) ) ####################### # Reset values and prepare the grid function self.grid_options[ "_probtot" ] = 0 # To make sure that the values are reset. TODO: fix this in a cleaner way self.grid_options[ "_start_time_evolution" ] = time.time() # Setting start time of grid # # TODO: fix this function # self._load_source_file_function() # self._load_source_file_function(dry_run=False) # self._load_grid_function() # elif self.grid_options["evolution_type"] == "mc": ####### def _cleanup(self): """ Function that handles all the cleaning up after the grid has been generated and/or run - reset values to 0 - remove grid file - unload grid function/module - remove dry grid file - unload dry grid function/module """ # Reset values self.grid_options["_count"] = 0 self.grid_options["_probtot"] = 0 self.grid_options["_system_generator"] = None self.grid_options["_failed_count"] = 0 self.grid_options["_failed_prob"] = 0 self.grid_options["_errors_found"] = False self.grid_options["_errors_exceeded"] = False self.grid_options["_failed_systems_error_codes"] = [] self.grid_options["_total_mass_run"] = 0 self.grid_options["_total_probability_weighted_mass_run"] = 0 # Remove files # TODO: remove files # Unload functions # TODO: unload functions # Unload/free custom_logging_code # TODO: cleanup custom logging code. ################################################### # Gridcode functions # # Function below are used to run populations with # a variable grid ################################################### def _generate_grid_code(self, dry_run=False): """ Function that generates the code from which the population will be made. dry_run: when True, it will return the starcount at the end so that we know what the total amount of systems is. The phasevol values are handled by generating a second array # TODO: Add correct logging everywhere # TODO: add part to handle separation if orbital_period is added. Idea. use default values # for orbital parameters and possibly overwrite those or something. # TODO: add centering center left right for the spacing. # TODO: add sensible description to this function. # TODO: Check whether all the probability and phasevol values are correct. # TODO: import only the necessary packages/functions # TODO: Put all the masses, eccentricities and periods in there already # TODO: Put the certain blocks that are repeated in some subfunctions Results in a generated file that contains a system_generator function. """ verbose_print("Generating grid code", self.grid_options["verbosity"], 1) # Some local values code_string = "" depth = 0 indent = " " total_grid_variables = len(self.grid_options["_grid_variables"]) # Import packages code_string += "import math\n" code_string += "import numpy as np\n" code_string += "from binarycpython.utils.distribution_functions import *\n" code_string += "from binarycpython.utils.spacing_functions import *\n" code_string += "from binarycpython.utils.useful_funcs import *\n" code_string += "\n\n" # Make the function code_string += "def grid_code(self, print_results=True):\n" # Increase depth depth += 1 # Write some info in the function code_string += ( indent * depth + "# Grid code generated on {}\n".format( datetime.datetime.now().isoformat() ) + indent * depth + "# This function generates the systems that will be evolved with binary_c\n\n" ) # Set some values in the generated code: code_string += indent * depth + "# Setting initial values\n" code_string += indent * depth + "_total_starcount = 0\n" code_string += indent * depth + "starcounts = [0 for i in range({})]\n".format( total_grid_variables ) code_string += indent * depth + "probabilities = {}\n" code_string += ( indent * depth + "probabilities_list = [0 for i in range({})]\n".format( total_grid_variables ) ) code_string += ( indent * depth + "probabilities_sum = [0 for i in range({})]\n".format( total_grid_variables ) ) code_string += indent * depth + "parameter_dict = {}\n" code_string += indent * depth + "phasevol = 1\n" code_string += indent * depth + "\n" # Setting up the system parameters code_string += indent * depth + "M_1 = None\n" code_string += indent * depth + "M_2 = None\n" code_string += indent * depth + "M_3 = None\n" code_string += indent * depth + "M_4 = None\n" code_string += indent * depth + "orbital_period = None\n" code_string += indent * depth + "orbital_period_triple = None\n" code_string += indent * depth + "orbital_period_quadruple = None\n" code_string += indent * depth + "eccentricity = None\n" code_string += indent * depth + "eccentricity2 = None\n" code_string += indent * depth + "eccentricity3 = None\n" code_string += indent * depth + "\n" # Prepare the probability code_string += indent * depth + "# setting probability lists\n" for grid_variable_el in sorted( self.grid_options["_grid_variables"].items(), key=lambda x: x[1]["grid_variable_number"], ): # Make probabilities dict grid_variable = grid_variable_el[1] code_string += indent * depth + 'probabilities["{}"] = 0\n'.format( grid_variable["parameter_name"] ) ################################################################################# # Start of code generation ################################################################################# code_string += indent * depth + "\n" # Generate code print("Generating grid code") for loopnr, grid_variable_el in enumerate( sorted( self.grid_options["_grid_variables"].items(), key=lambda x: x[1]["grid_variable_number"], ) ): print("Constructing/adding: {}".format(grid_variable_el[0])) grid_variable = grid_variable_el[1] ######################### # Setting up the forloop # Add comment for forloop code_string += ( indent * depth + "# for loop for {}".format(grid_variable["parameter_name"]) + "\n" ) code_string += ( indent * depth + "sampled_values_{} = {}".format( grid_variable["name"], grid_variable["spacingfunc"] ) + "\n" ) # TODO: Make clear that the phasevol only works good # TODO: add option to ignore this phasevol calculation and set it to 1 # if you sample linearly in that thing. # if phasevol is <= 0 then we SKIP that whole loop. Its not working then. if ( not grid_variable["dphasevol"] == -1 ): # A method to turn off this calculation and allow a phasevol = 1 code_string += ( indent * depth + "phasevol_{} = sampled_values_{}[1]-sampled_values_{}[0]".format( grid_variable["name"], grid_variable["name"], grid_variable["name"], ) + "\n" ) else: code_string += indent * depth + "phasevol_{} = 1\n".format( grid_variable["name"] ) # # Some print statement # code_string += ( # indent * depth # + "print('phasevol_{}:', phasevol_{})".format(grid_variable["name"], # grid_variable["name"]) # + "\n" # ) # Adding for loop structure code_string += ( indent * depth + "for {} in sampled_values_{}:".format( grid_variable["name"], grid_variable["name"] ) + "\n" ) ################################################################################# # Check condition and generate forloop # If the grid variable has a condition, write the check and the action if grid_variable["condition"]: # Add comment code_string += ( indent * (depth + 1) + "# Condition for {}".format(grid_variable["parameter_name"]) + "\n" ) # Add condition check code_string += ( indent * (depth + 1) + "if not {}:".format(grid_variable["condition"]) + "\n" ) # Add condition failed action: code_string += ( indent * (depth + 2) + 'print("Condition for {} not met!")'.format( grid_variable["parameter_name"] ) + "\n" ) code_string += indent * (depth + 2) + "continue" + "\n" # Add some whiteline code_string += indent * (depth + 1) + "\n" ##############3 # Add phasevol check: code_string += ( indent * (depth + 1) + "if phasevol_{} <= 0:".format(grid_variable["name"]) + "\n" ) # Add phasevol check action: code_string += ( indent * (depth + 2) + 'print("phasevol_{} <= 0!")'.format(grid_variable["name"]) + "\n" ) code_string += indent * (depth + 2) + "continue" + "\n" # Add some whiteline code_string += indent * (depth + 1) + "\n" ######################### # Setting up pre-code and value in some cases # Add pre-code if grid_variable["precode"]: code_string += ( indent * (depth + 1) + "{}".format( grid_variable["precode"].replace( "\n", "\n" + indent * (depth + 1) ) ) + "\n" ) # Set phasevol code_string += indent * (depth + 1) + "phasevol *= phasevol_{}\n".format( grid_variable["name"], ) ####################### # Probabilities # Calculate probability code_string += indent * (depth + 1) + "\n" code_string += indent * (depth + 1) + "# Setting probabilities\n" code_string += ( indent * (depth + 1) + "d{} = phasevol_{} * {}".format( grid_variable["name"], grid_variable["name"], grid_variable["probdist"], ) + "\n" ) # Saving probability sum code_string += ( indent * (depth + 1) + "probabilities_sum[{}] += d{}".format( grid_variable["grid_variable_number"], grid_variable["name"] ) + "\n" ) if grid_variable["grid_variable_number"] == 0: code_string += ( indent * (depth + 1) + "probabilities_list[0] = d{}".format(grid_variable["name"]) + "\n" ) else: code_string += ( indent * (depth + 1) + "probabilities_list[{}] = probabilities_list[{}] * d{}".format( grid_variable["grid_variable_number"], grid_variable["grid_variable_number"] - 1, grid_variable["name"], ) + "\n" ) ####################### # Increment starcount for this parameter code_string += "\n" code_string += indent * ( depth + 1 ) + "# Increment starcount for {}\n".format(grid_variable["parameter_name"]) code_string += ( indent * (depth + 1) + "starcounts[{}] += 1".format( grid_variable["grid_variable_number"], ) + "\n" ) # Add value to dict code_string += ( indent * (depth + 1) + 'parameter_dict["{}"] = {}'.format( grid_variable["parameter_name"], grid_variable["parameter_name"] ) + "\n" ) # Add some space code_string += "\n" # The final parts of the code, where things are returned, are within the deepest loop, # but in some cases code from a higher loop needs to go under it again # SO I think its better to put an ifstatement here that checks # whether this is the last loop. if loopnr == len(self.grid_options["_grid_variables"]) - 1: code_string = self._write_gridcode_system_call( code_string, indent, depth, grid_variable, dry_run, grid_variable["branchpoint"], ) # increment depth depth += 1 depth -= 1 code_string += "\n" # Write parts to write below the part that yield the results. # this has to go in a reverse order: # Here comes the stuff that is put after the deepest nested part that calls returns stuff. # Here we will have a reverse_sorted_grid_variables = sorted( self.grid_options["_grid_variables"].items(), key=lambda x: x[1]["grid_variable_number"], reverse=True, ) for loopnr, grid_variable_el in enumerate(reverse_sorted_grid_variables): grid_variable = grid_variable_el[1] code_string += indent * (depth + 1) + "#" * 40 + "\n" code_string += ( indent * (depth + 1) + "# Code below is for finalising the handling of this iteration of the parameter\n" ) # Set phasevol # TODO: fix. this isnt supposed to be the value that we give it here. discuss code_string += indent * (depth + 1) + "phasevol /= phasevol_{}\n".format( grid_variable["name"] ) code_string += indent * (depth + 1) + "\n" depth -= 1 # Check the branchpoint part here. The branchpoint makes sure that we can construct # a grid with several multiplicities and still can make the system calls for each # multiplicity without reconstructing the grid each time if grid_variable["branchpoint"] == 1: # Add comment code_string += ( indent * (depth + 1) + "# Condition for branchpoint at {}".format( reverse_sorted_grid_variables[loopnr + 1][1]["parameter_name"] ) + "\n" ) # Add condition check code_string += ( indent * (depth + 1) + "if not {}:".format(grid_variable["condition"]) + "\n" ) code_string = self._write_gridcode_system_call( code_string, indent, depth + 1, reverse_sorted_grid_variables[loopnr + 1][1], dry_run, grid_variable["branchpoint"], ) code_string += "\n" ################ # Finalising print statements # # code_string += indent * (depth + 1) + "\n" code_string += indent * (depth + 1) + "#" * 40 + "\n" code_string += indent * (depth + 1) + "if print_results:\n" code_string += ( indent * (depth + 2) + "print('Grid has handled {} stars'.format(_total_starcount))\n" ) code_string += ( indent * (depth + 2) + "print('with a total probability of {}'.format(self.grid_options['_probtot']))\n" ) ################ # Finalising return statement for dry run. # if dry_run: code_string += indent * (depth + 1) + "return _total_starcount\n" ################################################################################# # Stop of code generation. Here the code is saved and written # Save the gridcode to the grid_options verbose_print( "Saving grid code to grid_options", self.grid_options["verbosity"], 1 ) self.grid_options["code_string"] = code_string # Write to file gridcode_filename = os.path.join( self.grid_options["tmp_dir"], "binary_c_grid_{}.py".format(self.grid_options["_population_id"]), ) self.grid_options["gridcode_filename"] = gridcode_filename verbose_print( "Writing grid code to {}".format(gridcode_filename), self.grid_options["verbosity"], 1, ) with open(gridcode_filename, "w") as file: file.write(code_string) def _write_gridcode_system_call( self, code_string, indent, depth, grid_variable, dry_run, branchpoint ): ################################################################################# # Here are the calls to the queuing or other solution. this part is for every system # Add comment code_string += indent * (depth + 1) + "#" * 40 + "\n" if branchpoint: code_string += ( indent * (depth + 1) + "# Code below will get evaluated for every system at this level of multiplicity (last one of that being {})\n" ).format(grid_variable["name"]) else: code_string += ( indent * (depth + 1) + "# Code below will get evaluated for every generated system\n" ) # Calculate value code_string += ( indent * (depth + 1) + 'probability = self.grid_options["weight"] * probabilities_list[{}]'.format( grid_variable["grid_variable_number"] ) + "\n" ) # TODO: ask rob if just replacing this with probability is enough code_string += ( indent * (depth + 1) # + 'repeat_probability = probability / self.grid_options["repeat"]' + 'probability = probability / self.grid_options["repeat"]' + "\n" ) # For each repeat of the system this has to be done yes. code_string += ( indent * (depth + 1) + 'for _ in range(self.grid_options["repeat"]):' + "\n" ) code_string += indent * (depth + 2) + "_total_starcount += 1\n" # set probability and phasevol values code_string += ( indent * (depth + 2) + 'parameter_dict["{}"] = {}'.format("probability", "probability") + "\n" ) code_string += ( indent * (depth + 2) + 'parameter_dict["{}"] = {}'.format("phasevol", "phasevol") + "\n" ) # Some prints. will be removed # code_string += indent * (depth + 1) + "print(probabilities)\n" # code_string += ( # indent * (depth + 1) + 'print("_total_starcount: ", _total_starcount)\n' # ) # code_string += indent * (depth + 1) + "print(probability)\n" # Increment total probability code_string += indent * (depth + 2) + "self._increment_probtot(probability)\n" if not dry_run: # Handling of what is returned, or what is not. # TODO: think of whether this is a good method # code_string += indent * (depth + 2) + "if (self.grid_options['multiplicity'] >= 2): print('phasevol_q: ',phasevol_q); print('phasevol_log10per: ',phasevol_log10per);\n" # code_string += indent * (depth + 2) + "print('phasevol_lnm1: ',phasevol_lnm1); print('phasevol_multiplicity: ',phasevol_multiplicity);\n" # code_string += indent * (depth + 2) + "print(probabilities_list)\n" # code_string += indent * (depth + 2) + "print(parameter_dict)\n" # code_string += indent * (depth + 2) + "print('YOO IK GA LEKKER NOG EEN RONDJE')\n" code_string += indent * (depth + 2) + "yield(parameter_dict)\n" # If its a dry run, dont do anything with it else: # code_string += indent * (depth + 2) + "if (self.grid_options['multiplicity'] >= 2): print(phasevol_q)\n" # code_string += indent * (depth + 2) + "print(parameter_dict)\n" code_string += indent * (depth + 2) + "pass\n" code_string += indent * (depth + 1) + "#" * 40 + "\n" return code_string def _load_grid_function(self): """ Functon that loads the script containing the grid code. TODO: Update this description Test function to run grid stuff. mostly to test the import """ # Code to load the verbose_print( message="Loading grid code function from {}".format( self.grid_options["gridcode_filename"] ), verbosity=self.grid_options["verbosity"], minimal_verbosity=1, ) spec = importlib.util.spec_from_file_location( "binary_c_python_grid", os.path.join(self.grid_options["gridcode_filename"]), ) grid_file = importlib.util.module_from_spec(spec) spec.loader.exec_module(grid_file) generator = grid_file.grid_code self.grid_options["_system_generator"] = generator verbose_print("Grid code loaded", self.grid_options["verbosity"], 1) def _dry_run(self): """ Function to dry run the grid and know how many stars it will run Requires the grid to be built as a dry run grid """ system_generator = self.grid_options["_system_generator"] total_starcount = system_generator(self) self.grid_options["_total_starcount"] = total_starcount def _print_info(self, run_number, total_systems, full_system_dict): """ Function to print info about the current system and the progress of the grid. # color info tricks from https://ozzmaker.com/add-colour-to-text-in-python/ https://stackoverflow.com/questions/287871/how-to-print-colored-text-in-terminal-in-python """ # Define frequency if self.grid_options["verbosity"] == 1: print_freq = 1 else: print_freq = 10 # Calculate amount of time left # calculate amount of time passed # time_passed = time.time() - self.grid_options["_start_time_evolution"] if run_number % print_freq == 0: binary_cmdline_string = self._return_argline(full_system_dict) info_string = "{color_part_1} \ {text_part_1}{end_part_1}{color_part_2} \ {text_part_2}{end_part_2}".format( color_part_1="\033[1;32;41m", text_part_1="{}/{}".format(run_number, total_systems), end_part_1="\033[0m", color_part_2="\033[1;32;42m", text_part_2="{}".format(binary_cmdline_string), end_part_2="\033[0m", ) print(info_string) ################################################### # Montecarlo functions # # Functions below are used to run populations with # Monte carlo ################################################### ################################################### # Population from file functions # # Functions below are used to run populations from # a file containing binary_c calls ################################################### def _dry_run_source_file(self): """ Function to go through the source_file and count the amount of lines and the total probability """ system_generator = self.grid_options["_system_generator"] total_starcount = 0 total_probability = 0 contains_probability = False for line in system_generator: total_starcount += 1 total_starcount = system_generator(self) self.grid_options["_total_starcount"] = total_starcount def _load_source_file(self, check=False): """ Function that loads the source_file that contains a binary_c calls """ if not os.path.isfile(self.grid_options["source_file_filename"]): verbose_print("Source file doesnt exist", self.grid_options["verbosity"], 0) verbose_print( message="Loading source file from {}".format( self.grid_options["gridcode_filename"] ), verbosity=self.grid_options["verbosity"], minimal_verbosity=1, ) # We can choose to perform a check on the sourcefile, which checks if the lines start with 'binary_c' if check: source_file_check_filehandle = open( self.grid_options["source_file_filename"], "r" ) for line in source_file_check_filehandle: if not line.startswith("binary_c"): failed = True break if failed: verbose_print( "Error, sourcefile contains lines that do not start with binary_c", self.grid_options["verbosity"], 0, ) raise ValueError source_file_filehandle = open(self.grid_options["source_file_filename"], "r") self.grid_options["_system_generator"] = source_file_filehandle verbose_print("Source file loaded", self.grid_options["verbosity"], 1) def _dict_from_line_source_file(self, line): """ Function that creates a dict from a binary_c argline """ if line.startswith("binary_c "): line = line.replace("binary_c ", "") split_line = line.split() arg_dict = {} for i in range(0, len(split_line), 2): if "." in split_line[i + 1]: arg_dict[split_line[i]] = float(split_line[i + 1]) else: arg_dict[split_line[i]] = int(split_line[i + 1]) return arg_dict ################################################### # SLURM functions # # subroutines to run SLURM grids ################################################### # def _slurm_grid(self): # """ # Main function that manages the SLURM setup. # Has three stages: # - setup # - evolve # - join # Which stage is used is determined by the value of grid_options['slurm_command']: # <empty>: the function will know its the user that executed the script and # it will set up the necessary condor stuff # 'evolve': evolve_population is called to evolve the population of stars # 'join': We will attempt to join the output # """ # # Check version # # TODO: Put in function # slurm_version = get_slurm_version() # if not slurm_version: # verbose_print( # "SLURM: Error: No installation of slurm found", # self.grid_options["verbosity"], # 0, # ) # else: # major_version = int(slurm_version.split(".")[0]) # minor_version = int(slurm_version.split(".")[1]) # if major_version > 17: # verbose_print( # "SLURM: Found version {} which is new enough".format(slurm_version), # self.grid_options["verbosity"], # 1, # ) # else: # verbose_print( # "SLURM: Found version {} which is too old (we require 17+)".format( # slurm_version # ), # self.grid_options["verbosity"], # 0, # ) # verbose_print( # "SLURM: Running slurm grid. command={}".format( # self.grid_options["slurm_command"] # ), # self.grid_options["verbosity"], # 1, # ) # if not self.grid_options["slurm_command"]: # # Setting up # verbose_print( # "SLURM: Main controller script. Setting up", # self.grid_options["verbosity"], # 1, # ) # # Set up working directories: # verbose_print( # "SLURM: creating working directories", self.grid_options["verbosity"], 1 # ) # create_directories_hpc(self.grid_options["slurm_dir"]) # # Create command # python_details = get_python_details() # scriptname = path_of_calling_script() # command = "{} {}".format(python_details["executable"], scriptname) # command += ' --cmdline "{}"'.format( # " ".join( # [ # "{}".format(self.grid_options["_commandline_input"]), # "offset=$jobarrayindex", # "modulo={}".format(self.grid_options["slurm_njobs"]), # "vb={}".format(self.grid_options["verbosity"]), # "slurm_jobid=$jobid", # "slurm_jobarrayindex=$jobarrayindex", # "slurm_jobname='binary_grid_'$jobid'.'$jobarrayindex", # "slurm_njobs={}".format(self.grid_options["slurm_njobs"]), # "slurm_dir={}".format(self.grid_options["slurm_dir"]), # "rungrid=1", # "slurm_command=evolve", # ] # ).strip() # ) # # Construct dict with settings for the script while checking the settings at the same time # # Check settings: # # TODO: check settings # # Create SLURM_DIR script: # slurm_script_options = {} # slurm_script_options["n"] = self.grid_options["slurm_njobs"] # slurm_script_options["njobs"] = self.grid_options["slurm_njobs"] # slurm_script_options["dir"] = self.grid_options["slurm_dir"] # slurm_script_options["memory"] = self.grid_options["slurm_memory"] # slurm_script_options["working_dir"] = self.grid_options[ # "slurm_dir" # ] # TODO: check this # slurm_script_options["command"] = command # # slurm_script_options['streams'] = self.grid_options['streams'] # # Construct the script # slurm_script_contents = "" # slurm_script_contents += "#!/bin/bash\n" # slurm_script_contents += "# Slurm file for binary_grid and slurm\n" # slurm_script_contents += "#SBATCH --error={}/stderr/%A.%a\n".format( # self.grid_options["slurm_dir"] # ) # slurm_script_contents += "#SBATCH --output={}/stdout/%A.%a\n".format( # self.grid_options["slurm_dir"] # ) # slurm_script_contents += "#SBATCH --job-name={}\n".format( # self.grid_options["slurm_jobname"] # ) # slurm_script_contents += "#SBATCH --partition={}\n".format( # self.grid_options["slurm_partition"] # ) # slurm_script_contents += "#SBATCH --time={}\n".format( # self.grid_options["slurm_time"] # ) # slurm_script_contents += "#SBATCH --mem={}\n".format( # self.grid_options["slurm_memory"] # ) # slurm_script_contents += "#SBATCH --ntasks={}\n".format( # self.grid_options["slurm_ntasks"] # ) # slurm_script_contents += "#SBATCH --array={}\n".format( # self.grid_options["slurm_array"] # ) # slurm_script_contents += "\n" # if self.grid_options["slurm_extra_settings"]: # slurm_script_contents += "# Extra settings by user:" # slurm_script_contents += "\n".join( # [ # "--{}={}".format( # key, self.grid_options["slurm_extra_settings"][key] # ) # for key in self.grid_options["slurm_extra_settings"] # ] # ) # slurm_script_contents += '# set status to "running"\n' # slurm_script_contents += ( # 'echo "running" > {}/status/$jobid.$jobarrayindex\n\n'.format( # self.grid_options["slurm_dir"] # ) # ) # slurm_script_contents += "# run grid of stars\n" # slurm_script_contents += "{}\n\n".format(command) # slurm_script_contents += '# set status to "finished"\n' # slurm_script_contents += ( # 'echo "finished" > {}/status/$jobid.$jobarrayindex\n'.format( # self.grid_options["slurm_dir"] # ) # ) # slurm_script_contents += "\n" # if self.grid_options["slurm_postpone_join"]: # slurm_script_contents += "{} rungrid=0 results_hash_dumpfile={}/results/$jobid.all slurm_command=join\n".format( # command, self.grid_options["slurm_dir"] # ) # # Write script to file # slurm_script_filename = os.path.join( # self.grid_options["slurm_dir"], "slurm_script" # ) # with open(slurm_script_filename, "w") as slurm_script_file: # slurm_script_file.write(slurm_script_contents) # # Execute or postpone # if self.grid_options["slurm_postpone_sbatch"]: # # Execute or postpone the real call to sbatch # sbatch_command = "sbatch {}".format(slurm_script_filename) # verbose_print( # "running slurm script {}".format(slurm_script_filename), # self.grid_options["verbosity"], # 0, # ) # # subprocess.Popen(sbatch_command, close_fds=True) # # subprocess.Popen(sbatch_command, creationflags=subprocess.DETACHED_PROCESS) # verbose_print("Submitted scripts.", self.grid_options["verbosity"], 0) # else: # verbose_print( # "Slurm script is in {} but hasnt been executed".format( # slurm_script_filename # ), # self.grid_options["verbosity"], # 0, # ) # verbose_print("all done!", self.grid_options["verbosity"], 0) # exit() # elif self.grid_options["slurm_command"] == "evolve": # # Part to evolve the population. # # TODO: decide how many CPUs # verbose_print( # "SLURM: Evolving population", self.grid_options["verbosity"], 1 # ) # # # self.evolve_population() # elif self.grid_options["slurm_command"] == "join": # # Joining the output. # verbose_print("SLURM: Joining results", self.grid_options["verbosity"], 1) ################################################### # CONDOR functions # # subroutines to run CONDOR grids ################################################### # def _condor_grid(self): # """ # Main function that manages the CONDOR setup. # Has three stages: # - setup # - evolve # - join # Which stage is used is determined by the value of grid_options['condor_command']: # <empty>: the function will know its the user that executed the script and # it will set up the necessary condor stuff # 'evolve': evolve_population is called to evolve the population of stars # 'join': We will attempt to join the output # """ # # TODO: Put in function # condor_version = get_condor_version() # if not condor_version: # verbose_print( # "CONDOR: Error: No installation of condor found", # self.grid_options["verbosity"], # 0, # ) # else: # major_version = int(condor_version.split(".")[0]) # minor_version = int(condor_version.split(".")[1]) # if (major_version == 8) and (minor_version > 4): # verbose_print( # "CONDOR: Found version {} which is new enough".format( # condor_version # ), # self.grid_options["verbosity"], # 0, # ) # elif major_version > 9: # verbose_print( # "CONDOR: Found version {} which is new enough".format( # condor_version # ), # self.grid_options["verbosity"], # 0, # ) # else: # verbose_print( # "CONDOR: Found version {} which is too old (we require 8.3/8.4+)".format( # condor_version # ), # self.grid_options["verbosity"], # 0, # ) # verbose_print( # "Running Condor grid. command={}".format( # self.grid_options["condor_command"] # ), # self.grid_options["verbosity"], # 1, # ) # if not self.grid_options["condor_command"]: # # Setting up # verbose_print( # "CONDOR: Main controller script. Setting up", # self.grid_options["verbosity"], # 1, # ) # # Set up working directories: # verbose_print( # "CONDOR: creating working directories", # self.grid_options["verbosity"], # 1, # ) # create_directories_hpc(self.grid_options["condor_dir"]) # # Create command # current_workingdir = os.getcwd() # python_details = get_python_details() # scriptname = path_of_calling_script() # # command = "".join([ # # "{}".python_details['executable'], # # "{}".scriptname, # # "offset=$jobarrayindex", # # "modulo={}".format(self.grid_options['condor_njobs']), # # "vb={}".format(self.grid_options['verbosity']) # # "results_hash_dumpfile=$self->{_grid_options}{slurm_dir}/results/$jobid.$jobarrayindex", # # 'slurm_jobid='.$jobid, # # 'slurm_jobarrayindex='.$jobarrayindex, # # 'slurm_jobname=binary_grid_'.$jobid.'.'.$jobarrayindex, # # "slurm_njobs=$njobs", # # "slurm_dir=$self->{_grid_options}{slurm_dir}", # # ); # # Create directory with info for the condor script. By creating this directory we also check whether all the values are set correctly # # TODO: create the condor script. # condor_script_options = {} # # condor_script_options['n'] = # condor_script_options["njobs"] = self.grid_options["condor_njobs"] # condor_script_options["dir"] = self.grid_options["condor_dir"] # condor_script_options["memory"] = self.grid_options["condor_memory"] # condor_script_options["working_dir"] = self.grid_options[ # "condor_working_dir" # ] # condor_script_options["command"] = self.grid_options["command"] # condor_script_options["streams"] = self.grid_options["streams"] # # TODO: condor works with running an executable. # # Create script contents # condor_script_contents = "" # condor_script_contents += """ # ################################################# # # # # Condor script to run a binary_grid via python # # # ################################################# # """ # condor_script_contents += "Executable\t= {}".format(executable) # condor_script_contents += "arguments\t= {}".format(arguments) # condor_script_contents += "environment\t= {}".format(environment) # condor_script_contents += "universe\t= {}".format( # self.grid_options["condor_universe"] # ) # condor_script_contents += "\n" # condor_script_contents += "output\t= {}/stdout/$id\n".format( # self.grid_options["condor_dir"] # ) # condor_script_contents += "error\t={}/sterr/$id".format( # self.grid_options["condor_dir"] # ) # condor_script_contents += "log\t={}\n".format( # self.grid_options["condor_dir"] # ) # condor_script_contents += "initialdir\t={}\n".format(current_workingdir) # condor_script_contents += "remote_initialdir\t={}\n".format( # current_workingdir # ) # condor_script_contents += "\n" # condor_script_contents += "steam_output\t={}".format(stream) # condor_script_contents += "steam_error\t={}".format(stream) # condor_script_contents += "+WantCheckpoint = False" # condor_script_contents += "\n" # condor_script_contents += "request_memory\t={}".format( # self.grid_options["condor_memory"] # ) # condor_script_contents += "ImageSize\t={}".format( # self.grid_options["condor_memory"] # ) # condor_script_contents += "\n" # if self.grid_options["condor_extra_settings"]: # slurm_script_contents += "# Extra settings by user:" # slurm_script_contents += "\n".join( # [ # "{}\t={}".format( # key, self.grid_options["condor_extra_settings"][key] # ) # for key in self.grid_options["condor_extra_settings"] # ] # ) # condor_script_contents += "\n" # # request_memory = $_[0]{memory} # # ImageSize = $_[0]{memory} # # Requirements = (1) \&\& (". # # $self->{_grid_options}{condor_requirements}.")\n"; # # # # file name: my_program.condor # # Condor submit description file for my_program # # Executable = my_program # # Universe = vanilla # # Error = logs/err.$(cluster) # # Output = logs/out.$(cluster) # # Log = logs/log.$(cluster) # # should_transfer_files = YES # # when_to_transfer_output = ON_EXIT # # transfer_input_files = files/in1,files/in2 # # Arguments = files/in1 files/in2 files/out1 # # Queue # # Write script contents to file # if self.grid_options["condor_postpone_join"]: # condor_script_contents += "{} rungrid=0 results_hash_dumpfile={}/results/$jobid.all condor_command=join\n".format( # command, self.grid_options["condor_dir"] # ) # condor_script_filename = os.path.join( # self.grid_options["condor_dir"], "condor_script" # ) # with open(condor_script_filename, "w") as condor_script_file: # condor_script_file.write(condor_script_contents) # if self.grid_options["condor_postpone_sbatch"]: # # Execute or postpone the real call to sbatch # submit_command = "condor_submit {}".format(condor_script_filename) # verbose_print( # "running condor script {}".format(condor_script_filename), # self.grid_options["verbosity"], # 0, # ) # # subprocess.Popen(sbatch_command, close_fds=True) # # subprocess.Popen(sbatch_command, creationflags=subprocess.DETACHED_PROCESS) # verbose_print("Submitted scripts.", self.grid_options["verbosity"], 0) # else: # verbose_print( # "Condor script is in {} but hasnt been executed".format( # condor_script_filename # ), # self.grid_options["verbosity"], # 0, # ) # verbose_print("all done!", self.grid_options["verbosity"], 0) # exit() # elif self.grid_options["condor_command"] == "evolve": # # TODO: write this function # # Part to evolve the population. # # TODO: decide how many CPUs # verbose_print( # "CONDOR: Evolving population", self.grid_options["verbosity"], 1 # ) # # # self.evolve_population() # elif self.grid_options["condor_command"] == "join": # # TODO: write this function # # Joining the output. # verbose_print("CONDOR: Joining results", self.grid_options["verbosity"], 1) # pass ################################################### # Unordered functions # # Functions that arent ordered yet ################################################### def write_binary_c_calls_to_file( self, output_dir: Union[str, None] = None, output_filename: Union[str, None] = None, include_defaults: bool = False, ) -> None: """ Function that loops over the gridcode and writes the generated parameters to a file. In the form of a commandline call Only useful when you have a variable grid as system_generator. MC wouldnt be that useful Also, make sure that in this export there are the basic parameters like m1,m2,sep, orb-per, ecc, probability etc. On default this will write to the datadir, if it exists WARNING; dont use yet. not fully tested. Tasks: - TODO: test this function - TODO: make sure the binary_c_python .. output file has a unique name Args: output_dir: (optional, default = None) directory where to write the file to. If custom_options['data_dir'] is present, then that one will be used first, and then the output_dir output_filename: (optional, default = None) filename of the output. If not set it will be called "binary_c_calls.txt" include_defaults: (optional, default = None) whether to include the defaults of binary_c in the lines that are written. Beware that this will result in very long lines, and it might be better to just export the binary_c defaults and keep them in a seperate file. """ # Check if there is no compiled grid yet. If not, lets try to build it first. if not self.grid_options["_system_generator"]: ## check the settings: if self.bse_options.get("ensemble", None): if self.bse_options["ensemble"] == 1: if not self.bse_options.get("ensemble_defer", 0) == 1: verbose_print( "Error, if you want to run an ensemble in a population, the output needs to be deferred", self.grid_options["verbosity"], 0, ) raise ValueError # Put in check if len(self.grid_options["_grid_variables"]) == 0: print("Error: you havent defined any grid variables! Aborting") raise ValueError # self._generate_grid_code(dry_run=False) # self._load_grid_function() # then if the _system_generator is present, we go through it if self.grid_options["_system_generator"]: # Check if there is an output dir configured if self.custom_options.get("data_dir", None): binary_c_calls_output_dir = self.custom_options["data_dir"] # otherwise check if theres one passed to the function else: if not output_dir: print( "Error. No data_dir configured and you gave no output_dir. Aborting" ) raise ValueError binary_c_calls_output_dir = output_dir # check if theres a filename passed to the function if output_filename: binary_c_calls_filename = output_filename # otherwise use default value else: binary_c_calls_filename = "binary_c_calls.txt" binary_c_calls_full_filename = os.path.join( binary_c_calls_output_dir, binary_c_calls_filename ) print("Writing binary_c calls to {}".format(binary_c_calls_full_filename)) # Write to file with open(binary_c_calls_full_filename, "w") as file: # Get defaults and clean them, then overwrite them with the set values. if include_defaults: # TODO: make sure that the defaults here are cleaned up properly cleaned_up_defaults = self.cleaned_up_defaults full_system_dict = cleaned_up_defaults.copy() full_system_dict.update(self.bse_options.copy()) else: full_system_dict = self.bse_options.copy() for system in self.grid_options["_system_generator"](self): # update values with current system values full_system_dict.update(system) binary_cmdline_string = self._return_argline(full_system_dict) file.write(binary_cmdline_string + "\n") else: print("Error. No grid function found!") raise ValueError def _cleanup_defaults(self): """ Function to clean up the default values: from a dictionary, removes the entries that have the following values: - "NULL" - "" - "Function" Uses the function from utils.functions TODO: Rethink this functionality. seems a bit double, could also be just outside of the class """ binary_c_defaults = self._return_binary_c_defaults().copy() cleaned_dict = filter_arg_dict(binary_c_defaults) return cleaned_dict def _clean_up_custom_logging(self, evol_type): """ Function to clean up the custom logging. Has two types: 'single': - removes the compiled shared library (which name is stored in grid_options['_custom_logging_shared_library_file']) - TODO: unloads/frees the memory allocated to that shared library (which is stored in grid_options['custom_logging_func_memaddr']) - sets both to None 'multiple': - TODO: make this and design this """ if evol_type == "single": verbose_print( "Cleaning up the custom logging stuff. type: single", self.grid_options["verbosity"], 1, ) # TODO: Unset custom logging code # TODO: Unset function memory adress # print(self.grid_options["custom_logging_func_memaddr"]) # remove shared library files if self.grid_options["_custom_logging_shared_library_file"]: remove_file( self.grid_options["_custom_logging_shared_library_file"], self.grid_options["verbosity"], ) if evol_type == "population": verbose_print( "Cleaning up the custom logging stuffs. type: population", self.grid_options["verbosity"], 1, ) # TODO: make sure that these also work. not fully sure if necessary tho. # whether its a single file, or a dict of files/memaddresses if evol_type == "MC": pass def _increment_probtot(self, prob): """ Function to add to the total probability. For now not used """ self.grid_options["_probtot"] += prob def _increment_count(self): """ Function to add to the total amount of stars. For now not used """ self.grid_options["_count"] += 1 def _set_loggers(self): """ Function to set the loggers for the execution of the grid """ # Set logfile binary_c_logfile = self.grid_options["log_file"] # Create directory os.makedirs(os.path.dirname(binary_c_logfile), exist_ok=True) # Set up logger self.logger = logging.getLogger("binary_c_python_logger") self.logger.setLevel(self.grid_options["verbosity"]) # Reset handlers self.logger.handlers = [] # Set formatting of output log_formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) # Make and add filehandlers # make handler for output to file handler_file = logging.FileHandler(filename=os.path.join(binary_c_logfile)) handler_file.setFormatter(log_formatter) handler_file.setLevel(logging.INFO) # Make handler for output to stdout handler_stdout = logging.StreamHandler(sys.stdout) handler_stdout.setFormatter(log_formatter) handler_stdout.setLevel(logging.INFO) # Add the loggers self.logger.addHandler(handler_file) self.logger.addHandler(handler_stdout) # def join_result_dicts(self): # """ # Function to join the result dictionaries # """ def _check_binary_c_error(self, binary_c_output, system_dict): """ Function to check whether binary_c throws an error and handle accordingly. """ if binary_c_output: if (binary_c_output.splitlines()[0].startswith("SYSTEM_ERROR")) or ( binary_c_output.splitlines()[-1].startswith("SYSTEM_ERROR") ): verbose_print( "FAILING SYSTEM FOUND", self.grid_options["verbosity"], 0, ) # Keep track of the amount of failed systems and their error codes self.grid_options["_failed_prob"] += system_dict["probability"] self.grid_options["_failed_count"] += 1 self.grid_options["_errors_found"] = True # Try catching the error code and keep track of the unique ones. try: error_code = int( binary_c_output.splitlines()[0] .split("with error code")[-1] .split(":")[0] .strip() ) if ( not error_code in self.grid_options["_failed_systems_error_codes"] ): self.grid_options["_failed_systems_error_codes"].append( error_code ) except ValueError: verbose_print( "Failed to extract the error-code", self.grid_options["verbosity"], 1, ) # Check if we have exceeded the amount of errors if ( self.grid_options["_failed_count"] > self.grid_options["failed_systems_threshold"] ): if not self.grid_options["_errors_exceeded"]: verbose_print( "Process {} exceeded the maximum ({}) amount of failing systems. Stopped logging them to files now".format( self.process_ID, self.grid_options["failed_systems_threshold"], ), self.grid_options["verbosity"], 1, ) self.grid_options["_errors_exceeded"] = True # If not, write the failing systems to files unique to each process else: # Write arglines to file argstring = self._return_argline(system_dict) with open( os.path.join( self.grid_options["tmp_dir"], "failed_systems", "process_{}.txt".format(self.process_ID), ), "a+", ) as f: f.write(argstring + "\n") else: verbose_print( "binary_c had 0 output. Which is strange", self.grid_options["verbosity"], 2, ) ################################################################################################ def Moe_de_Stefano_2017(self, options=None): """ Class method of the population class Takes a dictionary as its only argument The dictionary should contain a file to """ # Require input options if not options: print("Please provide a options dictionary.") raise ValueError # TODO: put this in defaults default_options = copy.deepcopy(moe_distefano_default_options) # Take the option dictionary that was given and override. options = update_dicts(default_options, options) # Write options to a file with open("/tmp/moeopts.dat", "w") as f: f.write(json.dumps(options, indent=4)) json_data = get_moe_distefano_dataset(options) # entry of log10M1 is a list containing 1 dict. We can take the dict out of the list json_data["log10M1"] = json_data["log10M1"][0] # Get all the masses logmasses = sorted(json_data["log10M1"].keys()) if not logmasses: print("The table does not contain masses") raise ValueError with open("/tmp/moe.log", "w") as logfile: logfile.write("log₁₀Masses(M☉) {}\n".format(logmasses)) # Get all the periods and see if they are all consistently present logperiods = [] for logmass in logmasses: if not logperiods: logperiods = sorted(json_data["log10M1"][logmass]["logP"]) dlog10P = float(logperiods[1]) - float(logperiods[0]) current_logperiods = sorted(json_data["log10M1"][logmass]["logP"]) if not (logperiods == current_logperiods): print("Period values are not consistent throughout the dataset") raise ValueError ############################################################ # log10period binwidth : of course this assumes a fixed # binwidth, so we check for this too. for i in range(len(current_logperiods) - 1): if not dlog10P == ( float(current_logperiods[i + 1]) - float(current_logperiods[i]) ): print("Period spacing is not consistent throughout the dataset") raise ValueError with open("/tmp/moe.log", "a") as logfile: logfile.write("log₁₀Periods(days) {}\n".format(logperiods)) # Fill the global dict for logmass in logmasses: # print("Logmass: {}".format(logmass)) # Create the multiplicity table if not Moecache.get("multiplicity_table", None): Moecache["multiplicity_table"] = [] # multiplicity as a function of primary mass Moecache["multiplicity_table"].append( [ float(logmass), json_data["log10M1"][logmass]["f_multi"], json_data["log10M1"][logmass]["single star fraction"], json_data["log10M1"][logmass]["binary star fraction"], json_data["log10M1"][logmass]["triple/quad star fraction"], ] ) print("Size multiplicity table: {}", len(Moecache["multiplicity_table"])) ############################################################ # a small log10period which we can shift just outside the # table to force integration out there to zero epslog10P = 1e-8 * dlog10P ############################################################ # loop over either binary or triple-outer periods first = 1 # Go over the periods for logperiod in logperiods: # print("logperiod: {}".format(logperiod)) ############################################################ # distributions of binary and triple star fractions # as a function of mass, period. # # Note: these should be per unit log10P, hence we # divide by $dlog10P if first: first = 0 # Create the multiplicity table if not Moecache.get("period_distributions", None): Moecache["period_distributions"] = [] ############################################################ # lower bound the period distributions to zero probability Moecache["period_distributions"].append( [ float(logmass), float(logperiod) - 0.5 * dlog10P - epslog10P, 0.0, 0.0, ] ) Moecache["period_distributions"].append( [ float(logmass), float(logperiod) - 0.5 * dlog10P, json_data["log10M1"][logmass]["logP"][logperiod][ "normed_bin_frac_p_dist" ] / dlog10P, json_data["log10M1"][logmass]["logP"][logperiod][ "normed_tripquad_frac_p_dist" ] / dlog10P, ] ) Moecache["period_distributions"].append( [ float(logmass), float(logperiod), json_data["log10M1"][logmass]["logP"][logperiod][ "normed_bin_frac_p_dist" ] / dlog10P, json_data["log10M1"][logmass]["logP"][logperiod][ "normed_tripquad_frac_p_dist" ] / dlog10P, ] ) print( "Size period_distributions table: {}", len(Moecache["period_distributions"]), ) ############################################################ # distributions as a function of mass, period, q # # First, get a list of the qs given by Moe # qs = sorted(json_data["log10M1"][logmass]["logP"][logperiod]["q"]) qdata = {} I_q = 0.0 # get the q distribution from M&S, which ranges 0.15 to 0.95 for q in qs: val = json_data["log10M1"][logmass]["logP"][logperiod]["q"][q] qdata[q] = val I_q += val # hence normalize to 1.0 for q in qs: qdata[q] = qdata[q] / I_q # Create the multiplicity table if not Moecache.get("q_distributions", None): Moecache["q_distributions"] = [] for q in qs: Moecache["q_distributions"].append( [float(logmass), float(logperiod), float(q), qdata[q]] ) ############################################################ # eccentricity distributions as a function of mass, period, ecc eccs = sorted(json_data["log10M1"][logmass]["logP"][logperiod]["e"]) ecc_data = {} I_e = 0 # Read out the data for ecc in eccs: val = json_data["log10M1"][logmass]["logP"][logperiod]["e"][ecc] ecc_data[ecc] = val I_e += val # Normalize the data for ecc in eccs: ecc_data[ecc] = ecc_data[ecc] / I_e # Create the multiplicity table if not Moecache.get("ecc_distributions", None): Moecache["ecc_distributions"] = [] for ecc in eccs: Moecache["ecc_distributions"].append( [float(logmass), float(logperiod), float(ecc), ecc_data[ecc]] ) ############################################################ # upper bound the period distributions to zero probability Moecache["period_distributions"].append( [ float(logmass), float(logperiods[-1]) + 0.5 * dlog10P, json_data["log10M1"][logmass]["logP"][logperiods[-1]][ "normed_bin_frac_p_dist" ] / dlog10P, json_data["log10M1"][logmass]["logP"][logperiods[-1]][ "normed_tripquad_frac_p_dist" ] / dlog10P, ] ) Moecache["period_distributions"].append( [ float(logmass), float(logperiods[-1]) + 0.5 * dlog10P + epslog10P, 0.0, 0.0, ] ) print( "Size period_distributions table: {}".format( len(Moecache["period_distributions"]) ) ) # Write to logfile with open("/tmp/moecache.json", "w") as cache_filehandle: cache_filehandle.write(json.dumps(Moecache, indent=4)) # TODO: remove this and make the plotting of the multiplicity fractions more modular # options['M1'] = 10 # # multiplicity_fractions = Moe_de_Stefano_2017_multiplicity_fractions(options) # import matplotlib.pyplot as plt # mass_range = range(1, 80) # multiplicity_dict = {} # for mass in mass_range: # options["M1"] = mass # multiplicity_fractions = Moe_de_Stefano_2017_multiplicity_fractions(options) # multiplicity_dict[mass] = multiplicity_fractions # single_values = [multiplicity_dict[key][0] for key in multiplicity_dict.keys()] # binary_values = [multiplicity_dict[key][1] for key in multiplicity_dict.keys()] # triple_values = [multiplicity_dict[key][2] for key in multiplicity_dict.keys()] # quad_values = [multiplicity_dict[key][3] for key in multiplicity_dict.keys()] # print("mass = {}".format(list(mass_range))) # print("single_values={}".format(single_values)) # print("binary_values={}".format(binary_values)) # print("triple_values={}".format(triple_values)) # print("quad_values={}".format(quad_values)) # plt.plot(mass_range, single_values, label="single") # plt.plot(mass_range, binary_values, label="binary") # plt.plot(mass_range, triple_values, label="triple") # plt.plot(mass_range, quad_values, label="Quadruple") # plt.legend() # plt.xscale("log") # plt.show() ############################################################ # construct the grid here # ############################################################ # make a version of the options which can be passed to # the probability density function in the gridcode # TODO: check if that works, if we just pass the options as a dict ############################################################ # first, the multiplicity, this is 1,2,3,4, ... # for singles, binaries, triples, quadruples, ... max_multiplicity = 0 for i in range(1, 5): mod = options["multiplicity_modulator"][i - 1] print("Check mod {} = {}".format(i, mod)) if mod > 0: max_multiplicity = i print("Max multiplicity = {}".format(max_multiplicity)) ###### # Setting up the grid variables # Multiplicity self.add_grid_variable( name="multiplicity", parameter_name="multiplicity", longname="multiplicity", valuerange=[1, max_multiplicity], resolution=1, spacingfunc="range(1, 5)", precode='self.grid_options["multiplicity"] = multiplicity; options={}'.format( options ), condition="({}[multiplicity-1] > 0)".format( str(options["multiplicity_modulator"]) ), gridtype="edge", dphasevol=-1, probdist=1, ) ############################################################ # always require M1, for all systems # # TODO: put in option for the time-adaptive grid. # log-spaced m1 with given resolution self.add_grid_variable( name="lnm1", parameter_name="M_1", longname="Primary mass", resolution="options['resolutions']['M'][0]", spacingfunc="const(np.log({}), np.log({}), {})".format( options["ranges"]["M"][0], options["ranges"]["M"][1], options["resolutions"]["M"][0], ), valuerange=[ "np.log({})".format(options["ranges"]["M"][0]), "np.log({})".format(options["ranges"]["M"][1]), ], gridtype="centred", dphasevol="dlnm1", precode='M_1 = np.exp(lnm1); options["M1"]=M_1', probdist="Moe_de_Stefano_2017_pdf({{{}, {}, {}}}) if multiplicity == 1 else 1".format( str(options)[1:-1], "'multiplicity': multiplicity", "'M1': M_1" ), ) # Go to higher multiplicities if max_multiplicity >= 2: # binaries: period self.add_grid_variable( name="log10per", parameter_name="orbital_period", longname="log10(Orbital_Period)", resolution=options["resolutions"]["logP"][0], probdist=1.0, condition='(self.grid_options["multiplicity"] >= 2)', # branchpoint=1, gridtype="centred", dphasevol="({} * dlog10per)".format(LOG_LN_CONVERTER), valuerange=[options["ranges"]["logP"][0], options["ranges"]["logP"][1]], spacingfunc="const({}, {}, {})".format( options["ranges"]["logP"][0], options["ranges"]["logP"][1], options["resolutions"]["logP"][0], ), precode="""orbital_period = 10.0**log10per qmin={}/M_1 qmax=maximum_mass_ratio_for_RLOF(M_1, orbital_period) """.format( options.get("Mmin", 0.07) ), ) # binaries: mass ratio self.add_grid_variable( name="q", parameter_name="M_2", longname="Mass ratio", valuerange=[ options["ranges"]["q"][0] if options.get("ranges", {}).get("q", None) else "options.get('Mmin', 0.07)/M_1", options["ranges"]["q"][1] if options.get("ranges", {}).get("q", None) else "qmax", ], resolution=options["resolutions"]["M"][1], probdist=1, gridtype="centred", dphasevol="dq", precode=""" M_2 = q * M_1 sep = calc_sep_from_period(M_1, M_2, orbital_period) """, spacingfunc="const({}, {}, {})".format( options["ranges"]["q"][0] if options.get("ranges", {}).get("q", [None, None])[0] else "{}/M_1".format(options.get("Mmin", 0.07)), options["ranges"]["q"][1] if options.get("ranges", {}).get("q", [None, None])[1] else "qmax", options["resolutions"]["M"][1], ), ) # (optional) binaries: eccentricity if options["resolutions"]["ecc"][0] > 0: self.add_grid_variable( name="ecc", parameter_name="eccentricity", longname="Eccentricity", resolution=options["resolutions"]["ecc"][0], probdist=1, gridtype="centred", dphasevol="decc", precode="eccentricity=ecc", valuerange=[ options["ranges"]["ecc"][0], # Just fail if not defined. options["ranges"]["ecc"][1], ], spacingfunc="const({}, {}, {})".format( options["ranges"]["ecc"][0], # Just fail if not defined. options["ranges"]["ecc"][1], options["resolutions"]["ecc"][0], ), ) # Now for triples and quadruples if max_multiplicity >= 3: # Triple: period self.add_grid_variable( name="log10per2", parameter_name="orbital_period_triple", longname="log10(Orbital_Period2)", resolution=options["resolutions"]["logP"][1], probdist=1.0, condition='(self.grid_options["multiplicity"] >= 3)', branchpoint=1, gridtype="centred", dphasevol="({} * dlog10per2)".format(LOG_LN_CONVERTER), valuerange=[ options["ranges"]["logP"][0], options["ranges"]["logP"][1], ], spacingfunc="const({}, {}, {})".format( options["ranges"]["logP"][0], options["ranges"]["logP"][1], options["resolutions"]["logP"][1], ), precode="""orbital_period_triple = 10.0**log10per2 q2min={}/(M_1+M_2) q2max=maximum_mass_ratio_for_RLOF(M_1+M_2, orbital_period_triple) """.format( options.get("Mmin", 0.07) ), ) # Triples: mass ratio # Note, the mass ratio is M_outer/M_inner print("M2 resolution {}".format(options["resolutions"]["M"][2])) self.add_grid_variable( name="q2", parameter_name="M_3", longname="Mass ratio outer/inner", valuerange=[ options["ranges"]["q"][0] if options.get("ranges", {}).get("q", None) else "options.get('Mmin', 0.07)/(M_1+M_2)", options["ranges"]["q"][1] if options.get("ranges", {}).get("q", None) else "q2max", ], resolution=options["resolutions"]["M"][2], probdist=1, gridtype="centred", dphasevol="dq2", precode=""" M_3 = q2 * (M_1 + M_2) sep2 = calc_sep_from_period((M_1+M_2), M_3, orbital_period_triple) eccentricity2=0 """, spacingfunc="const({}, {}, {})".format( options["ranges"]["q"][0] if options.get("ranges", {}).get("q", None) else "options.get('Mmin', 0.07)/(M_1+M_2)", options["ranges"]["q"][1] if options.get("ranges", {}).get("q", None) else "q2max", options["resolutions"]["M"][2], ), ) # (optional) triples: eccentricity if options["resolutions"]["ecc"][1] > 0: self.add_grid_variable( name="ecc2", parameter_name="eccentricity2", longname="Eccentricity of the triple", resolution=options["resolutions"]["ecc"][1], probdist=1, gridtype="centred", dphasevol="decc2", precode="eccentricity2=ecc2", valuerange=[ options["ranges"]["ecc"][0], # Just fail if not defined. options["ranges"]["ecc"][1], ], spacingfunc="const({}, {}, {})".format( options["ranges"]["ecc"][0], # Just fail if not defined. options["ranges"]["ecc"][1], options["resolutions"]["ecc"][1], ), ) if max_multiplicity == 4: # Quadruple: period self.add_grid_variable( name="log10per3", parameter_name="orbital_period_quadruple", longname="log10(Orbital_Period3)", resolution=options["resolutions"]["logP"][2], probdist=1.0, condition='(self.grid_options["multiplicity"] >= 4)', branchpoint=1, gridtype="centred", dphasevol="({} * dlog10per3)".format(LOG_LN_CONVERTER), valuerange=[ options["ranges"]["logP"][0], options["ranges"]["logP"][1], ], spacingfunc="const({}, {}, {})".format( options["ranges"]["logP"][0], options["ranges"]["logP"][1], options["resolutions"]["logP"][2], ), precode="""orbital_period_quadruple = 10.0**log10per3 q3min={}/(M_3) q3max=maximum_mass_ratio_for_RLOF(M_3, orbital_period_quadruple) """.format( options.get("Mmin", 0.07) ), ) # Quadruple: mass ratio : M_outer / M_inner print("M3 resolution {}".format(options["resolutions"]["M"][3])) self.add_grid_variable( name="q3", parameter_name="M_4", longname="Mass ratio outer low/outer high", valuerange=[ options["ranges"]["q"][0] if options.get("ranges", {}).get("q", None) else "options.get('Mmin', 0.07)/(m3)", options["ranges"]["q"][1] if options.get("ranges", {}).get("q", None) else "q3max", ], resolution=options["resolutions"]["M"][3], probdist=1, gridtype="centred", dphasevol="dq3", precode=""" M_4 = q3 * M_3 sep3 = calc_sep_from_period((M_3), M_4, orbital_period_quadruple) eccentricity3=0 """, spacingfunc="const({}, {}, {})".format( options["ranges"]["q"][0] if options.get("ranges", {}).get("q", None) else "options.get('Mmin', 0.07)/(m3)", options["ranges"]["q"][1] if options.get("ranges", {}).get("q", None) else "q3max", options["resolutions"]["M"][2], ), ) # TODO: Ask about which periods should be used for the calc_sep_from_period # (optional) triples: eccentricity if options["resolutions"]["ecc"][2] > 0: self.add_grid_variable( name="ecc3", parameter_name="eccentricity3", longname="Eccentricity of the triple+quadruple/outer binary", resolution=options["resolutions"]["ecc"][2], probdist=1, gridtype="centred", dphasevol="decc3", precode="eccentricity3=ecc3", valuerange=[ options["ranges"]["ecc"][ 0 ], # Just fail if not defined. options["ranges"]["ecc"][1], ], spacingfunc="const({}, {}, {})".format( options["ranges"]["ecc"][ 0 ], # Just fail if not defined. options["ranges"]["ecc"][1], options["resolutions"]["ecc"][2], ), ) # Now we are at the last part. # Here we should combine all the information that we calculate and update the options # dictionary. This will then be passed to the Moe_de_Stefano_2017_pdf to calculate # the real probability. The trick we use is to strip the options_dict as a string # and add some keys to it: probdist_addition = "Moe_de_Stefano_2017_pdf({{{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}})".format( str(options)[1:-1], '"multiplicity": multiplicity', '"M1": M_1', '"M2": M_2', '"M3": M_3', '"M4": M_4', '"P": orbital_period', '"P2": orbital_period_triple', '"P3": orbital_period_quadruple', '"ecc": eccentricity', '"ecc2": eccentricity2', '"ecc3": eccentricity3', ) # and finally the probability calculator self.grid_options["_grid_variables"][self.last_grid_variable()][ "probdist" ] = probdist_addition