import copy import json import os import h5py import tempfile from collections import defaultdict import numpy as np import binary_c_python_api def remove_file(file, verbose=0): """ Function to remove files but with verbosity """ if os.path.exists(file): try: if verbose > 0: print("Removed {}".format(file)) os.remove(file) # TODO: Put correct exception here. except: print("Error while deleting file {}".format(file)) raise FileNotFoundError def temp_dir(): """ Function to return the path the custom logging library shared object and script will be written to. Makes use of os.makedirs exist_ok which requires python 3.2+ """ tmp_dir = tempfile.gettempdir() path = os.path.join(tmp_dir, "binary_c_python") # os.makedirs(path, exist_ok=True) return path def output_lines(output): """ Function that outputs the lines that were recieved from the binary_c run. """ return output.splitlines() def parse_binary_c_version_info(version_info_string): version_info_dict = {} for el in version_info_string.splitlines(): el = el.strip() if el == "": continue if " is " in el: split = el.split(" is ") version_info_dict[split[0].strip()] = split[1].strip() else: if el.startswith("Binary_c/nucsyn"): version_info_dict["intro"] = el elif el.startswith("Email"): emails = el.split("Email ")[1].split(",") cleaned_emails = [email.strip() for email in emails] version_info_dict["emails"] = cleaned_emails elif el.startswith("DTlimit"): split = el.split(" : ") version_info_dict[split[0]] = ": ".join(split[1:]) elif el.startswith("Version"): split = el.split("Version ") version_number = split[1] version_info_dict["version_number"] = version_number elif el.startswith("git URL"): split = el.split("git URL ") git_url = split[1] version_info_dict["git_url"] = git_url elif el.startswith("Build: "): split = el.split("Build: ") build = split[1] version_info_dict["build"] = build elif el.startswith("Compiled for "): split = el.split("Compiled for ") compiled_for = split[1] version_info_dict["compiled_for"] = compiled_for elif el.startswith("Stack limit "): split = el.split("Stack limit ") stack_limit = split[1] version_info_dict["stack_limit"] = stack_limit elif el.startswith("SVN URL "): split = el.split("SVN URL ") svn_url = split[1] version_info_dict["svn_url"] = svn_url elif el.startswith("git branch "): split = el.split("git branch ") git_branch = split[1] version_info_dict["git_branch"] = git_branch elif el.startswith("_SC_CLK_TCK"): split = el.split(" = ") _SC_CLK_TCK = split[1] version_info_dict["_SC_CLK_TCK"] = _SC_CLK_TCK elif el.startswith("Random number mean "): split = el.split("Random number mean ") random_number_mean = split[1] version_info_dict["Random number mean"] = random_number_mean elif el.startswith("SVN revision "): split = el.split("SVN revision ") svn_revision = split[1] version_info_dict["svn_revision"] = svn_revision elif el.startswith("Size of :"): split = el.split("Size of :") data_type_sizes = split[1] version_info_dict["data_type_sizes"] = data_type_sizes elif el.startswith("git revision "): split = el.split("git revision ") git_revision = split[1] version_info_dict["git_revision"] = git_revision elif el.startswith("BINARY_C_PRE_VERSION "): split = el.split("BINARY_C_PRE_VERSION ") binary_c_pre_version = split[1] version_info_dict["binary_c_pre_version"] = binary_c_pre_version elif el.startswith("Comenv accretion:"): split = el.split("Comenv accretion:") comenv_accretion = split[1] version_info_dict["comenv_accretion"] = comenv_accretion elif el.startswith("Compiled in parameters:"): split = el.split("Compiled in parameters:") compiled_in_parameters = split[1] version_info_dict["compiled_in_parameters"] = compiled_in_parameters elif el.startswith("__short__ is"): split = el.split("__short__ is") short_type = split[1] version_info_dict["short_type"] = short_type else: print("Still found unmatched items!:\n{}".format(repr(el))) return version_info_dict def create_hdf5(data_dir, name): """ Function to create an hdf5 file from the contents of a directory: - settings file is selected by checking on files ending on settings - data files are selected by checking on files ending with .dat TODO: fix missing settingsfiles """ # Make HDF5: # Create the file hdf5_filename = os.path.join(data_dir, "{}".format(name)) print("Creating {}".format(hdf5_filename)) f = h5py.File(hdf5_filename, "w") # Get content of data_dir content_data_dir = os.listdir(data_dir) # Settings if any([file.endswith("_settings.json") for file in content_data_dir]): print("Adding settings to HDF5 file") settings_file = os.path.join( data_dir, [file for file in content_data_dir if file.endswith("_settings.json")][0], ) with open(settings_file, "r") as settings_file: settings_json = json.load(settings_file) # Create settings group settings_grp = f.create_group("settings") # Write version_string to settings_group settings_grp.create_dataset("used_settings", data=json.dumps(settings_json)) # Get data files data_files = [el for el in content_data_dir if el.endswith(".dat")] if data_files: print("Adding data to HDF5 file") # Create the data group data_grp = f.create_group("data") # Write the data to the file: # Make sure: for data_file in data_files: # filename stuff filename = data_file full_path = os.path.join(data_dir, filename) base_name = os.path.splitext(os.path.basename(filename))[0] # Get header info header_name = "{base_name}_header".format(base_name=base_name) data_headers = np.genfromtxt(full_path, dtype="str", max_rows=1) data_headers = np.char.encode(data_headers) data_grp.create_dataset(header_name, data=data_headers) # Add data data = np.loadtxt(full_path, skiprows=1) data_grp.create_dataset(base_name, data=data) f.close() def get_help_super(print_help=False, return_dict=True, fail_silently=True): """ Function that first runs get_help_all, and then per argument also run the help function to get as much information as possible. """ # Get help_all information help_all_dict = get_help_all(print_help=False, return_dict=True) help_all_super_dict = help_all_dict.copy() # Loop over all sections and stuff for section_name in help_all_dict.keys(): section = help_all_dict[section_name] for parameter_name in section["parameters"].keys(): parameter = section["parameters"][parameter_name] # Get detailed help info detailed_help = get_help( parameter_name, print_help=False, return_dict=True, fail_silently=fail_silently, ) if detailed_help: # check whether the descriptions of help_all and detailed help are the same if not fail_silently: if not parameter["description"] == detailed_help["description"]: print(json.dumps(parameter, indent=4)) ## put values into help all super dict # input type parameter["parameter_value_input_type"] = detailed_help[ "parameter_value_input_type" ] # default parameter["default"] = detailed_help["default"] # macros if "macros" in detailed_help.keys(): parameter["macros"] = detailed_help["macros"] if print_help: # TODO: make a pretty print print(json.dumps(help_all_super_dict, indent=4)) pass if return_dict: return help_all_super_dict def get_help_all(print_help=True, return_dict=False): """ Function that reads out the output of the help_all api call to binary_c prints all the parameters and their descriptions. return_dict: returns a dictionary """ # Call function help_all = binary_c_python_api.return_help_all() # String manipulation split = help_all.split( "############################################################\n" ) cleaned = [el for el in split if not el == "\n"] section_nums = [i for i in range(len(cleaned)) if cleaned[i].startswith("#####")] # Create dicts help_all_dict = {} # Select the section name and the contents of that section. Note, not all sections have content! for i in range(len(section_nums)): if not i == len(section_nums) - 1: params = cleaned[section_nums[i] + 1 : section_nums[i + 1]] else: params = cleaned[section_nums[i] + 1 : len(cleaned)] section_name = ( cleaned[section_nums[i]] .lstrip("#####") .strip() .replace("Section ", "") .lower() ) # params_dict = {} if params: # Clean it, replace in-text newlines with a space and then split on newlines. split_params = params[0].strip().replace("\n ", " ").split("\n") # Process params and descriptions per section for el in split_params: split_param_info = el.split(" : ") if not len(split_param_info) == 3: # there are ocassions where the semicolon is used in the description text itself. if len(split_param_info) == 4: split_param_info = [ split_param_info[0], ": ".join([split_param_info[1], split_param_info[2]]), split_param_info[3], ] # other occassions? # Put the information in a dict param_name = split_param_info[0] param_description = split_param_info[1] rest = split_param_info[2] params_dict[param_name] = { "param_name": param_name, "description": param_description, "rest": rest, } # make section_dict section_dict = { "section_name": section_name, "parameters": params_dict.copy(), } # Put in the total dict help_all_dict[section_name] = section_dict.copy() # Print things if print_help: for section in sorted(help_all_dict.keys()): print( "##################\n###### Section {}\n##################".format( section ) ) section_dict = help_all_dict[section] for param_name in sorted(section_dict["parameters"].keys()): param = section_dict["parameters"][param_name] print( "\n{}:\n\t{}: {}".format( param["param_name"], param["description"], param["rest"] ) ) # Loop over all the parameters an call the help() function on it. Takes a long time but this is for testing # for section in help_all_dict.keys(): # section_dict = help_all_dict[section] # for param in section_dict['parameters'].keys(): # get_help(param) if return_dict: return help_all_dict else: return None def create_arg_string(arg_dict, sort=False, filter_values=False): """ Function that creates the arg string Options: sort: sort the order of the keys filter_values: filters the input dict on keys that have NULL or `function` as value """ arg_string = "" # keys = sorted(arg_dict.keys()) if sort else arg_dict.keys() # for key in keys: # Filter out NULLS (not compiled anyway) if filter_values: if not arg_dict[key] in ["NULL", "Function"]: if not arg_dict[key] == "": arg_string += "{key} {value} ".format(key=key, value=arg_dict[key]) else: arg_string += "{key} {value} ".format(key=key, value=arg_dict[key]) arg_string = arg_string.strip() return arg_string def get_defaults(filter_values=False): """ Function that calls the binaryc get args function and cast it into a dictionary All the values are strings filter_values: whether to filter out NULL and Function defaults. """ default_output = binary_c_python_api.return_arglines() default_dict = {} for default in default_output.split("\n"): if not default in ["__ARG_BEGIN", "__ARG_END", ""]: key, value = default.split(" = ") # Filter out NULLS (not compiled anyway) if filter_values: if not value in ["NULL", "Function"]: if not value == "": default_dict[key] = value # On default, just show everything else: default_dict[key] = value return default_dict def get_arg_keys(): """ Function that return the list of possible keys to give in the arg string """ return get_defaults().keys() def get_help(param_name="", print_help=True, return_dict=False, fail_silently=False): """ Function that returns the help info for a given parameter. Binary_c will output things in the following order; - Did you mean? - binary_c help for variable - default - available macros This function reads out that structure and catches the different components of this output Will print a dict return_dict: wether to return the help info dictionary """ available_arg_keys = get_arg_keys() if not param_name: print( "Please set the param_name to any of the following:\n {}".format( sorted(available_arg_keys) ) ) return None else: if param_name in available_arg_keys: help_info = binary_c_python_api.return_help(param_name) cleaned = [el for el in help_info.split("\n") if not el == ""] # Get line numbers did_you_mean_nr = [ i for i, el in enumerate(cleaned) if el.startswith("Did you mean") ] parameter_line_nr = [ i for i, el in enumerate(cleaned) if el.startswith("binary_c help") ] default_line_nr = [ i for i, el in enumerate(cleaned) if el.startswith("Default") ] macros_line_nr = [ i for i, el in enumerate(cleaned) if el.startswith("Available") ] help_info_dict = {} # Get alternatives if did_you_mean_nr: alternatives = cleaned[did_you_mean_nr[0] + 1 : parameter_line_nr[0]] alternatives = [el.strip() for el in alternatives] help_info_dict["alternatives"] = alternatives # Information about the parameter parameter_line = cleaned[parameter_line_nr[0]] parameter_name = parameter_line.split(":")[1].strip().split(" ")[0] parameter_value_input_type = ( " ".join(parameter_line.split(":")[1].strip().split(" ")[1:]) .replace("<", "") .replace(">", "") ) help_info_dict["parameter_name"] = parameter_name help_info_dict["parameter_value_input_type"] = parameter_value_input_type description_line = " ".join( cleaned[parameter_line_nr[0] + 1 : default_line_nr[0]] ) help_info_dict["description"] = description_line # Default: default_line = cleaned[default_line_nr[0]] default_value = default_line.split(":")[-1].strip() help_info_dict["default"] = default_value # Get Macros: if macros_line_nr: macros = cleaned[macros_line_nr[0] + 1 :] help_info_dict["macros"] = macros if print_help: for key in help_info_dict.keys(): print("{}:\n\t{}".format(key, help_info_dict[key])) if return_dict: return help_info_dict else: if not fail_silently: print( "{} is not a valid parameter name. Please choose from the following parameters:\n\t{}".format( param_name, list(available_arg_keys) ) ) return None def parse_output(output, selected_header): """ Function that parses output of binary_c: This function works in two cases: if the caught line contains output like 'example_header time=12.32 mass=0.94 ..' or if the line contains output like 'example_header 12.32 0.94' You can give a 'selected_header' to catch any line that starts with that. Then the values will be put into a dictionary. TODO: Think about exporting to numpy array or pandas instead of a defaultdict """ value_dicts = [] val_lists = [] # split output on newlines for i, line in enumerate(output.split("\n")): # Skip any blank lines if not line == "": split_line = line.split() # Select parts header = split_line[0] values_list = split_line[1:] # print(values_list) # Catch line starting with selected header if header == selected_header: # Check if the line contains '=' symbols: value_dict = {} if all("=" in el for el in values_list): for el in values_list: key, val = el.split("=") value_dict[key.strip()] = val.strip() value_dicts.append(value_dict) else: if any("=" in el for el in values_list): raise ValueError( "Caught line contains some = symbols but not all of them do. aborting run" ) else: for i, val in enumerate(values_list): value_dict[i] = val value_dicts.append(value_dict) if len(value_dicts) == 0: print( "Sorry, didnt find any line matching your header {}".format(selected_header) ) return None keys = value_dicts[0].keys() # Construct final dict. final_values_dict = defaultdict(list) for value_dict in value_dicts: for key in keys: final_values_dict[key].append(value_dict[key]) return final_values_dict def load_logfile(logfile): """ Function that parses the generated logfile of binary_c """ with open(logfile, "r") as f: logfile_data = f.readlines() time_list = [] m1_list = [] m2_list = [] k1_list = [] k2_list = [] sep_list = [] ecc_list = [] rel_r1_list = [] rel_r2_list = [] event_list = [] random_seed = logfile_data[0].split()[-2] random_count = logfile_data[0].split()[-1] probability = logfile_data[-1].split() for line in logfile_data[1:-1]: split_line = line.split() time_list.append(split_line[0]) m1_list.append(split_line[1]) m2_list.append(split_line[2]) k1_list.append(split_line[3]) k2_list.append(split_line[4]) sep_list.append(split_line[5]) ecc_list.append(split_line[6]) rel_r1_list.append(split_line[7]) rel_r2_list.append(split_line[8]) event_list.append(" ".join(split_line[9:])) print(event_list)