diff --git a/binarycpython/tests/main.py b/binarycpython/tests/main.py index 72e871b885d3ae07f67af3d3ebf3acd800712834..62da6bb66f7d3097db09733e6c8d378586ccdd4c 100755 --- a/binarycpython/tests/main.py +++ b/binarycpython/tests/main.py @@ -54,7 +54,13 @@ from binarycpython.tests.test_grid import ( test__return_argline, test_return_population_settings, test_return_binary_c_defaults, - test_return_all_info + test_return_all_info, + test_export_all_info, + test__cleanup_defaults, + test__increment_probtot, + test__increment_count, + test__dict_from_line_source_file, + test_evolve_single ) from binarycpython.tests.test_plot_functions import ( test_color_by_index, @@ -65,10 +71,10 @@ from binarycpython.tests.test_run_system_wrapper import * from binarycpython.tests.tests_population_extensions.test__distribution_functions import ( test_flat, test_number, - test_const + test_const_distribution ) from binarycpython.tests.tests_population_extensions.test__grid_options_defaults import ( - test_grid_options_help, + test_grid_options_help, test_grid_options_description_checker, test_write_grid_options_to_rst_file, ) diff --git a/binarycpython/tests/test_dicts.py b/binarycpython/tests/test_dicts.py index 538077775627af6f5b23df2ba60372b90b53d3d8..ce9ab0b0a688fcd0d9d1aa4ed6cfaca520bf97de 100644 --- a/binarycpython/tests/test_dicts.py +++ b/binarycpython/tests/test_dicts.py @@ -283,4 +283,7 @@ class test_inspect_dict(unittest.TestCase): "function": os.path.isfile, "dict": {"int": 1, "float": 1.2}, } - output_dict = inspect_dict(input_dict, print_structure=True) \ No newline at end of file + output_dict = inspect_dict(input_dict, print_structure=True) + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/binarycpython/tests/test_ensemble.py b/binarycpython/tests/test_ensemble.py index 0f392d5a1c25274f062cbd087fbcafbe45a8b477..ec19b99cc70c812bfab069549f57424057d1486e 100644 --- a/binarycpython/tests/test_ensemble.py +++ b/binarycpython/tests/test_ensemble.py @@ -69,4 +69,7 @@ class test_handle_ensemble_string_to_json(unittest.TestCase): self.assertTrue(isinstance(output_dict, dict)) self.assertTrue(output_dict["ding"] == 10) - self.assertTrue(output_dict["list_example"] == [1, 2, 3]) \ No newline at end of file + self.assertTrue(output_dict["list_example"] == [1, 2, 3]) + +if __name__ == "__main__": + unittest.main() diff --git a/binarycpython/tests/test_functions.py b/binarycpython/tests/test_functions.py index 86c52958f0868b3d43967e98a61ac34e01a17759..152df888399bd76b8d82f2abdbb88aedcf180b2a 100644 --- a/binarycpython/tests/test_functions.py +++ b/binarycpython/tests/test_functions.py @@ -35,7 +35,6 @@ from binarycpython.utils.functions import ( TMP_DIR = temp_dir("tests", "test_functions") - class test_verbose_print(unittest.TestCase): """ Unittests for verbose_print diff --git a/binarycpython/tests/test_grid.py b/binarycpython/tests/test_grid.py index 019f91e88288067fd62696273cf8f956e6fad431..f836d0b74b5a0906edf8b21147e6aaf281f359d9 100644 --- a/binarycpython/tests/test_grid.py +++ b/binarycpython/tests/test_grid.py @@ -3,7 +3,6 @@ Unit tests for the grid module TODO: jobID TODO: exit -TODO: export_all_info TODO: _set_nprocesses TODO: _pre_run_setup TODO: clean @@ -15,19 +14,17 @@ TODO: _evolve_system_mp TODO: _parent_signal_handler TODO: _child_signal_handler TODO: _process_run_population_grid -TODO: evolve_single TODO: _cleanup TODO: _dry_run TODO: _dry_run_source_file TODO: _load_source_file -TODO: _dict_from_line_source_file -TODO: _cleanup_defaults -TODO: _increment_probtot -TODO: _increment_count TODO: was_killed TODO: _check_binary_c_error + +TODO: Before running the non-unit tests to cover functions like evolve, we need to run the unit tests """ +import sys import unittest from binarycpython.utils.functions import ( @@ -40,6 +37,67 @@ from binarycpython.utils.grid import Population TMP_DIR = temp_dir("tests", "test_grid") TEST_VERBOSITY = 1 + +def parse_function_test_grid_evolve_2_threads_with_custom_logging(self, output): + """ + Simple parse function that directly appends all the output to a file + """ + + # Get some information from the + data_dir = self.custom_options["data_dir"] + + # make outputfilename + output_filename = os.path.join( + data_dir, + "test_grid_evolve_2_threads_with_custom_logging_outputfile_population_{}_thread_{}.dat".format( + self.grid_options["_population_id"], self.process_ID + ), + ) + + # Check directory, make if necessary + os.makedirs(data_dir, exist_ok=True) + + if not os.path.exists(output_filename): + with open(output_filename, "w") as first_f: + first_f.write(output + "\n") + else: + with open(output_filename, "a") as first_f: + first_f.write(output + "\n") + + +def parse_function_adding_results(self, output): + """ + Example parse function + """ + + seperator = " " + + parameters = ["time", "mass", "zams_mass", "probability", "stellar_type"] + + self.grid_results["example"]["count"] += 1 + + # Go over the output. + for line in output.splitlines(): + headerline = line.split()[0] + + # CHeck the header and act accordingly + if headerline == "EXAMPLE_OUTPUT": + values = line.split()[1:] + + # Bin the mass probability + self.grid_results["example"]["mass"][ + bin_data(float(values[2]), binwidth=0.5) + ] += float(values[3]) + + # + if not len(parameters) == len(values): + print("Number of column names isnt equal to number of columns") + raise ValueError + + # record the probability of this line (Beware, this is meant to only be run once for each system. its a controls quantity) + self.grid_results["example"]["probability"] += float(values[3]) + + class test__setup(unittest.TestCase): """ Unittests for _setup function @@ -256,4 +314,863 @@ class test_return_all_info(unittest.TestCase): self.assertNotEqual(all_info["population_settings"], {}) self.assertNotEqual(all_info["binary_c_defaults"], {}) self.assertNotEqual(all_info["binary_c_version_info"], {}) - self.assertNotEqual(all_info["binary_c_help_all"], {}) \ No newline at end of file + self.assertNotEqual(all_info["binary_c_help_all"], {}) + + +class test_export_all_info(unittest.TestCase): + """ + Unittests for export_all_info function + """ + + def test_export_all_info(self): + with Capturing() as output: + self._test_export_all_info() + + def _test_export_all_info(self): + """ + Unittests for the function export_all_info + """ + + test_pop = Population() + + test_pop.set(metallicity=0.02, verbosity=TEST_VERBOSITY) + test_pop.set(M_1=10) + test_pop.set(num_cores=2) + test_pop.set(data_dir=TMP_DIR) + + # datadir + settings_filename = test_pop.export_all_info(use_datadir=True) + self.assertTrue(os.path.isfile(settings_filename)) + with open(settings_filename, "r") as f: + all_info = json.loads(f.read()) + + # + self.assertIn("population_settings", all_info) + self.assertIn("binary_c_defaults", all_info) + self.assertIn("binary_c_version_info", all_info) + self.assertIn("binary_c_help_all", all_info) + + # + self.assertNotEqual(all_info["population_settings"], {}) + self.assertNotEqual(all_info["binary_c_defaults"], {}) + self.assertNotEqual(all_info["binary_c_version_info"], {}) + self.assertNotEqual(all_info["binary_c_help_all"], {}) + + # custom name + # datadir + settings_filename = test_pop.export_all_info( + use_datadir=False, + outfile=os.path.join(TMP_DIR, "example_settings.json"), + ) + self.assertTrue(os.path.isfile(settings_filename)) + with open(settings_filename, "r") as f: + all_info = json.loads(f.read()) + + # + self.assertIn("population_settings", all_info) + self.assertIn("binary_c_defaults", all_info) + self.assertIn("binary_c_version_info", all_info) + self.assertIn("binary_c_help_all", all_info) + + # + self.assertNotEqual(all_info["population_settings"], {}) + self.assertNotEqual(all_info["binary_c_defaults"], {}) + self.assertNotEqual(all_info["binary_c_version_info"], {}) + self.assertNotEqual(all_info["binary_c_help_all"], {}) + + # wrong filename + self.assertRaises( + ValueError, + test_pop.export_all_info, + use_datadir=False, + outfile=os.path.join(TMP_DIR, "example_settings.txt"), + ) + + +class test__cleanup_defaults(unittest.TestCase): + """ + Unittests for _cleanup_defaults function + """ + + def test__cleanup_defaults(self): + with Capturing() as output: + self._test__cleanup_defaults() + + def _test__cleanup_defaults(self): + """ + Unittests for the function _cleanup_defaults + """ + + test_pop = Population() + cleaned_up_defaults = test_pop._cleanup_defaults() + self.assertNotIn("help_all", cleaned_up_defaults) + + +class test__increment_probtot(unittest.TestCase): + """ + Unittests for _increment_probtot function + """ + + def test__increment_probtot(self): + with Capturing() as output: + self._test__increment_probtot() + + def _test__increment_probtot(self): + """ + Unittests for the function _increment_probtot + """ + + test_pop = Population() + test_pop._increment_probtot(0.5) + self.assertEqual(test_pop.grid_options["_probtot"], 0.5) + + +class test__increment_count(unittest.TestCase): + """ + Unittests for _increment_count function + """ + + def test__increment_count(self): + with Capturing() as output: + self._test__increment_count() + + def _test__increment_count(self): + """ + Unittests for the function _increment_count + """ + + test_pop = Population() + test_pop._increment_count() + self.assertEqual(test_pop.grid_options["_count"], 1) + + +class test__dict_from_line_source_file(unittest.TestCase): + """ + Unittests for _dict_from_line_source_file function + """ + + def test__dict_from_line_source_file(self): + with Capturing() as output: + self._test__dict_from_line_source_file() + + def _test__dict_from_line_source_file(self): + """ + Unittests for the function _dict_from_line_source_file + """ + + source_file = os.path.join(TMP_DIR, "example_source_file.txt") + + # write + with open(source_file, "w") as f: + f.write("binary_c M_1 10 metallicity 0.02\n") + + test_pop = Population() + + # readout + with open(source_file, "r") as f: + for line in f.readlines(): + argdict = test_pop._dict_from_line_source_file(line) + + self.assertTrue(argdict["M_1"] == 10) + self.assertTrue(argdict["metallicity"] == 0.02) + + +class test_evolve_single(unittest.TestCase): + """ + Unittests for evolve_single function + """ + + def test_evolve_single(self): + with Capturing() as output: + self._test_evolve_single() + + def _test_evolve_single(self): + """ + Unittests for the function evolve_single + """ + + CUSTOM_LOGGING_STRING_MASSES = """ + Printf("TEST_CUSTOM_LOGGING_1 %30.12e %g %g %g %g\\n", + // + stardata->model.time, // 1 + + // masses + stardata->common.zero_age.mass[0], // + stardata->common.zero_age.mass[1], // + + stardata->star[0].mass, + stardata->star[1].mass + ); + """ + + test_pop = Population() + test_pop.set( + M_1=10, + M_2=5, + orbital_period=100000, + metallicty=0.02, + max_evolution_time=15000, + verbosity=TEST_VERBOSITY, + ) + + test_pop.set(C_logging_code=CUSTOM_LOGGING_STRING_MASSES) + + output = test_pop.evolve_single() + + # + self.assertTrue(len(output.splitlines()) > 1) + self.assertIn("TEST_CUSTOM_LOGGING_1", output) + + # + custom_logging_dict = {"TEST_CUSTOM_LOGGING_2": ["star[0].mass", "model.time"]} + test_pop_2 = Population() + test_pop_2.set( + M_1=10, + M_2=5, + orbital_period=100000, + metallicty=0.02, + max_evolution_time=15000, + verbosity=TEST_VERBOSITY, + ) + + test_pop_2.set(C_auto_logging=custom_logging_dict) + + output_2 = test_pop_2.evolve_single() + + # + self.assertTrue(len(output_2.splitlines()) > 1) + self.assertIn("TEST_CUSTOM_LOGGING_2", output_2) + + + + + + +######## +# Some tests that are not really -unit- tests +class test_resultdict(unittest.TestCase): + """ + Unittests for bin_data + """ + + def test_adding_results(self): + """ + Function to test whether the results are properly added and combined + """ + + # Create custom logging statement + custom_logging_statement = """ + if (stardata->model.time < stardata->model.max_evolution_time) + { + Printf("EXAMPLE_OUTPUT %30.16e %g %g %30.12e %d\\n", + // + stardata->model.time, // 1 + stardata->star[0].mass, // 2 + stardata->common.zero_age.mass[0], // 3 + stardata->model.probability, // 4 + stardata->star[0].stellar_type // 5 + ); + }; + /* Kill the simulation to save time */ + stardata->model.max_evolution_time = stardata->model.time - stardata->model.dtm; + """ + + example_pop = Population() + example_pop.set(verbosity=0) + example_pop.set( + max_evolution_time=15000, # bse_options + # grid_options + num_cores=3, + tmp_dir=TMP_DIR, + # Custom options + data_dir=os.path.join(TMP_DIR, "test_resultdict"), # custom_options + C_logging_code=custom_logging_statement, + parse_function=parse_function_adding_results, + ) + + # Add grid variables + resolution = {"M_1": 10} + + # Mass + example_pop.add_grid_variable( + name="lnm1", + longname="Primary mass", + valuerange=[2, 150], + samplerfunc="const(math.log(2), math.log(150), {})".format( + resolution["M_1"] + ), + precode="M_1=math.exp(lnm1)", + probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 150, -1.3, -2.3, -2.3)*M_1", + dphasevol="dlnm1", + parameter_name="M_1", + condition="", # Impose a condition on this grid variable. Mostly for a check for yourself + ) + + ## Executing a population + ## This uses the values generated by the grid_variables + analytics = example_pop.evolve() + + # + grid_prob = analytics["total_probability"] + result_dict_prob = example_pop.grid_results["example"]["probability"] + + # amt systems + grid_count = analytics["total_count"] + result_dict_count = example_pop.grid_results["example"]["count"] + + # Check if the total probability matches + self.assertAlmostEqual( + grid_prob, + result_dict_prob, + places=12, + msg="Total probability from grid {} and from result dict {} are not equal".format( + grid_prob, result_dict_prob + ), + ) + + # Check if the total count matches + self.assertEqual( + grid_count, + result_dict_count, + msg="Total count from grid {} and from result dict {} are not equal".format( + grid_count, result_dict_count + ), + ) + + # Check if the structure is what we expect. Note: this depends on the probability calculation. if that changes we need to recalibrate this + test_case_dict = { + 2.25: 0.01895481306515, + 3.75: 0.01081338190204, + 5.75: 0.006168841009268, + 9.25: 0.003519213484031, + 13.75: 0.002007648361756, + 21.25: 0.001145327489437, + 33.25: 0.0006533888518775, + 50.75: 0.0003727466560393, + 78.25: 0.000212645301782, + 120.75: 0.0001213103421247, + } + + self.assertEqual( + test_case_dict, dict(example_pop.grid_results["example"]["mass"]) + ) + + + + +# class test_grid_evolve(unittest.TestCase): +# """ +# Unittests for function Population.evolve() +# """ + +# def test_grid_evolve_1_thread(self): +# with Capturing() as output: +# self._test_grid_evolve_1_thread() + +# def _test_grid_evolve_1_thread(self): +# """ +# Unittests to see if 1 thread does all the systems +# """ + +# test_pop_evolve_1_thread = Population() +# test_pop_evolve_1_thread.set( +# num_cores=1, M_2=1, orbital_period=100000, verbosity=TEST_VERBOSITY +# ) + +# resolution = {"M_1": 10} + +# test_pop_evolve_1_thread.add_grid_variable( +# name="lnm1", +# longname="Primary mass", +# valuerange=[1, 100], +# samplerfunc="const(math.log(1), math.log(100), {})".format( +# resolution["M_1"] +# ), +# precode="M_1=math.exp(lnm1)", +# probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", +# dphasevol="dlnm1", +# parameter_name="M_1", +# condition="", # Impose a condition on this grid variable. Mostly for a check for yourself +# ) + +# analytics = test_pop_evolve_1_thread.evolve() +# self.assertLess( +# np.abs(analytics["total_probability"] - 0.10820655287892997), +# 1e-10, +# msg=analytics["total_probability"], +# ) +# self.assertTrue(analytics["total_count"] == 10) + +# def test_grid_evolve_2_threads(self): +# with Capturing() as output: +# self._test_grid_evolve_2_threads() + +# def _test_grid_evolve_2_threads(self): +# """ +# Unittests to see if multiple threads handle the all the systems correctly +# """ + +# test_pop = Population() +# test_pop.set( +# num_cores=2, M_2=1, orbital_period=100000, verbosity=TEST_VERBOSITY +# ) + +# resolution = {"M_1": 10} + +# test_pop.add_grid_variable( +# name="lnm1", +# longname="Primary mass", +# valuerange=[1, 100], +# samplerfunc="const(math.log(1), math.log(100), {})".format( +# resolution["M_1"] +# ), +# precode="M_1=math.exp(lnm1)", +# probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", +# dphasevol="dlnm1", +# parameter_name="M_1", +# condition="", # Impose a condition on this grid variable. Mostly for a check for yourself +# ) + +# analytics = test_pop.evolve() +# self.assertLess( +# np.abs(analytics["total_probability"] - 0.10820655287892997), +# 1e-10, +# msg=analytics["total_probability"], +# ) # +# self.assertTrue(analytics["total_count"] == 10) + +# def test_grid_evolve_2_threads_with_custom_logging(self): +# with Capturing() as output: +# self._test_grid_evolve_2_threads_with_custom_logging() + +# def _test_grid_evolve_2_threads_with_custom_logging(self): +# """ +# Unittests to see if multiple threads do the custom logging correctly +# """ + +# data_dir_value = os.path.join(TMP_DIR, "grid_tests") +# num_cores_value = 2 +# custom_logging_string = 'Printf("MY_STELLAR_DATA_TEST_EXAMPLE %g %g %g %g\\n",((double)stardata->model.time),((double)stardata->star[0].mass),((double)stardata->model.probability),((double)stardata->model.dt));' + +# test_pop = Population() + +# test_pop.set( +# num_cores=num_cores_value, +# verbosity=TEST_VERBOSITY, +# M_2=1, +# orbital_period=100000, +# data_dir=data_dir_value, +# C_logging_code=custom_logging_string, # input it like this. +# parse_function=parse_function_test_grid_evolve_2_threads_with_custom_logging, +# ) +# test_pop.set(ensemble=0) +# resolution = {"M_1": 2} + +# test_pop.add_grid_variable( +# name="lnm1", +# longname="Primary mass", +# valuerange=[1, 100], +# samplerfunc="const(math.log(1), math.log(100), {})".format( +# resolution["M_1"] +# ), +# precode="M_1=math.exp(lnm1)", +# probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", +# dphasevol="dlnm1", +# parameter_name="M_1", +# condition="", # Impose a condition on this grid variable. Mostly for a check for yourself +# ) + +# analytics = test_pop.evolve() +# output_names = [ +# os.path.join( +# data_dir_value, +# "test_grid_evolve_2_threads_with_custom_logging_outputfile_population_{}_thread_{}.dat".format( +# analytics["population_name"], thread_id +# ), +# ) +# for thread_id in range(num_cores_value) +# ] + +# for output_name in output_names: +# self.assertTrue(os.path.isfile(output_name)) + +# with open(output_name, "r") as f: +# output_string = f.read() + +# self.assertIn("MY_STELLAR_DATA_TEST_EXAMPLE", output_string) + +# remove_file(output_name) + +# def test_grid_evolve_with_condition_error(self): +# with Capturing() as output: +# self._test_grid_evolve_with_condition_error() + +# def _test_grid_evolve_with_condition_error(self): +# """ +# Unittests to see if the threads catch the errors correctly. +# """ + +# test_pop = Population() +# test_pop.set( +# num_cores=2, M_2=1, orbital_period=100000, verbosity=TEST_VERBOSITY +# ) + +# # Set the amt of failed systems that each thread will log +# test_pop.set(failed_systems_threshold=4) + +# CUSTOM_LOGGING_STRING_WITH_EXIT = """ +# Exit_binary_c(BINARY_C_NORMAL_EXIT, "testing exits. This is part of the testing, don't worry"); +# Printf("TEST_CUSTOM_LOGGING_1 %30.12e %g %g %g %g\\n", +# // +# stardata->model.time, // 1 + +# // masses +# stardata->common.zero_age.mass[0], // +# stardata->common.zero_age.mass[1], // + +# stardata->star[0].mass, +# stardata->star[1].mass +# ); +# """ + +# test_pop.set(C_logging_code=CUSTOM_LOGGING_STRING_WITH_EXIT) + +# resolution = {"M_1": 10} +# test_pop.add_grid_variable( +# name="lnm1", +# longname="Primary mass", +# valuerange=[1, 100], +# samplerfunc="const(math.log(1), math.log(100), {})".format( +# resolution["M_1"] +# ), +# precode="M_1=math.exp(lnm1)", +# probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", +# dphasevol="dlnm1", +# parameter_name="M_1", +# condition="", # Impose a condition on this grid variable. Mostly for a check for yourself +# ) + +# analytics = test_pop.evolve() +# self.assertLess( +# np.abs(analytics["total_probability"] - 0.10820655287892997), +# 1e-10, +# msg=analytics["total_probability"], +# ) # +# self.assertEqual(analytics["failed_systems_error_codes"], [0]) +# self.assertTrue(analytics["total_count"] == 10) +# self.assertTrue(analytics["failed_count"] == 10) +# self.assertTrue(analytics["errors_found"] == True) +# self.assertTrue(analytics["errors_exceeded"] == True) + +# # test to see if 1 thread does all the systems + +# test_pop = Population() +# test_pop.set( +# num_cores=2, M_2=1, orbital_period=100000, verbosity=TEST_VERBOSITY +# ) +# test_pop.set(failed_systems_threshold=4) +# test_pop.set(C_logging_code=CUSTOM_LOGGING_STRING_WITH_EXIT) + +# resolution = {"M_1": 10, "q": 2} + +# test_pop.add_grid_variable( +# name="lnm1", +# longname="Primary mass", +# valuerange=[1, 100], +# samplerfunc="const(math.log(1), math.log(100), {})".format( +# resolution["M_1"] +# ), +# precode="M_1=math.exp(lnm1)", +# probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", +# dphasevol="dlnm1", +# parameter_name="M_1", +# condition="", # Impose a condition on this grid variable. Mostly for a check for yourself +# ) + +# test_pop.add_grid_variable( +# name="q", +# longname="Mass ratio", +# valuerange=["0.1/M_1", 1], +# samplerfunc="const(0.1/M_1, 1, {})".format(resolution["q"]), +# probdist="flatsections(q, [{'min': 0.1/M_1, 'max': 1.0, 'height': 1}])", +# dphasevol="dq", +# precode="M_2 = q * M_1", +# parameter_name="M_2", +# # condition="M_1 in dir()", # Impose a condition on this grid variable. Mostly for a check for yourself +# condition="'random_var' in dir()", # This will raise an error because random_var is not defined. +# ) + +# # TODO: why should it raise this error? It should probably raise a valueerror when the limit is exceeded right? +# # DEcided to turn it off for now because there is not raise VAlueError in that chain of functions. +# # NOTE: Found out why this test was here. It is to do with the condition random_var in dir(), but I changed the behaviour from raising an error to continue. This has to do with the moe&distefano code that will loop over several multiplicities +# # TODO: make sure the continue behaviour is what we actually want. + +# # self.assertRaises(ValueError, test_pop.evolve) + +# def test_grid_evolve_no_grid_variables(self): +# with Capturing() as output: +# self._test_grid_evolve_no_grid_variables() + +# def _test_grid_evolve_no_grid_variables(self): +# """ +# Unittests to see if errors are raised if there are no grid variables +# """ + +# test_pop = Population() +# test_pop.set( +# num_cores=1, M_2=1, orbital_period=100000, verbosity=TEST_VERBOSITY +# ) + +# resolution = {"M_1": 10} +# self.assertRaises(ValueError, test_pop.evolve) + +# def test_grid_evolve_2_threads_with_ensemble_direct_output(self): +# with Capturing() as output: +# self._test_grid_evolve_2_threads_with_ensemble_direct_output() + +# def _test_grid_evolve_2_threads_with_ensemble_direct_output(self): +# """ +# Unittests to see if multiple threads output the ensemble information to files correctly +# """ + +# data_dir_value = TMP_DIR +# num_cores_value = 2 + +# test_pop = Population() +# test_pop.set( +# num_cores=num_cores_value, +# verbosity=TEST_VERBOSITY, +# M_2=1, +# orbital_period=100000, +# ensemble=1, +# ensemble_defer=1, +# ensemble_filters_off=1, +# ensemble_filter_STELLAR_TYPE_COUNTS=1, +# ensemble_dt=1000, +# ) +# test_pop.set( +# data_dir=TMP_DIR, +# ensemble_output_name="ensemble_output.json", +# combine_ensemble_with_thread_joining=False, +# ) + +# resolution = {"M_1": 10} + +# test_pop.add_grid_variable( +# name="lnm1", +# longname="Primary mass", +# valuerange=[1, 100], +# samplerfunc="const(math.log(1), math.log(100), {})".format( +# resolution["M_1"] +# ), +# precode="M_1=math.exp(lnm1)", +# probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", +# dphasevol="dlnm1", +# parameter_name="M_1", +# condition="", # Impose a condition on this grid variable. Mostly for a check for yourself +# ) + +# analytics = test_pop.evolve() +# output_names = [ +# os.path.join( +# data_dir_value, +# "ensemble_output_{}_{}.json".format( +# analytics["population_name"], thread_id +# ), +# ) +# for thread_id in range(num_cores_value) +# ] + +# for output_name in output_names: +# self.assertTrue(os.path.isfile(output_name)) + +# with open(output_name, "r") as f: +# file_content = f.read() + +# ensemble_json = json.loads(file_content) + +# self.assertTrue(isinstance(ensemble_json, dict)) +# self.assertNotEqual(ensemble_json, {}) + +# self.assertIn("number_counts", ensemble_json) +# self.assertNotEqual(ensemble_json["number_counts"], {}) + +# def test_grid_evolve_2_threads_with_ensemble_combining(self): +# with Capturing() as output: +# self._test_grid_evolve_2_threads_with_ensemble_combining() + +# def _test_grid_evolve_2_threads_with_ensemble_combining(self): +# """ +# Unittests to see if multiple threads correclty combine the ensemble data and store them in the grid +# """ + +# data_dir_value = TMP_DIR +# num_cores_value = 2 + +# test_pop = Population() +# test_pop.set( +# num_cores=num_cores_value, +# verbosity=TEST_VERBOSITY, +# M_2=1, +# orbital_period=100000, +# ensemble=1, +# ensemble_defer=1, +# ensemble_filters_off=1, +# ensemble_filter_STELLAR_TYPE_COUNTS=1, +# ensemble_dt=1000, +# ) +# test_pop.set( +# data_dir=TMP_DIR, +# combine_ensemble_with_thread_joining=True, +# ensemble_output_name="ensemble_output.json", +# ) + +# resolution = {"M_1": 10} + +# test_pop.add_grid_variable( +# name="lnm1", +# longname="Primary mass", +# valuerange=[1, 100], +# samplerfunc="const(math.log(1), math.log(100), {})".format( +# resolution["M_1"] +# ), +# precode="M_1=math.exp(lnm1)", +# probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", +# dphasevol="dlnm1", +# parameter_name="M_1", +# condition="", # Impose a condition on this grid variable. Mostly for a check for yourself +# ) + +# analytics = test_pop.evolve() + +# self.assertTrue(isinstance(test_pop.grid_ensemble_results["ensemble"], dict)) +# self.assertNotEqual(test_pop.grid_ensemble_results["ensemble"], {}) + +# self.assertIn("number_counts", test_pop.grid_ensemble_results["ensemble"]) +# self.assertNotEqual( +# test_pop.grid_ensemble_results["ensemble"]["number_counts"], {} +# ) + +# def test_grid_evolve_2_threads_with_ensemble_comparing_two_methods(self): +# with Capturing() as output: +# self._test_grid_evolve_2_threads_with_ensemble_comparing_two_methods() + +# def _test_grid_evolve_2_threads_with_ensemble_comparing_two_methods(self): +# """ +# Unittests to compare the method of storing the combined ensemble data in the object and writing them to files and combining them later. they have to be the same +# """ + +# data_dir_value = TMP_DIR +# num_cores_value = 2 + +# # First +# test_pop_1 = Population() +# test_pop_1.set( +# num_cores=num_cores_value, +# verbosity=TEST_VERBOSITY, +# M_2=1, +# orbital_period=100000, +# ensemble=1, +# ensemble_defer=1, +# ensemble_filters_off=1, +# ensemble_filter_STELLAR_TYPE_COUNTS=1, +# ensemble_dt=1000, +# ) +# test_pop_1.set( +# data_dir=TMP_DIR, +# combine_ensemble_with_thread_joining=True, +# ensemble_output_name="ensemble_output.json", +# ) + +# resolution = {"M_1": 10} + +# test_pop_1.add_grid_variable( +# name="lnm1", +# longname="Primary mass", +# valuerange=[1, 100], +# samplerfunc="const(math.log(1), math.log(100), {})".format( +# resolution["M_1"] +# ), +# precode="M_1=math.exp(lnm1)", +# probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", +# dphasevol="dlnm1", +# parameter_name="M_1", +# condition="", # Impose a condition on this grid variable. Mostly for a check for yourself +# ) + +# analytics_1 = test_pop_1.evolve() +# ensemble_output_1 = test_pop_1.grid_ensemble_results + +# # second +# test_pop_2 = Population() +# test_pop_2.set( +# num_cores=num_cores_value, +# verbosity=TEST_VERBOSITY, +# M_2=1, +# orbital_period=100000, +# ensemble=1, +# ensemble_defer=1, +# ensemble_filters_off=1, +# ensemble_filter_STELLAR_TYPE_COUNTS=1, +# ensemble_dt=1000, +# ) +# test_pop_2.set( +# data_dir=TMP_DIR, +# ensemble_output_name="ensemble_output.json", +# combine_ensemble_with_thread_joining=False, +# ) + +# resolution = {"M_1": 10} + +# test_pop_2.add_grid_variable( +# name="lnm1", +# longname="Primary mass", +# valuerange=[1, 100], +# samplerfunc="const(math.log(1), math.log(100), {})".format( +# resolution["M_1"] +# ), +# precode="M_1=math.exp(lnm1)", +# probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", +# dphasevol="dlnm1", +# parameter_name="M_1", +# condition="", # Impose a condition on this grid variable. Mostly for a check for yourself +# ) + +# analytics_2 = test_pop_2.evolve() +# output_names_2 = [ +# os.path.join( +# data_dir_value, +# "ensemble_output_{}_{}.json".format( +# analytics_2["population_name"], thread_id +# ), +# ) +# for thread_id in range(num_cores_value) +# ] +# ensemble_output_2 = {} + +# for output_name in output_names_2: +# self.assertTrue(os.path.isfile(output_name)) + +# with open(output_name, "r") as f: +# file_content = f.read() + +# ensemble_json = json.loads(file_content) + +# ensemble_output_2 = merge_dicts(ensemble_output_2, ensemble_json) + +# for key in ensemble_output_1["ensemble"]["number_counts"]["stellar_type"]["0"]: +# self.assertIn(key, ensemble_output_2["number_counts"]["stellar_type"]["0"]) + +# # compare values +# self.assertLess( +# np.abs( +# ensemble_output_1["ensemble"]["number_counts"]["stellar_type"]["0"][ +# key +# ] +# - ensemble_output_2["number_counts"]["stellar_type"]["0"][key] +# ), +# 1e-8, +# ) + +if __name__ == "__main__": + unittest.main() diff --git a/binarycpython/tests/test_run_system_wrapper.py b/binarycpython/tests/test_run_system_wrapper.py index ef4cc754d43de33d02c0cf748caa16f2e9147a30..66bfbc41ab68baae3ea9bbf3ba662d7fd885134c 100644 --- a/binarycpython/tests/test_run_system_wrapper.py +++ b/binarycpython/tests/test_run_system_wrapper.py @@ -3,3 +3,10 @@ Unittests for run_system_wrapper """ # TODO: write tests for run_system_wrapper + +import unittest + +from binarycpython.utils.run_system_wrapper import run_system + +if __name__ == "__main__": + unittest.main() diff --git a/binarycpython/tests/test_stellar_types.py b/binarycpython/tests/test_stellar_types.py index 2fc5ceb44973cd342c3a7688b68e8ba6f52b5d79..9f2f9adbf182a36765a8c8b9642b462bed57cabd 100644 --- a/binarycpython/tests/test_stellar_types.py +++ b/binarycpython/tests/test_stellar_types.py @@ -2,4 +2,9 @@ Unittests for stellar_types module """ -from binarycpython.utils.stellar_types import STELLAR_TYPE_DICT, STELLAR_TYPE_DICT_SHORT \ No newline at end of file +import unittest + +from binarycpython.utils.stellar_types import STELLAR_TYPE_DICT, STELLAR_TYPE_DICT_SHORT + +if __name__ == "__main__": + unittest.main() diff --git a/binarycpython/tests/tests_population_extensions/test__HPC.py b/binarycpython/tests/tests_population_extensions/test__HPC.py index 24180dc883b912e56b973aa658ae93a9669a437c..4794a1ac2b17213958a330deaa5d4a2bd4323601 100644 --- a/binarycpython/tests/tests_population_extensions/test__HPC.py +++ b/binarycpython/tests/tests_population_extensions/test__HPC.py @@ -28,4 +28,9 @@ TODO: HPC_touch TODO: HPC_status TODO: HPC_dump_status TODO: HPC_queue_stats -""" \ No newline at end of file +""" + +import unittest + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/binarycpython/tests/tests_population_extensions/test__Moe_di_Stefano_2017.py b/binarycpython/tests/tests_population_extensions/test__Moe_di_Stefano_2017.py index f3308f8f0231fdec15f0bcc34437d635bd9e81e2..94ad7e8db9963cbf4a4fca2adba2bb83f541b679 100644 --- a/binarycpython/tests/tests_population_extensions/test__Moe_di_Stefano_2017.py +++ b/binarycpython/tests/tests_population_extensions/test__Moe_di_Stefano_2017.py @@ -10,4 +10,9 @@ TODO: _calculate_multiplicity_fraction TODO: get_moe_di_stefano_dataset TODO: get_Moe_di_Stefano_2017_default_options TODO: get_Moe_di_Stefano_2017_default_options_description -""" \ No newline at end of file +""" + +import unittest + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/binarycpython/tests/tests_population_extensions/test__analytics.py b/binarycpython/tests/tests_population_extensions/test__analytics.py index f5146f02bd8d24c133157db18ac366f522af10e5..152d27312d5c2408adf717e2d52330278c9efcd7 100644 --- a/binarycpython/tests/tests_population_extensions/test__analytics.py +++ b/binarycpython/tests/tests_population_extensions/test__analytics.py @@ -6,3 +6,8 @@ TODO: set_time TODO: time_elapsed TODO: CPU_time """ + +import unittest + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/binarycpython/tests/tests_population_extensions/test__cachce.py b/binarycpython/tests/tests_population_extensions/test__cache.py similarity index 70% rename from binarycpython/tests/tests_population_extensions/test__cachce.py rename to binarycpython/tests/tests_population_extensions/test__cache.py index 00e6f350ca4a37e9e7939206924ed96c8297f962..19e526381c97a1e844b9de8a2fdc48a0a7d1a8aa 100644 --- a/binarycpython/tests/tests_population_extensions/test__cachce.py +++ b/binarycpython/tests/tests_population_extensions/test__cache.py @@ -6,3 +6,8 @@ TODO: NullCache TODO: setup_function_cache TODO: test_caches """ + +import unittest + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/binarycpython/tests/tests_population_extensions/test__condor.py b/binarycpython/tests/tests_population_extensions/test__condor.py index 835c8025cc30980a898e4c9492b0e57b18cb938b..4a449764fb0c17dee75edaf9427c7b8ba788a00b 100644 --- a/binarycpython/tests/tests_population_extensions/test__condor.py +++ b/binarycpython/tests/tests_population_extensions/test__condor.py @@ -12,4 +12,9 @@ TODO: ondor_outfile TODO: make_condor_dirs TODO: condor_grid TODO: condor_queue_stats -""" \ No newline at end of file +""" + +import unittest + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/binarycpython/tests/tests_population_extensions/test__dataIO.py b/binarycpython/tests/tests_population_extensions/test__dataIO.py index 98718fdfb10037a4a3d81e1e437812e3ee46b279..aa3a2ad3f19083c55c3a1ac4d2e04f7ff357662f 100644 --- a/binarycpython/tests/tests_population_extensions/test__dataIO.py +++ b/binarycpython/tests/tests_population_extensions/test__dataIO.py @@ -19,4 +19,9 @@ TODO: NFS_flush_hack TODO: compression_type TODO: open TODO: NFSpath -""" \ No newline at end of file +""" + +import unittest + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/binarycpython/tests/tests_population_extensions/test__distribution_functions.py b/binarycpython/tests/tests_population_extensions/test__distribution_functions.py index 8fd20789956def67161fddaa063cd83479064946..30095a950914e85d02638d261810b80b3ed02fd3 100644 --- a/binarycpython/tests/tests_population_extensions/test__distribution_functions.py +++ b/binarycpython/tests/tests_population_extensions/test__distribution_functions.py @@ -106,7 +106,7 @@ class test_const_distribution(unittest.TestCase): def test_const_distribution(self): with Capturing() as output: - self._test_const() + self._test_const_distribution() def _test_const_distribution(self): """ diff --git a/binarycpython/tests/tests_population_extensions/test__grid_logging.py b/binarycpython/tests/tests_population_extensions/test__grid_logging.py index 7c2a2c4dff0ee51097ccababb250745fa92556c3..584edf446f7b3c3c6c266b36a85cbe29eb3d219d 100644 --- a/binarycpython/tests/tests_population_extensions/test__grid_logging.py +++ b/binarycpython/tests/tests_population_extensions/test__grid_logging.py @@ -11,3 +11,8 @@ TODO: _boxed TODO: _get_stream_logger TODO: _clean_up_custom_logging """ + +import unittest + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/binarycpython/tests/tests_population_extensions/test__grid_options_defaults.py b/binarycpython/tests/tests_population_extensions/test__grid_options_defaults.py index 69cd20bc68cad3dc96bc97538cbf514165edf9f5..c6ba68fe37e0ecd3d47525f98dda8ea9a57f4864 100644 --- a/binarycpython/tests/tests_population_extensions/test__grid_options_defaults.py +++ b/binarycpython/tests/tests_population_extensions/test__grid_options_defaults.py @@ -17,7 +17,6 @@ from binarycpython.utils.grid import Population TMP_DIR = temp_dir("tests", "test_grid_options_defaults") - class test_grid_options_help(unittest.TestCase): """ Unit tests for the grid_options_help function diff --git a/binarycpython/tests/tests_population_extensions/test__gridcode.py b/binarycpython/tests/tests_population_extensions/test__gridcode.py index 85a0a5ede8b7ef8ac43d26c5ed9b7412250d431e..0be209d358883ade4ab1c81a8ae2a939b5112c69 100644 --- a/binarycpython/tests/tests_population_extensions/test__gridcode.py +++ b/binarycpython/tests/tests_population_extensions/test__gridcode.py @@ -72,3 +72,6 @@ class test_add_grid_variable(unittest.TestCase): self.assertIn("q", test_pop.grid_options["_grid_variables"]) self.assertIn("lnm1", test_pop.grid_options["_grid_variables"]) self.assertEqual(len(test_pop.grid_options["_grid_variables"]), 2) + +if __name__ == "__main__": + unittest.main() diff --git a/binarycpython/tests/tests_population_extensions/test__hpc_functions.py b/binarycpython/tests/tests_population_extensions/test__hpc_functions.py deleted file mode 100644 index e5fe16c34ea86965592827255a6649a54de2c149..0000000000000000000000000000000000000000 --- a/binarycpython/tests/tests_population_extensions/test__hpc_functions.py +++ /dev/null @@ -1,7 +0,0 @@ -""" -Unittests for hpc_functions module -""" - -from binarycpython.utils.hpc_functions import * - -# TODO: write tests for hpc functions diff --git a/binarycpython/tests/tests_population_extensions/test__metadata.py b/binarycpython/tests/tests_population_extensions/test__metadata.py index 5c6ef8a601d5fc42ecca11fe385230ac8de9881d..95f6e81f197c8555e413f75c356ba08541d198cd 100644 --- a/binarycpython/tests/tests_population_extensions/test__metadata.py +++ b/binarycpython/tests/tests_population_extensions/test__metadata.py @@ -5,3 +5,8 @@ TODO: add_system_metadata TODO: add_ensemble_metadata TODO: _metadata_keylist """ + +import unittest + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/binarycpython/tests/tests_population_extensions/test__slurm.py b/binarycpython/tests/tests_population_extensions/test__slurm.py index 2d1ea37b1e09d76ff15055b111a6065911ddb1f2..d2734f9ce0ae58368ce9d365a45efe89471f091a 100644 --- a/binarycpython/tests/tests_population_extensions/test__slurm.py +++ b/binarycpython/tests/tests_population_extensions/test__slurm.py @@ -13,3 +13,8 @@ TODO: make_slurm_dirs TODO: slurm_grid TODO: slurm_queue_stats """ + +import unittest + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/binarycpython/tests/tests_population_extensions/test__version_info.py b/binarycpython/tests/tests_population_extensions/test__version_info.py index e893d4e0f918d7fd3e68ab50ca0d92ed3d9052b8..7599dc6502dd1536a9a4dac62d78a4b4db0d689f 100644 --- a/binarycpython/tests/tests_population_extensions/test__version_info.py +++ b/binarycpython/tests/tests_population_extensions/test__version_info.py @@ -4,7 +4,6 @@ Unit tests for the _version_info Population extension module TODO: minimum_stellar_mass """ -import os import unittest from binarycpython.utils.functions import ( @@ -138,4 +137,7 @@ class test_parse_binary_c_version_info(unittest.TestCase): self.assertIsNotNone(parsed_info["isotopes"]) if parsed_info["macros"]["NUCSYN_ID_SOURCES"] == "on": - self.assertIsNotNone(parsed_info["nucleosynthesis_sources"]) \ No newline at end of file + self.assertIsNotNone(parsed_info["nucleosynthesis_sources"]) + +if __name__ == "__main__": + unittest.main() diff --git a/binarycpython/tests/tmp_functions.py b/binarycpython/tests/tmp_functions.py index 6d8add997aaa35831d083a1e84b17f825fa216e2..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 --- a/binarycpython/tests/tmp_functions.py +++ b/binarycpython/tests/tmp_functions.py @@ -1,912 +0,0 @@ -""" -Test cases for the grid - -Tasks: - TODO: write tests for load_from_sourcefile -""" - -import os -import sys -import json -import unittest -import numpy as np - -from binarycpython.utils.grid import Population - -from binarycpython.utils.functions import ( - temp_dir, - remove_file, - Capturing, - bin_data, -) - -from binarycpython.utils.ensemble import ( - extract_ensemble_json_from_string, -) -from binarycpython.utils.dicts import ( - merge_dicts, -) - -from binarycpython.utils.custom_logging_functions import binary_c_log_code - - -def parse_function_test_grid_evolve_2_threads_with_custom_logging(self, output): - """ - Simple parse function that directly appends all the output to a file - """ - - # Get some information from the - data_dir = self.custom_options["data_dir"] - - # make outputfilename - output_filename = os.path.join( - data_dir, - "test_grid_evolve_2_threads_with_custom_logging_outputfile_population_{}_thread_{}.dat".format( - self.grid_options["_population_id"], self.process_ID - ), - ) - - # Check directory, make if necessary - os.makedirs(data_dir, exist_ok=True) - - if not os.path.exists(output_filename): - with open(output_filename, "w") as first_f: - first_f.write(output + "\n") - else: - with open(output_filename, "a") as first_f: - first_f.write(output + "\n") - - -class test_Population(unittest.TestCase): - """ - Unittests for function - """ - - def test_export_all_info(self): - with Capturing() as output: - self._test_export_all_info() - - def _test_export_all_info(self): - """ - Unittests for the function export_all_info - """ - - test_pop = Population() - - test_pop.set(metallicity=0.02, verbosity=TEST_VERBOSITY) - test_pop.set(M_1=10) - test_pop.set(num_cores=2) - test_pop.set(data_dir=TMP_DIR) - - # datadir - settings_filename = test_pop.export_all_info(use_datadir=True) - self.assertTrue(os.path.isfile(settings_filename)) - with open(settings_filename, "r") as f: - all_info = json.loads(f.read()) - - # - self.assertIn("population_settings", all_info) - self.assertIn("binary_c_defaults", all_info) - self.assertIn("binary_c_version_info", all_info) - self.assertIn("binary_c_help_all", all_info) - - # - self.assertNotEqual(all_info["population_settings"], {}) - self.assertNotEqual(all_info["binary_c_defaults"], {}) - self.assertNotEqual(all_info["binary_c_version_info"], {}) - self.assertNotEqual(all_info["binary_c_help_all"], {}) - - # custom name - # datadir - settings_filename = test_pop.export_all_info( - use_datadir=False, - outfile=os.path.join(TMP_DIR, "example_settings.json"), - ) - self.assertTrue(os.path.isfile(settings_filename)) - with open(settings_filename, "r") as f: - all_info = json.loads(f.read()) - - # - self.assertIn("population_settings", all_info) - self.assertIn("binary_c_defaults", all_info) - self.assertIn("binary_c_version_info", all_info) - self.assertIn("binary_c_help_all", all_info) - - # - self.assertNotEqual(all_info["population_settings"], {}) - self.assertNotEqual(all_info["binary_c_defaults"], {}) - self.assertNotEqual(all_info["binary_c_version_info"], {}) - self.assertNotEqual(all_info["binary_c_help_all"], {}) - - # wrong filename - self.assertRaises( - ValueError, - test_pop.export_all_info, - use_datadir=False, - outfile=os.path.join(TMP_DIR, "example_settings.txt"), - ) - - def test__cleanup_defaults(self): - with Capturing() as output: - self._test__cleanup_defaults() - - def _test__cleanup_defaults(self): - """ - Unittests for the function _cleanup_defaults - """ - - test_pop = Population() - cleaned_up_defaults = test_pop._cleanup_defaults() - self.assertNotIn("help_all", cleaned_up_defaults) - - def test__increment_probtot(self): - with Capturing() as output: - self._test__increment_probtot() - - def _test__increment_probtot(self): - """ - Unittests for the function _increment_probtot - """ - - test_pop = Population() - test_pop._increment_probtot(0.5) - self.assertEqual(test_pop.grid_options["_probtot"], 0.5) - - def test__increment_count(self): - with Capturing() as output: - self._test__increment_count() - - def _test__increment_count(self): - """ - Unittests for the function _increment_probtot - """ - - test_pop = Population() - test_pop._increment_count() - self.assertEqual(test_pop.grid_options["_count"], 1) - - def test__dict_from_line_source_file(self): - with Capturing() as output: - self._test__dict_from_line_source_file() - - def _test__dict_from_line_source_file(self): - """ - Unittests for the function _dict_from_line_source_file - """ - - source_file = os.path.join(TMP_DIR, "example_source_file.txt") - - # write - with open(source_file, "w") as f: - f.write("binary_c M_1 10 metallicity 0.02\n") - - test_pop = Population() - - # readout - with open(source_file, "r") as f: - for line in f.readlines(): - argdict = test_pop._dict_from_line_source_file(line) - - self.assertTrue(argdict["M_1"] == 10) - self.assertTrue(argdict["metallicity"] == 0.02) - - def test_evolve_single(self): - with Capturing() as output: - self._test_evolve_single() - - def _test_evolve_single(self): - """ - Unittests for the function evolve_single - """ - - CUSTOM_LOGGING_STRING_MASSES = """ - Printf("TEST_CUSTOM_LOGGING_1 %30.12e %g %g %g %g\\n", - // - stardata->model.time, // 1 - - // masses - stardata->common.zero_age.mass[0], // - stardata->common.zero_age.mass[1], // - - stardata->star[0].mass, - stardata->star[1].mass - ); - """ - - test_pop = Population() - test_pop.set( - M_1=10, - M_2=5, - orbital_period=100000, - metallicty=0.02, - max_evolution_time=15000, - verbosity=TEST_VERBOSITY, - ) - - test_pop.set(C_logging_code=CUSTOM_LOGGING_STRING_MASSES) - - output = test_pop.evolve_single() - - # - self.assertTrue(len(output.splitlines()) > 1) - self.assertIn("TEST_CUSTOM_LOGGING_1", output) - - # - custom_logging_dict = {"TEST_CUSTOM_LOGGING_2": ["star[0].mass", "model.time"]} - test_pop_2 = Population() - test_pop_2.set( - M_1=10, - M_2=5, - orbital_period=100000, - metallicty=0.02, - max_evolution_time=15000, - verbosity=TEST_VERBOSITY, - ) - - test_pop_2.set(C_auto_logging=custom_logging_dict) - - output_2 = test_pop_2.evolve_single() - - # - self.assertTrue(len(output_2.splitlines()) > 1) - self.assertIn("TEST_CUSTOM_LOGGING_2", output_2) - - -class test_grid_evolve(unittest.TestCase): - """ - Unittests for function Population.evolve() - """ - - def test_grid_evolve_1_thread(self): - with Capturing() as output: - self._test_grid_evolve_1_thread() - - def _test_grid_evolve_1_thread(self): - """ - Unittests to see if 1 thread does all the systems - """ - - test_pop_evolve_1_thread = Population() - test_pop_evolve_1_thread.set( - num_cores=1, M_2=1, orbital_period=100000, verbosity=TEST_VERBOSITY - ) - - resolution = {"M_1": 10} - - test_pop_evolve_1_thread.add_grid_variable( - name="lnm1", - longname="Primary mass", - valuerange=[1, 100], - samplerfunc="const(math.log(1), math.log(100), {})".format( - resolution["M_1"] - ), - precode="M_1=math.exp(lnm1)", - probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", - dphasevol="dlnm1", - parameter_name="M_1", - condition="", # Impose a condition on this grid variable. Mostly for a check for yourself - ) - - analytics = test_pop_evolve_1_thread.evolve() - self.assertLess( - np.abs(analytics["total_probability"] - 0.10820655287892997), - 1e-10, - msg=analytics["total_probability"], - ) - self.assertTrue(analytics["total_count"] == 10) - - def test_grid_evolve_2_threads(self): - with Capturing() as output: - self._test_grid_evolve_2_threads() - - def _test_grid_evolve_2_threads(self): - """ - Unittests to see if multiple threads handle the all the systems correctly - """ - - test_pop = Population() - test_pop.set( - num_cores=2, M_2=1, orbital_period=100000, verbosity=TEST_VERBOSITY - ) - - resolution = {"M_1": 10} - - test_pop.add_grid_variable( - name="lnm1", - longname="Primary mass", - valuerange=[1, 100], - samplerfunc="const(math.log(1), math.log(100), {})".format( - resolution["M_1"] - ), - precode="M_1=math.exp(lnm1)", - probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", - dphasevol="dlnm1", - parameter_name="M_1", - condition="", # Impose a condition on this grid variable. Mostly for a check for yourself - ) - - analytics = test_pop.evolve() - self.assertLess( - np.abs(analytics["total_probability"] - 0.10820655287892997), - 1e-10, - msg=analytics["total_probability"], - ) # - self.assertTrue(analytics["total_count"] == 10) - - def test_grid_evolve_2_threads_with_custom_logging(self): - with Capturing() as output: - self._test_grid_evolve_2_threads_with_custom_logging() - - def _test_grid_evolve_2_threads_with_custom_logging(self): - """ - Unittests to see if multiple threads do the custom logging correctly - """ - - data_dir_value = os.path.join(TMP_DIR, "grid_tests") - num_cores_value = 2 - custom_logging_string = 'Printf("MY_STELLAR_DATA_TEST_EXAMPLE %g %g %g %g\\n",((double)stardata->model.time),((double)stardata->star[0].mass),((double)stardata->model.probability),((double)stardata->model.dt));' - - test_pop = Population() - - test_pop.set( - num_cores=num_cores_value, - verbosity=TEST_VERBOSITY, - M_2=1, - orbital_period=100000, - data_dir=data_dir_value, - C_logging_code=custom_logging_string, # input it like this. - parse_function=parse_function_test_grid_evolve_2_threads_with_custom_logging, - ) - test_pop.set(ensemble=0) - resolution = {"M_1": 2} - - test_pop.add_grid_variable( - name="lnm1", - longname="Primary mass", - valuerange=[1, 100], - samplerfunc="const(math.log(1), math.log(100), {})".format( - resolution["M_1"] - ), - precode="M_1=math.exp(lnm1)", - probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", - dphasevol="dlnm1", - parameter_name="M_1", - condition="", # Impose a condition on this grid variable. Mostly for a check for yourself - ) - - analytics = test_pop.evolve() - output_names = [ - os.path.join( - data_dir_value, - "test_grid_evolve_2_threads_with_custom_logging_outputfile_population_{}_thread_{}.dat".format( - analytics["population_name"], thread_id - ), - ) - for thread_id in range(num_cores_value) - ] - - for output_name in output_names: - self.assertTrue(os.path.isfile(output_name)) - - with open(output_name, "r") as f: - output_string = f.read() - - self.assertIn("MY_STELLAR_DATA_TEST_EXAMPLE", output_string) - - remove_file(output_name) - - def test_grid_evolve_with_condition_error(self): - with Capturing() as output: - self._test_grid_evolve_with_condition_error() - - def _test_grid_evolve_with_condition_error(self): - """ - Unittests to see if the threads catch the errors correctly. - """ - - test_pop = Population() - test_pop.set( - num_cores=2, M_2=1, orbital_period=100000, verbosity=TEST_VERBOSITY - ) - - # Set the amt of failed systems that each thread will log - test_pop.set(failed_systems_threshold=4) - - CUSTOM_LOGGING_STRING_WITH_EXIT = """ -Exit_binary_c(BINARY_C_NORMAL_EXIT, "testing exits. This is part of the testing, don't worry"); -Printf("TEST_CUSTOM_LOGGING_1 %30.12e %g %g %g %g\\n", - // - stardata->model.time, // 1 - - // masses - stardata->common.zero_age.mass[0], // - stardata->common.zero_age.mass[1], // - - stardata->star[0].mass, - stardata->star[1].mass -); - """ - - test_pop.set(C_logging_code=CUSTOM_LOGGING_STRING_WITH_EXIT) - - resolution = {"M_1": 10} - test_pop.add_grid_variable( - name="lnm1", - longname="Primary mass", - valuerange=[1, 100], - samplerfunc="const(math.log(1), math.log(100), {})".format( - resolution["M_1"] - ), - precode="M_1=math.exp(lnm1)", - probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", - dphasevol="dlnm1", - parameter_name="M_1", - condition="", # Impose a condition on this grid variable. Mostly for a check for yourself - ) - - analytics = test_pop.evolve() - self.assertLess( - np.abs(analytics["total_probability"] - 0.10820655287892997), - 1e-10, - msg=analytics["total_probability"], - ) # - self.assertEqual(analytics["failed_systems_error_codes"], [0]) - self.assertTrue(analytics["total_count"] == 10) - self.assertTrue(analytics["failed_count"] == 10) - self.assertTrue(analytics["errors_found"] == True) - self.assertTrue(analytics["errors_exceeded"] == True) - - # test to see if 1 thread does all the systems - - test_pop = Population() - test_pop.set( - num_cores=2, M_2=1, orbital_period=100000, verbosity=TEST_VERBOSITY - ) - test_pop.set(failed_systems_threshold=4) - test_pop.set(C_logging_code=CUSTOM_LOGGING_STRING_WITH_EXIT) - - resolution = {"M_1": 10, "q": 2} - - test_pop.add_grid_variable( - name="lnm1", - longname="Primary mass", - valuerange=[1, 100], - samplerfunc="const(math.log(1), math.log(100), {})".format( - resolution["M_1"] - ), - precode="M_1=math.exp(lnm1)", - probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", - dphasevol="dlnm1", - parameter_name="M_1", - condition="", # Impose a condition on this grid variable. Mostly for a check for yourself - ) - - test_pop.add_grid_variable( - name="q", - longname="Mass ratio", - valuerange=["0.1/M_1", 1], - samplerfunc="const(0.1/M_1, 1, {})".format(resolution["q"]), - probdist="flatsections(q, [{'min': 0.1/M_1, 'max': 1.0, 'height': 1}])", - dphasevol="dq", - precode="M_2 = q * M_1", - parameter_name="M_2", - # condition="M_1 in dir()", # Impose a condition on this grid variable. Mostly for a check for yourself - condition="'random_var' in dir()", # This will raise an error because random_var is not defined. - ) - - # TODO: why should it raise this error? It should probably raise a valueerror when the limit is exceeded right? - # DEcided to turn it off for now because there is not raise VAlueError in that chain of functions. - # NOTE: Found out why this test was here. It is to do with the condition random_var in dir(), but I changed the behaviour from raising an error to continue. This has to do with the moe&distefano code that will loop over several multiplicities - # TODO: make sure the continue behaviour is what we actually want. - - # self.assertRaises(ValueError, test_pop.evolve) - - def test_grid_evolve_no_grid_variables(self): - with Capturing() as output: - self._test_grid_evolve_no_grid_variables() - - def _test_grid_evolve_no_grid_variables(self): - """ - Unittests to see if errors are raised if there are no grid variables - """ - - test_pop = Population() - test_pop.set( - num_cores=1, M_2=1, orbital_period=100000, verbosity=TEST_VERBOSITY - ) - - resolution = {"M_1": 10} - self.assertRaises(ValueError, test_pop.evolve) - - def test_grid_evolve_2_threads_with_ensemble_direct_output(self): - with Capturing() as output: - self._test_grid_evolve_2_threads_with_ensemble_direct_output() - - def _test_grid_evolve_2_threads_with_ensemble_direct_output(self): - """ - Unittests to see if multiple threads output the ensemble information to files correctly - """ - - data_dir_value = TMP_DIR - num_cores_value = 2 - - test_pop = Population() - test_pop.set( - num_cores=num_cores_value, - verbosity=TEST_VERBOSITY, - M_2=1, - orbital_period=100000, - ensemble=1, - ensemble_defer=1, - ensemble_filters_off=1, - ensemble_filter_STELLAR_TYPE_COUNTS=1, - ensemble_dt=1000, - ) - test_pop.set( - data_dir=TMP_DIR, - ensemble_output_name="ensemble_output.json", - combine_ensemble_with_thread_joining=False, - ) - - resolution = {"M_1": 10} - - test_pop.add_grid_variable( - name="lnm1", - longname="Primary mass", - valuerange=[1, 100], - samplerfunc="const(math.log(1), math.log(100), {})".format( - resolution["M_1"] - ), - precode="M_1=math.exp(lnm1)", - probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", - dphasevol="dlnm1", - parameter_name="M_1", - condition="", # Impose a condition on this grid variable. Mostly for a check for yourself - ) - - analytics = test_pop.evolve() - output_names = [ - os.path.join( - data_dir_value, - "ensemble_output_{}_{}.json".format( - analytics["population_name"], thread_id - ), - ) - for thread_id in range(num_cores_value) - ] - - for output_name in output_names: - self.assertTrue(os.path.isfile(output_name)) - - with open(output_name, "r") as f: - file_content = f.read() - - ensemble_json = json.loads(file_content) - - self.assertTrue(isinstance(ensemble_json, dict)) - self.assertNotEqual(ensemble_json, {}) - - self.assertIn("number_counts", ensemble_json) - self.assertNotEqual(ensemble_json["number_counts"], {}) - - def test_grid_evolve_2_threads_with_ensemble_combining(self): - with Capturing() as output: - self._test_grid_evolve_2_threads_with_ensemble_combining() - - def _test_grid_evolve_2_threads_with_ensemble_combining(self): - """ - Unittests to see if multiple threads correclty combine the ensemble data and store them in the grid - """ - - data_dir_value = TMP_DIR - num_cores_value = 2 - - test_pop = Population() - test_pop.set( - num_cores=num_cores_value, - verbosity=TEST_VERBOSITY, - M_2=1, - orbital_period=100000, - ensemble=1, - ensemble_defer=1, - ensemble_filters_off=1, - ensemble_filter_STELLAR_TYPE_COUNTS=1, - ensemble_dt=1000, - ) - test_pop.set( - data_dir=TMP_DIR, - combine_ensemble_with_thread_joining=True, - ensemble_output_name="ensemble_output.json", - ) - - resolution = {"M_1": 10} - - test_pop.add_grid_variable( - name="lnm1", - longname="Primary mass", - valuerange=[1, 100], - samplerfunc="const(math.log(1), math.log(100), {})".format( - resolution["M_1"] - ), - precode="M_1=math.exp(lnm1)", - probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", - dphasevol="dlnm1", - parameter_name="M_1", - condition="", # Impose a condition on this grid variable. Mostly for a check for yourself - ) - - analytics = test_pop.evolve() - - self.assertTrue(isinstance(test_pop.grid_ensemble_results["ensemble"], dict)) - self.assertNotEqual(test_pop.grid_ensemble_results["ensemble"], {}) - - self.assertIn("number_counts", test_pop.grid_ensemble_results["ensemble"]) - self.assertNotEqual( - test_pop.grid_ensemble_results["ensemble"]["number_counts"], {} - ) - - def test_grid_evolve_2_threads_with_ensemble_comparing_two_methods(self): - with Capturing() as output: - self._test_grid_evolve_2_threads_with_ensemble_comparing_two_methods() - - def _test_grid_evolve_2_threads_with_ensemble_comparing_two_methods(self): - """ - Unittests to compare the method of storing the combined ensemble data in the object and writing them to files and combining them later. they have to be the same - """ - - data_dir_value = TMP_DIR - num_cores_value = 2 - - # First - test_pop_1 = Population() - test_pop_1.set( - num_cores=num_cores_value, - verbosity=TEST_VERBOSITY, - M_2=1, - orbital_period=100000, - ensemble=1, - ensemble_defer=1, - ensemble_filters_off=1, - ensemble_filter_STELLAR_TYPE_COUNTS=1, - ensemble_dt=1000, - ) - test_pop_1.set( - data_dir=TMP_DIR, - combine_ensemble_with_thread_joining=True, - ensemble_output_name="ensemble_output.json", - ) - - resolution = {"M_1": 10} - - test_pop_1.add_grid_variable( - name="lnm1", - longname="Primary mass", - valuerange=[1, 100], - samplerfunc="const(math.log(1), math.log(100), {})".format( - resolution["M_1"] - ), - precode="M_1=math.exp(lnm1)", - probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", - dphasevol="dlnm1", - parameter_name="M_1", - condition="", # Impose a condition on this grid variable. Mostly for a check for yourself - ) - - analytics_1 = test_pop_1.evolve() - ensemble_output_1 = test_pop_1.grid_ensemble_results - - # second - test_pop_2 = Population() - test_pop_2.set( - num_cores=num_cores_value, - verbosity=TEST_VERBOSITY, - M_2=1, - orbital_period=100000, - ensemble=1, - ensemble_defer=1, - ensemble_filters_off=1, - ensemble_filter_STELLAR_TYPE_COUNTS=1, - ensemble_dt=1000, - ) - test_pop_2.set( - data_dir=TMP_DIR, - ensemble_output_name="ensemble_output.json", - combine_ensemble_with_thread_joining=False, - ) - - resolution = {"M_1": 10} - - test_pop_2.add_grid_variable( - name="lnm1", - longname="Primary mass", - valuerange=[1, 100], - samplerfunc="const(math.log(1), math.log(100), {})".format( - resolution["M_1"] - ), - precode="M_1=math.exp(lnm1)", - probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", - dphasevol="dlnm1", - parameter_name="M_1", - condition="", # Impose a condition on this grid variable. Mostly for a check for yourself - ) - - analytics_2 = test_pop_2.evolve() - output_names_2 = [ - os.path.join( - data_dir_value, - "ensemble_output_{}_{}.json".format( - analytics_2["population_name"], thread_id - ), - ) - for thread_id in range(num_cores_value) - ] - ensemble_output_2 = {} - - for output_name in output_names_2: - self.assertTrue(os.path.isfile(output_name)) - - with open(output_name, "r") as f: - file_content = f.read() - - ensemble_json = json.loads(file_content) - - ensemble_output_2 = merge_dicts(ensemble_output_2, ensemble_json) - - for key in ensemble_output_1["ensemble"]["number_counts"]["stellar_type"]["0"]: - self.assertIn(key, ensemble_output_2["number_counts"]["stellar_type"]["0"]) - - # compare values - self.assertLess( - np.abs( - ensemble_output_1["ensemble"]["number_counts"]["stellar_type"]["0"][ - key - ] - - ensemble_output_2["number_counts"]["stellar_type"]["0"][key] - ), - 1e-8, - ) - - -def parse_function_adding_results(self, output): - """ - Example parse function - """ - - seperator = " " - - parameters = ["time", "mass", "zams_mass", "probability", "stellar_type"] - - self.grid_results["example"]["count"] += 1 - - # Go over the output. - for line in output.splitlines(): - headerline = line.split()[0] - - # CHeck the header and act accordingly - if headerline == "EXAMPLE_OUTPUT": - values = line.split()[1:] - - # Bin the mass probability - self.grid_results["example"]["mass"][ - bin_data(float(values[2]), binwidth=0.5) - ] += float(values[3]) - - # - if not len(parameters) == len(values): - print("Number of column names isnt equal to number of columns") - raise ValueError - - # record the probability of this line (Beware, this is meant to only be run once for each system. its a controls quantity) - self.grid_results["example"]["probability"] += float(values[3]) - - -class test_resultdict(unittest.TestCase): - """ - Unittests for bin_data - """ - - def test_adding_results(self): - """ - Function to test whether the results are properly added and combined - """ - - # Create custom logging statement - custom_logging_statement = """ - if (stardata->model.time < stardata->model.max_evolution_time) - { - Printf("EXAMPLE_OUTPUT %30.16e %g %g %30.12e %d\\n", - // - stardata->model.time, // 1 - stardata->star[0].mass, // 2 - stardata->common.zero_age.mass[0], // 3 - stardata->model.probability, // 4 - stardata->star[0].stellar_type // 5 - ); - }; - /* Kill the simulation to save time */ - stardata->model.max_evolution_time = stardata->model.time - stardata->model.dtm; - """ - - example_pop = Population() - example_pop.set(verbosity=0) - example_pop.set( - max_evolution_time=15000, # bse_options - # grid_options - num_cores=3, - tmp_dir=TMP_DIR, - # Custom options - data_dir=os.path.join(TMP_DIR, "test_resultdict"), # custom_options - C_logging_code=custom_logging_statement, - parse_function=parse_function_adding_results, - ) - - # Add grid variables - resolution = {"M_1": 10} - - # Mass - example_pop.add_grid_variable( - name="lnm1", - longname="Primary mass", - valuerange=[2, 150], - samplerfunc="const(math.log(2), math.log(150), {})".format( - resolution["M_1"] - ), - precode="M_1=math.exp(lnm1)", - probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 150, -1.3, -2.3, -2.3)*M_1", - dphasevol="dlnm1", - parameter_name="M_1", - condition="", # Impose a condition on this grid variable. Mostly for a check for yourself - ) - - ## Executing a population - ## This uses the values generated by the grid_variables - analytics = example_pop.evolve() - - # - grid_prob = analytics["total_probability"] - result_dict_prob = example_pop.grid_results["example"]["probability"] - - # amt systems - grid_count = analytics["total_count"] - result_dict_count = example_pop.grid_results["example"]["count"] - - # Check if the total probability matches - self.assertAlmostEqual( - grid_prob, - result_dict_prob, - places=12, - msg="Total probability from grid {} and from result dict {} are not equal".format( - grid_prob, result_dict_prob - ), - ) - - # Check if the total count matches - self.assertEqual( - grid_count, - result_dict_count, - msg="Total count from grid {} and from result dict {} are not equal".format( - grid_count, result_dict_count - ), - ) - - # Check if the structure is what we expect. Note: this depends on the probability calculation. if that changes we need to recalibrate this - test_case_dict = { - 2.25: 0.01895481306515, - 3.75: 0.01081338190204, - 5.75: 0.006168841009268, - 9.25: 0.003519213484031, - 13.75: 0.002007648361756, - 21.25: 0.001145327489437, - 33.25: 0.0006533888518775, - 50.75: 0.0003727466560393, - 78.25: 0.000212645301782, - 120.75: 0.0001213103421247, - } - - self.assertEqual( - test_case_dict, dict(example_pop.grid_results["example"]["mass"]) - ) - - -if __name__ == "__main__": - unittest.main()