Source code for threeML.utils.spectrum.spectrum_likelihood

from builtins import object
import copy
from typing import Optional

import numpy as np
import numba as nb


from threeML.utils.numba_utils import nb_sum
from threeML.utils.statistics.likelihood_functions import half_chi2
from threeML.utils.statistics.likelihood_functions import (
    poisson_log_likelihood_ideal_bkg,
)
from threeML.utils.statistics.likelihood_functions import (
    poisson_observed_gaussian_background,
)
from threeML.utils.statistics.likelihood_functions import (
    poisson_observed_poisson_background,
)

from threeML.io.logging import setup_logger

log = setup_logger(__name__)

# These classes provide likelihood evaluation to SpectrumLike and children

_known_noise_models = {}


[docs]class BinnedStatistic(object): def __init__(self, spectrum_plugin): """ A class to hold the likelihood call and randomization of spectrum counts :param spectrum_plugin: the spectrum plugin to call """ self._spectrum_plugin = spectrum_plugin
[docs] def get_current_value(self): RuntimeError("must be implemented in subclass")
[docs] def get_randomized_source_counts(self, source_model_counts): return None
[docs] def get_randomized_source_errors(self): return None
[docs] def get_randomized_background_counts(self): return None
[docs] def get_randomized_background_errors(self): return None
[docs]class GaussianObservedStatistic(BinnedStatistic):
[docs] def get_current_value(self, precalc_fluxes: Optional[np.array]=None): model_counts = self._spectrum_plugin.get_model(precalc_fluxes=precalc_fluxes) chi2_ = half_chi2( self._spectrum_plugin.current_observed_counts, self._spectrum_plugin.current_observed_count_errors, model_counts, ) assert np.all(np.isfinite(chi2_)) return nb_sum(chi2_) * (-1), None
[docs] def get_randomized_source_counts(self, source_model_counts): idx = self._spectrum_plugin.observed_count_errors > 0 randomized_source_counts = np.zeros_like(source_model_counts) randomized_source_counts[idx] = np.random.normal( loc=source_model_counts[idx], scale=self._spectrum_plugin.observed_count_errors[idx], ) # Issue a warning if the generated background is less than zero, and fix it by placing it at zero idx = randomized_source_counts < 0 # type: np.ndarray negative_source_n = nb_sum(idx) if negative_source_n > 0: log.warning( "Generated source has negative counts " "in %i channels. Fixing them to zero" % (negative_source_n) ) randomized_source_counts[idx] = 0 return randomized_source_counts
[docs] def get_randomized_source_errors(self): return self._spectrum_plugin.observed_count_errors
[docs]class PoissonObservedIdealBackgroundStatistic(BinnedStatistic):
[docs] def get_current_value(self, precalc_fluxes: Optional[np.array]=None): # In this likelihood the background becomes part of the model, which means that # the uncertainty in the background is completely neglected model_counts = self._spectrum_plugin.get_model(precalc_fluxes=precalc_fluxes) loglike, _ = poisson_log_likelihood_ideal_bkg( self._spectrum_plugin.current_observed_counts, self._spectrum_plugin.current_scaled_background_counts, model_counts, ) return nb_sum(loglike), None
[docs] def get_randomized_source_counts(self, source_model_counts): # Randomize expectations for the source # we want the unscalled background counts # TODO: check with giacomo if this is correct! randomized_source_counts = np.random.poisson( source_model_counts + self._spectrum_plugin._background_counts ) return randomized_source_counts
[docs] def get_randomized_background_counts(self): # No randomization for the background in this case randomized_background_counts = self._spectrum_plugin._background_counts return randomized_background_counts
[docs]class PoissonObservedModeledBackgroundStatistic(BinnedStatistic):
[docs] def get_current_value(self, precalc_fluxes: Optional[np.array]=None): # In this likelihood the background becomes part of the model, which means that # the uncertainty in the background is completely neglected model_counts = self._spectrum_plugin.get_model(precalc_fluxes=precalc_fluxes) # we scale the background model to the observation background_model_counts = ( self._spectrum_plugin.get_background_model() * self._spectrum_plugin.scale_factor ) loglike, _ = poisson_log_likelihood_ideal_bkg( self._spectrum_plugin.current_observed_counts, background_model_counts, model_counts, ) bkg_log_like = self._spectrum_plugin.background_plugin.get_log_like() total_log_like = nb_sum(loglike) + bkg_log_like return total_log_like, None
[docs] def get_randomized_source_counts(self, source_model_counts): # first generate random source counts from the plugin self._synthetic_background_plugin = ( self._spectrum_plugin.background_plugin.get_simulated_dataset() ) randomized_source_counts = np.random.poisson( source_model_counts + self._synthetic_background_plugin.observed_counts ) return randomized_source_counts
[docs] def get_randomized_background_errors(self): randomized_background_count_err = None if not self._synthetic_background_plugin.observed_spectrum.is_poisson: randomized_background_count_err = ( self._synthetic_background_plugin.observed_count_errors ) return randomized_background_count_err
@property def synthetic_background_plugin(self): return self._synthetic_background_plugin
[docs]class PoissonObservedNoBackgroundStatistic(BinnedStatistic):
[docs] def get_current_value(self, precalc_fluxes: Optional[np.array]=None): # In this likelihood the background becomes part of the model, which means that # the uncertainty in the background is completely neglected model_counts = self._spectrum_plugin.get_model(precalc_fluxes=precalc_fluxes) background_model_counts = np.zeros_like(model_counts) loglike, _ = poisson_log_likelihood_ideal_bkg( self._spectrum_plugin.current_observed_counts, background_model_counts, model_counts, ) return nb_sum(loglike), None
[docs] def get_randomized_source_counts(self, source_model_counts): # Randomize expectations for the source # we want the unscalled background counts randomized_source_counts = np.random.poisson(source_model_counts) return randomized_source_counts
[docs]class PoissonObservedPoissonBackgroundStatistic(BinnedStatistic):
[docs] def get_current_value(self, precalc_fluxes: Optional[np.array]=None): # Scale factor between source and background spectrum model_counts = self._spectrum_plugin.get_model(precalc_fluxes=precalc_fluxes) loglike, bkg_model = poisson_observed_poisson_background( self._spectrum_plugin.current_observed_counts, self._spectrum_plugin.current_background_counts, self._spectrum_plugin.scale_factor, model_counts, ) return nb_sum(loglike), bkg_model
[docs] def get_randomized_source_counts(self, source_model_counts): # Since we use a profile likelihood, the background model is conditional on the source model, so let's # get it from the likelihood function _, background_model_counts = self.get_current_value() # Now randomize the expectations # Randomize expectations for the source randomized_source_counts = np.random.poisson( source_model_counts + background_model_counts ) return randomized_source_counts
[docs] def get_randomized_background_counts(self): # Randomize expectations for the background _, background_model_counts = self.get_current_value() randomized_background_counts = np.random.poisson(background_model_counts) return randomized_background_counts
[docs]class PoissonObservedGaussianBackgroundStatistic(BinnedStatistic):
[docs] def get_current_value(self, precalc_fluxes: Optional[np.array]=None): expected_model_counts = self._spectrum_plugin.get_model(precalc_fluxes=precalc_fluxes) loglike, bkg_model = poisson_observed_gaussian_background( self._spectrum_plugin.current_observed_counts, self._spectrum_plugin.current_background_counts, self._spectrum_plugin.current_background_count_errors, expected_model_counts, ) return nb_sum(loglike), bkg_model
[docs] def get_randomized_source_counts(self, source_model_counts): # Since we use a profile likelihood, the background model is conditional on the source model, so let's # get it from the likelihood function _, background_model_counts = self.get_current_value() # Now randomize the expectations # Randomize expectations for the source randomized_source_counts = np.random.poisson( source_model_counts + background_model_counts ) return randomized_source_counts
[docs] def get_randomized_background_counts(self): # Now randomize the expectations. _, background_model_counts = self.get_current_value() # We cannot generate variates with zero sigma. They variates from those channel will always be zero # This is a limitation of this whole idea. However, remember that by construction an error of zero # it is only allowed when the background counts are zero as well. idx = self._spectrum_plugin.background_count_errors > 0 randomized_background_counts = np.zeros_like(background_model_counts) randomized_background_counts[idx] = np.random.normal( loc=background_model_counts[idx], scale=self._spectrum_plugin.background_count_errors[idx], ) # Issue a warning if the generated background is less than zero, and fix it by placing it at zero idx = randomized_background_counts < 0 # type: np.ndarray negative_background_n = nb_sum(idx) if negative_background_n > 0: log.warning( "Generated background has negative counts " "in %i channels. Fixing them to zero" % (negative_background_n) ) randomized_background_counts[idx] = 0 return randomized_background_counts
[docs] def get_randomized_background_errors(self): return copy.copy(self._spectrum_plugin.background_count_errors)
statistic_lookup = { "poisson": { "poisson": PoissonObservedPoissonBackgroundStatistic, "gaussian": PoissonObservedGaussianBackgroundStatistic, "ideal": PoissonObservedIdealBackgroundStatistic, None: PoissonObservedNoBackgroundStatistic, "modeled": PoissonObservedModeledBackgroundStatistic, }, "gaussian": {None: GaussianObservedStatistic}, None: {None: None}, }