Skip to content
Snippets Groups Projects
Commit 8c7b3765 authored by David Hendriks's avatar David Hendriks
Browse files

working on putting the MC sampling functionality back into the class

parent 2ea6db91
No related branches found
No related tags found
No related merge requests found
...@@ -15,7 +15,24 @@ import os ...@@ -15,7 +15,24 @@ import os
from binarycpython.utils.functions import calculate_total_mass_system from binarycpython.utils.functions import calculate_total_mass_system
import os
import datetime
import numpy as np
import importlib
from binarycpython.utils.functions import verbose_print
from binarycpython.utils.dicts import normalize_dict
from binarycpython.utils.population_extensions.distribution_functions import (
Moecache,
)
from binarycpython import Population
from types import MethodType
_numba = False _numba = False
comment_line = "###########\n"
class monte_carlo_sampling: class monte_carlo_sampling:
...@@ -37,7 +54,43 @@ class monte_carlo_sampling: ...@@ -37,7 +54,43 @@ class monte_carlo_sampling:
raise NotImplementedError("This functionality is not available yet") raise NotImplementedError("This functionality is not available yet")
def _monte_carlo_sampling_generator_filename(self): def _monte_carlo_sampling_check_mass_threshold(self, system_dict):
"""
Function to handle checking the total mass evolved and signal to stop
"""
# Only if the monte_carlo_mass_threshold is positive (default = -1)
if self.grid_options["monte_carlo_mass_threshold"] > 0:
# Add total mass of current system tot total mass evolved
self.grid_options[
"_monte_carlo_current_total_mass_evolved"
] += calculate_total_mass_system(system_dict)
# Check if exceeds threshold
if (
self.grid_options["_monte_carlo_current_total_mass_evolved"]
> self.grid_options["monte_carlo_mass_threshold"]
):
self.grid_options["_monte_carlo_threshold_reached"] = True
def _monte_carlo_sampling_check_custom_threshold(self, result_queue):
"""
Function to handle checking the content of the result queue and look for a
TODO: put in the user-defined custom threshold function
"""
# raise NotImplementedError("Functionality in not implemented")
pass
##############
# Generator functions
# Management
def _monte_carlo_sampling_get_generator_filename(self):
""" """
Returns a filename for the gridcode. Returns a filename for the gridcode.
""" """
...@@ -59,32 +112,22 @@ class monte_carlo_sampling: ...@@ -59,32 +112,22 @@ class monte_carlo_sampling:
) )
return filename return filename
def _monte_carlo_sampling_write_generator(self): def _monte_carlo_sampling_write_generator_file(self, dry_run=False):
""" """
Function that generates the monte-carlo grid file that gets imported Function to write the function that can generate the sample for the parameter
"""
# Description of montecarlo grid generator:
# - Have the code write a generator object
# - A while loop that has no end
# - a system_dict that has all the relevant orbital properties.
# - this dict gets passed around to each of the CDF sampling methods for each of the parameters, potentially used as input for the probability (dependent distributions), and updated with the chosen parameter value
# - a dictionary of CDF sampling functions, with keys being the parameter that we want to sample, the values are functions that get passed a set of arguments, and return an updated system dict
# - (optionally) a latin hypercube implementation
# - function to write the CDF sampling functions for each sampling_variable
#################### Contrary to the grid sampling variable generator code, this code is not nested, but is linear.
# Development setup: simple 1-value mass yield """
######### #########
# Set up header and start of function # Set up header and start of function
self._add_code( self._add_code(
# Import packages # Import packages
"import math\n", "import math\n",
"import copy\n",
"import numpy as np\n", "import numpy as np\n",
"from collections import OrderedDict\n", "from collections import OrderedDict\n",
"from binarycpython.utils.useful_funcs import *\n", "from binarycpython.utils.useful_funcs import *\n",
"import numba" if _numba else "",
"\n\n", "\n\n",
# Make the function # Make the function
"def monte_carlo_generator(self, print_results=True):\n", "def monte_carlo_generator(self, print_results=True):\n",
...@@ -98,67 +141,332 @@ class monte_carlo_sampling: ...@@ -98,67 +141,332 @@ class monte_carlo_sampling:
# Write some info in the function # Write some info in the function
"# Grid code generated on {}\n".format(datetime.datetime.now().isoformat()), "# Grid code generated on {}\n".format(datetime.datetime.now().isoformat()),
"# This function generates the systems that will be evolved with binary_c\n\n", "# This function generates the systems that will be evolved with binary_c\n\n",
# Set some values in the generated code: # Set some values in the generated code:
"# Set initial values\n", "# Set initial values\n",
"_total_starcount = 0\n", "_total_starcount = 0\n",
"parameter_dict = {}\n\n", "generating = True\n",
"default_system_dict = {'multiplicity': 0}\n\n",
) )
# Add method for the pre-calculation of the arrays
if self.grid_options["monte_carlo_use_pre_calculated_distributions"]:
self._add_code(
"self.handle_pre_calc(self)\n"
)
######### #########
# Set up the loop and yield calls # Set up the loop and yield calls
self._add_code( self._add_code(
# Comment # Comment
comment_line,
"# Start of the while loop\n", "# Start of the while loop\n",
# while loop # while loop
"while True:\n", "while generating:\n",
) )
# Increase indent_depth # Increase indent_depth
self._increment_indent_depth(+1) self._increment_indent_depth(+1)
self._add_code(
"system_dict = copy.copy(default_system_dict)\n\n"
)
######### #########
# Fill the while loop with a dummy sytem of a fixed mass # Run loop of sampling variable setup
for loopnr, sampling_variable_el in enumerate(
sorted(
self.grid_options["_sampling_variables"].items(),
key=lambda x: x[1]["sampling_variable_number"],
)
):
sampling_variable = sampling_variable_el[1]
#########
# Generate parameter lines
self._monte_carlo_sampling_write_generator_parameter(
sampling_variable
)
#########
# Generate yield call if branchpoint
self._monte_carlo_sampling_write_system_call(
sampling_variable,
dry_run,
sampling_variable["branchpoint"],
sampling_variable["branchcode"],
)
#########
# Write to file
monte_carlo_sampling_generator_filename = (
self._monte_carlo_sampling_get_generator_filename()
)
self.grid_options[
"_monte_carlo_sampling_generator_filename"
] = monte_carlo_sampling_generator_filename
self.verbose_print(
"{blue}Write grid code to {file} {reset}".format(
blue=self.ANSI_colours["blue"],
file=monte_carlo_sampling_generator_filename,
reset=self.ANSI_colours["reset"],
),
self.grid_options["verbosity"],
1,
)
with self.open(
monte_carlo_sampling_generator_filename, "w", encoding="utf-8"
) as file:
file.write(self.code_string)
self.code_string = ""
# Construction
def _monte_carlo_sampling_write_system_call(
self, sampling_variable, dry_run, branchpoint, branchcode
):
"""
Function to write the block of code (as string) that handles the setting the final probability, taking into account the weight and repeat settings, incrementing the total starcount and total probability.
Then if the run is a dry run we implement the dry_run_hook or pass depending on the settings. If it is not a dry run we yield the system dict
"""
if branchpoint:
self._add_code(comment_line)
if branchcode:
self._add_code(
"# Branch code\n",
"if {branchcode}:\n".format(branchcode=branchcode)
)
self._increment_indent_depth(+1)
# Factor in the custom weight input
self._add_code(
"\n",
"# Loop over the repeats\n",
'for _ in range(self.grid_options["repeat"]):' + "\n",
)
self._add_code(
"_total_starcount += 1\n",
indent=1,
)
if not dry_run:
# Handle what is returned, or what is not.
self._add_code("yield system_dict\n\n", indent=1)
# If its a dry run, dont do anything with it
else:
# run the hook function, only if given
if self.grid_options["dry_run_hook"]:
self._add_code(
"self.grid_options['dry_run_hook'](self, system_dict)\n\n",
indent=1,
)
else:
# or pass
self._add_code("pass\n\n", indent=1)
self._increment_indent_depth(-1)
return self.code_string
def _monte_carlo_sampling_write_generator_parameter(self, sampling_variable):
"""
Function to write the parameter lines and the pre and postcode around it to the monte carlo sampling generator function.
"""
self.verbose_print(
"Constructing/adding: {}".format(sampling_variable['parameter_name']),
self.grid_options["verbosity"],
2,
)
####################
# Write comment
self._add_code( self._add_code(
# Sample the variables comment_line,
"# Add M_1 variable\n", "# Sampling variable {}\n\n".format(sampling_variable['name'])
# while loop
"parameter_dict['M_1'] = 1\n\n",
) )
self.verbose_print(
"Writing sampling variable {} to monte-carlo sampling generator".format(
sampling_variable['parameter_name']
),
self.grid_options["verbosity"],
1,
)
####################
# Generate top code
if sampling_variable["topcode"]:
self._add_code(
comment_line,
sampling_variable["topcode"],
"\n\n",
)
######### #########
# Handle system yield # Generate call to pdf-cdf function to generate the parameter value
self._add_code( self._add_code(
# Comment comment_line,
"# Yield the system dict\n", "{} = self.handle_calc_sampled_value(self, sampling_variable={}{})\n".format(sampling_variable['parameter_name'], sampling_variable, ', ' + ','.join(['{}={}'.format(dependency_variable, dependency_variable) for dependency_variable in sampling_variable['dependency_variables']]) if sampling_variable['dependency_variables'] else ''),
# while loop "system_dict['{}'] = {}\n\n".format(sampling_variable['parameter_name'], sampling_variable['parameter_name'])
"yield(parameter_dict)\n",
) )
################################################################################# ####################
# Stop of code generation. Here the code is saved and written # bottom code
if sampling_variable["bottomcode"]:
self._add_code(
comment_line,
sampling_variable["bottomcode"]+"\n\n",
)
##############
# Sampling functions
# Management
def _monte_carlo_sampling_get_sampling_functions_filename(self):
"""
Returns a filename for the file containing the function to generate values for the variables.
"""
filename = os.path.join(
"binary_c_monte_carlo_sampling_sampling_functions_{population_id}.py".format(
population_id=self.grid_options["_population_id"]
),
)
return filename
def _monte_carlo_sampling_load_sampling_functions_file(self):
"""
Function to load the sampling functions
"""
# Save the grid code to the grid_options # Code to load the generator code
self.verbose_print( self.verbose_print(
"Save monte_carlo sampling generator to grid_options", message="Load monte-carlo sampling functions from {file}".format(
self.grid_options["verbosity"], file=self.grid_options["_monte_carlo_sampling_sampling_functions_filename"]
1, ),
verbosity=self.grid_options["verbosity"],
minimal_verbosity=1,
)
# Load the module from file and the functions in it.
spec = importlib.util.spec_from_file_location(
"binary_c_python_monte_carlo_sampling_functions",
os.path.join(self.grid_options["_monte_carlo_sampling_sampling_functions_filename"]),
)
monte_carlo_sampling_functions_file = importlib.util.module_from_spec(spec)
spec.loader.exec_module(monte_carlo_sampling_functions_file)
# NOTE: the way we are loading and dynamically is i think not fully correct. I have to pass the self in twice.
# TODO: fix the setting of these functions properly
# Create dictionary to hold the calc_sampled_value functions
self.monte_carlo_calc_sampled_value_functions = {}
for obj in dir(monte_carlo_sampling_functions_file):
if obj.startswith('calc_sampled_value'):
# Set function as class function
setattr(self, obj, getattr(monte_carlo_sampling_functions_file, obj))
# Add to dict
self.monte_carlo_calc_sampled_value_functions[obj.replace('calc_sampled_value_', '')] = getattr(monte_carlo_sampling_functions_file, obj)
# Create dictionary to hold the calc_pdf_cdf_value_array_dict functions
self.monte_carlo_calc_pdf_cdf_value_array_dict_functions = {}
for obj in dir(monte_carlo_sampling_functions_file):
if obj.startswith('calc_pdf_cdf_value_array_dict'):
# Set function as class function
setattr(self, obj, getattr(monte_carlo_sampling_functions_file, obj))
# Add to dict
self.monte_carlo_calc_pdf_cdf_value_array_dict_functions[obj.replace('calc_pdf_cdf_value_array_dict_', '')] = getattr(monte_carlo_sampling_functions_file, obj)
# Load handler function
for obj in dir(monte_carlo_sampling_functions_file):
if obj.startswith('handle_calc_sampled_value'):
self.handle_calc_sampled_value = getattr(monte_carlo_sampling_functions_file, obj)
break
# Load handler function
for obj in dir(monte_carlo_sampling_functions_file):
if obj.startswith('handle_pre_calc'):
self.handle_pre_calc = getattr(monte_carlo_sampling_functions_file, obj)
# setattr(self, obj, getattr(monte_carlo_sampling_functions_file, obj))
# MethodType(func, obj)
# MethodType(getattr(monte_carlo_sampling_functions_file, obj), self)
break
def _monte_carlo_sampling_write_sampling_functions_file(self, dry_run=False):
"""
Function to write the functions that handle generating the values for each variable
"""
#########
# Set up header and start of function
self._add_code(
# Import packages
"import math\n",
"import copy\n",
"import numpy as np\n",
"from collections import OrderedDict\n",
"from binarycpython.utils.useful_funcs import *\n",
"\n\n",
) )
self.grid_options["code_string"] = self.code_string #########
# Write functions to calculate the values for each parameter.
for loopnr, sampling_variable_el in enumerate(
sorted(
self.grid_options["_sampling_variables"].items(),
key=lambda x: x[1]["sampling_variable_number"],
)
):
sampling_variable = sampling_variable_el[1]
######################
# Call to function to write the calc_sample_value function
self._monte_carlo_sampling_write_calc_pdf_cdf_value_array_dict_function(sampling_variable)
######################
# Call to function to write the calc_sample_value function
self._monte_carlo_sampling_write_calc_sampled_value_function(sampling_variable)
######################
# Call to function to write the pre_calc_sample_value function
if self.grid_options["monte_carlo_use_pre_calculated_distributions"]:
self._monte_carlo_sampling_write_pre_calc_function(sampling_variable)
######################
# Call to function to write the handle_calc_sample_value wrapper function
self._monte_carlo_sampling_write_handle_calc_sampled_value_function()
######################
# Call to function to write the handle_pre_calc function and other functions
if self.grid_options["monte_carlo_use_pre_calculated_distributions"]:
self._monte_carlo_sampling_write_handle_pre_calc_function()
self._monte_carlo_sampling_write_bin_function()
self._monte_carlo_sampling_write_center_function()
#########
# Write to file # Write to file
monte_carlo_sampling_generator_filename = ( monte_carlo_sampling_sampling_functions_filename = (
self._monte_carlo_sampling_generator_filename() self._monte_carlo_sampling_get_sampling_functions_filename()
) )
self.grid_options[ self.grid_options[
"_monte_carlo_sampling_generator_filename" "_monte_carlo_sampling_sampling_functions_filename"
] = monte_carlo_sampling_generator_filename ] = monte_carlo_sampling_sampling_functions_filename
self.verbose_print( self.verbose_print(
"{blue}Write grid code to {file} {reset}".format( "{blue}Write sampling functions code to {file} {reset}".format(
blue=self.ANSI_colours["blue"], blue=self.ANSI_colours["blue"],
file=monte_carlo_sampling_generator_filename, file=monte_carlo_sampling_sampling_functions_filename,
reset=self.ANSI_colours["reset"], reset=self.ANSI_colours["reset"],
), ),
self.grid_options["verbosity"], self.grid_options["verbosity"],
...@@ -166,12 +474,460 @@ class monte_carlo_sampling: ...@@ -166,12 +474,460 @@ class monte_carlo_sampling:
) )
with self.open( with self.open(
monte_carlo_sampling_generator_filename, "w", encoding="utf-8" monte_carlo_sampling_sampling_functions_filename, "w", encoding="utf-8"
) as file: ) as file:
file.write(self.code_string) file.write(self.code_string)
# raise NotImplementedError("This functionality is not available yet") # Reset indentation
self.code_string = ""
# Construction
def _monte_carlo_sampling_write_pdf_cdf_value_array_block(self, sampling_variable):
"""
Function to write the code-block for the pdf/cdf value array functions
"""
#######
# Set up lists
self._add_code(
comment_line,
"value_array = {}\n".format(sampling_variable['samplerfunc']), # Line for the sampler func value array sampling (not the same as parameter value array)
"parameter_value_array = []\n",
"probability_array = [0]\n\n",
)
#######
# Set up first parameter value array adding
self._add_code(
comment_line,
"{} = value_array[0]\n".format(sampling_variable['name']),
"{}\n".format(sampling_variable['precode'].replace(
"\n", "\n" + self._indent_block(0)
)) if sampling_variable['precode'] else "\n",
"parameter_value_array.append({})\n\n".format(sampling_variable['parameter_name'])
)
#######
# Set up loop for sampling the distribution function
self._add_code(
comment_line,
"for {} in value_array[1:]:\n".format(sampling_variable['name']),
)
self._increment_indent_depth(+1)
#######
# Set up conversion with precode
self._add_code(
comment_line,
"{}\n".format(sampling_variable['precode'].replace(
"\n", "\n" + self._indent_block(0)
)) if sampling_variable['precode'] else "\n",
"parameter_value_array.append({})\n\n".format(sampling_variable['parameter_name'])
)
#######
# Set up calculation of probability
self._add_code(
comment_line,
"probability = {}\n".format(sampling_variable['probdist']),
"probability_array.append(probability)\n\n",
)
self._increment_indent_depth(-1)
#######
# Construct method to calculate pdf/cdf and value array
self._add_code(
comment_line,
"probability_array = np.array(probability_array)\n",
"cdf_array = np.cumsum(probability_array) / np.sum(probability_array)\n",
"parameter_value_array = np.array(parameter_value_array)\n\n",
)
#######
# Construct pdf/cdf and array dict and return
self._add_code(
comment_line,
"pdf_cdf_value_array_dict = {'probability_array': probability_array, 'cdf_array': cdf_array, 'value_array': value_array, 'parameter_value_array': parameter_value_array}\n\n",
)
def _monte_carlo_sampling_write_bin_function(self):
"""
Function to write the bin function
"""
self._add_code(
comment_line,
"def bin(parameter_value_array, parameter_value):\n",
" index = np.digitize(parameter_value, bins=parameter_value_array, right=False)-1\n",
" center_value = (parameter_value_array[index+1]+parameter_value_array[index])/2\n",
" return center_value\n"
)
def _monte_carlo_sampling_write_center_function(self):
"""
Function to write the center function
"""
self._add_code(
comment_line,
"def center(sampling_func):\n",
" value_array = sampling_func\n",
" center_value_array = (value_array[1:]+value_array[:-1])/2\n",
" return center_value_array\n"
)
def _monte_carlo_sampling_write_handle_pre_calc_function(self):
"""
Function to write then handler function for the calc_sampled_value
"""
#######
# Construct function
self._add_code(
"def handle_pre_calc(self):\n".format(),
)
self._increment_indent_depth(+1)
#######
# Initialise dict
self._add_code(
"self.pre_calculated_pdf_cdf_value_array_dicts = {}\n",
)
#######
# Loop over variables and call them to set the pre-calculated pdf cdf information in the dictionary
for loopnr, sampling_variable_el in enumerate(
sorted(
self.grid_options["_sampling_variables"].items(),
key=lambda x: x[1]["sampling_variable_number"],
)
):
sampling_variable = sampling_variable_el[1]
#######
# Write call to specific functions to pre-calculate each pdf/cdf dict
self._add_code(
"self.pre_calculated_pdf_cdf_value_array_dicts['{}'] = pre_calculate_pdf_cdf_value_array_dict_{}(self)\n".format(sampling_variable['parameter_name'], sampling_variable['parameter_name']),
)
#######
# De-dent and padd
self._increment_indent_depth(-1)
self._add_code(
"\n\n"
)
def _monte_carlo_sampling_write_calc_sampled_value_function(self, sampling_variable):
"""
Function to write the calc_sampled_value call
"""
#######
# Construct function
self._add_code(
"def calc_sampled_value_{}(self{}):\n".format(sampling_variable['parameter_name'], ', '+ ','.join(['{}'.format(dependency_variable) for dependency_variable in sampling_variable['dependency_variables']]) if sampling_variable['dependency_variables'] else ''),
)
self._increment_indent_depth(+1)
#######
# Call to pdf_cdf_value_array_dict function
self._add_code(
comment_line,
"if self.grid_options['monte_carlo_use_pre_calculated_distributions']:\n",
)
######
# Add calls to the dependency variable's pre-calculated parameter_value_array
if sampling_variable['dependency_variables']:
for dependency_variable in sampling_variable['dependency_variables']:
# Create string for getting the parameter value array
string = "self.pre_calculated_pdf_cdf_value_array_dicts['{}']".format(dependency_variable)
if self.grid_options['_sampling_variables_parameter_names'][dependency_variable]['dependency_variables']:
for depth in range(0, len(self.grid_options['_sampling_variables_parameter_names'][dependency_variable]['dependency_variables'])):
string += "['{{:.6f}}'.format(binned_{})]".format(sampling_variable['dependency_variables'][depth])
# Add code to get the binned value of the current dependency value
self._add_code(
" binned_{} = bin({}['parameter_value_array'], {})\n".format(
dependency_variable,
string,
dependency_variable
)
)
self._add_code(
" pdf_cdf_value_array_dict = self.pre_calculated_pdf_cdf_value_array_dicts['{}']{}\n".format(sampling_variable['parameter_name'], ''.join(["['{{:.6f}}'.format(binned_{})]".format(dependency_variable) for dependency_variable in sampling_variable['dependency_variables']]) if sampling_variable['dependency_variables'] else ''),
"else:\n",
" pdf_cdf_value_array_dict = self.monte_carlo_calc_pdf_cdf_value_array_dict_functions['{}'](self{})\n\n".format(sampling_variable['parameter_name'], ', '+ ','.join(['{}'.format(dependency_variable) for dependency_variable in sampling_variable['dependency_variables']]) if sampling_variable['dependency_variables'] else ''),
comment_line,
"cdf_array = pdf_cdf_value_array_dict['cdf_array']\n",
"value_array = pdf_cdf_value_array_dict['value_array']\n",
"parameter_value_array = pdf_cdf_value_array_dict['parameter_value_array']\n\n",
)
#######
# Get random sample
self._add_code(
comment_line,
"random_sample = np.random.uniform(low=0.0, high=1.0, size=(1))\n\n",
)
#######
# Calculate indices and edge points
self._add_code(
comment_line,
"left_index = np.digitize(random_sample, bins=cdf_array, right=False)-1\n",
"right_index = left_index + 1\n\n",
)
#######
# Get probabilities of bin-edges and set
self._add_code(
comment_line,
"left_cdf = cdf_array[left_index]\n",
"right_cdf = cdf_array[right_index]\n",
"dcdf = right_cdf - left_cdf\n\n",
)
#######
# Get values of bin-edges and set
self._add_code(
comment_line,
"left_value = parameter_value_array[left_index]\n",
"right_value = parameter_value_array[right_index]\n",
"dvalue = right_value - left_value\n\n",
)
#######
# Calculate new point by interpolation
self._add_code(
comment_line,
"dvalue_dcdf = dvalue/dcdf\n",
"dist_to_left = random_sample - left_cdf\n",
"{} = left_value + (dvalue_dcdf) * dist_to_left\n\n".format(sampling_variable['parameter_name']),
)
#######
# Turn the new point into the actual parameter if we have post-code
if sampling_variable['postcode']:
self._add_code(
comment_line,
sampling_variable['postcode']+"\n\n",
)
#######
# return value
self._add_code(
comment_line,
"return {}[0]\n".format(sampling_variable['parameter_name']),
)
#######
# De-dent and padd
self._increment_indent_depth(-1)
self._add_code(
"\n\n"
)
def _monte_carlo_sampling_write_calc_pdf_cdf_value_array_dict_function(self, sampling_variable):
"""
Function to write the calc_pdf_cdf_value_array_dict_function call
"""
#######
# Construct function
self._add_code(
"def calc_pdf_cdf_value_array_dict_{}(self{}):\n".format(sampling_variable['parameter_name'], ', '+ ','.join(['{}'.format(dependency_variable) for dependency_variable in sampling_variable['dependency_variables']]) if sampling_variable['dependency_variables'] else ''),
)
self._increment_indent_depth(+1)
#######
# Write the pdf cdf value array code block
self._monte_carlo_sampling_write_pdf_cdf_value_array_block(sampling_variable)
#######
# Construct pdf/cdf and array dict and return
self._add_code(
"return pdf_cdf_value_array_dict\n",
)
#######
# De-dent and pad
self._increment_indent_depth(-1)
self._add_code(
"\n\n"
)
def _monte_carlo_sampling_write_handle_calc_sampled_value_function(self):
"""
Function to write then handler function for the calc_sampled_value
"""
#######
# Construct function
self._add_code(
"def handle_calc_sampled_value(self, sampling_variable, **kwargs):\n".format(),
)
self._increment_indent_depth(+1)
#######
# Get correct function and call the function with the arguments
self._add_code(
"calc_sampled_value_function = self.monte_carlo_calc_sampled_value_functions[sampling_variable['parameter_name']]\n",
"sampled_value = calc_sampled_value_function(self, **kwargs)\n\n",
"return sampled_value\n"
)
#######
# De-dent and padd
self._increment_indent_depth(-1)
self._add_code(
"\n\n"
)
def _monte_carlo_sampling_write_pre_calc_function(self, sampling_variable):
"""
Function to write the pre_calc function for each variable
"""
#######
# Construct function
self._add_code(
"def pre_calculate_pdf_cdf_value_array_dict_{}(self):\n".format(sampling_variable['parameter_name']),
)
self._increment_indent_depth(+1)
#######
# Handle construction if we have dependency variables
if sampling_variable['dependency_variables']:
#######
# Construct nested pdf cdf dict
self._add_code(
comment_line,
"nested_pdf_cdf_value_array_dict = {}\n\n",
)
sampling_variable_dict_based_on_parameter_name = {sampling_variable_el['parameter_name']: sampling_variable_el for sampling_variable_el in self.grid_options['_sampling_variables'].values()}
dependency_variable_list = [sampling_variable_dict_based_on_parameter_name[dependency_variable_key] for dependency_variable_key in sampling_variable['dependency_variables']]
#######
# Loop over all dependency variables
for dependency_variable_i, dependency_variable_key in enumerate(sampling_variable['dependency_variables']):
dependency_variable = sampling_variable_dict_based_on_parameter_name[dependency_variable_key]
################
# Create dict if we are in
if (not dependency_variable_i == len(sampling_variable['dependency_variables'])) and (not dependency_variable_i == 0):
self._add_code(
comment_line,
"nested_pdf_cdf_value_array_dict['{{:.6f}}'.format({})] = {{}}\n".format(sampling_variable['dependency_variables'][dependency_variable_i-1]),
)
#######
# Construct the loop over the
self._add_code(
comment_line,
"value_array_{} = {}\n\n".format(dependency_variable['name'], dependency_variable['samplerfunc']),
)
#######
# Set up loop for sampling the distribution function
self._add_code(
comment_line,
"for {} in center(value_array_{}):\n\n".format(dependency_variable['name'], dependency_variable['name']),
)
self._increment_indent_depth(+1)
self._add_code(
"{}\n".format(dependency_variable['precode'].replace(
"\n", "\n" + self._indent_block(0)
)) if dependency_variable['precode'] else "\n")
#######
# Construct parts to build the pdf cdf function
self._monte_carlo_sampling_write_pdf_cdf_value_array_block(sampling_variable)
#######
# Write code to store the dict in the nested dict
nested_dict_calls = "".join(["['{{:.6f}}'.format({})]".format(cur_dependency_variable['parameter_name']) for cur_dependency_variable in dependency_variable_list])
self._add_code(
comment_line,
"nested_pdf_cdf_value_array_dict{} = pdf_cdf_value_array_dict\n\n".format(
nested_dict_calls
)
)
#######
# de-dent the code
for dependency_variable_i, dependency_variable_key in enumerate(sampling_variable['dependency_variables'][::-1]):
self._increment_indent_depth(-1)
#######
# Construct dictionary to hold the other dictionaries
self._add_code(
"return nested_pdf_cdf_value_array_dict\n",
)
#######
# Handle construction if we don't have dependency variables
else:
#######
# Construct parts to build the pdf cdf function
self._monte_carlo_sampling_write_pdf_cdf_value_array_block(sampling_variable)
#######
# Write return statement
self._add_code(
"return pdf_cdf_value_array_dict\n",
)
#######
# De-dent and padd
self._increment_indent_depth(-1)
self._add_code(
"\n\n"
)
# Misc
def _sample_multiplicity(self, system_dict):
"""
Function to calculate the multiplicity based on grid choices and random guess
"""
##############
# If we choose to use no multiplicity fraction function then we always return multiplicity of 1
if self.grid_options["multiplicity_fraction_function"] in [0, "None"]:
return 1
# Otherwise we will get the multiplicity fraction dict sample a random value to determine which multiplicty we have
multiplicity_dict = self._get_multiplicity_dict(system_dict)
# TODO: this can probably be a oneliner
multiplicity_list, probability_list = [], []
for multiplicity, probability in sorted(multiplicity_dict.items(), key=lambda x: x[0]):
multiplicity_list.append(multiplicity)
probability_list.append(probability)
multiplicity_array = np.array(multiplicity_list)
probability_array = np.array(probability_list)
##############
# Calculate CDF and sample
probability_cdf = np.cumsum(probability_array)
random_number = np.random.uniform()
indices = np.digitize(random_number, bins=probability_cdf, right=False)
# Get multiplicity and return
multiplicity = multiplicity_array[indices]
return multiplicity
##############
# Management functions
def _monte_carlo_sampling_load_generator(self): def _monte_carlo_sampling_load_generator(self):
""" """
Function to load the monte_carlo grid Function to load the monte_carlo grid
...@@ -225,41 +981,12 @@ class monte_carlo_sampling: ...@@ -225,41 +981,12 @@ class monte_carlo_sampling:
return generator return generator
def _monte_carlo_sampling_check_mass_threshold(self, system_dict): ##############
""" # misc
Function to handle checking the total mass evolved and signal to stop
"""
# Only if the monte_carlo_mass_threshold is positive (default = -1)
if self.grid_options["monte_carlo_mass_threshold"] > 0:
# Add total mass of current system tot total mass evolved
self.grid_options[
"_monte_carlo_current_total_mass_evolved"
] += calculate_total_mass_system(system_dict)
# Check if exceeds threshold
if (
self.grid_options["_monte_carlo_current_total_mass_evolved"]
> self.grid_options["monte_carlo_mass_threshold"]
):
self.grid_options["_monte_carlo_threshold_reached"] = True
def _monte_carlo_sampling_check_custom_threshold(self, result_queue):
"""
Function to handle checking the content of the result queue and look for a
TODO: put in the user-defined custom threshold function
"""
# raise NotImplementedError("Functionality in not implemented")
pass
############## ##############
# Writing to file: # Writing binary_c calls to file:
def monte_carlo_sampling_write_binary_c_calls_to_file( def monte_carlo_sampling_write_binary_c_calls_to_file(
self, self,
output_file, output_file,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment