diff --git a/README.md b/README.md index 4d0dc5b92ef07bdad8ee37fd78372c5f4c512691..7ad8f6a3c0a41cd123f13c77c91a790acff0285a 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Docstring coverage: Test coverage:  -Binary population synthesis code that interfaces with binary_c. Based on a original work by Jeff Andrews (can be found in old_solution/ directory). Updated and extended for Python3 by David Hendriks, Robert Izzard. +Binary population synthesis code that interfaces with binary_c. Based on a original work by Jeff Andrews. Updated and extended for Python3 by David Hendriks, Robert Izzard. The current release is version [version](VERSION), make sure to use that version number when installing! diff --git a/binarycpython/utils/grid.py b/binarycpython/utils/grid.py index 690ce34a24489727fd7ef861c94ae690f481ffdf..213a89aa7ee9c98c1ac2d7b1341af7052b929512 100644 --- a/binarycpython/utils/grid.py +++ b/binarycpython/utils/grid.py @@ -22,21 +22,26 @@ Tasks: import argparse import bz2 +import compress_pickle import copy +import datasize import datetime import json import gc import gzip import importlib.util +import lib_programname import logging import msgpack import multiprocessing import os import pathlib +import psutil import py_rinterpolate import re import resource import setproctitle +import stat import strip_ansi import subprocess import sys @@ -359,25 +364,26 @@ class Population: # (attempt to) convert if old_value_found: - try: - verbose_print( - "Converting type of {} from {} to {}".format( - parameter, type(value), type(old_value) - ), - self.grid_options["verbosity"], - 2, - ) - value = type(old_value)(value) - verbose_print("Success!", self.grid_options["verbosity"], 2) - - except ValueError: - verbose_print( - "Tried to convert the given parameter {}/value {} to its correct type {} (from old value {}). But that wasn't possible.".format( - parameter, value, type(old_value), old_value - ), - self.grid_options["verbosity"], - 0, - ) + if old_value != None: + try: + verbose_print( + "Converting type of {} from {} to {}".format( + parameter, type(value), type(old_value) + ), + self.grid_options["verbosity"], + 2, + ) + value = type(old_value)(value) + verbose_print("Success!", self.grid_options["verbosity"], 2) + + except ValueError: + verbose_print( + "Tried to convert the given parameter {}/value {} to its correct type {} (from old value {}). But that wasn't possible.".format( + parameter, value, type(old_value), old_value + ), + self.grid_options["verbosity"], + 0, + ) # Add to dict cmdline_dict[parameter] = value @@ -390,10 +396,14 @@ class Population: string, dir="process_status", format_statment="process_{}.txt", - ID=self.process_ID()): + ID=None): """ function to set the status string in its appropriate file """ + + if ID is None: + ID = self.process_ID + if self.grid_options['status_dir']: with open( os.path.join( @@ -407,10 +417,10 @@ class Population: f.close() # custom logging functions - if self.grid_options['slurm']==1: - self.set_slurm_status(self,string) + if self.grid_options['slurm'] >= 1: + self.set_slurm_status(string) # if self.grid_options['condor']==1: -# self.set_condor_status(self,string) +# self.set_condor_status(string) def _return_argline(self, parameter_dict=None): """ @@ -996,7 +1006,6 @@ class Population: """ # empty results - self.grid_options["results"] = {} self.grid_results = AutoVivificationDict() self.grid_ensemble_results = {} @@ -1044,8 +1053,7 @@ class Population: def evolve(self) -> None: """ Entry point function of the whole object. From here, based on the settings, - we set up a SLURM or CONDOR grid, or if no setting is given we go straight - to evolving the population. + we set up a grid and (probably) evolve the population. There are no direct arguments to this function, rather it is based on the grid_options settings: grid_options['slurm']: integer Boolean whether to use a slurm_grid evolution @@ -1055,29 +1063,23 @@ class Population: (that doesn't mean this cannot be run on a server with many cores) Returns an dictionary containing the analytics of the run - - TODO: change the way this is done. Slurm & CONDOR should probably do this differently - NOTE: SLURM and CONDOR options are not working properly yet """ - # do not evolve if rungrid is False - if self.grid_options['rungrid'] is False: - return - # Just to make sure we don't have stuff from a previous run hanging around self._pre_run_cleanup() - # Check which type: - if self.grid_options["slurm"] == 1: - # Execute Slurm subroutines - # self._slurm_grid() - raise ValueError("Slurm evolution not available at this moment") + if self.grid_options["slurm"]>=1: + self.grid_options["symlink latest gridcode"] = False - elif self.grid_options["condor"] == 1: + if self.grid_options["condor"] >= 1: # Execute condor subroutines # self._condor_grid() raise ValueError("Condor evolution not available at this moment") - + elif self.grid_options["slurm"] == 1: + # Slurm setup grid + self.slurm_grid() + # and then exit + sys.exit() else: # Execute population evolution subroutines self._evolve_population() @@ -1104,14 +1106,24 @@ class Population: "zero_prob_stars_skipped": self.grid_options["_zero_prob_stars_skipped"], } - # Add analytics dict to the metadata too: - self.grid_ensemble_results["metadata"].update(analytics_dict) + if 'metadata' in self.grid_ensemble_results: + # Add analytics dict to the metadata too: + self.grid_ensemble_results["metadata"].update(analytics_dict) + + # Save object to a pickle file + if self.grid_options['save_population_object']: + self.save_population_object() + + # if we're running a slurm grid, exit here + if self.grid_options["slurm"] >= 1: + sys.exit() ## # Clean up code: remove files, unset values, unload interpolators etc. This is placed in the general evolve function, # because that makes for easier control self._cleanup() + return analytics_dict def _evolve_population(self): @@ -1132,17 +1144,26 @@ class Population: # set custom logging, set up store_memaddr, build grid code. dry run grid code. self._setup() - ## - # Evolve systems: via grid_options one can choose to do this linearly, or - # multiprocessing method. - if ( + # special cases + if self.grid_options['evolution_type'] == 'join': + joinfiles = self.joinfiles() + + if self.can_join(joinfiles): + # join object files + print("can join : all tasks are finished") + self.join_from_files(joinfiles) + else: + print("cannot join : other tasks are not yet finished") + return + + ############################################################ + # Evolve systems + elif ( self.grid_options["evolution_type"] in self.grid_options["_evolution_type_options"] ): - if self.grid_options["evolution_type"] == "grid": - self._evolve_population_grid() - elif self.grid_options["evolution_type"] == "custom_generator": - # Use the same as the normal grid evolution but just a different generator + if self.grid_options["evolution_type"] == "grid" or \ + self.grid_options["evolution_type"] == "custom_generator": self._evolve_population_grid() # elif self.grid_options["evolution_type"] == "mc": @@ -1486,7 +1507,7 @@ class Population: # setproctitle.setthreadtitle(name) # Set to starting up - self.set_status("STARTING") + self.set_status("starting") # lets try out making stores for all the grids: self.grid_options["_store_memaddr"] = _binary_c_bindings.return_store_memaddr() @@ -1542,23 +1563,12 @@ class Population: ) next_mem_update_time = start_grid_time + self.grid_options["log_dt"] + # Set status to running + self.set_status('running') + ############################################################ # Go over the queue for system_number, system_dict in iter(job_queue.get, "STOP"): - # At the first system set the status of the thread to running - if localcounter == 0: - - # Set status to running - with open( - os.path.join( - self.grid_options["tmp_dir"], - "process_status", - "process_{}.txt".format(self.process_ID), - ), - "w", - ) as f: - f.write("RUNNING") - f.close() # Combine that with the other settings full_system_dict = self.bse_options.copy() @@ -1745,7 +1755,7 @@ class Population: ) # Set status to finishing - self.set_status("FINISHING") + self.set_status("finishing") if self.grid_options["verbosity"] >= _LOGGER_VERBOSITY_LEVEL: stream_logger.debug(f"Process-{self.process_ID} is finishing.") @@ -1910,7 +1920,7 @@ class Population: f.close() # Set status to finished - self.set_status("FINISHED") + self.set_status("finished") verbose_print( "process {} queue put output_dict ".format(ID), @@ -3578,7 +3588,7 @@ class Population: "Unable to determine file type from ensemble filename {} : it should be .json or .msgpack." ).format(output_file) sys.exit() - elif file_type is "JSON": + elif file_type == "JSON": # JSON output if compression == "gzip": # gzip @@ -3591,7 +3601,7 @@ class Population: f = open(output_file, "wt") f.write(json.dumps(data, sort_keys=sort_keys, indent=indent)) - elif file_type is "msgpack": + elif file_type == "msgpack": # msgpack output if compression == "gzip": f = gzip.open(output_file, "wb") @@ -5000,39 +5010,64 @@ eccentricity3=0 return os.path.abspath(os.path.join(self.grid_options['slurm_dir'],path)) def slurm_status_file(self, - jobid=self.grid_options['slurm_jobid'], - jobarrayindex=self.grid_options['slurm_jobarrayindex']): + jobid=None, + jobarrayindex=None): """ Return the slurm status file corresponding to the jobid and jobarrayindex, which default to grid_options slurm_jobid and slurm_jobarrayindex, respectively. """ - return os.path.join(slurmpath(self,'status') - self.grid_options['slurm_jobid'] + '.' + self.grid_options['slurm_jobarrayindex']) + if jobid is None: + jobid= self.grid_options['slurm_jobid'] + if jobarrayindex is None: + jobarrayindex = self.grid_options['slurm_jobarrayindex'] + if jobid and jobarrayindex: + return os.path.join(self.slurmpath('status'), + self.grid_options['slurm_jobid'] + '.' + self.grid_options['slurm_jobarrayindex']) + else: + return None def set_slurm_status(self,string): """ Set the slurm status corresponing to the self object, which should have slurm_jobid and slurm_jobarrayindex set. """ - file = slurm_status_file(self) + file = self.slurm_status_file() + print('status file ',file) if file: with open(file,'w') as f: f.write(string) f.close() def get_slurm_status(self, - jobid=self.grid_options['jobid'], - jobarrayindex=self.grid_options['jobarrayindex']): + jobid=None, + jobarrayindex=None): """ Get and return the slurm status corresponing to the self object, or jobid.jobarrayindex if they are passed in. If no status is found, returns None. """ - path = pathlib.Path(slurm_status_file(self,jobid=jobid,jobarrayindex=jobarrayindex)) + if jobid is None: + jobid=self.grid_options['slurm_jobid'] + if jobarrayindex is None: + jobarrayindex=self.grid_options['slurm_jobarrayindex'] + + if jobid is None or jobid or jobarrayindex is None : + return None + + path = pathlib.Path(self.slurm_status_file(jobid=jobid, + jobarrayindex=jobarrayindex)) if path: return path.read_text() else: return None - def slurm_outfile(self,jobid): - # return a standard filename for the slurm chunk files - return os.path.abspath(os.path.join(self.grid_options['slurm_dir'],'joining',jobid)) + def slurm_outfile(self): + """ + return a standard filename for the slurm chunk files + """ + file = "{jobid}.{jobarrayindex}.gz".format( + jobid=self.grid_options['slurm_jobid'], + jobarrayindex=self.grid_options['slurm_jobarrayindex'] + ) + return os.path.abspath(os.path.join(self.grid_options['slurm_dir'], + 'results', + file)) def make_slurm_dirs(self): @@ -5045,45 +5080,68 @@ eccentricity3=0 # output, status files, etc. dirs = [] for dir in ['scripts','stdout','stderr','results','logs','status','joining']: - dirs.append(slurmpath(self,dir)) + dirs.append(self.slurmpath(dir)) - # make the directories + # make the directories: we do not allow these to already exist + # as the slurm directory should be a fresh location for each set of jobs for dir in dirs: - pathlib.Path(slurmpath(self,dir)).mkdir(exit_ok=True,parents=True) + pathlib.Path(self.slurmpath(dir)).mkdir(exist_ok=False, + parents=True) - # check that they have been made and exist + # check that they have been made and exist: we need this + # because on network mounts (NFS) there's often a delay between the mkdir + # above and the actual directory being made. This shouldn't be too long... fail = True + count = 0 + count_warn = 10 while fail is True: fail = False + count += 1 + if count > count_warn: + print("Warning: Have been waiting about {} seconds for Slurm directories to be made, there seems to be significant delay...".format(count)) for dir in dirs: if os.path.isdir(dir) is False: fail = True - sleep 1 + time.sleep(1) break # break the "for dir in dirs:" def slurm_grid(self): - # function to launch a grid of stellar selfs using - # Slurm + # function to be called when running grids when grid_options['slurm']>=1 + # + # if slurm=1, we set up the slurm script and launch the jobs + # if slurm=2, we are being called from the jobs + + print("SLURM GRID",self.grid_options['slurm'],self.grid_options['evolution_type']) + # if slurm=1, we should have no evolution type, we just + # set up a load of slurm scripts and get them evolving + # in slurm array + if self.grid_options['slurm'] == 1: + self.grid_options['evolution_type'] = None - if self.grid_options['slurm_command'] is 'run_grid': + print("OVERRIDE",self.grid_options['evolution_type']) + + if self.grid_options['evolution_type'] == 'grid': # run a grid of stars only, leaving the results # in a file + print("Run grid") # get number of cpu cores available to us - ncpus = max(1,psutil.cpu_count(logical=True)-1) + ncpus = max(1,psutil.cpu_count(logical=True)) # use them all self.custom_options['num_cores'] = ncpus - self.evolve() + # set output file + self.grid_options['save_population_object'] = slurm_outfile() - return + return self.evolve() - elif self.grid_options['slurm_command'] is 'join': + elif self.grid_options['evolution_type'] == 'join': # join the data from multiple grid runs - joinfile = os.path.join(slurmpath(self),'joining', self.grid_options['slurm_jobid']) + # TODO : fix this + joinfile = os.path.join(self.slurmpath(),'joining', self.grid_options['slurm_jobid']) if os.path.exists(joinfile): print("Another process is already joining") @@ -5095,9 +5153,8 @@ eccentricity3=0 file=results_dumpfle, jobid=self.grid_options['slurm_jobid'], n=n)) - status = get_slurm_status(self, - jobid=self.grid_options['slurm_jobid'], - jobarrayindex=n) + status = self.get_slurm_status(jobid=self.grid_options['slurm_jobid'], + jobarrayindex=n) if status != "finished": print("... is not finished") return @@ -5112,51 +5169,72 @@ eccentricity3=0 # already joining return - self.grid_options['rungrid'] = False - + self.grid_options['rungrid'] = 0 + return else: # setup and launch slurm jobs - make_slurm_dirs() + print("Setup and slurm") + self.make_slurm_dirs() # check we're not using too much RAM - if self.grid_options['slurm_memory'] > self.grid_options['slurm_warn_max_memory']: - print("WARNING: you want to use > {} MB of RAM : this is unlikely to be correct. If you believe it is, set slurm_warn_max_memory to something very large (it is currently {} MB)\n".format( + print("datasize slurm_memory",datasize.DataSize(self.grid_options['slurm_memory'])) + print("datasize slurm_warn_max_memory",datasize.DataSize(self.grid_options['slurm_warn_max_memory'])) + + if datasize.DataSize(self.grid_options['slurm_memory']) > datasize.DataSize(self.grid_options['slurm_warn_max_memory']): + print("WARNING: you want to use {} MB of RAM : this is unlikely to be correct. If you believe it is, set slurm_warn_max_memory to something very large (it is currently {} MB)\n".format( self.grid_options['slurm_memory'], self.grid_options['slurm_warn_max_memory'])) - os.exit() + sys.exit() # set slurm_array - slurm_array = self.grid_options['slurm_array'] or "1 - {njobs}\%{njobs}".format(self.grid_options['slurm_njobs']) + slurm_array = self.grid_options['slurm_array'] or "1-{njobs}\%{njobs}".format(njobs=self.grid_options['slurm_njobs']) # get job id (might be passed in) jobid = self.grid_options['slurm_jobid'] if self.grid_options['slurm_jobid'] != "" else '$SLURM_ARRAY_JOB_ID' # get job array index - jobarrayindex = self.grid_options['slurm_jobarrayindex'] if self.grid_options['slurm_jobarrayindex'] != "" else '$SLURM_ARRAY_TASK_ID' + jobarrayindex = self.grid_options['slurm_jobarrayindex'] + + if jobarrayindex is None: + jobarrayindex = '$SLURM_ARRAY_TASK_ID' + print("INDEX",jobarrayindex) + + if self.grid_options['slurm_njobs'] == 0: + print("binary_c-python Slurm : You must set grid_option slurm_njobs to be non-zero") + sys.exit() # build the grid command grid_command = [ "/usr/bin/env", - os.path.abspath(sys.argv[0]), - 'run_flexigrid=1', - 'offset=' + jobarrayindex, - 'modulo=' + njobs, - 'results_dumpfile=' os.path.join(slurmpath('results'), jobid + '.' + jobarrayindex), - 'slurm_njobs=' + slurm_njobs, + sys.executable, + str(lib_programname.get_path_executed_script()),#sys.argv[0],#os.path.abspath(__main__.__file__), + ] + sys.argv[1:] + [ + 'slurm=2', + 'offset=' + str(jobarrayindex), + 'modulo=' + str(self.grid_options['slurm_njobs']), + 'results_dumpfile=' + os.path.join(self.slurmpath('results'), jobid + '.' + jobarrayindex), + 'slurm_njobs=' + str(self.grid_options['slurm_njobs']), 'slurm_dir=' + self.grid_options['slurm_dir'], - 'verbosity=' + self.grid_options['verbosity'] + 'verbosity=' + str(self.grid_options['verbosity']) ] grid_command = ' '.join(grid_command) # make slurm script - scriptpath = slurmpath('slurm_script') + scriptpath = self.slurmpath('slurm_script') try: script = open(scriptpath,'w') except IOError: print("Could not open Slurm script at {path} for writing: please check you have set {slurm_dir} correctly (it is currently {slurm_dir} and can write to this directory.".format(path=scriptpath, slurm_dir=self.grid_options['slurm_dir'])) + # the joinfile contains the list of chunk files to be joined + joinfile = "{slurm_dir}/results/{jobid}.all".format( + slurm_dir=self.grid_options['slurm_dir'], + jobid=jobid + ) + + lines = [ "#!/bin/bash\n", "# Slurm file for binary_grid2 and slurm\n", @@ -5164,31 +5242,180 @@ eccentricity3=0 "#SBATCH --output={slurm_dir}/stdout/\%A.\%a\n".format(slurm_dir=self.grid_options['slurm_dir']), "#SBATCH --job-name={slurm_jobname}\n".format(slurm_jobname=self.grid_options['slurm_jobname']), "#SBATCH --partition={slurm_partition}\n".format(slurm_partition=self.grid_options['slurm_partition']), - "#SBATCH --time={slurm_time}\n".format(slurm_time=self.grid_options['slurm_ntime']), + "#SBATCH --time={slurm_time}\n".format(slurm_time=self.grid_options['slurm_time']), "#SBATCH --mem={slurm_memory}\n".format(slurm_memory=self.grid_options['slurm_memory']), "#SBATCH --ntasks={slurm_ntasks}\n".format(slurm_ntasks=self.grid_options['slurm_ntasks']), "#SBATCH --array={slurm_array}\n".format(slurm_array=slurm_array), "\n# set status to \"running\"\n", - "echo \"running\" > {slurm_dir}/status/{jobid}.{jobarrayindex}\n\n".format(slurm_dir=self.grid_options['slurm_dir'],jobid=jobid,jobarrayindex=jobarrayindex), - "\n# run grid of stars\n{grid_command} rungrid=1 slurm_command=run_flexigrid\n".format(grid_command=grid_command), - "\n# set status to \"finished\"\necho \"finished\" > {slurm_dir}/status/{jobid}.{jobarrayindex}\n\n\n".format(slurm_dir=self.grid_options['slurm_dir'],jobid=jobid,jobarrayindex=jobarrayindex) + "echo \"running\" > {slurm_dir}/status/{jobid}.{jobarrayindex}\n\n".format(slurm_dir=self.grid_options['slurm_dir'], + jobid=jobid, + jobarrayindex=jobarrayindex), + "\n# make list of files\n", + "\necho {slurm_dir}/results/{jobid}.{jobarrayindex}.gz >> {slurm_dir}/results/{jobid}.all\n".format(slurm_dir=self.grid_options['slurm_dir'], + jobid=jobid, + jobarrayindex=jobarrayindex, + joinfile=joinfile), + "\n# run grid of stars", + "\n{grid_command} evolution_type=grid save_population_object={slurm_dir}/results/{jobid}.{jobarrayindex}.gz\n".format(slurm_dir=self.grid_options['slurm_dir'], + jobid=jobid, + jobarrayindex=jobarrayindex, + grid_command=grid_command), + "\n# set status to \"finished\"\necho \"finished\" > {slurm_dir}/status/{jobid}.{jobarrayindex}\n\n\n".format(slurm_dir=self.grid_options['slurm_dir'], + jobid=jobid, + jobarrayindex=jobarrayindex) ] + + + if not self.grid_options['slurm_postpone_join']: - lines.append("\n# check if we can join\n{grid_command} rungrid=0 results_hash_dumpfile={slurm_dir}/results/{jobid}.all slurm_command=join".format( + lines.append("\n# check if we can join\n{grid_command} slurm=2 evolution_type=join joinlist={joinfile}\n\n".format( grid_command=grid_command, - slurm_dir=self.grid_options['slurm_dir'], - jobid=jobid)) + joinfile=joinfile + )) - # write to script and close it + # write to script, close it and make it executable by + # all (so the slurm user can pick it up) script.writelines(lines) script.close() + os.chmod(scriptpath, + stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC | \ + stat.S_IRGRP | stat.S_IXGRP | \ + stat.S_IROTH | stat.S_IXOTH) + time.sleep(1) if not self.grid_options['slurm_postpone_sbatch']: - # launch scripts - cmd = ' '.join("sbatch",scriptpath) - subprocess.call(cmd) + # call the slurm script to launch the jobs + cmd = "sbatch " + scriptpath + os.system(cmd) else: # just say we would have print("Slurm script is at {path} but has not been launched".format(path=scriptpath)) print("All done in slurm_grid().") + + def save_population_object(self,object=None,filename=None,confirmation=True,compression='gzip'): + """ + Save pickled Population object to file at filename or, if filename is None, whatever is set at self.grid_options['save_population_object'] + + Args: + object : the object to be saved to the file. If object is None, use self. + filename : the name of the file to be saved. If not set, use self.grid_options['save_population_object'] + confirmation : if True, a file "filename.saved" is touched just after the dump, so we know it is finished. + + Compression is performed according to the filename, as stated in the + compress_pickle documentation at + https://lucianopaz.github.io/compress_pickle/html/ + + Shared memory, stored in the object.shared_memory dict, is not saved. + + """ + if object is None: + # default to using self + object = self + + if filename is None: + # get filename from self + filename = self.grid_options['save_population_object'] + + if filename: + + print("Save pickle to ",filename) + print("pop is ",self.grid_options["_population_id"]) + print("probtot ",object.grid_options['_probtot']) + # remove shared memory + shared_memory = object.shared_memory + object.shared_memory = None + + # delete system generator + system_generator = object.grid_options["_system_generator"] + object.grid_options["_system_generator"] = None + + # dump pickle file + compress_pickle.dump(object, + filename) + + # restore data + object.shared_memory = shared_memory + object.grid_options["_system_generator"] = system_generator + + # touch 'saved' file + pathlib.Path(filename + '.saved').touch(exist_ok=True) + + def load_population_object(self,filename): + """ + returns the Population object loaded from filename + """ + print("loading population object from",filename) + if filename is None: + return None + else: + obj = compress_pickle.load(filename) + print("loaded obj",obj) + return obj + + def merge_grid_object_results(self,refpop,newpop): + """ + merge newpop's results data into refpop's results data + + Args: + refpop : the original "reference" Population object to be added to + newpop : Population object containing the new data + + Returns: + nothing + + Note: + The file should be saved using save_population_object() + """ + print("merge dicts") + print("left: ",refpop.grid_results) + print("right:",newpop.grid_results) + refpop.grid_results = merge_dicts(refpop.grid_results, + newpop.grid_results) + print("probs left ",refpop.grid_options["_probtot"],"right",newpop.grid_options["_probtot"]) + for key in ["_probtot"]: + refpop.grid_options[key] += newpop.grid_options[key] + + def merge_grid_object_results_from_file(self,refpop,filename): + """ + Wrapper for merge_grid_object_results so it can be done directly + from a file. + + Args: + refpop : the original "reference" Population object to be added to + filename : file containing the Population object containing the new data + + Note: + The file should be saved using save_population_object() + """ + self.merge_grid_object_results(refpop, + self.load_population_object(filename)) + print("done merge from file") + + def joinfiles(self): + """ + Function to load in the joinlist to an array + """ + print("Loading joinfiles from ",self.grid_options['joinlist']) + f = open(self.grid_options['joinlist'],'r') + list = f.read().splitlines() + f.close() + return list + + def join_from_files(self,joinfiles): + # merge the results from many object files + for file in joinfiles: + self.merge_grid_object_results_from_file(self, + file) + print("done join from files") + + def can_join(self,joinfiles): + # check the joinfiles to make sure they all exist + # and their .saved equivalents also exist + for file in joinfiles: + print("check for ",file) + if os.path.exists(file) == False: + return False + elif os.path.exists(file + '.saved') == False: + return False + return True diff --git a/binarycpython/utils/grid_options_defaults.py b/binarycpython/utils/grid_options_defaults.py index e3c5f4b73872448142e19fece8895ccc8ac4c9c3..ddc07e22431e91d1e7770d6bc327db311de57e62 100644 --- a/binarycpython/utils/grid_options_defaults.py +++ b/binarycpython/utils/grid_options_defaults.py @@ -33,6 +33,7 @@ grid_options_defaults_dict = { "parse_function": None, # Function to parse the output with. "multiplicity_fraction_function": 0, # Which multiplicity fraction function to use. 0: None, 1: Arenou 2010, 2: Rhagavan 2010, 3: Moe and di Stefano 2017 "tmp_dir": temp_dir(), # Setting the temp dir of the program + "status_dir" : None, # "_main_pid": -1, # Placeholder for the main process id of the run. "save_ensemble_chunks": True, # Force the ensemble chunk to be saved even if we are joining a thread (just in case the joining fails) "combine_ensemble_with_thread_joining": True, # Flag on whether to combine everything and return it to the user or if false: write it to data_dir/ensemble_output_{population_id}_{thread_id}.json @@ -128,6 +129,8 @@ grid_options_defaults_dict = { "_grid_variables": {}, # grid variables "gridcode_filename": None, # filename of gridcode "symlink latest gridcode": True, # symlink to latest gridcode + "save_population_object" : None, # filename to which we should save a pickled grid object as the final thing we do + 'joinlist' : None, ## Monte carlo type evolution # TODO: make MC options ## Evolution from source file @@ -164,11 +167,11 @@ grid_options_defaults_dict = { "slurm_dir": "", # working directory containing scripts output logs etc. "slurm_njobs": 0, # number of scripts; set to 0 as default "slurm_jobid": "", # slurm job id (%A) - "slurm_memory": 512, # in MB, the memory use of the job - "slurm_warn_max_memory": 1024, # in MB : warn if mem req. > this + "slurm_memory": '512MB', # memory required for the job + "slurm_warn_max_memory": '1024MB', # warn if we set it to more than this (usually by accident) "slurm_use_all_node_CPUs": 0, # 1 = use all of a node's CPUs. 0 = use a given number of CPUs "slurm_postpone_join": 0, # if 1 do not join on slurm, join elsewhere. want to do it off the slurm grid (e.g. with more RAM) - "slurm_jobarrayindex": "", # slurm job array index (%a) + "slurm_jobarrayindex": None, # slurm job array index (%a) "slurm_jobname": "binary_grid", # default "slurm_partition": None, "slurm_time": 0, # total time. 0 = infinite time @@ -197,24 +200,24 @@ grid_options_defaults_dict = { "condor_universe": "vanilla", # usually vanilla universe "condor_extra_settings": {}, # Place to put extra configuration for the CONDOR submit file. The key and value of the dict will become the key and value of the line in te slurm batch file. Will be put in after all the other settings (and before the command). Take care not to overwrite something without really meaning to do so. # snapshots and checkpoints - condor_snapshot_on_kill:0, # if 1 snapshot on SIGKILL before exit - condor_load_from_snapshot:0, # if 1 check for snapshot .sv file and load it if found - condor_checkpoint_interval:0, # checkpoint interval (seconds) - condor_checkpoint_stamp_times:0, # if 1 then files are given timestamped names + 'condor_snapshot_on_kill':0, # if 1 snapshot on SIGKILL before exit + 'condor_load_from_snapshot':0, # if 1 check for snapshot .sv file and load it if found + 'condor_checkpoint_interval':0, # checkpoint interval (seconds) + 'condor_checkpoint_stamp_times':0, # if 1 then files are given timestamped names # (warning: lots of files!), otherwise just store the lates - condor_streams:0, # stream stderr/stdout by default (warning: might cause heavy network load) - condor_save_joined_file:0, # if 1 then results/joined contains the results + 'condor_streams':0, # stream stderr/stdout by default (warning: might cause heavy network load) + 'condor_save_joined_file':0, # if 1 then results/joined contains the results # (useful for debugging, otherwise a lot of work) - condor_requirements:'', # used? + 'condor_requirements':'', # used? # # resubmit options : if the status of a condor script is # # either 'finished','submitted','running' or 'crashed', # # decide whether to resubmit it. # # NB Normally the status is empty, e.g. on the first run. # # These are for restarting runs. # condor_resubmit_finished:0, - condor_resubmit_submitted:0, - condor_resubmit_running:0, - condor_resubmit_crashed:0, + 'condor_resubmit_submitted':0, + 'condor_resubmit_running':0, + 'condor_resubmit_crashed':0, ########################## # Unordered. Need to go through this. Copied from the perl implementation. ########################## @@ -271,7 +274,7 @@ grid_options_defaults_dict = { # C_auto_logging : undef, # custom_output_C_function_pointer : binary_c_function_bind(), # # control flow - rungrid : True, # usually run the grid, but can be 0 + 'rungrid' : 1, # usually run the grid, but can be 0 # # to skip it (e.g. for condor/slurm runs) # merge_datafiles:'', # merge_datafiles_filelist:'', @@ -450,6 +453,7 @@ grid_options_defaults_dict = { # Grid containing the descriptions of the options # TODO: add input types for all of them grid_options_descriptions = { "tmp_dir": "Directory where certain types of output are stored. The grid code is stored in that directory, as well as the custom logging libraries. Log files and other diagnostics will usually be written to this location, unless specified otherwise", # TODO: improve this + "status_dir" : "Directory where grid status is stored", "_binary_c_dir": "Director where binary_c is stored. This options are not really used", "_binary_c_config_executable": "Full path of the binary_c-config executable. This options is not used in the population object.", "_binary_c_executable": "Full path to the binary_c executable. This options is not used in the population object.", @@ -489,7 +493,7 @@ grid_options_descriptions = { "slurm": "Int flag whether to use a Slurm type population evolution.", # TODO: describe this in more detail "weight": "Weight factor for each system. The calculated probability is multiplied by this. If the user wants each system to be repeated several times, then this variable should not be changed, rather change the _repeat variable instead, as that handles the reduction in probability per system. This is useful for systems that have a process with some random element in it.", # TODO: add more info here, regarding the evolution splitting. "repeat": "Factor of how many times a system should be repeated. Consider the evolution splitting binary_c argument for supernovae kick repeating.", # TODO: make sure this is used. - "evolution_type": "Variable containing the type of evolution used of the grid. Multiprocessing or linear processing", + "evolution_type": "Variable containing the type of evolution used of the grid. Multiprocessing, linear processing or possibly something else (e.g. for Slurm or Condor).", "combine_ensemble_with_thread_joining": "Boolean flag on whether to combine everything and return it to the user or if false: write it to data_dir/ensemble_output_{population_id}_{thread_id}.json", "log_runtime_systems": "Whether to log the runtime of the systems . Each systems run by the thread is logged to a file and is stored in the tmp_dir. (1 file per thread). Don't use this if you are planning to run a lot of systems. This is mostly for debugging and finding systems that take long to run. Integer, default = 0. if value is 1 then the systems are logged", "_total_mass_run": "To count the total mass that thread/process has ran", diff --git a/setup.py b/setup.py index 624e08e87911755a945068ddd26fda7620148017..721dd841d81ef5ec0906bac3b2075e4a4602cb02 100644 --- a/setup.py +++ b/setup.py @@ -263,9 +263,12 @@ setup( install_requires=[ "astropy", "colorama", + "compress_pickle", + "datasize", "h5py", "halo", "humanize", + "lib_programname", "matplotlib", "msgpack", "numpy",