Source code for threeML.plugins.UnbinnedPoissonLike

import types
from collections.abc import Iterable
from typing import Optional, Tuple, Union

import astromodels
import numba as nb
import numpy as np

from threeML.io.logging import setup_logger
from threeML.plugin_prototype import PluginPrototype

__instrument_name = "n.a."


log = setup_logger(__name__)

_tiny = np.float64(np.finfo(1.).tiny)


[docs]class EventObservation(object): def __init__( self, events: np.ndarray, exposure: float, start: Union[float, np.ndarray], stop: Union[float, np.ndarray], for_timeseries: bool = False ): self._events = np.array(events) self._exposure: float = exposure if isinstance(start, Iterable) or isinstance(stop, Iterable): assert isinstance(start, Iterable) assert isinstance(stop, Iterable) assert len(start) == len(stop) for i, v in enumerate(start): assert v < stop[i] self._start: np.ndarray = start self._stop: np.ndarray = stop self._is_multi_interval: bool = True else: assert start < stop self._start: float = float(start) self._stop: float = float(stop) self._is_multi_interval: bool = False self._n_events: int = len(self._events) log.debug(f"created event observation with") log.debug(f"{self._start} {self._stop}") self._for_timeseries = for_timeseries if for_timeseries: log.debug("This EventObservation is for a time series fit!") @property def events(self) -> np.ndarray: return self._events @property def n_events(self) -> int: return self._n_events @property def exposure(self) -> float: return self._exposure @property def start(self) -> Union[float, np.ndarray]: return self._start @property def stop(self) -> Union[float, np.ndarray]: return self._stop @property def is_multi_interval(self) -> bool: return self._is_multi_interval @property def for_timeseries(self) -> bool: return self._for_timeseries
[docs]class UnbinnedPoissonLike(PluginPrototype): def __init__( self, name: str, observation: EventObservation, source_name: Optional[str] = None, ) -> None: """ This is a generic likelihood for unbinned Poisson data. It is very slow for many events. :param name: the plugin name :param observation: and EventObservation container :param source_name: option source name to apply to the source """ assert isinstance(observation, EventObservation) self._observation: EventObservation = observation self._source_name: str = source_name self._n_events: int = self._observation.n_events if self._observation.for_timeseries: total_dt = 0 if self._observation.is_multi_interval: for start, stop in zip(self._observation.start, self._observation.stop): total_dt += stop-start else: total_dt = self._observation.stop-self._observation.start self._dead_corr = self._observation.exposure/total_dt else: self._dead_corr = 1. super(UnbinnedPoissonLike, self).__init__( name=name, nuisance_parameters={})
[docs] def set_model(self, model: astromodels.Model) -> None: """ Set the model to be used in the joint minimization. Must be a LikelihoodModel instance. """ self._like_model: astromodels.Model = model # We assume there are no extended sources, since we cannot handle them here assert self._like_model.get_number_of_extended_sources() == 0, ( "SpectrumLike plugins do not support " "extended sources" ) # check if we set a source name that the source is in the model if self._source_name is not None: assert self._source_name in self._like_model.sources, ( "Source %s is not contained in " "the likelihood model" % self._source_name ) differential, integral = self._get_diff_and_integral(self._like_model) self._integral_model = integral self._model = differential
def _get_diff_and_integral( self, likelihood_model: astromodels.Model ) -> Tuple[types.FunctionType, types.FunctionType]: if self._source_name is None: n_point_sources = likelihood_model.get_number_of_point_sources() # Make a function which will stack all point sources (OGIP do not support spatial dimension) def differential(energies): fluxes = likelihood_model.get_point_source_fluxes( 0, energies, tag=self._tag ) # If we have only one point source, this will never be executed for i in range(1, n_point_sources): fluxes += likelihood_model.get_point_source_fluxes( i, energies, tag=self._tag ) return fluxes else: # This SpectrumLike dataset refers to a specific source # Note that we checked that self._source_name is in the model when the model was set try: def differential_flux(energies): return likelihood_model.sources[self._source_name]( energies, tag=self._tag ) except KeyError: raise KeyError( "This plugin has been assigned to source %s, " "which does not exist in the current model" % self._source_name ) # New way with simpson rule. # Make sure to not calculate the model twice for the same energies def integral(e1, e2): # Simpson's rule # single energy values given return ( (e2 - e1) / 6.0 * ( differential(e1) + 4 * differential((e2 + e1) / 2.0) + differential(e2) ) ) return differential, integral
[docs] def get_log_like(self) -> float: """ Return the value of the log-likelihood with the current values for the parameters """ n_expected_counts: float = 0. if self._observation.is_multi_interval: for start, stop in zip(self._observation.start, self._observation.stop): n_expected_counts += self._integral_model(start, stop) else: n_expected_counts += self._integral_model( self._observation.start, self._observation.stop ) M = self._model(self._observation.events) negative_mask = M < 0 if negative_mask.sum() > 0: M[negative_mask] = 0.0 # use numba to sum the events sum_logM = _evaluate_logM_sum(M, self._n_events) minus_log_like = -n_expected_counts*self._dead_corr + sum_logM return minus_log_like
[docs] def inner_fit(self) -> float: """ This is used for the profile likelihood. Keeping fixed all parameters in the LikelihoodModel, this method minimize the logLike over the remaining nuisance parameters, i.e., the parameters belonging only to the model for this particular detector. If there are no nuisance parameters, simply return the logLike value. """ return self.get_log_like()
[docs] def get_number_of_data_points(self): return self._n_events
@nb.njit(fastmath=True) def _evaluate_logM_sum(M, size): # Evaluate the logarithm with protection for negative or small # numbers, using a smooth linear extrapolation (better than just a sharp # cutoff) non_tiny_mask = M > 2.0 * _tiny tink_mask = np.logical_not(non_tiny_mask) if tink_mask.sum() > 0: logM = np.zeros(size) logM[tink_mask] = (np.abs(M[tink_mask])/_tiny) + np.log(_tiny) - 1 logM[non_tiny_mask] = np.log(M[non_tiny_mask]) else: logM = np.log(M) return logM.sum()