From 5698330d26e9ed324031a64cb511ca03faad7e6b Mon Sep 17 00:00:00 2001 From: David Hendriks <davidhendriks93@gmail.com> Date: Tue, 1 Jun 2021 16:22:36 +0100 Subject: [PATCH] Small changes. Cleaning up --- binarycpython/utils/grid.py | 38 ++++++++++--------------------------- 1 file changed, 10 insertions(+), 28 deletions(-) diff --git a/binarycpython/utils/grid.py b/binarycpython/utils/grid.py index 1b92a69fe..398d6cd13 100644 --- a/binarycpython/utils/grid.py +++ b/binarycpython/utils/grid.py @@ -30,17 +30,14 @@ import copy import json import time import uuid -import copy -import types import logging import datetime import argparse -import subprocess import importlib.util +import multiprocessing from typing import Union, Any from pathos.helpers import mp as pathos_multiprocess - from pathos.pools import _ProcessPool as Pool from binarycpython.utils.grid_options_defaults import ( @@ -55,22 +52,18 @@ from binarycpython.utils.custom_logging_functions import ( ) from binarycpython.utils.functions import ( get_defaults, - parse_binary_c_version_info, remove_file, filter_arg_dict, get_help_all, return_binary_c_version_info, binaryc_json_serializer, verbose_print, - binarycDecoder, merge_dicts, update_dicts, - BinaryCEncoder, extract_ensemble_json_from_string, get_moe_distefano_dataset, ) -import binarycpython.utils.moe_distefano_data # from binarycpython.utils.hpc_functions import ( # get_condor_version, @@ -83,7 +76,6 @@ import binarycpython.utils.moe_distefano_data from binarycpython.utils.distribution_functions import ( Moecache, LOG_LN_CONVERTER, - Moe_de_Stefano_2017_multiplicity_fractions, ) from binarycpython import _binary_c_bindings @@ -915,16 +907,20 @@ class Population: # https://www.programcreek.com/python/example/58176/multiprocessing.Value # https://stackoverflow.com/questions/17377426/shared-variable-in-pythons-multiprocessing - # Set up the manager object that can share info between processes - manager = pathos_multiprocess.Manager() - job_queue = manager.Queue(maxsize=10) + + # pathos_multiprocess + queue_size = 1000 + + + manager = multiprocessing.Manager() + job_queue = manager.Queue(maxsize=queue_size) result_queue = manager.Queue(maxsize=self.grid_options['amt_cores']) # Create process instances processes = [] for ID in range(self.grid_options["amt_cores"]): - processes.append(pathos_multiprocess.Process(target=self._process_run_population_grid, args=(job_queue, result_queue, ID))) + processes.append(multiprocessing.Process(target=self._process_run_population_grid, args=(job_queue, result_queue, ID))) # Activate the processes for p in processes: @@ -943,25 +939,11 @@ class Population: sentinel = object() for output_dict in iter(result_queue.get, sentinel): + print(output_dict) combined_output_dict = merge_dicts(combined_output_dict, output_dict) if result_queue.empty(): break - # # Create the pool - # pool = Pool(processes=self.grid_options["amt_cores"]) - - # # start the processes by giving them an ID value - # result = list( - # pool.imap_unordered( - # self._process_run_population_grid, range(self.grid_options["amt_cores"]) - # ) - # ) - - # # Handle clean termination of the whole multiprocessing (making sure there are no zombie - # # processes (https://en.wikipedia.org/wiki/Zombie_process)) - # pool.close() - # pool.join() - # Put the values back as object properties self.grid_results = combined_output_dict["results"] self.grid_ensemble_results = combined_output_dict[ -- GitLab