__author__ = "grburgess"
import functools
import itertools
import numpy as np
from astromodels import use_astromodels_memoization
from joblib import Parallel, delayed
from threeML.config import threeML_config
from threeML.io.logging import setup_logger
from threeML.parallel.parallel_client import ParallelClient
from threeML.utils.progress_bar import tqdm
log = setup_logger(__name__)
[docs]
class GenericFittedSourceHandler(object):
def __init__(
self,
analysis_result,
new_function,
parameter_names,
parameters,
confidence_level,
equal_tailed,
*independent_variable_range
):
"""
A generic 3ML fitted source post-processor. This should be sub-classed in general
:param analysis_result: a 3ML analysis result
:param new_function: the function to use the fitted values to compute new values
:param parameter_names: a list of parameter names
:param parameters: astromodels parameter dictionary
:param confidence_level: the confidence level to compute error
:param independent_variable_range: the range(s) of independent values to compute the new function over
"""
# bind the class properties
self._analysis_results = analysis_result
self._analysis = analysis_result
self._independent_variable_range = independent_variable_range
self._cl = confidence_level
self._equal_tailed = equal_tailed
self._function = new_function
self._parameter_names = parameter_names
self._parameters = parameters
# if only 1-D then we must place into its own tuple to
# keep from confusing itertools
if len(self._independent_variable_range) == 1:
self._independent_variable_range = (
self._independent_variable_range[0],
)
# figure out the output shape of the best fit and errors
self._out_shape = tuple(map(len, self._independent_variable_range))
# construct the propagated function
self._build_propagated_function()
# fold the function through its independent values
self._evaluate()
def __add__(self, other):
"""
The basics of adding are handled in the VariatesContainer
:param other: another fitted source handler
:return: a VariatesContainer with the summed values
"""
# assure that the shapes will be the same
if other._out_shape != self._out_shape:
log.error("cannot sum together arrays with different shapes!")
raise RuntimeError()
# this will get the value container for the other values
return self.values + other.values
def __radd__(self, other):
if other == 0:
return self
else:
return self.values + other.values
def _transform(self, value):
"""
dummy transform to be overridden in a subclass
:param value:
:return: transformed value
"""
return value
[docs]
def update_tag(self, tag, value):
pass
def _build_propagated_function(self):
"""
builds a propagated function using RandomVariates propagation
:return:
"""
arguments = {}
# first test a parameters to check the number of samples
for par in list(self._parameters.values()):
if par.free:
test_par = par
break
else:
log.error("There are no free parameters in the model!")
raise RuntimeError()
test_variate = self._analysis_results.get_variates(test_par.path)
if len(test_variate) > threeML_config.point_source.max_number_samples:
choices = np.random.choice(
range(len(test_variate)),
size=threeML_config.point_source.max_number_samples,
)
# because we might be using composite functions,
# we have to keep track of parameter names in a non-elegant way
for par, name in zip(
list(self._parameters.values()), self._parameter_names
):
if par.free:
this_variate = self._analysis_results.get_variates(par.path)
# Do not use more than 1000 values (would make computation too slow for nothing)
if (
len(this_variate)
> threeML_config.point_source.max_number_samples
):
this_variate = this_variate[choices]
arguments[name] = this_variate
else:
# use the fixed value rather than a variate
arguments[name] = par.value
# create the propagtor
self._propagated_function = self._analysis_results.propagate(
self._function, **arguments
)
def _evaluate(self):
"""
calculate the best or mean fit of the new function or
quantity
:return:
"""
# if there are independent variables
if self._independent_variable_range:
variates = []
# scroll through the independent variables
n_iterations = np.product(self._out_shape)
with use_astromodels_memoization(False):
variables = list(
itertools.product(*self._independent_variable_range)
)
if len(variables) > 1:
if threeML_config.parallel.use_parallel:
def execute(v):
return self._propagated_function(*v)
client = ParallelClient()
view = client.load_balanced_view()
view.register_joblib_backend(make_default=True)
variates = Parallel()(
delayed(execute)(v)
for v in tqdm(variables, desc="Propagating error")
)
else:
for v in tqdm(variables, desc="Propagating errors"):
variates.append(self._propagated_function(*v))
else:
for v in variables:
variates.append(self._propagated_function(*v))
# otherwise just evaluate
else:
variates = self._propagated_function()
# create a variates container
self._propagated_variates = VariatesContainer(
variates,
self._out_shape,
self._cl,
self._transform,
self._equal_tailed,
)
@property
def values(self):
"""
:return: The VariatesContainer
"""
return self._propagated_variates
@property
def samples(self):
"""
:return: the raw samples of the variates
"""
return self._propagated_variates.samples
@property
def median(self):
"""
:return: the median of the variates
"""
return self._propagated_variates.median
@property
def average(self):
"""
:return: the average of the variates
"""
return self._propagated_variates.average
@property
def upper_error(self):
"""
:return: the upper error of the variates
"""
return self._propagated_variates.upper_error
@property
def lower_error(self):
"""
:return: the lower error of the variates
"""
return self._propagated_variates.lower_error
[docs]
class VariatesContainer(object):
def __init__(self, values, out_shape, cl, transform, equal_tailed=True):
"""
A container to store an *List* of RandomVariates and transform their outputs
to the appropriate shape. This cannot be done with normal numpy array operations
because an array of RandomVariates becomes a normal ndarray. Therefore, we calculate
the averages, errors, etc, and transform those.
Additionally, any unit association must be done post calculation as well because the
numpy array constructor sees a unit array as a regular array and again loses the RandomVariates
properties. Therefore, the transform method is used which applies a function to the output properties,
e.g., a unit association and or conversion.
:param values: a flat List of RandomVariates
:param out_shape: the array shape for the output variables
:param cl: the confidence level to calculate error intervals on
:param transform: a method to transform the outputs
:param equal_tailed: whether to use equal-tailed error intervals or not
"""
self._values = values # type: list
self._out_shape = out_shape # type: tuple
self._cl = cl # type: float
self._equal_tailed = equal_tailed # type: bool
self._transform = transform # type: callable
# calculate mean and median and transform them into the provided
# output shape
self._average = np.array([val.average for val in self._values])
self._average = self._average.reshape(self._out_shape)
self._median = np.array([val.median for val in self._values])
self._median = self._median.reshape(self._out_shape)
# construct the error intervals
upper_error = []
lower_error = []
# if equal tailed errors requested
if equal_tailed:
for val in self._values:
error = val.equal_tail_interval(self._cl)
upper_error.append(error[1])
lower_error.append(error[0])
else:
# else use the hdp
for val in self._values:
error = val.highest_posterior_density_interval(self._cl)
upper_error.append(error[1])
lower_error.append(error[0])
# reshape the errors into the output shape
self._upper_error = np.array(upper_error).reshape(self._out_shape)
self._lower_error = np.array(lower_error).reshape(self._out_shape)
samples = []
for val in self._values:
samples.append(val.samples)
n_samples = len(samples[0])
samples_shape = list(self._out_shape) + [n_samples]
self._samples_shape = tuple(samples_shape)
self._samples = np.array(samples).reshape(samples_shape)
@property
def values(self):
"""
:return: the list of of RandomVariates
"""
return self._values
@property
@transform
def samples(self):
"""
:return: the transformed raw samples
"""
return self._samples
@property
@transform
def average(self):
"""
:return: the transformed average
"""
return self._average
@property
@transform
def median(self):
"""
:return: the transformed median
"""
return self._median
@property
@transform
def upper_error(self):
"""
:return: the transformed upper error
"""
return self._upper_error
@property
@transform
def lower_error(self):
"""
:return: the transformed lower error
"""
return self._lower_error
def __add__(self, other):
"""
:param other:
:return:
"""
assert (
other._out_shape == self._out_shape
), "cannot sum together arrays with different shapes!"
# this will get the value container for the other values
other_values = other.values
summed_values = [v + vo for v, vo in zip(self._values, other_values)]
return VariatesContainer(
summed_values,
self._out_shape,
self._cl,
self._transform,
self._equal_tailed,
)
def __radd__(self, other):
if other == 0:
return self
else:
other_values = other.values
summed_values = [
v + vo for v, vo in zip(self._values, other_values)
]
return VariatesContainer(
summed_values,
self._out_shape,
self._cl,
self._transform,
self._equal_tailed,
)