diff --git a/binarycpython/utils/dicts.py b/binarycpython/utils/dicts.py index fd5a27ad8dada2a82f8e54070b920e884de0e819..60825dff28a68d481b6160b4dd45a89f8ef15550 100644 --- a/binarycpython/utils/dicts.py +++ b/binarycpython/utils/dicts.py @@ -401,6 +401,10 @@ def merge_dicts(dict_1: dict, dict_2: dict) -> dict: - dictionaries will be merged by calling recursively calling this function again - numbers will be added - (opt) lists will be appended + - booleans are merged with logical OR + - identical strings are just set to the string + - non-identical strings are concatenated + - NoneTypes are set to None - In the case that the instances do not match: for now I will raise an error Args: @@ -464,10 +468,26 @@ def merge_dicts(dict_1: dict, dict_2: dict) -> dict: ): new_dict[key] = merge_dicts(dict_1[key], dict_2[key]) + # string-int clash : convert both to ints and save + elif isinstance(dict_1[key], str) and isinstance(dict_2[key], int) or \ + isinstance(dict_1[key], int) and isinstance(dict_2[key], str): + try: + new_dict[key] = int(dict_1[key]) + int(dict_2[key]) + except: + print("key",key,": Failed to convert string, one of \"{}\" or \"{}\" to an int".format(dict_1[key],dict_2[key])) + + # string-float clash : convert both to floats and save + elif isinstance(dict_1[key], str) and isinstance(dict_2[key], float) or \ + isinstance(dict_1[key], float) and isinstance(dict_2[key], str): + try: + new_dict[key] = float(dict_1[key]) + float(dict_2[key]) + except: + print("key",key,": Failed to convert string, one of \"{}\" or \"{}\" to a float".format(dict_1[key],dict_2[key])) + # If the above cases have not dealt with it, then we should raise an error else: print( - "Error key: {} value: {} type: {} and key: {} value: {} type: {} are not of the same type and cannot be merged".format( + "merge_dicts error: key: {} value: {} type: {} and key: {} value: {} type: {} are not of the same type and cannot be merged".format( key, dict_1[key], type(dict_1[key]), @@ -508,6 +528,19 @@ def merge_dicts(dict_1: dict, dict_2: dict) -> dict: elif isinstance(dict_1[key], dict) and isinstance(dict_2[key], dict): new_dict[key] = merge_dicts(dict_1[key], dict_2[key]) + # strings + elif isinstance(dict_1[key], str) and isinstance(dict_2[key], str): + if dict_1[key] == dict_2[key]: + # same strings + new_dict[key] = dict_1[key] + else: + # different strings: just concatenate them + new_dict[key] = dict_1[key] + dict_2[key] + + # None types + elif dict_1[key] is None and dict_2[key] is None: + new_dict[key] = None + else: print( "Object types {}: {} ({}), {} ({}) not supported.".format( diff --git a/binarycpython/utils/functions.py b/binarycpython/utils/functions.py index 1480a418e8e1aaf142b59a535621ba040bb015f2..093ab1753f025635a213696cd4499435abe18eda 100644 --- a/binarycpython/utils/functions.py +++ b/binarycpython/utils/functions.py @@ -46,6 +46,9 @@ import binarycpython.utils.moe_di_stefano_2017_data as moe_di_stefano_2017_data from binarycpython import _binary_c_bindings from binarycpython.utils.dicts import filter_dict, filter_dict_through_values +from collections import ( + OrderedDict, +) ######################################################## # Unsorted @@ -276,6 +279,26 @@ def imports(): yield val.__name__ +def isfloat(x): + """ + Return True if the "number" x, which could be a string, is a float, otherwise return False. + """ + try: + y = float(x) + return True + except: + return False + +def isint(x): + """ + Return True if the "number" x, which could be a string, is an int, otherwise return False. + """ + try: + y = int(x) + return True + except: + return False + def convfloat(x): """ Convert scalar x to a float if we can, in which case return the float, otherwise just return x without changing it. Usually, x is a string, but could be anything that float() can handle without failure. @@ -747,14 +770,42 @@ def parse_binary_c_version_info(version_info_string: str) -> dict: param_type = split_info[0] new_split = "".join(split_info[1:]).split(" is ") - param_name = new_split[0] + param_name = new_split[0].strip() param_value = " is ".join(new_split[1:]) + param_value = param_value.strip() + + #print("macro ",param_name,"=",param_value," float?",isfloat(param_value)," int?",isint(param_value)) + + # If we're trying to set the value to "on", check that + # it doesn't already exist. If it does, do nothing, as the + # extra information is better than just "on" + if param_name in macros_dict: + #print("already exists (is ",macros_dict[param_name]," float? ",isfloat(macros_dict[param_name]),", int? ",isint(macros_dict[param_name]),") : check that we can improve it") + if macros_dict[param_name] == "on": + # update with better value + store = True + elif isfloat(macros_dict[param_name]) == False and isfloat(param_value) == True: + # store the number we now have to replace the non-number we had + store = True + else: + # don't override existing number + store = False + + #if store: + # print("Found improved macro value of param",param_name,", was ",macros_dict[param_name],", is",param_value) + #else: + # print("Cannot improve: use old value") + else: + store = True + + if store: + # Sometimes the macros have extra information behind it. + # Needs an update in outputting by binary_c (RGI: what does this mean David???) + try: + macros_dict[param_name] = param_type_dict[param_type](param_value) + except ValueError: + macros_dict[param_name] = str(param_value) - # Sometimes the macros have extra information behind it. Needs an update in outputting by binary_c - try: - macros_dict[param_name] = param_type_dict[param_type](param_value) - except ValueError: - macros_dict[param_name] = str(param_value) version_info_dict["macros"] = macros_dict if macros_dict else None ########################## diff --git a/binarycpython/utils/grid.py b/binarycpython/utils/grid.py index 3692971e4064f722e9076c519fcb9ac8c5728c39..43339aff42c8bfc5494dda276d7ed5110e644f8f 100644 --- a/binarycpython/utils/grid.py +++ b/binarycpython/utils/grid.py @@ -104,6 +104,7 @@ from binarycpython.utils.dicts import ( AutoVivificationDict, custom_sort_dict, merge_dicts, + multiply_float_values, multiply_values_dict, recursive_change_key_to_float, recursive_change_key_to_string, @@ -378,13 +379,23 @@ class Population: verbose_print("Success!", self.grid_options["verbosity"], 2) except ValueError: - verbose_print( - "Tried to convert the given parameter {}/value {} to its correct type {} (from old value {}). But that wasn't possible.".format( - parameter, value, type(old_value), old_value - ), - self.grid_options["verbosity"], - 0, - ) + + # might be able to eval the parameter, e.g. + # an expression like "2-1" can eval to "1" + # which would be valid + try: + evaled = eval(value) + value = type(old_value)(evaled) + verbose_print("Success! (evaled)", self.grid_options["verbosity"], 2) + + except ValueError: + verbose_print( + "Tried to convert the given parameter {}/value {} to its correct type {} (from old value {}). But that wasn't possible.".format( + parameter, value, type(old_value), old_value + ), + self.grid_options["verbosity"], + 0, + ) # Add to dict cmdline_dict[parameter] = value @@ -1001,6 +1012,20 @@ class Population: # Evolution functions ################################################### + def _set_ncores(self): + # If num_cores <= 0, set automatically + # + # if num_cores is 0, we use as many as we have available + if self.grid_options['num_cores'] == 0: + # use all logical cores available to us + self.grid_options['num_processes'] = max(1,psutil.cpu_count(logical=True)) + elif self.grid_options['num_cores'] == -1: + # use all physical cores available to us + self.grid_options['num_processes'] = max(1,psutil.cpu_count(logical=False)) + else: + # manually specify number of cores made available + self.grid_options['num_processes'] = self.grid_options['num_cores'] + def _pre_run_cleanup(self) -> None: """ Function to clean up some stuff in the grid before a run (like results, ensemble results etc) @@ -1010,6 +1035,9 @@ class Population: self.grid_results = AutoVivificationDict() self.grid_ensemble_results = {} + # set number of cores we want to use + self._set_ncores() + # Reset the process ID (should not have a value initially, but can't hurt if it does) self.process_ID = 0 @@ -1034,10 +1062,10 @@ class Population: # arrays to store memory and max memory use per-thread mem = 1.0 * mem_use() self.shared_memory["memory_use_per_thread"] = multiprocessing.Array( - "d", [mem] * self.grid_options["num_cores"] + "d", [mem] * self.grid_options["num_processes"] ) self.shared_memory["max_memory_use_per_thread"] = multiprocessing.Array( - "d", [mem] * self.grid_options["num_cores"] + "d", [mem] * self.grid_options["num_processes"] ) def clean(self) -> None: @@ -1088,31 +1116,39 @@ class Population: self._evolve_population() print("do analytics") - # Put all interesting stuff in a variable and output that afterwards, as analytics of the run. - analytics_dict = { - "population_name": self.grid_options["_population_id"], - "evolution_type": self.grid_options["evolution_type"], - "failed_count": self.grid_options["_failed_count"], - "failed_prob": self.grid_options["_failed_prob"], - "failed_systems_error_codes": self.grid_options[ - "_failed_systems_error_codes" - ].copy(), - "errors_exceeded": self.grid_options["_errors_exceeded"], - "errors_found": self.grid_options["_errors_found"], - "total_probability": self.grid_options["_probtot"], - "total_count": self.grid_options["_count"], - "start_timestamp": self.grid_options["_start_time_evolution"], - "end_timestamp": self.grid_options["_end_time_evolution"], - "total_mass_run": self.grid_options["_total_mass_run"], - "total_probability_weighted_mass_run": self.grid_options[ - "_total_probability_weighted_mass_run" - ], - "zero_prob_stars_skipped": self.grid_options["_zero_prob_stars_skipped"], - } - if 'metadata' in self.grid_ensemble_results: - # Add analytics dict to the metadata too: - self.grid_ensemble_results["metadata"].update(analytics_dict) + if self.grid_options['do_analytics']: + # Put all interesting stuff in a variable and output that afterwards, as analytics of the run. + analytics_dict = { + "population_name": self.grid_options["_population_id"], + "evolution_type": self.grid_options["evolution_type"], + "failed_count": self.grid_options["_failed_count"], + "failed_prob": self.grid_options["_failed_prob"], + "failed_systems_error_codes": self.grid_options[ + "_failed_systems_error_codes" + ].copy(), + "errors_exceeded": self.grid_options["_errors_exceeded"], + "errors_found": self.grid_options["_errors_found"], + "total_probability": self.grid_options["_probtot"], + "total_count": self.grid_options["_count"], + "start_timestamp": self.grid_options["_start_time_evolution"], + "end_timestamp": self.grid_options["_end_time_evolution"], + "total_mass_run": self.grid_options["_total_mass_run"], + "total_probability_weighted_mass_run": self.grid_options[ + "_total_probability_weighted_mass_run" + ], + "zero_prob_stars_skipped": self.grid_options["_zero_prob_stars_skipped"], + } + + if 'metadata' in self.grid_ensemble_results: + # Add analytics dict to the metadata too: + self.grid_ensemble_results["metadata"].update(analytics_dict) + else: + # use existing analytics dict + try: + analytics_dict = self.grid_ensemble_results["metadata"] + except: + analytics_dict = {} # should never happen # Save object to a pickle file if self.grid_options['save_population_object']: @@ -1157,11 +1193,18 @@ class Population: try: pathlib.Path(joiningfile).touch(exist_ok=False) print("can join : all tasks are finished") - self.join_from_files(joinfiles) + try: + self.join_from_files(self,joinfiles) + except Exception as e: + print("Join gave exception",e) + # disable analytics calculations : use the + # values we just loaded + self.grid_options['do_analytics'] = False + return except: pass else: - print("cannot join : other tasks are not yet finished") + print("cannot join : other tasks are not yet finished\n") print("Finished this job : exiting") sys.exit() @@ -1199,8 +1242,8 @@ class Population: string2 = "It took a total of {dtsecs} to run {starcount} systems on {ncores} cores\n = {totaldtsecs} of CPU time.\nMaximum memory use {memuse:.3f} MB".format( dtsecs=timedelta(dtsecs), starcount=self.grid_options["_total_starcount"], - ncores=self.grid_options["num_cores"], - totaldtsecs=timedelta(dtsecs * self.grid_options["num_cores"]), + ncores=self.grid_options["num_processes"], + totaldtsecs=timedelta(dtsecs * self.grid_options["num_processes"]), memuse=sum(self.shared_memory["max_memory_use_per_thread"]), ) verbose_print(self._boxed(string1, string2), self.grid_options["verbosity"], 0) @@ -1280,17 +1323,36 @@ class Population: self, print_results=False ) - # TODO: build in method to handle with the HPC. # Continuously fill the queue + + + # start_at can be an expression : we should eval it + # prior to running the loop + self.grid_options['start_at'] = eval(str(self.grid_options['start_at'])) + for system_number, system_dict in enumerate(generator): - # skip systems before start_at, and apply modulo + # skip systems before start_at + if system_number < self.grid_options["start_at"]: + verbose_print("skip system {n} because < start_at = {start}".format( + n=system_number, + start=self.grid_options["start_at"]), + self.grid_options['verbosity'], + 3) + continue + + # apply modulo if not ( - system_number >= self.grid_options["start_at"] - and (system_number - self.grid_options["start_at"]) - % self.grid_options["modulo"] - == 0 + (system_number - self.grid_options["start_at"]) % self.grid_options["modulo"] == 0 ): + verbose_print("skip system {n} because modulo {mod} == {donemod}".format( + n=system_number, + mod=self.grid_options["modulo"], + donemod=(system_number - self.grid_options["start_at"]) % self.grid_options["modulo"] + ), + self.grid_options['verbosity'], + 3) + continue # Put job in queue @@ -1316,7 +1378,7 @@ class Population: First we set up the multiprocessing manager and the job and result queue. - Then we spawn <self.grid_options["num_cores"]> number of process instances, + Then we spawn <self.grid_options["num_processes"]> number of process instances, and signal them to start. While the processes are waiting for their instructions, we start the queue filler, @@ -1339,13 +1401,14 @@ class Population: # backwards compatibility if "amt_cores" in self.grid_options: + self.grid_options["num_processes"] = self.grid_options["amt_cores"] self.grid_options["num_cores"] = self.grid_options["amt_cores"] - result_queue = manager.Queue(maxsize=self.grid_options["num_cores"]) + result_queue = manager.Queue(maxsize=self.grid_options["num_processes"]) # Create process instances processes = [] - for ID in range(self.grid_options["num_cores"]): + for ID in range(self.grid_options["num_processes"]): processes.append( multiprocessing.Process( target=self._process_run_population_grid, @@ -1358,7 +1421,7 @@ class Population: p.start() # Set up the system_queue - self._system_queue_filler(job_queue, num_cores=self.grid_options["num_cores"]) + self._system_queue_filler(job_queue, num_cores=self.grid_options["num_processes"]) # Join the processes for p in processes: @@ -1408,7 +1471,7 @@ class Population: if self.grid_options["ensemble_factor_in_probability_weighted_mass"]: multiply_values_dict( self.grid_ensemble_results["ensemble"], - 1 + 1.0 / self.grid_ensemble_results["metadata"][ "total_probability_weighted_mass" ], @@ -2214,6 +2277,12 @@ class Population: self.grid_options["_total_mass_run"] = 0 self.grid_options["_total_probability_weighted_mass_run"] = 0 + # Xinit is overcounted + if 'Xinit' in self.grid_ensemble_results['ensemble']: + multiply_float_values(self.grid_ensemble_results['ensemble']['Xinit'], + 1.0/float(self.grid_options['num_processes'])) + + # Remove files # TODO: remove files @@ -2229,6 +2298,24 @@ class Population: # Function below are used to run populations with # a variable grid ################################################### + def _gridcode_filename(self): + if self.grid_options['slurm'] > 0: + filename = os.path.join( + self.grid_options["tmp_dir"], + "binary_c_grid_{population_id}.{jobid}.{jobarrayindex}.py".format( + population_id=self.grid_options["_population_id"], + jobid=self.grid_options['slurm_jobid'], + jobarrayindex=self.grid_options['slurm_jobarrayindex'], + ) + ) + else: + filename = os.path.join( + self.grid_options["tmp_dir"], + "binary_c_grid_{population_id}.py".format( + population_id=self.grid_options["_population_id"] + ), + ) + return filename def _add_code(self, *args, indent=0): """ @@ -2355,7 +2442,6 @@ class Population: vb = False # Generate code - print("Generating grid code") for loopnr, grid_variable_el in enumerate( sorted( self.grid_options["_grid_variables"].items(), @@ -2808,10 +2894,7 @@ class Population: self.grid_options["code_string"] = self.code_string # Write to file - gridcode_filename = os.path.join( - self.grid_options["tmp_dir"], - "binary_c_grid_{}.py".format(self.grid_options["_population_id"]), - ) + gridcode_filename = self._gridcode_filename() self.grid_options["gridcode_filename"] = gridcode_filename verbose_print( @@ -2829,14 +2912,17 @@ class Population: file.write(self.code_string) # perhaps create symlink - if self.grid_options["symlink_latest_gridcode"] and self.grid_options["slurm"] == 0: + if self.grid_options['slurm']==0 and \ + self.grid_options["symlink_latest_gridcode"]: global _count symlink = os.path.join( self.grid_options["tmp_dir"], "binary_c_grid-latest" + str(_count) ) _count += 1 - if os.path.exists(symlink): + try: os.unlink(symlink) + except: + pass try: os.symlink(gridcode_filename, symlink) @@ -3080,226 +3166,6 @@ class Population: return arg_dict - ################################################### - # SLURM functions - # - # subroutines to run SLURM grids - ################################################### - - # def _slurm_grid(self): - # """ - # Main function that manages the SLURM setup. - - # Has three stages: - - # - setup - # - evolve - # - join - - # Which stage is used is determined by the value of grid_options['slurm_command']: - - # <empty>: the function will know its the user that executed the script and - # it will set up the necessary condor stuff - - # 'evolve': evolve_population is called to evolve the population of stars - - # 'join': We will attempt to join the output - # """ - - # # Check version - # # TODO: Put in function - # slurm_version = get_slurm_version() - # if not slurm_version: - # verbose_print( - # "SLURM: Error: No installation of slurm found", - # self.grid_options["verbosity"], - # 0, - # ) - # else: - # major_version = int(slurm_version.split(".")[0]) - # minor_version = int(slurm_version.split(".")[1]) - - # if major_version > 17: - # verbose_print( - # "SLURM: Found version {} which is new enough".format(slurm_version), - # self.grid_options["verbosity"], - # 1, - # ) - # else: - # verbose_print( - # "SLURM: Found version {} which is too old (we require 17+)".format( - # slurm_version - # ), - # self.grid_options["verbosity"], - # 0, - # ) - - # verbose_print( - # "SLURM: Running slurm grid. command={}".format( - # self.grid_options["slurm_command"] - # ), - # self.grid_options["verbosity"], - # 1, - # ) - - # if not self.grid_options["slurm_command"]: - # # Setting up - # verbose_print( - # "SLURM: Main controller script. Setting up", - # self.grid_options["verbosity"], - # 1, - # ) - - # # Set up working directories: - # verbose_print( - # "SLURM: creating working directories", self.grid_options["verbosity"], 1 - # ) - # create_directories_hpc(self.grid_options["slurm_dir"]) - - # # Create command - # python_details = get_python_details() - # scriptname = path_of_calling_script() - # command = "{} {}".format(python_details["executable"], scriptname) - # command += '{}'.format( - # " ".join( - # [ - # "{}".format(self.grid_options["_commandline_input"]), - # "offset=$jobarrayindex", - # "modulo={}".format(self.grid_options["slurm_njobs"]), - # "vb={}".format(self.grid_options["verbosity"]), - # "slurm_jobid=$jobid", - # "slurm_jobarrayindex=$jobarrayindex", - # "slurm_jobname='binary_grid_'$jobid'.'$jobarrayindex", - # "slurm_njobs={}".format(self.grid_options["slurm_njobs"]), - # "slurm_dir={}".format(self.grid_options["slurm_dir"]), - # "rungrid=1", - # "slurm_command=evolve", - # ] - # ).strip() - # ) - - # # Construct dict with settings for the script while checking the settings at the same time - # # Check settings: - # # TODO: check settings - # # Create SLURM_DIR script: - # slurm_script_options = {} - # slurm_script_options["n"] = self.grid_options["slurm_njobs"] - # slurm_script_options["njobs"] = self.grid_options["slurm_njobs"] - # slurm_script_options["dir"] = self.grid_options["slurm_dir"] - # slurm_script_options["memory"] = self.grid_options["slurm_memory"] - # slurm_script_options["working_dir"] = self.grid_options[ - # "slurm_dir" - # ] # TODO: check this - # slurm_script_options["command"] = command - # # slurm_script_options['streams'] = self.grid_options['streams'] - - # # Construct the script - # slurm_script_contents = "" - # slurm_script_contents += "#!/bin/bash\n" - # slurm_script_contents += "# Slurm file for binary_grid and slurm\n" - # slurm_script_contents += "#SBATCH --error={}/stderr/%A.%a\n".format( - # self.grid_options["slurm_dir"] - # ) - # slurm_script_contents += "#SBATCH --output={}/stdout/%A.%a\n".format( - # self.grid_options["slurm_dir"] - # ) - # slurm_script_contents += "#SBATCH --job-name={}\n".format( - # self.grid_options["slurm_jobname"] - # ) - # slurm_script_contents += "#SBATCH --partition={}\n".format( - # self.grid_options["slurm_partition"] - # ) - # slurm_script_contents += "#SBATCH --time={}\n".format( - # self.grid_options["slurm_time"] - # ) - # slurm_script_contents += "#SBATCH --mem={}\n".format( - # self.grid_options["slurm_memory"] - # ) - # slurm_script_contents += "#SBATCH --ntasks={}\n".format( - # self.grid_options["slurm_ntasks"] - # ) - # slurm_script_contents += "#SBATCH --array={}\n".format( - # self.grid_options["slurm_array"] - # ) - # slurm_script_contents += "\n" - - # if self.grid_options["slurm_extra_settings"]: - # slurm_script_contents += "# Extra settings by user:" - # slurm_script_contents += "\n".join( - # [ - # "--{}={}".format( - # key, self.grid_options["slurm_extra_settings"][key] - # ) - # for key in self.grid_options["slurm_extra_settings"] - # ] - # ) - - # slurm_script_contents += '# set status to "running"\n' - # slurm_script_contents += ( - # 'echo "running" > {}/status/$jobid.$jobarrayindex\n\n'.format( - # self.grid_options["slurm_dir"] - # ) - # ) - # slurm_script_contents += "# run grid of stars\n" - # slurm_script_contents += "{}\n\n".format(command) - # slurm_script_contents += '# set status to "finished"\n' - # slurm_script_contents += ( - # 'echo "finished" > {}/status/$jobid.$jobarrayindex\n'.format( - # self.grid_options["slurm_dir"] - # ) - # ) - # slurm_script_contents += "\n" - - # if self.grid_options["slurm_postpone_join"]: - # slurm_script_contents += "{} rungrid=0 results_hash_dumpfile={}/results/$jobid.all slurm_command=join\n".format( - # command, self.grid_options["slurm_dir"] - # ) - - # # Write script to file - # slurm_script_filename = os.path.join( - # self.grid_options["slurm_dir"], "slurm_script" - # ) - # with open(slurm_script_filename, "w") as slurm_script_file: - # slurm_script_file.write(slurm_script_contents) - - # # Execute or postpone - # if self.grid_options["slurm_postpone_sbatch"]: - # # Execute or postpone the real call to sbatch - # sbatch_command = "sbatch {}".format(slurm_script_filename) - # verbose_print( - # "running slurm script {}".format(slurm_script_filename), - # self.grid_options["verbosity"], - # 0, - # ) - # # subprocess.Popen(sbatch_command, close_fds=True) - # # subprocess.Popen(sbatch_command, creationflags=subprocess.DETACHED_PROCESS) - # verbose_print("Submitted scripts.", self.grid_options["verbosity"], 0) - # else: - # verbose_print( - # "Slurm script is in {} but hasnt been executed".format( - # slurm_script_filename - # ), - # self.grid_options["verbosity"], - # 0, - # ) - - # verbose_print("all done!", self.grid_options["verbosity"], 0) - # sys.exit() - - # elif self.grid_options["slurm_command"] == "evolve": - # # Part to evolve the population. - # # TODO: decide how many CPUs - # verbose_print( - # "SLURM: Evolving population", self.grid_options["verbosity"], 1 - # ) - - # # - # self.evolve_population() - - # elif self.grid_options["slurm_command"] == "join": - # # Joining the output. - # verbose_print("SLURM: Joining results", self.grid_options["verbosity"], 1) - ################################################### # CONDOR functions # @@ -4921,7 +4787,7 @@ eccentricity3=0 ) # compensate for multithreading and modulo - tpr *= self.grid_options["num_cores"] * self.grid_options["modulo"] + tpr *= self.grid_options["num_processes"] * self.grid_options["modulo"] if eta_secs < secs_per_day: fintime = time.localtime(now + eta_secs) @@ -5039,7 +4905,6 @@ eccentricity3=0 Set the slurm status corresponing to the self object, which should have slurm_jobid and slurm_jobarrayindex set. """ file = self.slurm_status_file() - print('status file ',file) if file: with open(file,'w') as f: f.write(string) @@ -5052,9 +4917,9 @@ eccentricity3=0 Get and return the slurm status corresponing to the self object, or jobid.jobarrayindex if they are passed in. If no status is found, returns None. """ if jobid is None: - jobid=self.grid_options['slurm_jobid'] + jobid = self.grid_options['slurm_jobid'] if jobarrayindex is None: - jobarrayindex=self.grid_options['slurm_jobarrayindex'] + jobarrayindex = self.grid_options['slurm_jobarrayindex'] if jobid is None or jobid or jobarrayindex is None : return None @@ -5071,8 +4936,8 @@ eccentricity3=0 return a standard filename for the slurm chunk files """ file = "{jobid}.{jobarrayindex}.gz".format( - jobid=self.grid_options['slurm_jobid'], - jobarrayindex=self.grid_options['slurm_jobarrayindex'] + jobid = self.grid_options['slurm_jobid'], + jobarrayindex = self.grid_options['slurm_jobarrayindex'] ) return os.path.abspath(os.path.join(self.grid_options['slurm_dir'], 'results', @@ -5115,12 +4980,12 @@ eccentricity3=0 break # break the "for dir in dirs:" def slurm_grid(self): - # function to be called when running grids when grid_options['slurm']>=1 - # - # if slurm=1, we set up the slurm script and launch the jobs - # if slurm=2, we are being called from the jobs + """ + function to be called when running grids when grid_options['slurm']>=1 - print("SLURM GRID",self.grid_options['slurm'],self.grid_options['evolution_type']) + if grid_options['slurm']==1, we set up the slurm script and launch the jobs, then exit. + if grid_options['slurm']==2, we are being called from the jobs to run the grids + """ # if slurm=1, we should have no evolution type, we just # set up a load of slurm scripts and get them evolving @@ -5128,36 +4993,22 @@ eccentricity3=0 if self.grid_options['slurm'] == 1: self.grid_options['evolution_type'] = None - print("OVERRIDE",self.grid_options['evolution_type']) - if self.grid_options['evolution_type'] == 'grid': # run a grid of stars only, leaving the results # in a file - print("Run grid") - - # get number of cpu cores available to us - ncpus = max(1,psutil.cpu_count(logical=True)) - - # use them all - self.custom_options['num_cores'] = ncpus # set output file self.grid_options['save_population_object'] = slurm_outfile() return self.evolve() - elif self.grid_options['evolution_type'] == 'join': # should not happen! return else: # setup and launch slurm jobs - print("Setup and slurm") self.make_slurm_dirs() # check we're not using too much RAM - print("datasize slurm_memory",datasize.DataSize(self.grid_options['slurm_memory'])) - print("datasize slurm_warn_max_memory",datasize.DataSize(self.grid_options['slurm_warn_max_memory'])) - if datasize.DataSize(self.grid_options['slurm_memory']) > datasize.DataSize(self.grid_options['slurm_warn_max_memory']): print("WARNING: you want to use {} MB of RAM : this is unlikely to be correct. If you believe it is, set slurm_warn_max_memory to something very large (it is currently {} MB)\n".format( self.grid_options['slurm_memory'], @@ -5175,7 +5026,6 @@ eccentricity3=0 if jobarrayindex is None: jobarrayindex = '$SLURM_ARRAY_TASK_ID' - print("INDEX",jobarrayindex) if self.grid_options['slurm_njobs'] == 0: print("binary_c-python Slurm : You must set grid_option slurm_njobs to be non-zero") @@ -5185,16 +5035,17 @@ eccentricity3=0 grid_command = [ "/usr/bin/env", sys.executable, - str(lib_programname.get_path_executed_script()),#sys.argv[0],#os.path.abspath(__main__.__file__), + str(lib_programname.get_path_executed_script()), ] + sys.argv[1:] + [ 'slurm=2', - 'offset=' + str(jobarrayindex), + 'start_at=' + str(jobarrayindex) + '-1', # do we need the -1? 'modulo=' + str(self.grid_options['slurm_njobs']), - 'results_dumpfile=' + os.path.join(self.slurmpath('results'), jobid + '.' + jobarrayindex), 'slurm_njobs=' + str(self.grid_options['slurm_njobs']), 'slurm_dir=' + self.grid_options['slurm_dir'], - 'verbosity=' + str(self.grid_options['verbosity']) + 'verbosity=' + str(self.grid_options['verbosity']), + 'num_cores=' + str(self.grid_options['num_processes']) ] + grid_command = ' '.join(grid_command) # make slurm script @@ -5203,7 +5054,7 @@ eccentricity3=0 script = open(scriptpath,'w') except IOError: print("Could not open Slurm script at {path} for writing: please check you have set {slurm_dir} correctly (it is currently {slurm_dir} and can write to this directory.".format(path=scriptpath, - slurm_dir=self.grid_options['slurm_dir'])) + slurm_dir = self.grid_options['slurm_dir'])) # the joinfile contains the list of chunk files to be joined joinfile = "{slurm_dir}/results/{jobid}.all".format( @@ -5211,7 +5062,6 @@ eccentricity3=0 jobid=jobid ) - lines = [ "#!/bin/bash\n", "# Slurm file for binary_grid2 and slurm\n", @@ -5223,6 +5073,17 @@ eccentricity3=0 "#SBATCH --mem={slurm_memory}\n".format(slurm_memory=self.grid_options['slurm_memory']), "#SBATCH --ntasks={slurm_ntasks}\n".format(slurm_ntasks=self.grid_options['slurm_ntasks']), "#SBATCH --array={slurm_array}\n".format(slurm_array=slurm_array), + "#SBATCH --cpus-per-task={ncpus}\n".format(ncpus=self.grid_options['num_processes']) + ] + + for key in self.grid_options['slurm_extra_settings']: + lines += [ "#SBATCH --{key} = {value}\n".format( + key=key, + value=self.grid_options['slurm_extra_settings'][key] + )] + + + lines += [ "\n# set status to \"running\"\n", "echo \"running\" > {slurm_dir}/status/{jobid}.{jobarrayindex}\n\n".format(slurm_dir=self.grid_options['slurm_dir'], jobid=jobid, @@ -5242,8 +5103,6 @@ eccentricity3=0 jobarrayindex=jobarrayindex) ] - - if not self.grid_options['slurm_postpone_join']: lines.append("\n# check if we can join\n{grid_command} slurm=2 evolution_type=join joinlist={joinfile}\n\n".format( grid_command=grid_command, @@ -5258,14 +5117,13 @@ eccentricity3=0 stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC | \ stat.S_IRGRP | stat.S_IXGRP | \ stat.S_IROTH | stat.S_IXOTH) - time.sleep(1) if not self.grid_options['slurm_postpone_sbatch']: - # call the slurm script to launch the jobs + # call sbatch to launch the jobs cmd = "sbatch " + scriptpath os.system(cmd) else: - # just say we would have + # just say we would have (use this for testing) print("Slurm script is at {path} but has not been launched".format(path=scriptpath)) print("All done in slurm_grid().") @@ -5296,10 +5154,10 @@ eccentricity3=0 if filename: - print("Save pickle to ",filename) - print("pop is ",self.grid_options["_population_id"]) - print("probtot ",object.grid_options['_probtot']) - #print("grid_ensemble_results",pprint.pprint(object.grid_ensemble_results, sort_dicts=False)) + print("Save population {id}, probtot {probtot} to pickle in {filename}".format( + id=self.grid_options["_population_id"], + probtot=object.grid_options['_probtot'], + filename=filename)) # remove shared memory shared_memory = object.shared_memory @@ -5309,6 +5167,18 @@ eccentricity3=0 system_generator = object.grid_options["_system_generator"] object.grid_options["_system_generator"] = None + # add datestamp + if not 'metadata' in object.grid_ensemble_results: + object.grid_ensemble_results['metadata'] = {} + object.grid_ensemble_results['metadata']['save_population_time'] = datetime.datetime.now().strftime("%m/%d/%Y %H:%M:%S") + + # add max memory use of the grid + try: + object.grid_ensemble_results['metadata']['max_memory_use'] = copy.deepcopy(sum(shared_memory["max_memory_use_per_thread"])) + except Exception as e: + print("save_population_object : Error: ",e) + pass + # dump pickle file compress_pickle.dump(object, filename) @@ -5316,6 +5186,7 @@ eccentricity3=0 # restore data object.shared_memory = shared_memory object.grid_options["_system_generator"] = system_generator + del object.grid_ensemble_results['metadata']['save_population_time'] # touch 'saved' file pathlib.Path(filename + '.saved').touch(exist_ok=True) @@ -5324,14 +5195,17 @@ eccentricity3=0 """ returns the Population object loaded from filename """ - print("loading population object from",filename) if filename is None: - return None + obj = None else: - obj = compress_pickle.load(filename) - return obj + try: + obj = compress_pickle.load(filename) + except Exception as e: + obj = None + + return obj - def merge_grid_object_results(self,refpop,newpop): + def merge_populations(self,refpop,newpop): """ merge newpop's results data into refpop's results data @@ -5345,23 +5219,58 @@ eccentricity3=0 Note: The file should be saved using save_population_object() """ - print("merge dicts") - print("left: ",refpop.grid_results) - print("right:",newpop.grid_results) # combine data - refpop.grid_results = merge_dicts(refpop.grid_results, - newpop.grid_results) - refpop.grid_ensemble_results = merge_dicts(refpop.grid_ensemble_results, - newpop.grid_ensemble_results) + try: + refpop.grid_results = merge_dicts(refpop.grid_results, + newpop.grid_results) + except Exception as e: + print("Error merging grid_results:",e) + + # special cases + try: + maxmem = max(refpop.grid_ensemble_results['metadata']['max_memory_use'], + newpop.grid_ensemble_results['metadata']['max_memory_use']) + except: + maxmem = 0 + + try: + # special cases: + # copy the Xinit and settings: these should just be overridden + try: + settings = copy.deepcopy(newpop.grid_ensemble_results['metadata']['settings']) + except: + settings = None + try: + Xinit = copy.deepcopy(newpop.grid_ensemble_results['ensemble']['Xinit']) + except: + Xinit = None + + # merge the ensemble dicts + refpop.grid_ensemble_results = merge_dicts(refpop.grid_ensemble_results, + newpop.grid_ensemble_results) + + # set special cases + try: + refpop.grid_ensemble_results['metadata']['max_memory_use'] = maxmem + if settings: + refpop.grid_ensemble_results['metadata']['settings'] = settings + if Xinit: + refpop.grid_ensemble_results['ensemble']['Xinit'] = Xinit + except: + pass + + except Exception as e: + print("Error merging grid_ensemble_results:",e) - print("probs left ",refpop.grid_options["_probtot"],"right",newpop.grid_options["_probtot"]) for key in ["_probtot"]: refpop.grid_options[key] += newpop.grid_options[key] - def merge_grid_object_results_from_file(self,refpop,filename): + return + + def merge_populations_from_file(self,refpop,filename): """ - Wrapper for merge_grid_object_results so it can be done directly + Wrapper for merge_populations so it can be done directly from a file. Args: @@ -5371,37 +5280,74 @@ eccentricity3=0 Note: The file should be saved using save_population_object() """ - self.merge_grid_object_results(refpop, - self.load_population_object(filename)) - print("done merge from file") + + newpop = self.load_population_object(filename) + + + # merge with refpop + try: + self.merge_populations(refpop, + newpop) + except Exception as e: + print("merge_populations gave exception",e) + + return def joinfiles(self): """ Function to load in the joinlist to an array """ - print("Loading joinfiles from ",self.grid_options['joinlist']) f = open(self.grid_options['joinlist'],'r') list = f.read().splitlines() f.close() return list - def join_from_files(self,joinfiles): + def join_from_files(self,newobj,joinfiles): # merge the results from many object files + # into newobj for file in joinfiles: - self.merge_grid_object_results_from_file(self, - file) - print("done join from files") + print("join data in",file) + self.merge_populations_from_file(newobj, + file) + return newobj - def can_join(self,joinfiles,joiningfile): + def can_join(self,joinfiles,joiningfile,vb=False): # check the joinfiles to make sure they all exist # and their .saved equivalents also exist if os.path.exists(joiningfile): + if vb: + print("cannot join: joiningfile exists at {}".format(joiningfile)) return False + elif vb: + print("joiningfile does not exist") for file in joinfiles: - print("check for ",file) + if vb: + print("check for {}".format(file)) if os.path.exists(file) == False: + if vb: + print("cannot join: {} does not exist".format(file)) return False - elif os.path.exists(file + '.saved') == False: + savedfile = file + '.saved' + if vb: + print("check for {}".format(savedfile)) + if os.path.exists(savedfile) == False: + if vb: + print("cannot join: {} does not exist".format(savedfile)) return False + + # found both files + if vb: + print("found {} and {}".format(file,savedfile)) + + # check for joiningfile again + if os.path.exists(joiningfile): + if vb: + print("cannot join: joiningfile exists at {}".format(joiningfile)) + return False + elif vb: + print("joiningfile does not exist") + + if vb: + print("returning True from can_join()") return True diff --git a/binarycpython/utils/grid_options_defaults.py b/binarycpython/utils/grid_options_defaults.py index ed37dc8df1925746cf3e06755f5bbb06f34d74c4..cfd9460ad89e9d0c86a1c11bb40808be5a90a15e 100644 --- a/binarycpython/utils/grid_options_defaults.py +++ b/binarycpython/utils/grid_options_defaults.py @@ -30,6 +30,7 @@ grid_options_defaults_dict = { # general (or unordered..) ########################## "num_cores": 1, # total number of cores used to evolve the population + "num_cores_available": None, # set automatically, not by the user "parse_function": None, # Function to parse the output with. "multiplicity_fraction_function": 0, # Which multiplicity fraction function to use. 0: None, 1: Arenou 2010, 2: Rhagavan 2010, 3: Moe and di Stefano 2017 "tmp_dir": temp_dir(), # Setting the temp dir of the program @@ -131,6 +132,7 @@ grid_options_defaults_dict = { "symlink_latest_gridcode": True, # symlink to latest gridcode "save_population_object" : None, # filename to which we should save a pickled grid object as the final thing we do 'joinlist' : None, + 'do_analytics' : True, # if True, calculate analytics prior to return ## Monte carlo type evolution # TODO: make MC options ## Evolution from source file @@ -161,15 +163,13 @@ grid_options_defaults_dict = { ######################################## # Slurm stuff ######################################## - "slurm": 0, # dont use the slurm by default. 1 = use slurm - "slurm_ntasks": 1, # CPUs required per array job: usually only need this - "slurm_command": "", # Command that slurm runs (e.g. evolve or join_datafiles) + "slurm": 0, # dont use the slurm by default, 0 = no slurm, 1 = launch slurm jobs, 2 = run slurm jobs + "slurm_ntasks": 1, # CPUs required per array job: usually only need this to be 1 "slurm_dir": "", # working directory containing scripts output logs etc. "slurm_njobs": 0, # number of scripts; set to 0 as default "slurm_jobid": "", # slurm job id (%A) "slurm_memory": '512MB', # memory required for the job "slurm_warn_max_memory": '1024MB', # warn if we set it to more than this (usually by accident) - "slurm_use_all_node_CPUs": 0, # 1 = use all of a node's CPUs. 0 = use a given number of CPUs "slurm_postpone_join": 0, # if 1 do not join on slurm, join elsewhere. want to do it off the slurm grid (e.g. with more RAM) "slurm_jobarrayindex": None, # slurm job array index (%a) "slurm_jobname": "binary_grid", # default @@ -177,14 +177,7 @@ grid_options_defaults_dict = { "slurm_time": 0, # total time. 0 = infinite time "slurm_postpone_sbatch": 0, # if 1: don't submit, just make the script "slurm_array": None, # override for --array, useful for rerunning jobs - "slurm_use_all_node_CPUs": 0, # if given nodes, set to 1 - # # if given CPUs, set to 0 - # # you will want to use this if your Slurm SelectType is e.g. linear - # # which means it allocates all the CPUs in a node to the job - "slurm_control_CPUs": 0, # if so, leave this many for Pythons control (0) - "slurm_array": None, # override for --array, useful for rerunning jobs - "slurm_partition": None, # MUST be defined - "slurm_extra_settings": {}, # Place to put extra configuration for the SLURM batch file. The key and value of the dict will become the key and value of the line in te slurm batch file. Will be put in after all the other settings (and before the command). Take care not to overwrite something without really meaning to do so. + "slurm_extra_settings": {}, # Dictionary of extra settings for Slurm to put in its launch script. ######################################## # Condor stuff ######################################## @@ -459,8 +452,9 @@ grid_options_descriptions = { "_binary_c_executable": "Full path to the binary_c executable. This options is not used in the population object.", "_binary_c_shared_library": "Full path to the libbinary_c file. This options is not used in the population object", "verbosity": "Verbosity of the population code. Default is 0, by which only errors will be printed. Higher values will show more output, which is good for debugging.", - "binary": "Set this to 1 if the population contains binaries. Input: int", # TODO: write what effect this has. - "num_cores": "The number of cores that the population grid will use. The multiprocessing is useful but make sure to figure out how many logical cores the machine has (use e.g. psutil.cpu_count(logical=False) to find the true number of cores, psutil.cpu_count(logical=True) to find the number of logical cores). The core is multi processed, not multi threaded, and will gain no extra speed when num_cores exceeds the number of logical cores. Input: int", + # deprecated: "binary": "Set this to 1 if the population contains binaries. Input: int", # TODO: write what effect this has. + "num_cores": "The number of cores that the population grid will use. You can set this manually by entering an integer great than 0. When 0 uses all logical cores. When -1 uses all physical cores. Input: int", + "num_processes" : "Number of processes launched by multiprocessing. This should be set automatically by binary_c-python, not by the user.", "_start_time_evolution": "Variable storing the start timestamp of the population evolution. Set by the object itself.", # TODO: make sure this is logged to a file "_end_time_evolution": "Variable storing the end timestamp of the population evolution. Set by the object itself", # TODO: make sure this is logged to a file "_total_starcount": "Variable storing the total number of systems in the generator. Used and set by the population object.", diff --git a/binarycpython/utils/spacing_functions.py b/binarycpython/utils/spacing_functions.py index c3e0be388abf1320089604e61e8831a722480141..1e4c2ecf72297696b36d304ade77f05baaff531b 100644 --- a/binarycpython/utils/spacing_functions.py +++ b/binarycpython/utils/spacing_functions.py @@ -228,6 +228,11 @@ def const_dt( num_cores=self.grid_options["num_cores"], max_stellar_type_1=10, save_ensemble_chunks=False, + symlink_latest_gridcode=False, + modulo=1, + start_at=0, + slurm=0, + condor=0, ) # make a grid in M1