Skip to content
Snippets Groups Projects
functions.py 20.9 KiB
Newer Older
from collections import defaultdict
def remove_file(file, verbose=0):
    """
    Function to remove files but with verbosity
    """

    if os.path.exists(file):
        try:
            if verbose > 0:
                print("Removed {}".format(file))
            os.remove(file)

        # TODO: Put correct exception here.
        except:
            print("Error while deleting file {}".format(file))
            raise FileNotFoundError


def temp_dir():
    """
    Function to return the path the custom logging library shared object and script will be written to.

    Makes use of os.makedirs exist_ok which requires python 3.2+
    """

    tmp_dir = tempfile.gettempdir()
    path = os.path.join(tmp_dir, "binary_c_python")

    #
    os.makedirs(path, exist_ok=True)

    return path
def output_lines(output):
    """
    Function that outputs the lines that were recieved from the binary_c run. 
    """
    return output.splitlines()
def parse_binary_c_version_info(version_info_string):
    version_info_dict = {}

    for el in version_info_string.splitlines():
        el = el.strip()
            split = el.split(" is ")
            version_info_dict[split[0].strip()] = split[1].strip()
        else:
            if el.startswith("Binary_c/nucsyn"):
                version_info_dict["intro"] = el
            elif el.startswith("Email"):
                emails = el.split("Email ")[1].split(",")
                cleaned_emails = [email.strip() for email in emails]
                version_info_dict["emails"] = cleaned_emails
            elif el.startswith("DTlimit"):
                split = el.split(" : ")
                version_info_dict[split[0]] = ": ".join(split[1:])
            elif el.startswith("Version"):
                split = el.split("Version ")
                version_number = split[1]
                version_info_dict["version_number"] = version_number
            elif el.startswith("git URL"):
                split = el.split("git URL ")
                git_url = split[1]
                version_info_dict["git_url"] = git_url
                version_info_dict["compiled_for"] = compiled_for
                version_info_dict["stack_limit"] = stack_limit
            elif el.startswith("SVN URL "):
                split = el.split("SVN URL ")
                version_info_dict["svn_url"] = svn_url
            elif el.startswith("git branch "):
                split = el.split("git branch ")
                version_info_dict["git_branch"] = git_branch
            elif el.startswith("_SC_CLK_TCK"):
                split = el.split(" = ")
                version_info_dict["_SC_CLK_TCK"] = _SC_CLK_TCK
            elif el.startswith("Random number mean "):
                split = el.split("Random number mean ")
                version_info_dict["Random number mean"] = random_number_mean
            elif el.startswith("SVN revision "):
                split = el.split("SVN revision ")
                version_info_dict["svn_revision"] = svn_revision
            elif el.startswith("Size of :"):
                split = el.split("Size of :")
                version_info_dict["data_type_sizes"] = data_type_sizes
            elif el.startswith("git revision "):
                split = el.split("git revision ")
                version_info_dict["git_revision"] = git_revision
            elif el.startswith("BINARY_C_PRE_VERSION "):
                split = el.split("BINARY_C_PRE_VERSION ")
                version_info_dict["binary_c_pre_version"] = binary_c_pre_version
            elif el.startswith("Comenv accretion:"):
                split = el.split("Comenv accretion:")
                version_info_dict["comenv_accretion"] = comenv_accretion
            elif el.startswith("Compiled in parameters:"):
                split = el.split("Compiled in parameters:")
                version_info_dict["compiled_in_parameters"] = compiled_in_parameters
            elif el.startswith("__short__ is"):
                split = el.split("__short__ is")
                version_info_dict["short_type"] = short_type
            else:
                print("Still found unmatched items!:\n{}".format(repr(el)))

    return version_info_dict

    """
    Function to create an hdf5 file from the contents of a directory: 
     - settings file is selected by checking on files ending on settings
     - data files are selected by checking on files ending with .dat

    hdf5_filename = os.path.join(data_dir, "{}".format(name))
    print("Creating {}".format(hdf5_filename))
David Hendriks's avatar
David Hendriks committed
    f = h5py.File(hdf5_filename, "w")

    # Get content of data_dir
    content_data_dir = os.listdir(data_dir)
    # Settings
    if any([file.endswith("_settings.json") for file in content_data_dir]):
        print("Adding settings to HDF5 file")
        settings_file = os.path.join(
            data_dir,
            [file for file in content_data_dir if file.endswith("_settings.json")][0],
        )
        with open(settings_file, "r") as settings_file:
            settings_json = json.load(settings_file)
        # Create settings group
        settings_grp = f.create_group("settings")
        # Write version_string to settings_group
        settings_grp.create_dataset("used_settings", data=json.dumps(settings_json))
    # Get data files
    data_files = [el for el in content_data_dir if el.endswith(".dat")]
    if data_files:
        print("Adding data to HDF5 file")

        # Create the data group
        data_grp = f.create_group("data")

        # Write the data to the file:
        # Make sure:
        for data_file in data_files:
            # filename stuff
            filename = data_file
            full_path = os.path.join(data_dir, filename)
            base_name = os.path.splitext(os.path.basename(filename))[0]

            # Get header info
            header_name = "{base_name}_header".format(base_name=base_name)
            data_headers = np.genfromtxt(full_path, dtype="str", max_rows=1)
            data_headers = np.char.encode(data_headers)
            data_grp.create_dataset(header_name, data=data_headers)

            # Add data
            data = np.loadtxt(full_path, skiprows=1)
            data_grp.create_dataset(base_name, data=data)

        f.close()
def get_help_super(print_help=False, return_dict=True, fail_silently=True):
    """
    Function that first runs get_help_all, and then per argument also run the help function to get as much information as possible.
    """

    help_all_dict = get_help_all(print_help=False, return_dict=True)

    help_all_super_dict = help_all_dict.copy()

    for section_name in help_all_dict.keys():
        section = help_all_dict[section_name]

        for parameter_name in section["parameters"].keys():
            parameter = section["parameters"][parameter_name]
            detailed_help = get_help(
                parameter_name,
                print_help=False,
                return_dict=True,
                fail_silently=fail_silently,
            if detailed_help:
                # check whether the descriptions of help_all and detailed help are the same
                if not fail_silently:
                    if not parameter["description"] == detailed_help["description"]:
                        print(json.dumps(parameter, indent=4))

                ## put values into help all super dict
                # input type
                parameter["parameter_value_input_type"] = detailed_help[
                    "parameter_value_input_type"
                ]
                # default
                parameter["default"] = detailed_help["default"]
                    parameter["macros"] = detailed_help["macros"]
        print(json.dumps(help_all_super_dict, indent=4))
        pass

    if return_dict:
David Hendriks's avatar
David Hendriks committed

def get_help_all(print_help=True, return_dict=False):
    """
    Function that reads out the output of the help_all api call to binary_c

    prints all the parameters and their descriptions.

    return_dict:  returns a dictionary
    """

    # Call function
    help_all = binary_c_python_api.return_help_all()

    # String manipulation
    split = help_all.split(
        "############################################################\n"
    )
    cleaned = [el for el in split if not el == "\n"]

    section_nums = [i for i in range(len(cleaned)) if cleaned[i].startswith("#####")]

    # Create dicts
    help_all_dict = {}

    # Select the section name and the contents of that section. Note, not all sections have content!
    for i in range(len(section_nums)):
        if not i == len(section_nums) - 1:
            params = cleaned[section_nums[i] + 1 : section_nums[i + 1]]
        else:
            params = cleaned[section_nums[i] + 1 : len(cleaned)]
        section_name = (
            cleaned[section_nums[i]]
            .lstrip("#####")
            .strip()
            .replace("Section ", "")
            .lower()
        )

        #
        params_dict = {}

        if params:

            # Clean it, replace in-text newlines with a space and then split on newlines.
            split_params = params[0].strip().replace("\n ", " ").split("\n")

            # Process params and descriptions per section
            for el in split_params:
                split_param_info = el.split(" : ")
                if not len(split_param_info) == 3:
                    # there are ocassions where the semicolon is used in the description text itself.
                    if len(split_param_info) == 4:
                        split_param_info = [
                            split_param_info[0],
                            ": ".join([split_param_info[1], split_param_info[2]]),
                            split_param_info[3],
                        ]

                    # other occassions?

                # Put the information in a dict
                param_name = split_param_info[0]
                param_description = split_param_info[1]
                rest = split_param_info[2]

                params_dict[param_name] = {
                    "param_name": param_name,
                    "description": param_description,
                    "rest": rest,
                }

            # make section_dict
David Hendriks's avatar
David Hendriks committed
            section_dict = {
                "section_name": section_name,
                "parameters": params_dict.copy(),
            }

            # Put in the total dict
            help_all_dict[section_name] = section_dict.copy()

    # Print things
    if print_help:
        for section in sorted(help_all_dict.keys()):
            print(
                "##################\n###### Section {}\n##################".format(
                    section
                )
            )
            section_dict = help_all_dict[section]
            for param_name in sorted(section_dict["parameters"].keys()):
                param = section_dict["parameters"][param_name]
                print(
                    "\n{}:\n\t{}: {}".format(
                        param["param_name"], param["description"], param["rest"]
                    )
                )

    # Loop over all the parameters an call the help() function on it. Takes a long time but this is for testing
    # for section in help_all_dict.keys():
    #     section_dict = help_all_dict[section]
    #     for param in section_dict['parameters'].keys():
    #         get_help(param)

    if return_dict:
        return help_all_dict
    else:
        return None

def create_arg_string(arg_dict, sort=False, filter_values=False):
    """
    Function that creates the arg string

    Options:
        sort: sort the order of the keys
        filter_values: filters the input dict on keys that have NULL or `function` as value 

    arg_string = ""

    # 
    keys = sorted(arg_dict.keys()) if sort else arg_dict.keys()

    # 
    for key in keys:
        # Filter out NULLS (not compiled anyway)
David Hendriks's avatar
David Hendriks committed
        if filter_values:   
            if not arg_dict[key] in ["NULL", "Function"]:
David Hendriks's avatar
David Hendriks committed
                if not arg_dict[key] == "":
David Hendriks's avatar
David Hendriks committed
                    arg_string += "{key} {value} ".format(key=key, value=arg_dict[key])
        else:
            arg_string += "{key} {value} ".format(key=key, value=arg_dict[key])
    arg_string = arg_string.strip()
    return arg_string

def get_defaults(filter_values=False):
    """
    Function that calls the binaryc get args function and cast it into a dictionary
    All the values are strings
    
    filter_values: whether to filter out NULL and Function defaults.
    default_output = binary_c_python_api.return_arglines()
    for default in default_output.split("\n"):
        if not default in ["__ARG_BEGIN", "__ARG_END", ""]:
            key, value = default.split(" = ")

            # Filter out NULLS (not compiled anyway)
            if filter_values:
                if not value in ["NULL", "Function"]:
                    if not value == "":
                        default_dict[key] = value

            # On default, just show everything
            else:
                default_dict[key] = value

def get_arg_keys():
    """
    Function that return the list of possible keys to give in the arg string
    """

    return get_defaults().keys()

def get_help(param_name="", print_help=True, return_dict=False, fail_silently=False):
    """
    Function that returns the help info for a given parameter. 

    Binary_c will output things in the following order;
    - Did you mean?
    - binary_c help for variable
    - default 
    - available macros

    This function reads out that structure and catches the different components of this output
    
    Will print a dict

    return_dict: wether to return the help info dictionary
    available_arg_keys = get_arg_keys()

        print(
            "Please set the param_name to any of the following:\n {}".format(
                sorted(available_arg_keys)
            )
        )
        return None
    else:
        if param_name in available_arg_keys:
            help_info = binary_c_python_api.return_help(param_name)
            cleaned = [el for el in help_info.split("\n") if not el == ""]

            # Get line numbers
            did_you_mean_nr = [
                i for i, el in enumerate(cleaned) if el.startswith("Did you mean")
            ]
            parameter_line_nr = [
                i for i, el in enumerate(cleaned) if el.startswith("binary_c help")
            ]
            default_line_nr = [
                i for i, el in enumerate(cleaned) if el.startswith("Default")
            ]
            macros_line_nr = [
                i for i, el in enumerate(cleaned) if el.startswith("Available")
            ]

            help_info_dict = {}

            # Get alternatives
            if did_you_mean_nr:
                alternatives = cleaned[did_you_mean_nr[0] + 1 : parameter_line_nr[0]]
                alternatives = [el.strip() for el in alternatives]
                help_info_dict["alternatives"] = alternatives

            # Information about the parameter
            parameter_line = cleaned[parameter_line_nr[0]]
            parameter_name = parameter_line.split(":")[1].strip().split(" ")[0]
            parameter_value_input_type = (
                " ".join(parameter_line.split(":")[1].strip().split(" ")[1:])
                .replace("<", "")
                .replace(">", "")
            )
            help_info_dict["parameter_name"] = parameter_name
            help_info_dict["parameter_value_input_type"] = parameter_value_input_type
            description_line = " ".join(
                cleaned[parameter_line_nr[0] + 1 : default_line_nr[0]]
            )
            help_info_dict["description"] = description_line
            # Default:
            default_line = cleaned[default_line_nr[0]]
            default_value = default_line.split(":")[-1].strip()
            help_info_dict["default"] = default_value
            # Get Macros:
            if macros_line_nr:
                macros = cleaned[macros_line_nr[0] + 1 :]
                help_info_dict["macros"] = macros
            if print_help:
                for key in help_info_dict.keys():
                    print("{}:\n\t{}".format(key, help_info_dict[key]))
            if return_dict:
                return help_info_dict
        else:
            if not fail_silently:
                print(
                    "{} is not a valid parameter name. Please choose from the following parameters:\n\t{}".format(
                        param_name, list(available_arg_keys)
                    )
def parse_output(output, selected_header):
    """
    Function that parses output of binary_c:

    This function works in two cases:
    if the caught line contains output like 'example_header time=12.32 mass=0.94 ..'
    or if the line contains output like 'example_header 12.32 0.94'

    You can give a 'selected_header' to catch any line that starts with that. 
    Then the values will be put into a dictionary.
    
    TODO: Think about exporting to numpy array or pandas instead of a defaultdict
    value_dicts = []
    val_lists = []

    # split output on newlines
    for i, line in enumerate(output.split("\n")):
        # Skip any blank lines
        if not line == "":
            split_line = line.split()

            # Select parts
            header = split_line[0]
            values_list = split_line[1:]
            # print(values_list)
            # Catch line starting with selected header
            if header == selected_header:
                # Check if the line contains '=' symbols:
                value_dict = {}
                if all("=" in el for el in values_list):
                    for el in values_list:
                        key, val = el.split("=")
                        value_dict[key.strip()] = val.strip()
                    value_dicts.append(value_dict)
                else:
                    if any("=" in el for el in values_list):
                        raise ValueError(
                            "Caught line contains some = symbols but not all of them do. aborting run"
                        )
                    else:
                        for i, val in enumerate(values_list):
                            value_dict[i] = val
                        value_dicts.append(value_dict)
    if len(value_dicts) == 0:
        print(
            "Sorry, didnt find any line matching your header {}".format(selected_header)
        )
    keys = value_dicts[0].keys()

    # Construct final dict.
    final_values_dict = defaultdict(list)
    for value_dict in value_dicts:
        for key in keys:
            final_values_dict[key].append(value_dict[key])

    return final_values_dict
    """
    Function that parses the generated logfile of binary_c
    """

    with open(logfile, "r") as f:
        logfile_data = f.readlines()

    time_list = []
    m1_list = []
    m2_list = []
    k1_list = []
    k2_list = []
    sep_list = []
    ecc_list = []
    rel_r1_list = []
    rel_r2_list = []
    event_list = []

    random_seed = logfile_data[0].split()[-2]
    random_count = logfile_data[0].split()[-1]
    probability = logfile_data[-1].split()

    for line in logfile_data[1:-1]:
        split_line = line.split()

        time_list.append(split_line[0])
        m1_list.append(split_line[1])
        m2_list.append(split_line[2])
        k1_list.append(split_line[3])
        k2_list.append(split_line[4])
        sep_list.append(split_line[5])
        ecc_list.append(split_line[6])
        rel_r1_list.append(split_line[7])
        rel_r2_list.append(split_line[8])
        event_list.append(" ".join(split_line[9:]))