From 581cd0029b6ff25eea34736400579b8841ee25f0 Mon Sep 17 00:00:00 2001 From: David Hendriks <davidhendriks93@gmail.com> Date: Mon, 11 Jan 2021 22:25:03 +0000 Subject: [PATCH] updated tests for ensemble, and for grid stuff --- binarycpython/tests/test_c_bindings.py | 431 ++++++------------ binarycpython/tests/test_functions.py | 17 +- binarycpython/tests/test_grid.py | 220 ++++++++- .../tests/test_grid_options_defaults.py | 6 +- binarycpython/tests/test_spacing_functions.py | 4 +- 5 files changed, 368 insertions(+), 310 deletions(-) diff --git a/binarycpython/tests/test_c_bindings.py b/binarycpython/tests/test_c_bindings.py index 5d97ffc91..942ae169c 100644 --- a/binarycpython/tests/test_c_bindings.py +++ b/binarycpython/tests/test_c_bindings.py @@ -4,6 +4,7 @@ import time import json import textwrap import unittest +import numpy as np from binarycpython import _binary_c_bindings @@ -13,13 +14,14 @@ from binarycpython.utils.functions import ( inspect_dict, merge_dicts, handle_ensemble_string_to_json, + verbose_print ) # https://docs.python.org/3/library/unittest.html TMP_DIR = temp_dir() os.makedirs(os.path.join(TMP_DIR, "test"), exist_ok=True) -#### +#### some useful functions def return_argstring( m1=15.0, m2=14.0, @@ -56,6 +58,28 @@ ensemble_filters_off {8} ensemble_filter_{9} 1 probability 0.1" return argstring +def extract_ensemble_json_from_string(binary_c_output): + """ + Function to extract the ensemble_json information from a raw binary_c output string + """ + + json = None + + try: + ensemble_jsons_strings = [ + line for line in binary_c_output.splitlines() if line.startswith("ENSEMBLE_JSON") + ] + + json = handle_ensemble_string_to_json( + ensemble_jsons_strings[0][len("ENSEMBLE_JSON ") :] + ) + + if len(ensemble_jsons_strings)>1: + verbose_print("Warning: There is more than one line starting with ENSEMBLE_JSON. Taking the first, but you should check this out.",1, 0) + except IndexError: + verbose_print("Error: Couldn't extract the ensemble information from the output string", 0, 0) + + return json ####################################################################################################################################################### ### General run_system test @@ -123,336 +147,189 @@ class test_return_store_memaddr(unittest.TestCase): class TestEnsemble(unittest.TestCase): + """ + Unittests for handling the ensemble outputs and adding those + """ + + def __init__(self, *args, **kwargs): + super(TestEnsemble, self).__init__(*args, **kwargs) + + def test_return_persistent_data_memaddr(self): + """ + Test case to check if the memory adress has been created succesfully + """ + + output = _binary_c_bindings.return_persistent_data_memaddr() + + self.assertIsInstance(output, int, msg="memory adress has to be an integer") + self.assertNotEqual( + output, 0, "memory adress seems not to have a correct value" + ) + def test_minimal_ensemble_output(self): """ - Tase case to check if the ensemble output is correctly written to the buffer instead of printed + test_case to check if the ensemble output is correctly output """ m1 = 2 # Msun m2 = 0.1 # Msun - ############################################################################################# - # The 2 runs below use the ensemble but do not defer the output to anything else, so that the - # results are returned directly after the run - # Direct output commands argstring_1 = return_argstring( - m1=m1, m2=m2, ensemble_filter="STELLAR_TYPE_COUNTS", defer_ensemble=0 + m1=m1, m2=m2, ensemble_filter="STELLAR_TYPE_COUNTS", defer_ensemble=0 # no defer to memory location. just output it ) - # Get outputs - output_1 = _binary_c_bindings.run_system(argstring=argstring_1) - test_ensemble_jsons_string = [ - line for line in output_1.splitlines() if line.startswith("ENSEMBLE_JSON") - ] - - test_json = handle_ensemble_string_to_json( - test_ensemble_jsons_string[0][len("ENSEMBLE_JSON ") :] - ) + output_1 = _binary_c_bindings.run_system(argstring=argstring_1) - # print(test_json.keys()) - # print(test_json) + # Check if the ENSEMBLE_JSON is uberhaubt in the output + self.assertIn("ENSEMBLE_JSON", output_1) - self.assertIsNotNone( - test_json, - msg="Ensemble output not correctly written passed to the buffer in _binary_c_bindings", - ) - self.assertIn("number_counts", test_json.keys()) + test_json = extract_ensemble_json_from_string(output_1) + self.assertIn("number_counts", test_json) + self.assertNotEqual(test_json["number_counts"], {}) - def test_return_persistent_data_memaddr(self): + def test_minimal_ensemble_output_defer(self): """ - Test case to check if the memory adress has been created succesfully + test_case to check if the ensemble output is correctly output, by using defer command and freeing+outputting """ - output = _binary_c_bindings.return_persistent_data_memaddr() - - print("function: test_run_system") - print("Binary_c output:") - print(textwrap.indent(str(output), "\t")) - - self.assertIsInstance(output, int, msg="memory adress has to be an integer") - self.assertNotEqual( - output, 0, "memory adress seems not to have a correct value" - ) - - def test_passing_persistent_data_to_run_system(self): - # Function to test the passing of the persistent data memoery adress, and having ensemble_defer = True - # We should see that the results of multiple systems have been added to the one output json - - # Make argstrings - argstring_1 = return_argstring(defer_ensemble=0) - argstring_1_deferred = return_argstring(defer_ensemble=1) - argstring_2 = return_argstring(defer_ensemble=0) + m1 = 2 # Msun + m2 = 0.1 # Msun - # persistent_data_memaddr = _binary_c_bindings.return_persistent_data_memaddr() - output_1 = _binary_c_bindings.run_system(argstring=argstring_1) - ensemble_jsons_1 = [ - line for line in output_1.splitlines() if line.startswith("ENSEMBLE_JSON") - ] - json_1 = handle_ensemble_string_to_json( - ensemble_jsons_1[0][len("ENSEMBLE_JSON ") :] + # Direct output commands + argstring_1 = return_argstring( + m1=m1, m2=m2, orbital_period=1000000000, ensemble_filter="STELLAR_TYPE_COUNTS", defer_ensemble=1 # no defer to memory location. just output it ) - # Doing 2 systems in a row. - output_1_deferred = _binary_c_bindings.run_system( - argstring=argstring_1_deferred, - persistent_data_memaddr=persistent_data_memaddr, - ) - output_2 = _binary_c_bindings.run_system( - argstring=argstring_2, persistent_data_memaddr=persistent_data_memaddr - ) - ensemble_jsons_2 = [ - line for line in output_2.splitlines() if line.startswith("ENSEMBLE_JSON") - ] - json_2 = handle_ensemble_string_to_json( - ensemble_jsons_2[0][len("ENSEMBLE_JSON ") :] - ) + output_1 = _binary_c_bindings.run_system(argstring=argstring_1, persistent_data_memaddr=persistent_data_memaddr) - # Doing system one again. - output_1_again = _binary_c_bindings.run_system(argstring=argstring_1) - ensemble_jsons_1 = [ - line - for line in output_1_again.splitlines() - if line.startswith("ENSEMBLE_JSON") - ] - json_1_again = handle_ensemble_string_to_json( - ensemble_jsons_1[0][len("ENSEMBLE_JSON ") :] - ) + # + self.assertNotIn('ENSEMBLE_JSON', output_1) - self.assertEqual( - json_1, - json_1_again, - msg="The system with the same initial settings did not give the same output", - ) - self.assertNotEqual( - json_1, - json_2, - msg="The output of the deferred two systems should not be the same as the first undeferred output", + # free memory and output the stuff. + raw_json_output = _binary_c_bindings.free_persistent_data_memaddr_and_return_json_output( + persistent_data_memaddr ) + ensemble_json_output = extract_ensemble_json_from_string(raw_json_output) - def test_adding_ensemble_output(self): + self.assertIn("number_counts", ensemble_json_output) + self.assertNotEqual(ensemble_json_output["number_counts"], {}) + + def test_add_ensembles_direct(self): """ - Function that adds the output of 2 ensembles and compares it to the output that we get by deferring the first output + test_case to check if adding the ensemble outputs works. Many things should be caught by tests in the merge_dict test, but still good to test a bit here """ m1 = 2 # Msun m2 = 0.1 # Msun - extra_mass = 10 - - ############################################################################################# - # The 2 runs below use the ensemble but do not defer the output to anything else, so that the - # results are returned directly after the run # Direct output commands argstring_1 = return_argstring( - m1=m1, m2=m2, ensemble_filter="STELLAR_TYPE_COUNTS", defer_ensemble=0 + m1=m1, m2=m2, orbital_period=1000000000, ensemble_filter="STELLAR_TYPE_COUNTS" # no defer to memory location. just output it ) argstring_2 = return_argstring( - m1=m1 + extra_mass, - m2=m2, - ensemble_filter="STELLAR_TYPE_COUNTS", - defer_ensemble=0, + m1=10, m2=m2, orbital_period=1000000000, ensemble_filter="STELLAR_TYPE_COUNTS" # no defer to memory location. just output it ) - # Get outputs + # output_1 = _binary_c_bindings.run_system(argstring=argstring_1) output_2 = _binary_c_bindings.run_system(argstring=argstring_2) - test_1_ensemble_jsons_1 = [ - line for line in output_1.splitlines() if line.startswith("ENSEMBLE_JSON") - ] - test_1_ensemble_jsons_2 = [ - line for line in output_2.splitlines() if line.startswith("ENSEMBLE_JSON") - ] + # + output_json_1 = extract_ensemble_json_from_string(output_1) + output_json_2 = extract_ensemble_json_from_string(output_2) - test_1_json_1 = handle_ensemble_string_to_json( - test_1_ensemble_jsons_1[0][len("ENSEMBLE_JSON ") :] - ) - test_1_json_2 = handle_ensemble_string_to_json( - test_1_ensemble_jsons_2[0][len("ENSEMBLE_JSON ") :] - ) - - test_1_merged_dict = merge_dicts(test_1_json_1, test_1_json_2) + # + merged_dict = merge_dicts(output_json_1, output_json_2) - with open(os.path.join(TMP_DIR, "test", "adding_json_1.json"), "w") as file: - file.write(json.dumps(test_1_json_1, indent=4)) - with open(os.path.join(TMP_DIR, "test", "adding_json_2.json"), "w") as file: - file.write(json.dumps(test_1_json_2, indent=4)) - with open( - os.path.join(TMP_DIR, "test", "adding_json_merged.json"), "w" - ) as file: - file.write(json.dumps(test_1_json_2, indent=4)) + self.assertIn('number_counts', merged_dict) + self.assertIn('stellar_type', merged_dict['number_counts']) - print("Single runs done\n") + for key in output_json_1['number_counts']['stellar_type']['0']: + self.assertIn(key, merged_dict['number_counts']['stellar_type']['0']) - ############################################################################################# - # The 2 runs below use the ensemble and both defer the output so that after they are finished - # nothing is printed. After that we explicitly free the memory of the persistent_data and - # have the output returned in that way + for key in output_json_2['number_counts']['stellar_type']['0']: + self.assertIn(key, merged_dict['number_counts']['stellar_type']['0']) - # Deferred commands - argstring_1_deferred = return_argstring( - m1=m1, m2=m2, ensemble_filter="STELLAR_TYPE_COUNTS", defer_ensemble=1 - ) - argstring_2_deferred = return_argstring( - m1=m1 + extra_mass, - m2=m2, - ensemble_filter="STELLAR_TYPE_COUNTS", - defer_ensemble=1, - ) + # compare stuff: + self.assertLess(np.abs(output_json_1['number_counts']['stellar_type']['0']['CHeB'] + output_json_2['number_counts']['stellar_type']['0']['CHeB'] - merged_dict['number_counts']['stellar_type']['0']['CHeB']), 1e-10) + self.assertLess(np.abs(output_json_1['number_counts']['stellar_type']['0']['MS'] + output_json_2['number_counts']['stellar_type']['0']['MS'] - merged_dict['number_counts']['stellar_type']['0']['MS']), 1e-10) - # Get a memory location - test_2_persistent_data_memaddr = ( - _binary_c_bindings.return_persistent_data_memaddr() - ) + def test_compare_added_systems_with_double_deferred_systems(self): + # test to run 2 systems without deferring, and merging them manually. Then run 2 systems with defer and then output them. - # Run the systems and defer the output each time - _ = _binary_c_bindings.run_system( - argstring=argstring_1_deferred, - persistent_data_memaddr=test_2_persistent_data_memaddr, - ) - _ = _binary_c_bindings.run_system( - argstring=argstring_2_deferred, - persistent_data_memaddr=test_2_persistent_data_memaddr, - ) + m1 = 2 # Msun + m2 = 0.1 # Msun - # Have the persistent_memory adress be released and have the json outputted - test_2_output = ( - _binary_c_bindings.free_persistent_data_memaddr_and_return_json_output( - test_2_persistent_data_memaddr - ) + # Direct output commands + argstring_1 = return_argstring( + m1=m1, m2=m2, orbital_period=1000000000, ensemble_filter="STELLAR_TYPE_COUNTS" # no defer to memory location. just output it ) - test_2_ensemble_json = [ - line - for line in test_2_output.splitlines() - if line.startswith("ENSEMBLE_JSON") - ] - test_2_json = handle_ensemble_string_to_json( - test_2_ensemble_json[0][len("ENSEMBLE_JSON ") :] + argstring_2 = return_argstring( + m1=10, m2=m2, orbital_period=1000000000, ensemble_filter="STELLAR_TYPE_COUNTS" # no defer to memory location. just output it ) - with open( - os.path.join(TMP_DIR, "test", "adding_json_deferred.json"), "w" - ) as file: - file.write(json.dumps(test_2_json, indent=4)) + # + output_1 = _binary_c_bindings.run_system(argstring=argstring_1) + output_2 = _binary_c_bindings.run_system(argstring=argstring_2) - print("Double deferred done\n") + # + output_json_1 = extract_ensemble_json_from_string(output_1) + output_json_2 = extract_ensemble_json_from_string(output_2) - ############################################################################################# - # The 2 runs below use the ensemble and the first one defers the output to the memory, - # Then the second one uses that memory to combine its results with, but doesn't defer the - # data after that, so it will print it after the second run is done + # + merged_dict = merge_dicts(output_json_1, output_json_2) - test_3_persistent_data_memaddr = ( - _binary_c_bindings.return_persistent_data_memaddr() - ) + ############################### + # Deferred setup + persistent_data_memaddr = _binary_c_bindings.return_persistent_data_memaddr() - # Run the systems and defer the output once and the second time not, so that the second run - # automatically prints out the results - _ = _binary_c_bindings.run_system( - argstring=argstring_1_deferred, - persistent_data_memaddr=test_3_persistent_data_memaddr, - ) - test_3_output_2 = _binary_c_bindings.run_system( - argstring=argstring_2, - persistent_data_memaddr=test_3_persistent_data_memaddr, + argstring_1_deferred = return_argstring( + m1=m1, m2=m2, orbital_period=1000000000, ensemble_filter="STELLAR_TYPE_COUNTS", defer_ensemble=1 # no defer to memory location. just output it ) - test_3_ensemble_jsons = [ - line - for line in test_3_output_2.splitlines() - if line.startswith("ENSEMBLE_JSON") - ] - test_3_json = handle_ensemble_string_to_json( - test_3_ensemble_jsons[0][len("ENSEMBLE_JSON ") :] + argstring_2_deferred = return_argstring( + m1=10, m2=m2, orbital_period=1000000000, ensemble_filter="STELLAR_TYPE_COUNTS", defer_ensemble=1 # no defer to memory location. just output it ) - with open( - os.path.join(TMP_DIR, "test", "adding_json_deferred_and_output.json"), "w" - ) as f: - f.write(json.dumps(test_3_json, indent=4)) - - print("Single deferred done\n") + # run + _ = _binary_c_bindings.run_system(argstring=argstring_1_deferred, persistent_data_memaddr=persistent_data_memaddr) + _ = _binary_c_bindings.run_system(argstring=argstring_2_deferred, persistent_data_memaddr=persistent_data_memaddr) - # - assert_message_1 = """ - The structure of the manually merged is not the same as the merged by double deferring - """ - assert_message_2 = """ - The structure of the manually merged is not the same as the merged by deferring once - and output on the second run - """ + # output + raw_json_output = _binary_c_bindings.free_persistent_data_memaddr_and_return_json_output( + persistent_data_memaddr + ) + ensemble_json_output = extract_ensemble_json_from_string(raw_json_output) - # - # print(json.dumps(test_1_merged_dict, indent=4)) - # print(json.dumps(test_2_json, indent=4)) + # CHeck all keys are present + for key in merged_dict['number_counts']['stellar_type']['0']: + self.assertIn(key, ensemble_json_output['number_counts']['stellar_type']['0']) - # TODO: add more asserts. - # - self.assertEqual( - inspect_dict(test_1_merged_dict, print_structure=False), - inspect_dict(test_2_json, print_structure=False), - msg=assert_message_1, - ) - # assert inspect_dict(test_1_merged_dict, print_structure=False) == inspect_dict(test_3_json, print_structure=False), assert_message_2 + # Check if they are of the same value + for key in merged_dict['number_counts']['stellar_type']['0']: + self.assertLess(np.abs(merged_dict['number_counts']['stellar_type']['0'][key]-ensemble_json_output['number_counts']['stellar_type']['0'][key]), 1e-10) - def test_free_and_json_output(self): + def test_combine_with_empty_json(self): """ - Function that tests the freeing of the memory adress and the output of the json + Test for merging with an empty dict """ m1 = 2 # Msun m2 = 0.1 # Msun - # Get argstring: argstring_1 = return_argstring( - m1=m2, m2=m2, ensemble_filter="STELLAR_TYPE_COUNTS", defer_ensemble=1 + m1=m1, m2=m2, orbital_period=1000000000, ensemble_filter="STELLAR_TYPE_COUNTS" # no defer to memory location. just output it ) - # Get a memory adress: - persistent_data_memaddr = _binary_c_bindings.return_persistent_data_memaddr() - - # Evolve and defer output - output_1_deferred = _binary_c_bindings.run_system( - argstring=argstring_1, persistent_data_memaddr=persistent_data_memaddr - ) - - # Free memory adress - json_output_by_freeing = ( - _binary_c_bindings.free_persistent_data_memaddr_and_return_json_output( - persistent_data_memaddr - ) - ) - # print(textwrap.indent(str(json_output_by_freeing), "\t")) - - parsed_json = handle_ensemble_string_to_json( - json_output_by_freeing.splitlines()[0][len("ENSEMBLE_JSON ") :], - ) - - self.assertIn( - "number_counts", - parsed_json.keys(), - msg="Output not correct. 'number_counts' not part of the keys", - ) - - def test_combine_with_empty_json(self): - """ - Test for merging with an empty dict - """ - - argstring_1 = return_argstring(defer_ensemble=0) output_1 = _binary_c_bindings.run_system(argstring=argstring_1) - ensemble_jsons_1 = [ - line for line in output_1.splitlines() if line.startswith("ENSEMBLE_JSON") - ] - json_1 = handle_ensemble_string_to_json( - ensemble_jsons_1[0][len("ENSEMBLE_JSON ") :] - ) + + output_json_1 = extract_ensemble_json_from_string(output_1) assert_message = "combining output json with empty dict should give same result as initial json" - self.assertEqual(merge_dicts(json_1, {}), json_1, assert_message) + self.assertEqual(merge_dicts(output_json_1, {}), output_json_1, assert_message) ############# def _test_full_ensemble_output(self): @@ -461,16 +338,18 @@ class TestEnsemble(unittest.TestCase): TODO: put this one back """ + m1 = 2 # Msun + m2 = 0.1 # Msun + + argstring_1 = return_argstring( + m1=m1, m2=m2, orbital_period=1000000000, ensemble_filter=0 + ) argstring_1 = return_argstring(defer_ensemble=0, ensemble_filters_off=0) - # print(argstring_1) output_1 = _binary_c_bindings.run_system(argstring=argstring_1) - ensemble_jsons_1 = [ - line for line in output_1.splitlines() if line.startswith("ENSEMBLE_JSON") - ] - json_1 = handle_ensemble_string_to_json( - ensemble_jsons_1[0][len("ENSEMBLE_JSON ") :] - ) + + # + output_json_1 = extract_ensemble_json_from_string(output_1) keys = json_1.keys() @@ -482,31 +361,9 @@ class TestEnsemble(unittest.TestCase): self.assertIn("distributions", keys) self.assertIn("scalars", keys) - -def all(): - test_run_system() - test_return_store_memaddr() - test_unload_store_memaddr() - test_minimal_ensemble_output() - test_return_persistent_data_memaddr() - test_passing_persistent_data_to_run_system() - test_full_ensemble_output() - test_adding_ensemble_output() - test_free_and_json_output() - test_combine_with_empty_json() - - -# #### -# if __name__ == "__main__": -# # test_minimal_ensemble_output() -# # test_return_persistent_data_memaddr() -# # test_passing_persistent_data_to_run_system() -# # test_full_ensemble_output() -# # test_adding_ensemble_output() -# # test_free_and_json_output() -# # test_combine_with_empty_json() -# all() -# print("Done") +####################################################################################################################################################### +### ensemble tests +####################################################################################################################################################### if __name__ == "__main__": unittest.main() diff --git a/binarycpython/tests/test_functions.py b/binarycpython/tests/test_functions.py index 83a5dd7e1..50dfd7966 100644 --- a/binarycpython/tests/test_functions.py +++ b/binarycpython/tests/test_functions.py @@ -141,22 +141,23 @@ class test_parse_binary_c_version_info(unittest.TestCase): parsed_info = parse_binary_c_version_info(info) self.assertIn("isotopes", parsed_info.keys()) - self.assertNotEqual(parsed_info["isotopes"], {}) self.assertIn("argpairs", parsed_info.keys()) - self.assertNotEqual(parsed_info["argpairs"], {}) self.assertIn("ensembles", parsed_info.keys()) - self.assertNotEqual(parsed_info["ensembles"], {}) self.assertIn("macros", parsed_info.keys()) - self.assertNotEqual(parsed_info["macros"], {}) self.assertIn("elements", parsed_info.keys()) - self.assertNotEqual(parsed_info["elements"], {}) self.assertIn("dt_limits", parsed_info.keys()) - self.assertNotEqual(parsed_info["dt_limits"], {}) self.assertIn("nucleosynthesis_sources", parsed_info.keys()) - self.assertNotEqual(parsed_info["nucleosynthesis_sources"], {}) self.assertIn("miscellaneous", parsed_info.keys()) - self.assertNotEqual(parsed_info["miscellaneous"], {}) + self.assertIsNotNone(parsed_info["argpairs"]) + self.assertIsNotNone(parsed_info["ensembles"]) + self.assertIsNotNone(parsed_info["macros"]) + self.assertIsNotNone(parsed_info["dt_limits"]) + self.assertIsNotNone(parsed_info["miscellaneous"]) + + if parsed_info['miscellaneous']['NUCSYN'] == 'on': + self.assertIsNotNone(parsed_info["isotopes"]) + self.assertIsNotNone(parsed_info["nucleosynthesis_sources"]) class test_output_lines(unittest.TestCase): """ diff --git a/binarycpython/tests/test_grid.py b/binarycpython/tests/test_grid.py index 5290067ef..b3990afae 100644 --- a/binarycpython/tests/test_grid.py +++ b/binarycpython/tests/test_grid.py @@ -11,6 +11,7 @@ import json import unittest import tempfile import datetime +import numpy as np from binarycpython.utils.grid import Population from binarycpython.utils.functions import temp_dir @@ -205,14 +206,18 @@ class test_Population(unittest.TestCase): self.assertIn("nucleosynthesis_sources", binary_c_version_info) self.assertIn("miscellaneous", binary_c_version_info) - self.assertIsNotNone(binary_c_version_info["isotopes"]) self.assertIsNotNone(binary_c_version_info["argpairs"]) self.assertIsNotNone(binary_c_version_info["ensembles"]) self.assertIsNotNone(binary_c_version_info["macros"]) self.assertIsNotNone(binary_c_version_info["dt_limits"]) - self.assertIsNotNone(binary_c_version_info["nucleosynthesis_sources"]) self.assertIsNotNone(binary_c_version_info["miscellaneous"]) + if binary_c_version_info['miscellaneous']['NUCSYN'] == 'on': + self.assertIsNotNone(binary_c_version_info["isotopes"]) + self.assertIsNotNone(binary_c_version_info["nucleosynthesis_sources"]) + + + def test__return_binary_c_defaults(self): """ Unittests for the function _return_binary_c_defaults @@ -357,7 +362,7 @@ class test_Population(unittest.TestCase): """ CUSTOM_LOGGING_STRING_MASSES = """ - Printf("MASS_PLOTTING %30.12e %g %g %g %g\\n", + Printf("TEST_CUSTOM_LOGGING_1 %30.12e %g %g %g %g\\n", // stardata->model.time, // 1 @@ -373,15 +378,214 @@ class test_Population(unittest.TestCase): test_pop = Population() test_pop.set(M_1=10, M_2=5, orbital_period=100000, metallicty=0.02, max_evolution_time = 15000) - custom_logging_code = binary_c_log_code( - CUSTOM_LOGGING_STRING_MASSES - ) - test_pop.set(custom_logging_code=custom_logging_code) + test_pop.set(C_logging_code=CUSTOM_LOGGING_STRING_MASSES) output = test_pop.evolve_single() - print(output) + # + self.assertTrue(len(output.splitlines())>1) + self.assertIn('TEST_CUSTOM_LOGGING_1', output) + + + # + custom_logging_dict = { + 'TEST_CUSTOM_LOGGING_2': ['star[0].mass', 'model.time'] + } + test_pop_2 = Population() + test_pop_2.set(M_1=10, M_2=5, orbital_period=100000, metallicty=0.02, max_evolution_time = 15000) + + test_pop_2.set(C_auto_logging=custom_logging_dict) + + output_2 = test_pop_2.evolve_single() + + # + self.assertTrue(len(output_2.splitlines())>1) + self.assertIn('TEST_CUSTOM_LOGGING_2', output_2) + +class test_grid_evolve(unittest.TestCase): + """ + Unittests for function Population.evolve() + """ + + def test_grid_evolve_1_thread(self): + # test to see if 1 thread does all the systems + + test_pop_evolve_1_thread = Population() + test_pop_evolve_1_thread.set(amt_cores=1, verbosity=1, M_2=1, orbital_period=100000) + + resolution = {"M_1": 10} + + test_pop_evolve_1_thread.add_grid_variable( + name="lnm1", + longname="Primary mass", + valuerange=[1, 100], + resolution="{}".format(resolution["M_1"]), + spacingfunc="const(math.log(1), math.log(100), {})".format( + resolution["M_1"] + ), + precode="M_1=math.exp(lnm1)", + probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", + dphasevol="dlnm1", + parameter_name="M_1", + condition="", # Impose a condition on this grid variable. Mostly for a check for yourself + ) + + analytics = test_pop_evolve_1_thread.evolve() + self.assertLess(np.abs(analytics['total_probability']-0.1503788456014623), 1e-10) + self.assertTrue(analytics['total_count']==10) + + def test_grid_evolve_2_threads(self): + # test to see if 1 thread does all the systems + + test_pop = Population() + test_pop.set(amt_cores=2, verbosity=1, M_2=1, orbital_period=100000) + + resolution = {"M_1": 10} + + test_pop.add_grid_variable( + name="lnm1", + longname="Primary mass", + valuerange=[1, 100], + resolution="{}".format(resolution["M_1"]), + spacingfunc="const(math.log(1), math.log(100), {})".format( + resolution["M_1"] + ), + precode="M_1=math.exp(lnm1)", + probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", + dphasevol="dlnm1", + parameter_name="M_1", + condition="", # Impose a condition on this grid variable. Mostly for a check for yourself + ) + + analytics = test_pop.evolve() + self.assertLess(np.abs(analytics['total_probability']-0.1503788456014623), 1e-10) # + self.assertTrue(analytics['total_count']==10) + + def test_grid_evolve_2_threads_with_custom_logging(self): + # test to see if 1 thread does all the systems + + test_pop = Population() + test_pop.set(amt_cores=2, verbosity=1, M_2=1, orbital_period=100000) + + resolution = {"M_1": 10} + + test_pop.add_grid_variable( + name="lnm1", + longname="Primary mass", + valuerange=[1, 100], + resolution="{}".format(resolution["M_1"]), + spacingfunc="const(math.log(1), math.log(100), {})".format( + resolution["M_1"] + ), + precode="M_1=math.exp(lnm1)", + probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", + dphasevol="dlnm1", + parameter_name="M_1", + condition="", # Impose a condition on this grid variable. Mostly for a check for yourself + ) + + analytics = test_pop.evolve() + self.assertLess(np.abs(analytics['total_probability']-0.1503788456014623), 1e-10) # + self.assertTrue(analytics['total_count']==10) + + + def test_grid_evolve_with_condition_error(self): + # Test to see if we can catch the errors correctly. + + test_pop = Population() + test_pop.set(amt_cores=2, verbosity=1, M_2=1, orbital_period=100000) + + # Set the amt of failed systems that each thread will log + test_pop.set(failed_systems_threshold=4) + + CUSTOM_LOGGING_STRING_WITH_EXIT = """ + Exit_binary_c(BINARY_C_NORMAL_EXIT, "testing exits"); + Printf("TEST_CUSTOM_LOGGING_1 %30.12e %g %g %g %g\\n", + // + stardata->model.time, // 1 + + // masses + stardata->common.zero_age.mass[0], // + stardata->common.zero_age.mass[1], // + + stardata->star[0].mass, + stardata->star[1].mass + ); + """ + + test_pop.set(C_logging_code=CUSTOM_LOGGING_STRING_WITH_EXIT) + + resolution = {"M_1": 10} + test_pop.add_grid_variable( + name="lnm1", + longname="Primary mass", + valuerange=[1, 100], + resolution="{}".format(resolution["M_1"]), + spacingfunc="const(math.log(1), math.log(100), {})".format( + resolution["M_1"] + ), + precode="M_1=math.exp(lnm1)", + probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", + dphasevol="dlnm1", + parameter_name="M_1", + condition="", # Impose a condition on this grid variable. Mostly for a check for yourself + ) + + analytics = test_pop.evolve() + self.assertLess(np.abs(analytics['total_probability']-0.1503788456014623), 1e-10) # + self.assertLess(np.abs(analytics['failed_prob']-0.1503788456014623), 1e-10) # + self.assertEqual(analytics['failed_systems_error_codes'], [0]) + self.assertTrue(analytics['total_count']==10) + self.assertTrue(analytics['failed_count']==10) + self.assertTrue(analytics['errors_found']==True) + self.assertTrue(analytics['errors_exceeded']==True) + + # test to see if 1 thread does all the systems + + test_pop = Population() + test_pop.set(amt_cores=2, verbosity=1, M_2=1, orbital_period=100000) + + resolution = {"M_1": 10, "q": 2} + + test_pop.add_grid_variable( + name="lnm1", + longname="Primary mass", + valuerange=[1, 100], + resolution="{}".format(resolution["M_1"]), + spacingfunc="const(math.log(1), math.log(100), {})".format( + resolution["M_1"] + ), + precode="M_1=math.exp(lnm1)", + probdist="three_part_powerlaw(M_1, 0.1, 0.5, 1.0, 100, -1.3, -2.3, -2.3)*M_1", + dphasevol="dlnm1", + parameter_name="M_1", + condition="", # Impose a condition on this grid variable. Mostly for a check for yourself + ) + + test_pop.add_grid_variable( + name="q", + longname="Mass ratio", + valuerange=["0.1/M_1", 1], + resolution="{}".format(resolution["q"]), + spacingfunc="const(0.1/M_1, 1, {})".format(resolution["q"]), + probdist="flatsections(q, [{'min': 0.1/M_1, 'max': 1.0, 'height': 1}])", + dphasevol="dq", + precode="M_2 = q * M_1", + parameter_name="M_2", + # condition="M_1 in dir()", # Impose a condition on this grid variable. Mostly for a check for yourself + condition="'random_var' in dir()", # This will raise an error because random_var is not defined. + ) + + self.assertRaises(ValueError, test_pop.evolve) + + def test_grid_evolve_no_grid_variables(self): + # test to see if 1 thread does all the systems + + test_pop = Population() + test_pop.set(amt_cores=1, verbosity=1, M_2=1, orbital_period=100000) + resolution = {"M_1": 10} + self.assertRaises(ValueError, test_pop.evolve) diff --git a/binarycpython/tests/test_grid_options_defaults.py b/binarycpython/tests/test_grid_options_defaults.py index 53723636c..23e879a6c 100644 --- a/binarycpython/tests/test_grid_options_defaults.py +++ b/binarycpython/tests/test_grid_options_defaults.py @@ -30,14 +30,14 @@ class test_grid_options_defaults(unittest.TestCase): result_2[input_2], "", msg="description should not be empty" ) - input_3 = "condor_jobid" + input_3 = "evolution_type" result_3 = grid_options_help(input_3) self.assertIn( input_3, result_3, msg="{} should be in the keys of the returned dict".format(input_3), ) - self.assertEqual(result_3[input_3], "", msg="description should be empty") + # self.assertEqual(result_3[input_3], "", msg="description should be empty") def test_grid_options_description_checker(self): """ @@ -67,7 +67,5 @@ class test_grid_options_defaults(unittest.TestCase): self.assertTrue(os.path.isfile(input_2)) - -write_grid_options_to_rst_file if __name__ == "__main__": unittest.main() diff --git a/binarycpython/tests/test_spacing_functions.py b/binarycpython/tests/test_spacing_functions.py index aff7ff139..4cf06ce27 100644 --- a/binarycpython/tests/test_spacing_functions.py +++ b/binarycpython/tests/test_spacing_functions.py @@ -10,8 +10,6 @@ class test_spacing_functions(unittest.TestCase): def test_const(self): const_return = const(1, 10, 10) - self.assertEqual( - const_return, - np.linspace(1, 10, 10), + self.assertTrue((const_return==np.linspace(1, 10, 10)).all(), msg="Output didn't contain SINGLE_STAR_LIFETIME", ) -- GitLab