# Custom warning
import math
import shutil
import signal
import subprocess
import sys
import time
import warnings
from contextlib import contextmanager
from pathlib import Path
from typing import Optional
from threeML.config.config import threeML_config
from threeML.io.logging import setup_logger
from threeML.utils.progress_bar import tqdm
log = setup_logger(__name__)
try:
from subprocess import DEVNULL # py3k
except ImportError:
import os
DEVNULL = open(os.devnull, "wb")
# Check whether we have a parallel system or not
has_parallel = False
try:
from ipyparallel import Client
except ImportError:
has_parallel = False
else:
has_parallel = True
[docs]
def get_base_prefix_compat() -> str:
"""Get base/real prefix, or sys.prefix if there is none."""
return (
getattr(sys, "base_prefix", None)
or getattr(sys, "real_prefix", None)
or sys.prefix
)
[docs]
def in_virtualenv() -> bool:
return get_base_prefix_compat() != sys.prefix
[docs]
class NoParallelEnvironment(UserWarning):
pass
# Set up the warnings module to always display our custom warning (otherwise it would
# only be displayed once)
warnings.simplefilter("always", NoParallelEnvironment)
[docs]
@contextmanager
def parallel_computation(
profile: Optional[str] = None,
start_cluster: bool = True,
n_jobs: Optional[int] = None,
) -> None:
"""A context manager which turns on parallel execution temporarily.
:param profile: the profile to use, if different from the default
:param start_cluster: True or False. Whether to start a new cluster.
If False, try to use an existing one for the same profile
:return:
"""
# Memorize the state of the use-parallel config
old_state = bool(threeML_config.parallel.use_parallel)
old_profile = str(threeML_config.parallel.profile_name)
# Set the use_parallel feature on, if available
if has_parallel:
threeML_config.parallel.use_parallel = True
else:
# No parallel environment available. Issue a warning and continue with serial
# computation
log.warning(
"You requested parallel computation, but no parallel environment is "
"available. You need to install the ipyparallel package. Continuing with "
"serial computation...",
)
threeML_config.parallel.use_parallel = False
# Now use the specified profile (if any), otherwise the default one
if profile is not None:
threeML_config.parallel.profile_name = str(profile)
# Here is where the content of the with parallel_computation statement gets
# executed
# See if we need to start the ipyparallel cluster first
if start_cluster:
# Get the command line together
# First find out path of ipcluster
# first let's see if we are in a virtaul env
if in_virtualenv():
ipcluster_path = Path(sys.prefix) / "bin" / "ipcluster"
if not ipcluster_path.exists():
log.warning(f"you are using the virtualenv {sys.prefix}")
log.warning("but no ipcluster executable was found!")
ipcluster_path = shutil.which("ipcluster")
log.warning(f"using {ipcluster_path} instead")
else:
ipcluster_path = shutil.which("ipcluster")
cmd_line = [str(ipcluster_path), "start"]
if profile is not None:
cmd_line.append(f"--profile={profile}")
if n_jobs is not None:
cmd_line.append(f"-n {n_jobs}")
# Start process asynchronously with Popen, suppressing all output
log.info("Starting ipyparallel cluster with this command line:")
log.info(" ".join(cmd_line))
ipycluster_process = subprocess.Popen(
cmd_line, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
)
rc = Client(profile=profile)
# Wait for the engines to become available
while True:
try:
view = rc[:]
except Exception:
log.info("waiting on cluster to start")
time.sleep(0.5)
continue
else:
log.info(f"{len(view)} engines are active")
break
# Do whatever we need to do
try:
yield
finally:
# This gets executed in any case, even if there is an exception
# Shut down via Client API first so engines exit cleanly (avoid SIGTERM
# which causes "Event loop stopped before Future completed" in engine logs)
log.info("\nShutting down ipcluster...")
try:
rc.shutdown(hub=True, block=True)
except Exception as e:
log.debug("Client shutdown failed (cluster may already be down): %s", e)
# If the process is still running (e.g. ipcluster parent), stop it
if ipycluster_process.poll() is None:
ipycluster_process.send_signal(signal.SIGINT)
try:
ipycluster_process.wait(timeout=15)
except subprocess.TimeoutExpired:
ipycluster_process.kill()
ipycluster_process.wait()
else:
# Using an already started cluster
yield
# Revert back
threeML_config.parallel.use_parallel = old_state
threeML_config.parallel.profile_name = old_profile
[docs]
def is_parallel_computation_active() -> bool:
return bool(threeML_config.parallel.use_parallel)
if has_parallel:
class ParallelClient(Client):
def __init__(self, *args, **kwargs) -> None:
"""Wrapper around the IPython Client class, which forces the use of
dill for object serialization.
:param args: same as IPython Client
:param kwargs: same as IPython Client
:return:
"""
# Just a wrapper around the IPython Client class
# forcing the use of dill for object serialization
# (more robust, and allows for serialization of class
# methods)
if "profile" not in kwargs.keys():
kwargs["profile"] = threeML_config.parallel.profile_name
super(ParallelClient, self).__init__(*args, **kwargs)
# This will propagate the use_dill to all running
# engines
_ = self.direct_view().use_dill()
[docs]
def get_number_of_engines(self):
return len(self.direct_view())
def _interactive_map(
self, worker, items_to_process, ordered=True, chunk_size=None
):
"""Subdivide the work among the active engines, taking care of
dividing it among them.
:param worker: the function to be applied
:param items_to_process: the items to apply the function to
:param ordered: whether to keep the order of output
(default: True). Using False can be much faster, but
you need to have a way to re-estabilish the order if
you care about it, after the fact.
:param chunk_size: determine how many items should an engine
process before reporting back. Use None for an
automatic choice.
:return: a AsyncResult object
"""
# Split the work evenly between the engines
n_total_engines = self.get_number_of_engines()
n_items = len(items_to_process)
# Get a load-balanced view with the appropriate number of engines
if n_items < n_total_engines:
log.warning("More engines than items to process")
# Limit the view to the needed engines
lview = self.load_balanced_view(range(n_items))
n_active_engines = n_items
chunk_size = 1
else:
# Use all engines
lview = self.load_balanced_view()
n_active_engines = n_total_engines
if chunk_size is None:
chunk_size = int(math.ceil(n_items / float(n_active_engines) / 20))
# We need this to keep the instance alive
self._current_amr = lview.imap(
worker,
items_to_process,
# chunksize=chunk_size,
ordered=ordered,
)
return self._current_amr
[docs]
def execute_with_progress_bar(
self, worker, items, chunk_size=None, name="progress"
):
# Let's make a wrapper which will allow us to recover the order
def wrapper(x):
id, item = x
return (id, worker(item))
items_wrapped = [(i, item) for i, item in enumerate(items)]
amr = self._interactive_map(
wrapper, items_wrapped, ordered=False, chunk_size=chunk_size
)
results = []
for i, res in enumerate(tqdm(amr, desc=name)):
results.append(res)
# Reorder the list according to the id
return list(map(lambda x: x[1], sorted(results, key=lambda x: x[0])))
else:
# NO parallel environment available. Make a dumb object to avoid import problems,
# but this object will never be really used because the context manager will not
# activate the parallel mode (see above)
[docs]
class ParallelClient(object):
def __init__(self, *args, **kwargs):
raise RuntimeError(
"No parallel environment and attempted to use the ParallelClient class,"
" which should never happen. Please open an issue at "
"https://github.com/threeML/threeML/issues"
)