import copy
from builtins import object
from typing import Optional
import numba as nb
import numpy as np
from threeML.io.logging import setup_logger
from threeML.utils.numba_utils import nb_sum
from threeML.utils.statistics.likelihood_functions import (
half_chi2, poisson_log_likelihood_ideal_bkg,
poisson_observed_gaussian_background, poisson_observed_poisson_background)
log = setup_logger(__name__)
# These classes provide likelihood evaluation to SpectrumLike and children
_known_noise_models = {}
[docs]
class BinnedStatistic(object):
def __init__(self, spectrum_plugin):
"""
A class to hold the likelihood call and randomization of spectrum counts
:param spectrum_plugin: the spectrum plugin to call
"""
self._spectrum_plugin = spectrum_plugin
[docs]
def get_current_value(self):
RuntimeError("must be implemented in subclass")
[docs]
def get_randomized_source_counts(self, source_model_counts):
return None
[docs]
def get_randomized_source_errors(self):
return None
[docs]
def get_randomized_background_counts(self):
return None
[docs]
def get_randomized_background_errors(self):
return None
[docs]
class GaussianObservedStatistic(BinnedStatistic):
[docs]
def get_current_value(self, precalc_fluxes: Optional[np.array]=None):
model_counts = self._spectrum_plugin.get_model(precalc_fluxes=precalc_fluxes)
chi2_ = half_chi2(
self._spectrum_plugin.current_observed_counts,
self._spectrum_plugin.current_observed_count_errors,
model_counts,
)
assert np.all(np.isfinite(chi2_))
return nb_sum(chi2_) * (-1), None
[docs]
def get_randomized_source_counts(self, source_model_counts):
if not np.isfinite(source_model_counts[0]):
source_model_counts[0] = 0
log.warning("simulated spectrum had infinite counts in first channel")
log.warning("setting to ZERO")
idx = self._spectrum_plugin.observed_count_errors > 0
randomized_source_counts = np.zeros_like(source_model_counts)
randomized_source_counts[idx] = np.random.normal(
loc=source_model_counts[idx],
scale=self._spectrum_plugin.observed_count_errors[idx],
)
# Issue a warning if the generated background is less than zero, and fix it by placing it at zero
idx = randomized_source_counts < 0 # type: np.ndarray
negative_source_n = nb_sum(idx)
if negative_source_n > 0:
log.warning(
"Generated source has negative counts "
"in %i channels. Fixing them to zero" % (negative_source_n)
)
randomized_source_counts[idx] = 0
return randomized_source_counts
[docs]
def get_randomized_source_errors(self):
return self._spectrum_plugin.observed_count_errors
[docs]
class PoissonObservedIdealBackgroundStatistic(BinnedStatistic):
[docs]
def get_current_value(self, precalc_fluxes: Optional[np.array]=None):
# In this likelihood the background becomes part of the model, which means that
# the uncertainty in the background is completely neglected
model_counts = self._spectrum_plugin.get_model(precalc_fluxes=precalc_fluxes)
loglike, _ = poisson_log_likelihood_ideal_bkg(
self._spectrum_plugin.current_observed_counts,
self._spectrum_plugin.current_scaled_background_counts,
model_counts,
)
return nb_sum(loglike), None
[docs]
def get_randomized_source_counts(self, source_model_counts):
# Randomize expectations for the source
# we want the unscalled background counts
# TODO: check with giacomo if this is correct!
if not np.isfinite(source_model_counts[0]):
source_model_counts[0] = 0
log.warning("simulated spectrum had infinite counts in first channel")
log.warning("setting to ZERO")
randomized_source_counts = np.random.poisson(
source_model_counts + self._spectrum_plugin._background_counts
)
return randomized_source_counts
[docs]
def get_randomized_background_counts(self):
# No randomization for the background in this case
randomized_background_counts = self._spectrum_plugin._background_counts
return randomized_background_counts
[docs]
class PoissonObservedModeledBackgroundStatistic(BinnedStatistic):
[docs]
def get_current_value(self, precalc_fluxes: Optional[np.array]=None):
# In this likelihood the background becomes part of the model, which means that
# the uncertainty in the background is completely neglected
model_counts = self._spectrum_plugin.get_model(precalc_fluxes=precalc_fluxes)
# we scale the background model to the observation
background_model_counts = (
self._spectrum_plugin.get_background_model()
* self._spectrum_plugin.scale_factor
)
loglike, _ = poisson_log_likelihood_ideal_bkg(
self._spectrum_plugin.current_observed_counts,
background_model_counts,
model_counts,
)
bkg_log_like = self._spectrum_plugin.background_plugin.get_log_like()
total_log_like = nb_sum(loglike) + bkg_log_like
return total_log_like, None
[docs]
def get_randomized_source_counts(self, source_model_counts):
# first generate random source counts from the plugin
self._synthetic_background_plugin = (
self._spectrum_plugin.background_plugin.get_simulated_dataset()
)
if not np.isfinite(source_model_counts[0]):
source_model_counts[0] = 0
log.warning("simulated spectrum had infinite counts in first channel")
log.warning("setting to ZERO")
randomized_source_counts = np.random.poisson(
source_model_counts + self._synthetic_background_plugin.observed_counts
)
return randomized_source_counts
[docs]
def get_randomized_background_errors(self):
randomized_background_count_err = None
if not self._synthetic_background_plugin.observed_spectrum.is_poisson:
randomized_background_count_err = (
self._synthetic_background_plugin.observed_count_errors
)
return randomized_background_count_err
@property
def synthetic_background_plugin(self):
return self._synthetic_background_plugin
[docs]
class PoissonObservedNoBackgroundStatistic(BinnedStatistic):
[docs]
def get_current_value(self, precalc_fluxes: Optional[np.array]=None):
# In this likelihood the background becomes part of the model, which means that
# the uncertainty in the background is completely neglected
model_counts = self._spectrum_plugin.get_model(precalc_fluxes=precalc_fluxes)
background_model_counts = np.zeros_like(model_counts)
loglike, _ = poisson_log_likelihood_ideal_bkg(
self._spectrum_plugin.current_observed_counts,
background_model_counts,
model_counts,
)
return nb_sum(loglike), None
[docs]
def get_randomized_source_counts(self, source_model_counts):
# Randomize expectations for the source
# we want the unscalled background counts
if not np.isfinite(source_model_counts[0]):
source_model_counts[0] = 0
log.warning("simulated spectrum had infinite counts in first channel")
log.warning("setting to ZERO")
randomized_source_counts = np.random.poisson(source_model_counts)
return randomized_source_counts
[docs]
class PoissonObservedPoissonBackgroundStatistic(BinnedStatistic):
[docs]
def get_current_value(self, precalc_fluxes: Optional[np.array]=None):
# Scale factor between source and background spectrum
model_counts = self._spectrum_plugin.get_model(precalc_fluxes=precalc_fluxes)
loglike, bkg_model = poisson_observed_poisson_background(
self._spectrum_plugin.current_observed_counts,
self._spectrum_plugin.current_background_counts,
self._spectrum_plugin.scale_factor,
model_counts,
)
return nb_sum(loglike), bkg_model
[docs]
def get_randomized_source_counts(self, source_model_counts):
# Since we use a profile likelihood, the background model is conditional on the source model, so let's
# get it from the likelihood function
_, background_model_counts = self.get_current_value()
if not np.isfinite(source_model_counts[0]):
source_model_counts[0] = 0
log.warning("simulated spectrum had infinite counts in first channel")
log.warning("setting to ZERO")
# Now randomize the expectations
# Randomize expectations for the source
randomized_source_counts = np.random.poisson(
source_model_counts + background_model_counts
)
return randomized_source_counts
[docs]
def get_randomized_background_counts(self):
# Randomize expectations for the background
_, background_model_counts = self.get_current_value()
# scale the background to the scale factor
randomized_background_counts = np.random.poisson(background_model_counts / self._spectrum_plugin.scale_factor )
return randomized_background_counts
[docs]
class PoissonObservedGaussianBackgroundStatistic(BinnedStatistic):
[docs]
def get_current_value(self, precalc_fluxes: Optional[np.array]=None):
expected_model_counts = self._spectrum_plugin.get_model(precalc_fluxes=precalc_fluxes)
loglike, bkg_model = poisson_observed_gaussian_background(
self._spectrum_plugin.current_observed_counts,
self._spectrum_plugin.current_background_counts * self._spectrum_plugin.scale_factor,
self._spectrum_plugin.current_background_count_errors * self._spectrum_plugin.scale_factor,
expected_model_counts,
)
return nb_sum(loglike), bkg_model
[docs]
def get_randomized_source_counts(self, source_model_counts):
# Since we use a profile likelihood, the background model is conditional on the source model, so let's
# get it from the likelihood function
_, background_model_counts = self.get_current_value()
# a bad background model can produce
# more background counts than observed counts
# which results in negative background model counts
# we will filter that
idx = background_model_counts < 0
background_model_counts[idx] = 0.
if np.any(np.isnan(background_model_counts)):
log.error("NaN count in background model counts")
log.error(f"{background_model_counts}")
raise RuntimeError()
if not np.all(background_model_counts >= 0):
log.error("negative count in background model counts")
log.error(f"{background_model_counts}")
raise RuntimeError()
if not np.isfinite(source_model_counts[0]):
source_model_counts[0] = 0
log.warning("simulated spectrum had infinite counts in first channel")
log.warning("setting to ZERO")
# Now randomize the expectations
# Randomize expectations for the source
randomized_source_counts = np.random.poisson(
source_model_counts + background_model_counts
)
return randomized_source_counts
[docs]
def get_randomized_background_counts(self):
# Now randomize the expectations.
_, background_model_counts = self.get_current_value()
# We cannot generate variates with zero sigma. They variates from those channel will always be zero
# This is a limitation of this whole idea. However, remember that by construction an error of zero
# it is only allowed when the background counts are zero as well.
idx = self._spectrum_plugin.background_count_errors > 0
randomized_background_counts = np.zeros_like(background_model_counts)
randomized_background_counts[idx] = np.random.normal(
loc=background_model_counts[idx],
scale=self._spectrum_plugin.background_count_errors[idx],
)
# Issue a warning if the generated background is less than zero, and fix it by placing it at zero
idx = randomized_background_counts < 0 # type: np.ndarray
negative_background_n = nb_sum(idx)
if negative_background_n > 0:
log.warning(
"Generated background has negative counts "
"in %i channels. Fixing them to zero" % (negative_background_n)
)
randomized_background_counts[idx] = 0
return randomized_background_counts
[docs]
def get_randomized_background_errors(self):
return copy.copy(self._spectrum_plugin.background_count_errors)
[docs]
class NotAvailableStatistic(object):
def __init__(self, spectrum_plugin):
"""
"""
log.error('The required statistic is currently restricted to the IXPE plugin only.')
raise RuntimeError()
try:
from ixpe.likelihood import GaussianObservedGaussianBackgroundStatistic
log.info('IXPE plugin found. Enabling Gaussian source with Gaussian background.')
except:
GaussianObservedGaussianBackgroundStatistic = NotAvailableStatistic
statistic_lookup = {
"poisson": {
"poisson": PoissonObservedPoissonBackgroundStatistic,
"gaussian": PoissonObservedGaussianBackgroundStatistic,
"ideal": PoissonObservedIdealBackgroundStatistic,
None: PoissonObservedNoBackgroundStatistic,
"modeled": PoissonObservedModeledBackgroundStatistic,
},
"gaussian": {None: GaussianObservedStatistic,
'gaussian':GaussianObservedGaussianBackgroundStatistic},
None: {None: None},
}