diff --git a/binarycpython/utils/functions.py b/binarycpython/utils/functions.py index f16992d31ef71784fb8d5ddc5166e45436f79df1..f7e4532b73130630798e868a4e0f5643901c1c7f 100644 --- a/binarycpython/utils/functions.py +++ b/binarycpython/utils/functions.py @@ -18,6 +18,8 @@ import sys import subprocess import time import types +import resource +import psutil from io import StringIO from typing import Union, Any @@ -39,6 +41,36 @@ import py_rinterpolate ######################################################## # Unsorted ######################################################## + +def mem_use(): + """ + Return current process memory use in MB. (Takes no arguments) Note: this is per-thread only. + """ + + return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024.0 + +def trem(dt, count, dn, n): + """ + Estimate time remaining (seconds) given a differential time and count (i.e. progress = $count/$n). $dt is the time since the last call, $count is the current progress count, $dn is the number run since the last call, and $n is the total number required. + """ + tpr = dt / max(1,dn) + etasecs = tpr * (n - count) + (eta, units) = conv_time_units(etasecs) + return (eta, units, tpr, etasecs) + +def conv_time_units(t): + """ + Converts time (t, in seconds, passing in as the only argument) to seconds, minutes or hours depending on its magnitude. Returns a tuple (t,units). + """ + units='s'; # default to seconds + if t > 60: + t /= 60; + units='m'; + if t > 60: + t /=60; + units='h'; + return (t, units) + class AutoVivificationDict(dict): """ Implementation of perl's autovivification feature, by overriding the @@ -505,6 +537,9 @@ def remove_file(file: str, verbosity: int = 0) -> None: ) +def get_username(): + return psutil.Process().username() + def temp_dir(*args: str) -> str: """ Function to create directory within the TMP directory of the file system @@ -519,7 +554,8 @@ def temp_dir(*args: str) -> str: """ tmp_dir = tempfile.gettempdir() - path = os.path.join(tmp_dir, "binary_c_python") + username = get_username() + path = os.path.join(tmp_dir, "binary_c_python-{}".format(username)) # loop over the other paths if there are any: if args: @@ -531,7 +567,6 @@ def temp_dir(*args: str) -> str: return path - def create_hdf5(data_dir: str, name: str) -> None: """ Function to create an hdf5 file from the contents of a directory: diff --git a/binarycpython/utils/grid.py b/binarycpython/utils/grid.py index d695262ce35ffa79f238b89731bb05b04e4f0b25..37ab96be1a536372b2d7571abc7165075b4da4ed 100644 --- a/binarycpython/utils/grid.py +++ b/binarycpython/utils/grid.py @@ -71,6 +71,9 @@ from binarycpython.utils.functions import ( multiply_values_dict, format_ensemble_results, AutoVivificationDict, + trem, + conv_time_units, + mem_use ) # from binarycpython.utils.hpc_functions import ( @@ -824,12 +827,14 @@ class Population: self.grid_options["_population_id"] = uuid.uuid4().hex # set previous logging time - self.shared_memory["prev_log_time"] = multiprocessing.Value('d',time.time()) + self.shared_memory["prev_log_time"] = multiprocessing.Value('d', time.time()) + # set previous logging system number to 0 - self.shared_memory["prev_log_system_number"] = multiprocessing.Value('i',0) + self.shared_memory["prev_log_system_number"] = multiprocessing.Value('i', 0) + # array to store memory use per-thread - mem = self._mem_use() - self.shared_memory["memory_use_per_thread"] = multiprocessing.Array('d',[1.0*mem]*self.grid_options["amt_cores"]) + mem = mem_use() + self.shared_memory["memory_use_per_thread"] = multiprocessing.Array('d', [1.0 * mem] * self.grid_options["amt_cores"]) def clean( self @@ -1296,15 +1301,19 @@ class Population: zero_prob_stars_skipped = 0 total_time_calling_binary_c = 0 - start_grid_time = time.time() - next_log_time = self.shared_memory["prev_log_time"].value + self.grid_options["log_dt"] - next_mem_update_time = start_grid_time + self.grid_options["log_dt"] total_mass_run = 0 total_probability_weighted_mass_run = 0 + # variables for the statu bar prints + start_grid_time = time.time() + next_log_time = self.shared_memory["prev_log_time"].value + self.grid_options["log_dt"] + next_mem_update_time = start_grid_time + self.grid_options["log_dt"] + + ############################################################ # Go over the queue for system_number, system_dict in iter(job_queue.get, "STOP"): + # At the first system set the status of the thread to running if localcounter == 0: # Set status to running @@ -1318,25 +1327,16 @@ class Population: ) as f: f.write("RUNNING") - # if system_number%10==0: - # print("_process_run_population_grid: system_number: {}".format(system_number)) - # bytes_size_Moecache = get_size(Moecache) - # print("\t_process_run_population_grid: Size moecache: {}".format(convert_bytes(bytes_size_Moecache))) - - # bytes_size_distribution_constants = get_size(distribution_constants) - # print("\t_process_run_population_grid: Size distribution_constants: {}".format(convert_bytes(bytes_size_distribution_constants))) - - # bytes_size_self = get_size(dir(self)) - # print("\t_process_run_population_grid: Size dir(self): {}".format(convert_bytes(bytes_size_self))) - - # Combine that with the other settings full_system_dict = self.bse_options.copy() full_system_dict.update(system_dict) + + # In the first system, explicitly check all the keys that are passed to see if # they match the keys known to binary_c. # Won't do that every system cause that is a bit of a waste of computing time. + # TODO: check if we can rename the below var if number_of_systems_run == 0: # TODO: Put this someplace else and wrap in a function call for key in full_system_dict.keys(): @@ -1362,33 +1362,44 @@ class Population: # i + 1, self.grid_options["_total_starcount"], full_system_dict # ) - # + # verbose_print( + # "Process {} is handling system {}".format(ID, system_number), + # self.grid_options["verbosity"], + # 1, + # ) -# verbose_print( -# "Process {} is handling system {}".format(ID, system_number), -# self.grid_options["verbosity"], -# 1, -# ) + ###################### + # Print status of runs # save the current time (used often) now = time.time() # update memory use stats every log_dt seconds (not every time, this is likely a bit expensive) if now > next_mem_update_time: - self.shared_memory["memory_use_per_thread"][ID] = self._mem_use() - next_mem_update = now + self.grid_options["log_dt"] + self.shared_memory["memory_use_per_thread"][ID] = mem_use() + next_mem_update_time = now + self.grid_options["log_dt"] # calculate the next logging time next_log_time = self.shared_memory["prev_log_time"].value + self.grid_options["log_dt"] + # Check if we need to log info again + # TODO: Check if we can put this functionality elsewhere if now > next_log_time: # we have exceeded the next log time : output and update timers + # Lock the threads. TODO: Do we need to release this? lock = multiprocessing.Lock() - self.vb1print(ID,now,system_number) + + # Do the printing itself + self.vb1print(ID, now, system_number) + + # Set some values for next time next_log_time = now + self.grid_options["log_dt"] self.shared_memory["prev_log_time"].value = now self.shared_memory["prev_log_system_number"].value = system_number + ############### + # Log current system info + # In some cases, the whole run crashes. To be able to figure out which system # that was on, we log each current system to a file (each thread has one). # Each new system overrides the previous @@ -1403,6 +1414,9 @@ class Population: binary_cmdline_string = self._return_argline(full_system_dict) f.write(binary_cmdline_string) + + ############## + # Running the system start_runtime_binary_c = time.time() # If we want to actually evolve the systems @@ -1425,6 +1439,9 @@ class Population: end_runtime_binary_c - start_runtime_binary_c ) # keep track of total binary_c call time + ############ + # Logging runtime + # Debug line: logging all the lines if self.grid_options["log_runtime_systems"] == 1: with open( @@ -1444,6 +1461,9 @@ class Population: ) ) + #################### + # Tallying system information + # Keep track of systems: probability_of_systems_run += full_system_dict["probability"] number_of_systems_run += 1 @@ -1475,7 +1495,10 @@ class Population: if self.grid_options['verbosity'] >= _LOGGER_VERBOSITY_LEVEL: stream_logger.debug(f"Process-{self.process_ID} is finishing.") - # Handle ensemble output: is ensemble==1, then either directly write that data to a file, or combine everything into 1 file. + #################### + # Handle ensemble outut + + # if ensemble==1, then either directly write that data to a file, or combine everything into 1 file. ensemble_json = {} # Make sure it exists already if self.bse_options.get("ensemble", 0) == 1: verbose_print( @@ -1531,6 +1554,9 @@ class Population: ) ) + ######## + # Clean up and return + # free store memory: _binary_c_bindings.free_store_memaddr(self.grid_options["_store_memaddr"]) @@ -4175,29 +4201,10 @@ eccentricity3=0 return multiplicity_fraction_dict[system_dict['multiplicity']] - def _trem(self,dt,count,dn,n): - """ - Estimate time remaining (seconds) given a differential time and count (i.e. progress = $count/$n). $dt is the time since the last call, $count is the current progress count, $dn is the number run since the last call, and $n is the total number required. - """ - tpr = dt / max(1,dn) - etasecs = tpr * (n - count) - (eta,units) = self._conv_time_units(etasecs) - return (eta,units,tpr,etasecs) + ###################### + # Status logging - def _conv_time_units(self,t): - """ - Converts time (t, in seconds, passing in as the only argument) to seconds, minutes or hours depending on its magnitude. Returns a tuple (t,units). - """ - units='s'; # default to seconds - if t > 60: - t /= 60; - units='m'; - if t > 60: - t /=60; - units='h'; - return (t,units) - - def vb1print(self,ID,now,system_number): + def vb1print(self, ID, now, system_number): """ Verbosity-level 1 printing, to keep an eye on a grid. Arguments: @@ -4205,14 +4212,20 @@ eccentricity3=0 now : the time now as a UNIX-style epoch in seconds (float) system_number : the system number """ + # calculate estimated time of arrive (eta and eta_secs), time per run (tpr) localtime = time.localtime(now) + dt = now - self.shared_memory["prev_log_time"].value dn = system_number - self.shared_memory["prev_log_system_number"].value - (eta,units,tpr,eta_secs) = self._trem(dt, - system_number, - dn, - self.grid_options["_total_starcount"]) + + eta, units, tpr, eta_secs = trem( + dt, + system_number, + dn, + self.grid_options["_total_starcount"] + ) + if eta_secs < secs_per_day: fintime = time.localtime(now + eta_secs) etf = "{hours:02d}:{minutes:02d}:{seconds:02d}".format(hours = fintime.tm_hour, @@ -4223,7 +4236,7 @@ eccentricity3=0 if d == 1: etf = "Tomorrow" else: - etf = "In " + d + " days" + etf = "In {} days".format(d) # modulo information if self.grid_options['modulo'] == 1: @@ -4232,8 +4245,9 @@ eccentricity3=0 modulo = '%' + str(self.grid_options['modulo']) # add up memory use from each thread - mem_use = sum(self.shared_memory["memory_use_per_thread"]) + total_mem_use = sum(self.shared_memory["memory_use_per_thread"]) + # verbose_print( "{system_number}/{total_starcount}{modulo} {complete:5.1f}% complete {hours:02d}:{minutes:02d}:{seconds:02d} ETA={eta:7.1f}{units} tpr={tpr:2.2e} ETF={etf} mem:{mem_use:.1f}MB".format( system_number = system_number, @@ -4247,14 +4261,8 @@ eccentricity3=0 units = units, tpr = tpr, etf = etf, - mem_use = mem_use + mem_use = total_mem_use ), self.grid_options["verbosity"], 1 ) - - def _mem_use(self): - """ - Return current process memory use in MB. (Takes no arguments) Note: this is per-thread only. - """ - return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024.0