import math
from typing import Optional, Literal
from packaging.version import Version
import numpy as np
from astromodels import use_astromodels_memoization
from threeML.bayesian.sampler_base import UnitCubeSampler
from threeML.config.config import threeML_config
from threeML.io.logging import setup_logger
from threeML.parallel.parallel_client import ParallelClient
try:
from dynesty import DynamicNestedSampler, NestedSampler
import dynesty
DYNESTY_DOC_URL = (
f"https://dynesty.readthedocs.io/en/v{dynesty.__version__}/api.html"
)
except Exception: # pragma: no cover
has_dynesty = False
else:
has_dynesty = True
log = setup_logger(__name__)
[docs]
def fill_docs(**kwargs):
def decorator(func):
if func.__doc__:
func.__doc__ = func.__doc__.format(**kwargs)
return func
return decorator
[docs]
class DynestyPool(object):
"""A simple wrapper for `dview`."""
def __init__(self, dview):
self.dview = dview
self.size = len(dview)
[docs]
def map(self, function, tasks):
return self.dview.map_sync(function, tasks)
[docs]
class DynestyNestedSampler(UnitCubeSampler):
def __init__(self, likelihood_model=None, data_list=None, **kwargs):
assert has_dynesty, "You must install Dynesty to use this sampler"
super(DynestyNestedSampler, self).__init__(
likelihood_model, data_list, **kwargs
)
[docs]
@fill_docs(BASE_URL=DYNESTY_DOC_URL)
def setup(
self,
nlive: int = 500,
bound: Optional[Literal["multi", "single", "none", "balls", "cubes"]] = "multi",
history_filename: Optional[str] = None,
**kwargs,
):
"""
Setup the Dynesty nested sampler.
All available parameters can be found in the respective version of
{BASE_URL}#dynesty.dynesty.NestedSampler
:param nlive: Number of live points. Defaults to 500.
:type nlive: int
:param bound: Method to approximately bound the prior using the current set of
live points. Options are "multi", "single", "none", "balls" or "cubes".
Defaults to "multi".
:param history_filename: Path to save the history. Defaults to None
:type history_filename: str
:param kwargs: Additional keyword arguments - must be same name and type as
paramters in constructor of the dynesty.NestedSampler class.
Defaults to the values used by dynesty.
:type kwargs: dict
"""
log.debug("Setup dynesty sampler")
if history_filename is not None:
if Version(dynesty.__version__) < Version("1.2.0"): # pragma: no cover
log.warning(
f"Your dynesty version is {dynesty.__version__} but "
+ "saving to a file was introduced in version 1.2.0. We will "
+ "ignore your input."
)
history_filename = None
self._sampler_kwargs = {}
self._kwargs = {}
self._kwargs["nlive"] = nlive
self._kwargs["bound"] = bound
self._kwargs["history_filename"] = history_filename
self._kwargs.update(kwargs)
self._is_setup = True
[docs]
def sample(self, quiet: bool = False, **kwargs):
"""Sample using the Dynesty NestedSampler class
:param quiet: verbosity. Defaults to False.
:type quiet: bool
:param kwargs: Additional keywords that get passed to the run_nested() function.
:type kwargs: dict
:rtype:
:returns:
"""
if not self._is_setup:
log.info("You forgot to setup the sampler!")
return
loud = not quiet
self._sampler_kwargs.update(kwargs)
self._update_free_parameters()
param_names = list(self._free_parameters.keys())
ndim = len(param_names)
self._kwargs["ndim"] = ndim
loglike, dynesty_prior = self._construct_unitcube_posterior(return_copy=True)
# check if we are doing to do things in parallel
if threeML_config["parallel"]["use_parallel"]:
c = ParallelClient()
view = c[:]
self._kwargs["pool"] = view
self._kwargs["queue_size"] = len(view)
sampler = NestedSampler(loglike, dynesty_prior, **self._kwargs)
self._sampler_kwargs["print_progress"] = loud
with use_astromodels_memoization(False):
log.debug("Start dynesty run")
sampler.run_nested(**self._sampler_kwargs)
log.debug("Dynesty run done")
self._sampler = sampler
results = self._sampler.results
# draw posterior samples
weights = np.exp(results["logwt"] - results["logz"][-1])
SQRTEPS = math.sqrt(float(np.finfo(np.float64).eps))
rstate = np.random
if abs(np.sum(weights) - 1.0) > SQRTEPS: # same tol as in np.random.choice.
raise ValueError("Weights do not sum to 1.")
# Make N subdivisions and choose positions with a consistent random offset.
nsamples = len(weights)
positions = (rstate.random() + np.arange(nsamples)) / nsamples
# Resample the data.
idx = np.zeros(nsamples, dtype=int)
cumulative_sum = np.cumsum(weights)
i, j = 0, 0
while i < nsamples:
if positions[i] < cumulative_sum[j]:
idx[i] = j
i += 1
else:
j += 1
samples_dynesty = results["samples"][idx]
self._raw_samples = samples_dynesty
# now do the same for the log likes
logl_dynesty = results["logl"][idx]
self._log_like_values = logl_dynesty
self._log_probability_values = self._log_like_values + np.array(
[self._log_prior(samples) for samples in self._raw_samples]
)
self._marginal_likelihood = self._sampler.results["logz"][-1] / np.log(10.0)
self._build_samples_dictionary()
self._build_results()
# Display results
if loud:
self._results.display()
# now get the marginal likelihood
return self.samples
[docs]
class DynestyDynamicSampler(UnitCubeSampler):
def __init__(self, likelihood_model=None, data_list=None, **kwargs):
assert has_dynesty, "You must install Dynesty to use this sampler"
super(DynestyDynamicSampler, self).__init__(
likelihood_model, data_list, **kwargs
)
[docs]
@fill_docs(BASE_URL=DYNESTY_DOC_URL)
def setup(
self,
nlive: int = 500,
history_filename=None,
**kwargs,
):
"""
Setup the Dynesty dynamic nested sampler.
All available parameters can be found in the respective version of
{BASE_URL}#dynesty.dynesty.DynamicNestedSampler
:param nlive: Number of live points used during the inital nested sampling run
:type nlive: int
:param history_filename: Path to save the history. Defaults to None
:type history_filename: str
:param kwargs: Additional keyword arguments - must be same name and type as
paramters in constructor of the dynesty.DynamicNestedSampler class.
Defaults to the values used by dynesty.
:type kwargs: dict
"""
log.debug("Setup dynesty dynamic sampler")
if history_filename is not None:
if Version(dynesty.__version__) < Version("1.2.0"): # pragma: no cover
log.warning(
f"Your dynesty version is {dynesty.__version__} but "
+ "saving to a file was introduced in version 1.2.0"
)
history_filename = None
self._kwargs = {}
self._sampler_kwargs = {}
self._kwargs["nlive"] = nlive
self._kwargs.update(kwargs)
self._is_setup = True
[docs]
def sample(self, quiet: bool = False, **kwargs):
"""Sample using the Dynestey DynamicNestedSampler class.
:param quiet: verbosity. Defaults to False.
:type quiet: bool
:param kwargs: Additional keywords that get passed to the run_nested() function.
:type kwargs: dict
:rtype:
:returns:
"""
if not self._is_setup:
log.info("You forgot to setup the sampler!")
return
loud = not quiet
self._sampler_kwargs.update(kwargs)
self._update_free_parameters()
param_names = list(self._free_parameters.keys())
ndim = len(param_names)
self._kwargs["ndim"] = ndim
loglike, dynesty_prior = self._construct_unitcube_posterior(return_copy=True)
# check if we are doing to do things in parallel
if threeML_config["parallel"]["use_parallel"]:
c = ParallelClient()
view = c[:]
self._kwargs["pool"] = view
self._kwargs["queue_size"] = len(view)
sampler = DynamicNestedSampler(loglike, dynesty_prior, **self._kwargs)
self._sampler_kwargs["print_progress"] = loud
with use_astromodels_memoization(False):
log.debug("Start dynestsy run")
sampler.run_nested(**self._sampler_kwargs)
log.debug("Dynesty run done")
self._sampler = sampler
results = self._sampler.results
# draw posterior samples
weights = np.exp(results["logwt"] - results["logz"][-1])
SQRTEPS = math.sqrt(float(np.finfo(np.float64).eps))
rstate = np.random
if abs(np.sum(weights) - 1.0) > SQRTEPS: # same tol as in np.random.choice.
raise ValueError("Weights do not sum to 1.")
# Make N subdivisions and choose positions with a consistent random offset.
nsamples = len(weights)
positions = (rstate.random() + np.arange(nsamples)) / nsamples
# Resample the data.
idx = np.zeros(nsamples, dtype=int)
cumulative_sum = np.cumsum(weights)
i, j = 0, 0
while i < nsamples:
if positions[i] < cumulative_sum[j]:
idx[i] = j
i += 1
else:
j += 1
samples_dynesty = results["samples"][idx]
self._raw_samples = samples_dynesty
# now do the same for the log likes
logl_dynesty = results["logl"][idx]
self._log_like_values = logl_dynesty
self._log_probability_values = self._log_like_values + np.array(
[self._log_prior(samples) for samples in self._raw_samples]
)
self._marginal_likelihood = self._sampler.results["logz"][-1] / np.log(10.0)
self._build_samples_dictionary()
self._build_results()
# Display results
if loud:
self._results.display()
# now get the marginal likelihood
return self.samples