From 0c88363f38f6f4122594c87a11d389617114bfe5 Mon Sep 17 00:00:00 2001
From: David Hendriks <davidhendriks93@gmail.com>
Date: Tue, 1 Jun 2021 15:58:45 +0100
Subject: [PATCH] REplaced the method where all the threads use their own
 generator with one where there is a centralized queue. Now debugging

---
 binarycpython/utils/grid.py | 321 ++++++++++++++++++++----------------
 1 file changed, 182 insertions(+), 139 deletions(-)

diff --git a/binarycpython/utils/grid.py b/binarycpython/utils/grid.py
index 303334b06..1b92a69fe 100644
--- a/binarycpython/utils/grid.py
+++ b/binarycpython/utils/grid.py
@@ -832,6 +832,54 @@ class Population:
                 0,
             )
 
+
+    def get_stream_logger(self, level=logging.DEBUG):
+        """Return logger with configured StreamHandler."""
+        stream_logger = logging.getLogger('stream_logger')
+        stream_logger.handlers = []
+        stream_logger.setLevel(level)
+        sh = logging.StreamHandler()
+        sh.setLevel(level)
+        fmt = '[%(asctime)s %(levelname)-8s %(processName)s] --- %(message)s'
+        formatter = logging.Formatter(fmt)
+        sh.setFormatter(formatter)
+        stream_logger.addHandler(sh)
+
+        return stream_logger
+
+    def system_queue_filler(self, job_queue, amt_cores):
+        """
+        Function that is responsible for keeping the queue filled.
+
+        This will generate the systems until it is full, and then keeps trying to fill it.
+        Will have to play with the size of this. 
+        """
+        stream_logger = self.get_stream_logger()
+        stream_logger.debug(f"setting up the system_queue_filler now")
+
+
+        # Setup of the generator
+        self._generate_grid_code(dry_run=False)
+
+        self._load_grid_function()
+
+        generator = self.grid_options["_system_generator"](self, print_results=False)
+        
+        # TODO: build in method to handle with the HPC.
+        # Continously fill the queue
+        for system_number, system_dict in enumerate(generator):
+            stream_logger.debug(f"producing: {system_number}")  # DEBUG
+            job_queue.put((system_number, system_dict))
+
+            # Print current size
+            # print("Current size: {}".format(save_que.qsize()))
+
+
+        # Send closing signal to workers. When they receive this they will terminate
+        stream_logger.debug(f"Signaling stop to processes")  # DEBUG
+        for _ in range(amt_cores):
+            job_queue.put("STOP")
+
     def _evolve_population_grid(self):
         """
         Function to evolve the population with multiprocessing approach.
@@ -867,30 +915,52 @@ class Population:
         # https://www.programcreek.com/python/example/58176/multiprocessing.Value
         # https://stackoverflow.com/questions/17377426/shared-variable-in-pythons-multiprocessing
 
-        # Create the pool
-        pool = Pool(processes=self.grid_options["amt_cores"])
 
-        # start the processes by giving them an ID value
-        result = list(
-            pool.imap_unordered(
-                self._process_run_population_grid, range(self.grid_options["amt_cores"])
-            )
-        )
+        # Set up the manager object that can share info between processes
+        manager = pathos_multiprocess.Manager()
+        job_queue = manager.Queue(maxsize=10)
+        result_queue = manager.Queue(maxsize=self.grid_options['amt_cores'])
 
-        # Handle clean termination of the whole multiprocessing (making sure there are no zombie
-        # processes (https://en.wikipedia.org/wiki/Zombie_process))
-        pool.close()
-        pool.join()
+        # Create process instances
+        processes = []
+        for ID in range(self.grid_options["amt_cores"]):
+            processes.append(pathos_multiprocess.Process(target=self._process_run_population_grid, args=(job_queue, result_queue, ID)))
 
-        print("OUTSIDE THREAD")
-        print(Moecache.keys())
-        print("OUTSIDE THREAD")
+        # Activate the processes
+        for p in processes:
+            p.start()
+
+        # Set up the system_queue
+        self.system_queue_filler(job_queue, amt_cores=self.grid_options["amt_cores"])
+
+        # Join the processes
+        for p in processes:
+            p.join()
 
         # Handle the results by merging all the dictionaries. How that merging happens exactly is
         # described in the merge_dicts description.
         combined_output_dict = {}
-        for output_dict in result:
+
+        sentinel = object()
+        for output_dict in iter(result_queue.get, sentinel):
             combined_output_dict = merge_dicts(combined_output_dict, output_dict)
+            if result_queue.empty():
+                break
+
+        # # Create the pool
+        # pool = Pool(processes=self.grid_options["amt_cores"])
+
+        # # start the processes by giving them an ID value
+        # result = list(
+        #     pool.imap_unordered(
+        #         self._process_run_population_grid, range(self.grid_options["amt_cores"])
+        #     )
+        # )
+
+        # # Handle clean termination of the whole multiprocessing (making sure there are no zombie
+        # # processes (https://en.wikipedia.org/wiki/Zombie_process))
+        # pool.close()
+        # pool.join()
 
         # Put the values back as object properties
         self.grid_results = combined_output_dict["results"]
@@ -945,7 +1015,7 @@ class Population:
         if self.grid_options["parse_function"]:
             self.grid_options["parse_function"](self, out)
 
-    def _process_run_population_grid(self, ID):
+    def _process_run_population_grid(self, job_queue, result_queue, ID):
         """
         Function that loops over the whole generator, but only runs
         systems that fit to: if (localcounter+ID) % self.grid_options["amt_cores"] == 0
@@ -963,6 +1033,9 @@ class Population:
             ID  # Store the ID as a object property again, lets see if that works.
         )
 
+        stream_logger = self.get_stream_logger()
+        stream_logger.debug(f"Setting up processor: process-{self.process_ID}")
+
         # Set to starting up
         with open(
             os.path.join(
@@ -1003,17 +1076,7 @@ class Population:
                 0,
             )
 
-        #
-        self._generate_grid_code(dry_run=False)
-
-        # apparently we have to re-load this for every process, otherwise NameErrors arise (seems like a bug but I'm not sure)
-        self._load_grid_function()
-
-        # Set up generator
-        generator = self.grid_options["_system_generator"](self, print_results=False)
-
         # Set up local variables
-        running = True
         localcounter = (
             0  # global counter for the whole loop. (need to be ticked every loop)
         )
@@ -1024,17 +1087,13 @@ class Population:
             0  # counter for the actual amt of systems this thread ran
         )
 
-        round_number_mod = 0  # rotating modulo
-
         total_time_calling_binary_c = 0
 
         total_mass_run = 0
         total_probability_weighted_mass_run = 0
 
-        # Go over the generator
-        while running:
-            # round_number_mod = (localcounter+1)%self.grid_options["amt_cores"]
-
+        # Go over the queue
+        for system_number, system_dict in iter(job_queue.get, 'STOP'):
             if localcounter == 0:
 
                 # Set status to running
@@ -1047,121 +1106,101 @@ class Population:
                 ) as f:
                     f.write("RUNNING")
 
-            try:
-                # Get the system
-                system = next(generator)
-
-                # Check if the ID is the correct one for this process. This is the method we use to split this calculation over many cores and or machines
-                if (localcounter + (ID + round_number_mod)) % self.grid_options[
-                    "amt_cores"
-                ] == 0:
-
-                    # Combine that with the other settings
-                    full_system_dict = self.bse_options.copy()
-                    full_system_dict.update(system)
-
-
-                    # In the first system, explicitly check all the keys that are passed to see if
-                    # they match the keys known to binary_c.
-                    # Won't do that every system cause that is a bit of a waste of computing time.
-                    if number_of_systems_run==0:
-                        # TODO: Put this someplace else and wrap in a functioncall
-                        for key in full_system_dict.keys():
-                            if not key in self.available_keys:
-                                # Deal with special keys
-                                if not any(
-                                    [
-                                        True
-                                        if (
-                                            key.startswith(param[:-2])
-                                            and len(param[:-2]) < len(key)
-                                        )
-                                        else False
-                                        for param in self.special_params
-                                    ]
-                                ):
-                                    msg = "Error: Found a parameter unknown to binary_c: {}. Abort".format(
-                                        key
-                                    )
-                                    raise ValueError(msg)
-
-                    # self._print_info(
-                    #     i + 1, self.grid_options["_total_starcount"], full_system_dict
-                    # )
-
-                    #
-                    verbose_print(
-                        "Process {} is handling system {}".format(ID, localcounter),
-                        self.grid_options["verbosity"],
-                        2,
-                    )
-
-                    # In some cases, the whole run crashes. To be able to figure out which system
-                    # that was on, we log each current system to a file (each thread has one).
-                    # Each new system overrides the previous
-                    with open(
-                        os.path.join(
-                            self.grid_options["tmp_dir"], "current_system",
-                            "process_{}.txt".format(self.process_ID),
-                        ),
-                        "w",
-                    ) as f:
-                        binary_cmdline_string = self._return_argline(full_system_dict)
-                        f.write(binary_cmdline_string)
-
-                    start_runtime_binary_c = time.time()
-
-                    # Evolve the system
-                    if self.grid_options['_actually_evolve_system']:
-                        self._evolve_system_mp(full_system_dict)
-
-                    end_runtime_binary_c = time.time()
-
-                    total_time_calling_binary_c += (
-                        end_runtime_binary_c - start_runtime_binary_c
-                    )  # keep track of total binary_c call time
-
-                    # Debug line: logging all the lines
-                    if self.grid_options["log_runtime_systems"] == 1:
-                        with open(
-                            os.path.join(
-                                self.grid_options["tmp_dir"], "runtime_systems",
-                                "process_{}.txt".format(self.process_ID),
-                            ),
-                            "a+",
-                        ) as f:
-                            binary_cmdline_string = self._return_argline(
-                                full_system_dict
-                            )
-                            f.write(
-                                "{} {} '{}'\n".format(start_runtime_binary_c,
-                                    end_runtime_binary_c - start_runtime_binary_c,
-                                    binary_cmdline_string,
+            # Combine that with the other settings
+            full_system_dict = self.bse_options.copy()
+            full_system_dict.update(system_dict)
+
+            # In the first system, explicitly check all the keys that are passed to see if
+            # they match the keys known to binary_c.
+            # Won't do that every system cause that is a bit of a waste of computing time.
+            if number_of_systems_run==0:
+                # TODO: Put this someplace else and wrap in a functioncall
+                for key in full_system_dict.keys():
+                    if not key in self.available_keys:
+                        # Deal with special keys
+                        if not any(
+                            [
+                                True
+                                if (
+                                    key.startswith(param[:-2])
+                                    and len(param[:-2]) < len(key)
                                 )
+                                else False
+                                for param in self.special_params
+                            ]
+                        ):
+                            msg = "Error: Found a parameter unknown to binary_c: {}. Abort".format(
+                                key
                             )
+                            raise ValueError(msg)
 
-                    # Keep track of systems:
-                    probability_of_systems_run += full_system_dict["probability"]
-                    number_of_systems_run += 1
+            # self._print_info(
+            #     i + 1, self.grid_options["_total_starcount"], full_system_dict
+            # )
 
-                    # Tally up some numbers
-                    total_mass_system = full_system_dict.get("M_1", 0) + full_system_dict.get("M_1", 0) + full_system_dict.get("M_1", 0) + full_system_dict.get("M_1", 0)
-                    total_mass_run += total_mass_system
-                    total_probability_weighted_mass_run += total_mass_system * full_system_dict["probability"]
+            #
+            verbose_print(
+                "Process {} is handling system {}".format(ID, system_number),
+                self.grid_options["verbosity"],
+                2,
+            )
+            stream_logger.debug("Process {} is handling system {}".format(ID, system_number))
+
+            # In some cases, the whole run crashes. To be able to figure out which system
+            # that was on, we log each current system to a file (each thread has one).
+            # Each new system overrides the previous
+            with open(
+                os.path.join(
+                    self.grid_options["tmp_dir"], "current_system",
+                    "process_{}.txt".format(self.process_ID),
+                ),
+                "w",
+            ) as f:
+                binary_cmdline_string = self._return_argline(full_system_dict)
+                f.write(binary_cmdline_string)
+
+            start_runtime_binary_c = time.time()
 
-            except StopIteration:
-                running = False
+            # Evolve the system
+            if self.grid_options['_actually_evolve_system']:
+                self._evolve_system_mp(full_system_dict)
 
-            # Rotate the round number mod. The idea here is to prevent a thread from always getting the same sampled period of whatever. This just rotates everyone
-            if (localcounter + 1) % self.grid_options["amt_cores"] == 0:
-                round_number_mod += 1
+            end_runtime_binary_c = time.time()
 
-            # print("thread {} round_nr_mod {}. localcounter {}".format(ID, round_number_mod, localcounter))
+            total_time_calling_binary_c += (
+                end_runtime_binary_c - start_runtime_binary_c
+            )  # keep track of total binary_c call time
 
-            # Has to be here because this one is used for the (localcounter+ID) % (self..)
+            # Debug line: logging all the lines
+            if self.grid_options["log_runtime_systems"] == 1:
+                with open(
+                    os.path.join(
+                        self.grid_options["tmp_dir"], "runtime_systems",
+                        "process_{}.txt".format(self.process_ID),
+                    ),
+                    "a+",
+                ) as f:
+                    binary_cmdline_string = self._return_argline(
+                        full_system_dict
+                    )
+                    f.write(
+                        "{} {} '{}'\n".format(start_runtime_binary_c,
+                            end_runtime_binary_c - start_runtime_binary_c,
+                            binary_cmdline_string,
+                        )
+                    )
+
+            # Keep track of systems:
+            probability_of_systems_run += full_system_dict["probability"]
+            number_of_systems_run += 1
             localcounter += 1
 
-        # Set status to running
+            # Tally up some numbers
+            total_mass_system = full_system_dict.get("M_1", 0) + full_system_dict.get("M_1", 0) + full_system_dict.get("M_1", 0) + full_system_dict.get("M_1", 0)
+            total_mass_run += total_mass_system
+            total_probability_weighted_mass_run += total_mass_system * full_system_dict["probability"]
+
+        # Set status to finishing
         with open(
             os.path.join(
                 self.grid_options["tmp_dir"], "process_status",
@@ -1170,6 +1209,7 @@ class Population:
             "w",
         ) as f:
             f.write("FINISHING")
+        stream_logger.debug(f"Process-{self.process_ID} is finishing.")
 
         # Handle ensemble output: is ensemble==1, then either directly write that data to a file, or combine everything into 1 file.
         ensemble_json = {}  # Make sure it exists already
@@ -1293,7 +1333,10 @@ class Population:
         ) as f:
             f.write("FINISHED")
 
-        return output_dict
+        result_queue.put(output_dict)
+        stream_logger.debug(f"Process-{self.process_ID} is finished.")
+
+        return
 
     # Single system
     def evolve_single(self, clean_up_custom_logging_files: bool = True) -> Any:
-- 
GitLab