Source code for threeML.plugins.experimental.CastroLike

from __future__ import division
from past.utils import old_div
from builtins import object
from threeML.plugin_prototype import PluginPrototype
from threeML.exceptions.custom_exceptions import custom_warnings

import numpy as np
import scipy.integrate
import scipy.interpolate
import scipy.optimize

import matplotlib.pyplot as plt

[docs]class IntervalContainer(object): def __init__( self, start, stop, parameter_values, likelihood_values, n_integration_points ): # Make sure there is no NaN or infinity assert np.all( np.isfinite(likelihood_values) ), "Infinity or NaN in likelihood values" likelihood_values = np.asarray(likelihood_values) parameter_values = np.asarray(parameter_values) self._start = start self._stop = stop # Make sure the number of integration points is uneven, and that there are at minimum 11 points # n_integration_points = max(int(n_integration_points), 11) if n_integration_points % 2 == 0: # n_points is even, it shouldn't be otherwise things like Simpson rule will have problems n_integration_points += 1 custom_warnings.warn( "The number of integration points should not be even. Adding +1" ) self._n_integration_points = int(n_integration_points) # Build interpolation of the likelihood curve self._minus_likelihood_interp = scipy.interpolate.InterpolatedUnivariateSpline( np.log10(parameter_values), -likelihood_values, k=1, ext=0 ) # Find maximum of loglike idx = likelihood_values.argmax() self._min_par_value = parameter_values.min() self._max_par_value = parameter_values.max() res = scipy.optimize.minimize_scalar( self._minus_likelihood_interp, bounds=(np.log10(self._min_par_value), np.log10(self._max_par_value)), method="bounded", options={"maxiter": 10000, "disp": True, "xatol": 1e-3}, ) # res = scipy.optimize.minimize(self._minus_likelihood_interp, x0=[np.log10(parameter_values[idx])], # jac=lambda x:self._minus_likelihood_interp.derivative(1)(x), # # bounds=(self._min_par_value, self._max_par_value), # # method='bounded', # tol=1e-3, # options={'maxiter': 10000, 'disp': True}) assert res.success, "Could not find minimum" self._minimum = (10 ** res.x, float( @property def start(self): return self._start @property def stop(self): return self._stop @property def n_integration_points(self): return self._n_integration_points def __call__(self, parameter_value): return -self._minus_likelihood_interp(np.log10(parameter_value))
[docs] def get_measurement( self, delta_log_like=0.5, ul_log_like=2.71 / 2.0, low_bound_extreme=0.0, hi_bound_extreme=np.inf, ): # Find when the likelihood changes by delta_log_like unit bounding_f = ( lambda x: self._minus_likelihood_interp(np.log10(x)) - self._minimum[1] - delta_log_like ) if bounding_f(self._min_par_value) <= 0: # This is an upper limit measurement, i.e., there is no lower bound on the confidence. Use # low_bound_extreme low_bound_cl = low_bound_extreme else: # Look for negative bound using BRENTQ low_bound_cl, res = scipy.optimize.brentq( bounding_f, self._min_par_value, self._minimum[0], full_output=True ) assert res.converged, "Could not find lower bound" if bounding_f(self._max_par_value) <= 0.0: # This is a lower limit measurement, i.e., there is no upper bound on the confidence. Use # hi_bound_extreme hi_bound_cl = hi_bound_extreme else: # If there was no lower limit, then compute upper bound for 95% confidence if low_bound_cl == low_bound_extreme: bounding_f = ( lambda x: self._minus_likelihood_interp(np.log10(x)) - self._minimum[1] - ul_log_like ) # Look for positive bound using BRENTQ hi_bound_cl, res = scipy.optimize.brentq( bounding_f, self._minimum[0], self._max_par_value, full_output=True ) assert res.converged, "Could not find upper bound" return low_bound_cl, self._minimum[0], hi_bound_cl
[docs]class CastroLike(PluginPrototype): def __init__(self, name, interval_containers): self._interval_containers = sorted(interval_containers, key=lambda x: x.start) # By default all containers are active self._active_containers = self._interval_containers self._likelihood_model = None self._all_xx, self._all_xx_split, self._splits = self._setup_x_values() super(CastroLike, self).__init__(name, {}) def _setup_x_values(self): # Create a list of all x values for each container xxs = [] splits = [] # This will keep the total number of x values, so we can check for overlapping intervals total_n = 0 # Loop over the active containers and fill the list for container in self._active_containers: xxs.append( np.logspace( np.log10(container.start), np.log10(container.stop), container.n_integration_points, ) ) total_n += container.n_integration_points splits.append(total_n) all_xx = np.concatenate(xxs) assert ( all_xx.shape[0] == total_n ), "One or more containers are overlapping. This is not supported." return all_xx, np.split(all_xx, splits), splits
[docs] def set_active_measurements(self, tmin, tmax): self._active_containers = [] for interval_container in self._interval_containers: if interval_container.start >= tmin and interval_container.stop <= tmax: self._active_containers.append(interval_container) # Reset the global xx self._all_xx, self._all_xx_split, self._splits = self._setup_x_values() return len(self._active_containers)
@property def start(self): return min([x.start for x in self._active_containers]) @property def stop(self): return max([x.stop for x in self._active_containers]) @property def active_containers(self): return self._active_containers
[docs] def set_model(self, likelihood_model_instance): """ Set the model to be used in the joint minimization. Must be a LikelihoodModel instance. """ self._likelihood_model = likelihood_model_instance
[docs] def get_log_like(self): """ Return the value of the log-likelihood with the current values for the parameters """ log_l = 0.0 # Evaluate once for all all_yy = np.split( self._likelihood_model.get_total_flux(self._all_xx), self._splits ) for i, interval_container in enumerate(self._active_containers): # Get integral of model between start and stop for this interval # xx = np.logspace(np.log10(interval_container.start), # np.log10(interval_container.stop), # interval_container.n_integration_points) # # yy = self._likelihood_model.get_total_flux(xx) # # assert np.allclose(yy, all_yy[i]) xx = self._all_xx_split[i] yy = all_yy[i] length = interval_container.stop - interval_container.start expected_flux = old_div(scipy.integrate.simps(yy, xx), length) this_log_l = interval_container(expected_flux) log_l += this_log_l return log_l
[docs] def inner_fit(self): """ This is used for the profile likelihood. Keeping fixed all parameters in the LikelihoodModel, this method minimize the logLike over the remaining nuisance parameters, i.e., the parameters belonging only to the model for this particular detector. If there are no nuisance parameters, simply return the logLike value. """ return self.get_log_like()
@staticmethod def _plot(containers, sub, color): xs = [] xerrs = [] ys = [] yerrs = [[], []] uls_xs = [] uls_xerrs = [] uls_ys = [] uls_yerrs = [] for interval_container in containers: t1, t2 = interval_container.start, interval_container.stop tc = (t2 + t1) / 2.0 dt = (t2 - t1) / 2.0 y_low, y, y_hi = interval_container.get_measurement() if y_low > 0.0: # A normal point xs.append(tc) xerrs.append(dt) ys.append(y) yerrs[0].append((y - y_low)) yerrs[1].append((y_hi - y)) else: # Upper limit uls_xs.append(tc) uls_xerrs.append(dt) uls_ys.append(y_hi) # Make an errorbar that is constant length in log space dy_ = np.log10(y_hi) - 0.2 dy = y_hi - 10 ** dy_ uls_yerrs.append(dy) sub.errorbar(xs, ys, xerr=xerrs, yerr=yerrs, fmt=",", ecolor=color, mfc=color) sub.errorbar( uls_xs, uls_ys, xerr=uls_xerrs, yerr=uls_yerrs, uplims=True, fmt=",", ecolor=color, mfc=color, ) xxs = np.append(xs, uls_xs) xxerrs = np.append(xerrs, uls_xerrs) idx = np.argsort(xxs) xxs = xxs[idx] xxerrs = xxerrs[idx] return xxs, xxerrs
[docs] def plot(self, plot_model=True, n_points=1000, fig=None, sub=None): if fig is None: fig, sub = plt.subplots() else: if sub is None: assert len(fig.axes) > 0 sub = fig.axes[0] xs, xerrs = self._plot(self._active_containers, sub, "blue") # Find out which containers are not active inactive_containers = [] for container in self._interval_containers: if container not in self._active_containers: inactive_containers.append(container) if len(inactive_containers) > 0: self._plot(inactive_containers, sub, "gray") sub.set_xscale("log") sub.set_yscale("log") if plot_model: xs = np.asarray(xs) xerrs = np.asarray(xerrs) min_idx = xs.argmin() max_idx = xs.argmax() xx = np.logspace( np.log10(xs[min_idx] - xerrs[min_idx]), np.log10(xs[max_idx] + xerrs[max_idx]), n_points, ) yy = self._likelihood_model.get_total_flux(xx) _ = sub.plot(xx, yy, linestyle="--", color="red") return fig