Source code for threeML.parallel.parallel_client

# Custom warning
import math
import shutil
import signal
import subprocess
import sys
import time
import warnings
from contextlib import contextmanager
from pathlib import Path
from typing import Optional

from threeML.config.config import threeML_config
from threeML.io.logging import setup_logger
from threeML.utils.progress_bar import tqdm

log = setup_logger(__name__)

try:
    from subprocess import DEVNULL  # py3k
except ImportError:
    import os

    DEVNULL = open(os.devnull, "wb")

# Check whether we have a parallel system or not

has_parallel = False

try:
    from ipyparallel import Client

except ImportError:
    has_parallel = False

else:
    has_parallel = True


[docs] def get_base_prefix_compat() -> str: """Get base/real prefix, or sys.prefix if there is none.""" return ( getattr(sys, "base_prefix", None) or getattr(sys, "real_prefix", None) or sys.prefix )
[docs] def in_virtualenv() -> bool: return get_base_prefix_compat() != sys.prefix
[docs] class NoParallelEnvironment(UserWarning): pass
# Set up the warnings module to always display our custom warning (otherwise it would # only be displayed once) warnings.simplefilter("always", NoParallelEnvironment)
[docs] @contextmanager def parallel_computation( profile: Optional[str] = None, start_cluster: bool = True, n_jobs: Optional[int] = None, ) -> None: """A context manager which turns on parallel execution temporarily. :param profile: the profile to use, if different from the default :param start_cluster: True or False. Whether to start a new cluster. If False, try to use an existing one for the same profile :return: """ # Memorize the state of the use-parallel config old_state = bool(threeML_config.parallel.use_parallel) old_profile = str(threeML_config.parallel.profile_name) # Set the use_parallel feature on, if available if has_parallel: threeML_config.parallel.use_parallel = True else: # No parallel environment available. Issue a warning and continue with serial # computation log.warning( "You requested parallel computation, but no parallel environment is " "available. You need to install the ipyparallel package. Continuing with " "serial computation...", ) threeML_config.parallel.use_parallel = False # Now use the specified profile (if any), otherwise the default one if profile is not None: threeML_config.parallel.profile_name = str(profile) # Here is where the content of the with parallel_computation statement gets # executed # See if we need to start the ipyparallel cluster first if start_cluster: # Get the command line together # First find out path of ipcluster # first let's see if we are in a virtaul env if in_virtualenv(): ipcluster_path = Path(sys.prefix) / "bin" / "ipcluster" if not ipcluster_path.exists(): log.warning(f"you are using the virtualenv {sys.prefix}") log.warning("but no ipcluster executable was found!") ipcluster_path = shutil.which("ipcluster") log.warning(f"using {ipcluster_path} instead") else: ipcluster_path = shutil.which("ipcluster") cmd_line = [str(ipcluster_path), "start"] if profile is not None: cmd_line.append(f"--profile={profile}") if n_jobs is not None: cmd_line.append(f"-n {n_jobs}") # Start process asynchronously with Popen, suppressing all output log.info("Starting ipyparallel cluster with this command line:") log.info(" ".join(cmd_line)) ipycluster_process = subprocess.Popen( cmd_line, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT ) rc = Client(profile=profile) # Wait for the engines to become available while True: try: view = rc[:] except Exception: log.info("waiting on cluster to start") time.sleep(0.5) continue else: log.info(f"{len(view)} engines are active") break # Do whatever we need to do try: yield finally: # This gets executed in any case, even if there is an exception # Shut down via Client API first so engines exit cleanly (avoid SIGTERM # which causes "Event loop stopped before Future completed" in engine logs) log.info("\nShutting down ipcluster...") try: rc.shutdown(hub=True, block=True) except Exception as e: log.debug("Client shutdown failed (cluster may already be down): %s", e) # If the process is still running (e.g. ipcluster parent), stop it if ipycluster_process.poll() is None: ipycluster_process.send_signal(signal.SIGINT) try: ipycluster_process.wait(timeout=15) except subprocess.TimeoutExpired: ipycluster_process.kill() ipycluster_process.wait() else: # Using an already started cluster yield # Revert back threeML_config.parallel.use_parallel = old_state threeML_config.parallel.profile_name = old_profile
[docs] def is_parallel_computation_active() -> bool: return bool(threeML_config.parallel.use_parallel)
if has_parallel: class ParallelClient(Client): def __init__(self, *args, **kwargs) -> None: """Wrapper around the IPython Client class, which forces the use of dill for object serialization. :param args: same as IPython Client :param kwargs: same as IPython Client :return: """ # Just a wrapper around the IPython Client class # forcing the use of dill for object serialization # (more robust, and allows for serialization of class # methods) if "profile" not in kwargs.keys(): kwargs["profile"] = threeML_config.parallel.profile_name super(ParallelClient, self).__init__(*args, **kwargs) # This will propagate the use_dill to all running # engines _ = self.direct_view().use_dill()
[docs] def get_number_of_engines(self): return len(self.direct_view())
def _interactive_map( self, worker, items_to_process, ordered=True, chunk_size=None ): """Subdivide the work among the active engines, taking care of dividing it among them. :param worker: the function to be applied :param items_to_process: the items to apply the function to :param ordered: whether to keep the order of output (default: True). Using False can be much faster, but you need to have a way to re-estabilish the order if you care about it, after the fact. :param chunk_size: determine how many items should an engine process before reporting back. Use None for an automatic choice. :return: a AsyncResult object """ # Split the work evenly between the engines n_total_engines = self.get_number_of_engines() n_items = len(items_to_process) # Get a load-balanced view with the appropriate number of engines if n_items < n_total_engines: log.warning("More engines than items to process") # Limit the view to the needed engines lview = self.load_balanced_view(range(n_items)) n_active_engines = n_items chunk_size = 1 else: # Use all engines lview = self.load_balanced_view() n_active_engines = n_total_engines if chunk_size is None: chunk_size = int(math.ceil(n_items / float(n_active_engines) / 20)) # We need this to keep the instance alive self._current_amr = lview.imap( worker, items_to_process, # chunksize=chunk_size, ordered=ordered, ) return self._current_amr
[docs] def execute_with_progress_bar( self, worker, items, chunk_size=None, name="progress" ): # Let's make a wrapper which will allow us to recover the order def wrapper(x): id, item = x return (id, worker(item)) items_wrapped = [(i, item) for i, item in enumerate(items)] amr = self._interactive_map( wrapper, items_wrapped, ordered=False, chunk_size=chunk_size ) results = [] for i, res in enumerate(tqdm(amr, desc=name)): results.append(res) # Reorder the list according to the id return list(map(lambda x: x[1], sorted(results, key=lambda x: x[0])))
else: # NO parallel environment available. Make a dumb object to avoid import problems, # but this object will never be really used because the context manager will not # activate the parallel mode (see above)
[docs] class ParallelClient(object): def __init__(self, *args, **kwargs): raise RuntimeError( "No parallel environment and attempted to use the ParallelClient class," " which should never happen. Please open an issue at " "https://github.com/threeML/threeML/issues" )