Source code for threeML.parallel.parallel_client

# Custom warning
import math
import signal
import subprocess
import time
from typing import Optional
import warnings
from contextlib import contextmanager
import shutil
from pathlib import Path

from threeML.config.config import threeML_config
from import setup_logger
from threeML.utils.progress_bar import tqdm

import sys

log = setup_logger(__name__)

    from subprocess import DEVNULL  # py3k
except ImportError:
    import os

    DEVNULL = open(os.devnull, "wb")

# Check whether we have a parallel system or not

has_parallel = False


    from ipyparallel import Client

except ImportError:

    has_parallel = False


    has_parallel = True

[docs] def get_base_prefix_compat() -> str: """Get base/real prefix, or sys.prefix if there is none.""" return ( getattr(sys, "base_prefix", None) or getattr(sys, "real_prefix", None) or sys.prefix )
[docs] def in_virtualenv() -> bool: return get_base_prefix_compat() != sys.prefix
[docs] class NoParallelEnvironment(UserWarning): pass
# Set up the warnings module to always display our custom warning (otherwise it would only be displayed once) warnings.simplefilter("always", NoParallelEnvironment)
[docs] @contextmanager def parallel_computation( profile: Optional[str] = None, start_cluster: bool = True, n_jobs: Optional[int] = None, ) -> None: """ A context manager which turns on parallel execution temporarily :param profile: the profile to use, if different from the default :param start_cluster: True or False. Whether to start a new cluster. If False, try to use an existing one for the same profile :return: """ # Memorize the state of the use-parallel config old_state = bool(threeML_config.parallel.use_parallel) old_profile = str(threeML_config.parallel.profile_name) # Set the use_parallel feature on, if available if has_parallel: threeML_config.parallel.use_parallel = True else: # No parallel environment available. Issue a warning and continue with serial computation log.warning( "You requested parallel computation, but no parallel environment is available. You need " "to install the ipyparallel package. Continuing with serial computation...", ) threeML_config.parallel.use_parallel = False # Now use the specified profile (if any), otherwise the default one if profile is not None: threeML_config.parallel.profile_name = str(profile) # Here is where the content of the with parallel_computation statement gets # executed # See if we need to start the ipyparallel cluster first if start_cluster: # Get the command line together # First find out path of ipcluster # first let's see if we are in a virtaul env if in_virtualenv(): ipcluster_path = Path(sys.prefix) / "bin" / "ipcluster" if not ipcluster_path.exists(): log.warning(f"you are using the virtualenv {sys.prefix}") log.warning("but no ipcluster executable was found!") ipcluster_path = shutil.which("ipcluster") log.warning(f"using {ipcluster_path} instead") else: ipcluster_path = shutil.which("ipcluster") cmd_line = [str(ipcluster_path), "start"] if profile is not None: cmd_line.append(f"--profile={profile}") if n_jobs is not None: cmd_line.append(f"-n {n_jobs}") # Start process asynchronously with Popen, suppressing all output"Starting ipyparallel cluster with this command line:")" ".join(cmd_line)) ipycluster_process = subprocess.Popen( cmd_line, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT ) rc = Client(profile=profile) # Wait for the engines to become available while True: try: view = rc[:] except Exception as e:"waiting on cluster to start") time.sleep(0.5) continue else:"{len(view)} engines are active") break # Do whatever we need to do try: yield finally: # This gets executed in any case, even if there is an exception"\nShutting down ipcluster...") ipycluster_process.send_signal(signal.SIGINT) ipycluster_process.wait() else: # Using an already started cluster yield # Revert back threeML_config.parallel.use_parallel = old_state threeML_config.parallel.profile_name = old_profile
[docs] def is_parallel_computation_active() -> bool: return bool(threeML_config.parallel.use_parallel)
if has_parallel: class ParallelClient(Client): def __init__(self, *args, **kwargs) -> None: """ Wrapper around the IPython Client class, which forces the use of dill for object serialization :param args: same as IPython Client :param kwargs: same as IPython Client :return: """ # Just a wrapper around the IPython Client class # forcing the use of dill for object serialization # (more robust, and allows for serialization of class # methods) if "profile" not in kwargs.keys(): kwargs["profile"] = threeML_config.parallel.profile_name super(ParallelClient, self).__init__(*args, **kwargs) # This will propagate the use_dill to all running # engines _ = self.direct_view().use_dill()
[docs] def get_number_of_engines(self): return len(self.direct_view())
def _interactive_map( self, worker, items_to_process, ordered=True, chunk_size=None ): """ Subdivide the work among the active engines, taking care of dividing it among them :param worker: the function to be applied :param items_to_process: the items to apply the function to :param ordered: whether to keep the order of output (default: True). Using False can be much faster, but you need to have a way to re-estabilish the order if you care about it, after the fact. :param chunk_size: determine how many items should an engine process before reporting back. Use None for an automatic choice. :return: a AsyncResult object """ # Split the work evenly between the engines n_total_engines = self.get_number_of_engines() n_items = len(items_to_process) # Get a load-balanced view with the appropriate number of engines if n_items < n_total_engines: log.warning("More engines than items to process") # Limit the view to the needed engines lview = self.load_balanced_view(range(n_items)) n_active_engines = n_items chunk_size = 1 else: # Use all engines lview = self.load_balanced_view() n_active_engines = n_total_engines if chunk_size is None: chunk_size = int( math.ceil(n_items / float(n_active_engines) / 20) ) # We need this to keep the instance alive self._current_amr = lview.imap( worker, items_to_process, # chunksize=chunk_size, ordered=ordered, ) return self._current_amr
[docs] def execute_with_progress_bar( self, worker, items, chunk_size=None, name="progress" ): # Let's make a wrapper which will allow us to recover the order def wrapper(x): (id, item) = x return (id, worker(item)) items_wrapped = [(i, item) for i, item in enumerate(items)] amr = self._interactive_map( wrapper, items_wrapped, ordered=False, chunk_size=chunk_size ) results = [] for i, res in enumerate(tqdm(amr, desc=name)): results.append(res) # Reorder the list according to the id return list( map(lambda x: x[1], sorted(results, key=lambda x: x[0])) )
else: # NO parallel environment available. Make a dumb object to avoid import problems, but this object will never # be really used because the context manager will not activate the parallel mode (see above)
[docs] class ParallelClient(object): def __init__(self, *args, **kwargs): raise RuntimeError( "No parallel environment and attempted to use the ParallelClient class, which should " "never happen. Please open an issue at" )