import collections
import copy
import types
from collections.abc import Iterable
from contextlib import contextmanager
from typing import Any, Dict, List, Optional, Tuple, Union
import matplotlib
import matplotlib.pyplot as plt
import numba as nb
import numpy as np
import pandas as pd
from astromodels import Model, PointSource, clone_model
from astromodels.core.parameter import Parameter
from astromodels.functions.priors import Truncated_gaussian, Uniform_prior
from past.utils import old_div
from threeML.config.config import threeML_config
from threeML.config.plotting_structure import BinnedSpectrumPlot
from threeML.exceptions.custom_exceptions import NegativeBackground
from threeML.io.logging import setup_logger
from threeML.io.package_data import get_path_of_data_file
from threeML.io.plotting.data_residual_plot import ResidualPlot
from threeML.io.plotting.light_curve_plots import (
channel_plot,
disjoint_patch_plot,
)
from threeML.io.rich_display import display
from threeML.plugin_prototype import PluginPrototype
from threeML.plugins.XYLike import XYLike
from threeML.utils.binner import Rebinner
from threeML.utils.spectrum.binned_spectrum import (
BinnedSpectrum,
ChannelSet,
Quality,
)
from threeML.utils.spectrum.pha_spectrum import PHASpectrum
from threeML.utils.spectrum.spectrum_likelihood import statistic_lookup
from threeML.utils.statistics.stats_tools import Significance
from threeML.utils.string_utils import dash_separated_string_to_tuple
if threeML_config.plotting.use_threeml_style:
plt.style.use(str(get_path_of_data_file("threeml.mplstyle")))
log = setup_logger(__name__)
NO_REBIN = 1e-99
__instrument_name = "General binned spectral data"
# This defines the known noise models for source and/or background spectra
_known_noise_models = ["poisson", "gaussian", "ideal", "modeled"]
[docs]
class SpectrumLike(PluginPrototype):
def __init__(
self,
name: str,
observation: BinnedSpectrum,
background: Optional[
Union[BinnedSpectrum, XYLike, "SpectrumLike"]
] = None,
verbose: bool = True,
background_exposure=None,
tstart: Optional[Union[float, int]] = None,
tstop: Optional[Union[float, int]] = None,
) -> None:
"""
A plugin for generic spectral data, accepts an observed binned spectrum,
and a background binned spectrum or plugin with the background data.
In the case of a binned background spectrum, the background model is profiled
out and the appropriate profile-likelihood is used to fit the total spectrum. In this
case, caution must be used when there are zero background counts in bins as the
profiled background parameters (one per channel) will then have zero information from which to
constrain the background. It is recommended to bin the spectrum such that there is one background count
per channel.
If either an SpectrumLike or XYLike instance is provided as background, it is assumed that this is the
background data and the likelihood model from this plugin is used to simultaneously fit the background
and source.
:param name: the plugin name
:param observation: the observed spectrum
:param background: the background spectrum or a plugin from which the background will be modeled
:param background_exposure: (optional) adjust the background exposure of the modeled background data comes from and
XYLike plugin
:param verbose: turn on/off verbose logging
"""
# Just a toggle for verbosity
self._verbose: bool = bool(verbose)
self._name: str = name
if not isinstance(observation, BinnedSpectrum):
log.error(
"The observed spectrum is not an instance of BinnedSpectrum"
)
# Precomputed observed (for speed)
self._observed_spectrum: BinnedSpectrum = observation
self._has_contiguous_energies: bool = observation.is_contiguous()
self._predefined_energies: np.ndarray = observation.edges
self._observed_counts: np.ndarray = self._observed_spectrum.counts
# initialize the background
background_parameters = self._background_setup(background, observation)
# unpack the parameters
(
self._background_spectrum,
self._background_plugin,
self._background_counts,
self._scaled_background_counts,
) = background_parameters
# Init everything else to None
self._like_model: Optional[Model] = None
self._rebinner: Optional[Rebinner] = None
self._source_name: Optional[str] = None
self._stokes = None
# probe the noise models and then setup the appropriate count errors
(
self._observation_noise_model,
self._background_noise_model,
) = self._probe_noise_models()
(
self._observed_count_errors,
self._back_count_errors,
) = self._count_errors_initialization()
# Init the integral methods for background and model integration to default
self._model_integrate_method: str = "simpson"
self._background_integrate_method: str = "simpson"
# Initialize a mask that selects all the data.
# We will initially use the quality mask for the PHA file
# and set any quality greater than 0 to False. We want to save
# the native quality so that we can warn the user if they decide to
# select channels that were flagged as bad.
self._mask: np.ndarray = np.asarray(
np.ones(self._observed_spectrum.n_channels), bool
)
# Now create the nuisance parameter for the effective area correction, which is fixed
# by default. This factor multiplies the model so that it can account for calibration uncertainties on the
# global effective area. By default it is limited to stay within 20%
self._nuisance_parameter: Parameter = Parameter(
"cons_%s" % name,
1.0,
min_value=0.8,
max_value=1.2,
delta=0.05,
free=False,
desc="Effective area correction for %s" % name,
)
nuisance_parameters: Dict[str, Parameter] = collections.OrderedDict()
nuisance_parameters[
self._nuisance_parameter.name
] = self._nuisance_parameter
# if we have a background model we are going
# to link all those parameters to new nuisance parameters
if self._background_plugin is not None:
log.debug(f"{name} is using a modeled background")
self._background_noise_model = "modeled"
for par_name, parameter in list(
self._background_plugin.likelihood_model.parameters.items()
):
# create a new parameters that is like the one from the background model
local_name = "bkg_%s_%s" % (par_name, name)
local_name = local_name.replace(".", "_")
nuisance_parameters[local_name] = parameter
if parameter.prior is not None:
nuisance_parameters[local_name].prior = parameter.prior
log.debug(f"{par_name} has passed its prior")
# now get the background likelihood model
differential_flux, integral = self._get_diff_flux_and_integral(
self._background_plugin.likelihood_model,
integrate_method=self._background_integrate_method,
)
self._background_integral_flux = integral
super(SpectrumLike, self).__init__(name, nuisance_parameters)
if isinstance(self._background_plugin, XYLike):
if background_exposure is None:
log.warning(
"An XYLike plugin is modeling the background but background_exposure is not set. "
"It is assumed the observation and background have the same exposure"
)
self._explict_background_exposure = self.exposure
else:
self._explicit_background_exposure = background_exposure
# The following vectors are the ones that will be really used for the computation. At the beginning they just
# point to the original ones, but if a rebinner is used and/or a mask is created through set_active_measurements,
# they will contain the rebinned and/or masked versions
self._current_observed_counts: np.ndarray = self._observed_counts
self._current_observed_count_errors: Optional[
np.array
] = self._observed_count_errors
self._current_background_counts: Optional[
np.array
] = self._background_counts
self._current_scaled_background_counts: Optional[
np.array
] = self._scaled_background_counts
self._current_back_count_errors: Optional[
np.array
] = self._back_count_errors
# This will be used to keep track of how many syntethic datasets have been generated
self._n_synthetic_datasets: int = 0
if tstart is not None:
self._tstart: float = tstart
else:
self._tstart = observation.tstart
if tstop is not None:
self._tstop: float = tstop
else:
self._tstop = observation.tstop
# This is so far not a simulated data set
self._simulation_storage = None
# set the mask to the native quality
self._mask: np.array = self._observed_spectrum.quality.good
# Apply the mask
self._apply_mask_to_original_vectors()
# this will be immeadiately changed if inherited
# calculate all scalings between area and exposure
self._precalculations()
# now create a likelihood object for the call
# we pass the current object over as well
# the likelihood object is opaque to the class and
# keeps a pointer of the plugin inside so that the current
# counts, bkg, etc. are always up to date
# This way, when evaluating the likelihood,
# no checks are involved because the appropriate
# noise models are pre-selected
self._likelihood_evaluator = statistic_lookup[
self.observation_noise_model
][self.background_noise_model](self)
def _count_errors_initialization(self) -> Tuple[np.ndarray]:
"""
compute the count errors for the observed and background spectra
:return: (observed_count_errors, background_count errors)
"""
# if there is not a background the dictionary
# will crash, so we need to do a small check
tmp_bkg_count_errors = None
if self._background_spectrum is not None:
tmp_bkg_count_errors = self._background_spectrum.count_errors
count_errors_lookup = {
"poisson": {
"poisson": (None, None),
"gaussian": (None, tmp_bkg_count_errors),
None: (None, None),
},
# gaussian source
"gaussian": {
"gaussian": (
self._observed_spectrum.count_errors,
tmp_bkg_count_errors,
),
None: (self._observed_spectrum.count_errors, None),
},
}
try:
error_tuple = count_errors_lookup[self._observation_noise_model][
self._background_noise_model
] # type: tuple
except (KeyError):
log.error(
f"The noise combination of source: {self._observation_noise_model}, background: {self._background_noise_model} is not supported"
)
RuntimeError()
for errors, counts, name in zip(
error_tuple,
[self._observed_counts, self._background_counts],
["observed", "background"],
):
# if the errors are not None then we want to make sure they make sense
if errors is not None:
zero_idx = errors == 0 # type: np.ndarray
# check that zero error => zero counts
if not np.all(errors[zero_idx] == counts[zero_idx]):
log.error(
f"Error in {name} spectrum: if the error on the background is zero, "
f"also the expected background counts must be zero"
)
raise RuntimeError()
observed_count_errors, background_count_errors = error_tuple
return observed_count_errors, background_count_errors
def _probe_noise_models(self):
"""
probe the noise models
:return: (observation_noise_model, background_noise_model)
"""
observation_noise_model, background_noise_model = None, None
# Now auto-probe the statistic to use
if self._background_spectrum is not None:
if self._observed_spectrum.is_poisson:
self._observed_count_errors = None
self._observed_counts = np.around(self._observed_counts).astype(
np.int64
)
if self._background_spectrum.is_poisson:
observation_noise_model = "poisson"
background_noise_model = "poisson"
self._background_counts = np.around(
self._background_counts
).astype(np.int64)
if not np.all(self._observed_counts >= 0):
log.error("Error in PHA: negative counts!")
raise RuntimeError()
if not np.all(self._background_counts >= 0):
log.error(
"Error in background spectrum: negative counts!"
)
raise NegativeBackground()
else:
observation_noise_model = "poisson"
background_noise_model = "gaussian"
if not np.all(self._background_counts >= 0):
log.error(
"Error in background spectrum: negative background!"
)
raise NegativeBackground()
else:
if self._background_spectrum.is_poisson:
raise NotImplementedError(
"We currently do not support Gaussian observation and Poisson background"
)
else:
observation_noise_model = "gaussian"
background_noise_model = "gaussian"
# if not np.all(self._background_counts >= 0):
# raise NegativeBackground(
# "Error in background spectrum: negative background!"
# )
else:
# this is the case for no background
self._background_counts = None
self._back_count_errors = None
self._scaled_background_counts = None
if self._observed_spectrum.is_poisson:
self._observed_count_errors = None
self._observed_counts = np.around(self._observed_counts).astype(
np.int64
)
if not np.all(self._observed_counts >= 0):
log.error("Error in PHA: negative counts!")
raise RuntimeError()
if not np.all(self._observed_counts >= 0):
log.error("Error in PHA: negative counts!")
raise RuntimeError()
observation_noise_model = "poisson"
background_noise_model = None
else:
observation_noise_model = "gaussian"
background_noise_model = None
# Print the auto-probed noise models
if self._background_plugin is not None:
log.info(
"Background modeled from plugin: %s"
% self._background_plugin.name
)
bkg_noise = self._background_plugin.observation_noise_model
else:
bkg_noise = background_noise_model
log.info("Auto-probed noise models:")
log.info("- observation: %s" % observation_noise_model)
log.info("- background: %s" % bkg_noise)
return observation_noise_model, background_noise_model
def _background_setup(
self,
background: Union[None, BinnedSpectrum, XYLike, "SpectrumLike"],
observation: BinnedSpectrum,
):
"""
:param background: background arguments (spectrum or plugin)
:param observation: observed spectrum
:return: (background_spectrum, background_plugin, background_counts, scaled_background_counts)
"""
# this is only called during once during construction
# setup up defaults as none
background_plugin: Optional[Union[XYLike, "SpectrumLike"]] = None
background_spectrum: Optional[BinnedSpectrum] = None
background_counts: Optional[np.ndarray] = None
scaled_background_counts: Optional[np.ndarray] = None
self._has_background: bool = False
if background is not None:
self._has_background = True
# If this is a plugin created from a background
# we extract the observed spectrum (it should not have a background...
# it is a background)
# we are explicitly violating duck-typing
if isinstance(background, SpectrumLike) or isinstance(
background, XYLike
):
background_plugin = background
log.debug("using a background plugin")
else:
# if the background is not a plugin then we need to make sure it is a spectrum
# and that the spectrum is the same size as the observation
if not isinstance(background, BinnedSpectrum):
log.error(
"The background spectrum is not an instance of BinnedSpectrum"
)
raise RuntimeError()
if not observation.n_channels == background.n_channels:
log.error(
"Data file and background file have different "
"number of channels"
)
raise RuntimeError()
background_spectrum = background
background_counts = background_spectrum.counts
# this assumes the observed spectrum is already set!
scaled_background_counts = (
self._get_expected_background_counts_scaled(
background_spectrum
)
) # type: np.ndarray
return (
background_spectrum,
background_plugin,
background_counts,
scaled_background_counts,
)
def _precalculations(self):
"""
pre calculate values for speed.
originally, the plugins were calculating these values on the fly, which was very slow
:return:
"""
log.debug("Starting precalculations")
# area scale factor between background and source
# and exposure ratio between background and source
if (self._background_spectrum is None) and (
self._background_plugin is None
):
# there is no background so the area scaling is unity
log.debug("no background set in precalculations")
self._area_ratio = 1.0
self._exposure_ratio = 1.0
self._background_exposure = 1.0
self._background_scale_factor = None
else:
log.debug("background set in precalculations")
if self._background_plugin is not None:
log.debug("detected background plugin")
if isinstance(self._background_plugin, SpectrumLike):
# use the background plugin's observed spectrum and exposure to scale the area and time
self._background_scale_factor = (
self._background_plugin.observed_spectrum.scale_factor
)
self._background_exposure = (
self._background_plugin.observed_spectrum.exposure
)
else:
# in this case, the XYLike data could come from anything, so area scaling is set to unity
# TODO: could this be wrong?
self._background_scale_factor = (
self._observed_spectrum.scale_factor
)
# if the background exposure is set in the constructor, then this will scale it, otherwise
# this will be unity
self._exposure_ratio = (
self._background_exposure
) = self._explict_background_exposure
else:
# this is the normal case with no background model, get the scale factor directly
log.debug("this is a normal background observation")
self._background_scale_factor = (
self._background_spectrum.scale_factor
)
self._background_exposure = self._background_spectrum.exposure
self._area_ratio = float(
self._observed_spectrum.scale_factor
) / float(self._background_scale_factor)
self._exposure_ratio = float(
self._observed_spectrum.exposure
) / float(self._background_exposure)
self._total_scale_factor = self._area_ratio * self._exposure_ratio
log.debug("completed precalculations")
# deal with background exposure and scale factor
# we run through this separately to
@property
def exposure(self) -> float:
"""
Exposure of the source spectrum
"""
return self._observed_spectrum.exposure
@property
def area_ratio(self) -> float:
"""
:return: ratio between source and background area
"""
if not (
(self._background_plugin is not None)
or (self._background_spectrum) is not None
):
log.error("No background exists!")
raise RuntimeError()
return self._area_ratio
@property
def exposure_ratio(self) -> float:
"""
:return: ratio between source and background exposure
"""
if not (
(self._background_plugin is not None)
or (self._background_spectrum) is not None
):
log.error("No background exists!")
raise RuntimeError()
return self._exposure_ratio
@property
def scale_factor(self) -> float:
"""
Ratio between the source and the background exposure and area
:return:
"""
if not (
(self._background_plugin is not None)
or (self._background_spectrum) is not None
):
log.error("No background exists!")
raise RuntimeError()
return self._total_scale_factor
@property
def background_exposure(self) -> float:
"""
Exposure of the background spectrum, if present
"""
return self._background_exposure
@property
def background_scale_factor(self) -> float:
"""
The background scale factor
:return:
"""
return self._background_scale_factor
@property
def background_spectrum(self) -> BinnedSpectrum:
if self._background_spectrum is None:
log.error("This SpectrumLike instance has no background")
raise RuntimeError()
return self._background_spectrum
@property
def background_plugin(self):
# type: () -> Optional[Union[SpectrumLike,XYLike] ]
return self._background_plugin
@property
def observed_spectrum(self) -> BinnedSpectrum:
return self._observed_spectrum
@classmethod
def _get_synthetic_plugin(
cls,
observation: BinnedSpectrum,
background,
source_function,
are_contiguous=False,
):
speclike_gen = cls("generator", observation, background, verbose=False)
pts = PointSource("fake", 0.0, 0.0, source_function)
model = Model(pts)
speclike_gen.set_model(model)
return speclike_gen
@staticmethod
def _build_fake_observation(
fake_data,
channel_set,
source_errors,
source_sys_errors,
is_poisson,
exposure,
scale_factor,
**kwargs,
) -> BinnedSpectrum:
"""
This is the fake observation builder for SpectrumLike which builds data
for a binned spectrum without dispersion. It must be overridden in child classes.
:param fake_data: series of values... they are ignored later
:param channel_set: a channel set
:param source_errors:
:param source_sys_errors:
:param is_poisson:
:return:
"""
observation = BinnedSpectrum(
fake_data,
exposure=exposure,
ebounds=channel_set.edges,
count_errors=source_errors,
sys_errors=source_sys_errors,
quality=None,
scale_factor=scale_factor,
is_poisson=is_poisson,
mission="fake_mission",
instrument="fake_instrument",
tstart=0.0,
tstop=exposure,
)
return observation
[docs]
@classmethod
def from_background(cls, name: str, spectrum_like, verbose: bool = True):
"""
Extract a SpectrumLike plugin from the background of another SpectrumLike (or subclass) instance
:param name: name of the extracted_plugin
:param spectrum_like: plugin with background to extract
:param verbose: if the plugin should be verbose
:return: SpectrumLike instance from the background
"""
log.debug("creating new spectrumlike from background")
background_only_spectrum = copy.deepcopy(
spectrum_like.background_spectrum
)
background_spectrum_like = SpectrumLike(
name,
observation=background_only_spectrum,
background=None,
verbose=verbose,
)
return background_spectrum_like
[docs]
@classmethod
def from_function(
cls,
name: str,
source_function,
energy_min,
energy_max,
source_errors=None,
source_sys_errors=None,
background_function=None,
background_errors=None,
background_sys_errors=None,
exposure=1.0,
scale_factor=1.0,
**kwargs,
):
"""
Construct a simulated spectrum from a given source function and (optional) background function. If source and/or background errors are not supplied, the likelihood is assumed to be Poisson.
:param name: simulkated data set name
:param source_function: astromodels function
:param energy_min: array of low energy bin edges
:param energy_max: array of high energy bin edges
:param source_errors: (optional) gaussian source errors
:param source_sys_errors: (optional) systematic source errors
:param background_function: (optional) astromodels background function
:param background_errors: (optional) gaussian background errors
:param background_sys_errors: (optional) background systematic errors
:param exposure: the exposure to assume
:param scale_factor: the scale factor between source exposure / bkg exposure
:return: simulated SpectrumLike plugin
"""
log.debug("creating new spectrumlike from function")
channel_set = ChannelSet.from_starts_and_stops(energy_min, energy_max)
# this is just for construction
fake_data = np.ones(len(energy_min))
if source_errors is None:
is_poisson = True
else:
if not len(source_errors) == len(energy_min):
log.error(
"source error array is not the same dimension as the energy array"
)
raise RuntimeError()
is_poisson = False
if source_sys_errors is not None:
if not len(source_sys_errors) == len(energy_min):
log.error(
"background systematic error array is not the same dimension as the energy array"
)
raise RuntimeError()
# call the class dependent observation builder
observation = cls._build_fake_observation(
fake_data,
channel_set,
source_errors,
source_sys_errors,
is_poisson,
exposure,
scale_factor,
**kwargs,
)
if background_function is not None:
fake_background = np.ones(len(energy_min))
if background_errors is None:
is_poisson = True
else:
if not len(background_errors) == len(energy_min):
log.error(
"background error array is not the same dimension as the energy array"
)
raise RuntimeError()
is_poisson = False
if background_sys_errors is not None:
if not len(background_sys_errors) == len(energy_min):
log.error(
"background systematic error array is not the same dimension as the energy array"
)
raise RuntimeError()
tmp_background = BinnedSpectrum(
fake_background,
exposure=1.0,
ebounds=channel_set.edges,
count_errors=background_errors,
sys_errors=background_sys_errors,
quality=None,
scale_factor=1.0,
is_poisson=is_poisson,
mission="fake_mission",
instrument="fake_instrument",
tstart=0.0,
tstop=1.0,
)
# now we have to generate the background counts
# we treat the background as a simple observation with no
# other background
background_gen = SpectrumLike(
"generator", tmp_background, None, verbose=False
)
pts_background = PointSource(
"fake_background", 0.0, 0.0, background_function
)
background_model = Model(pts_background)
background_gen.set_model(background_model)
sim_background = background_gen.get_simulated_dataset("fake")
background = sim_background._observed_spectrum
else:
background = None
generator = cls._get_synthetic_plugin(
observation,
background,
source_function,
) # type: SpectrumLike
return generator.get_simulated_dataset(name)
[docs]
def assign_to_source(self, source_name: str) -> None:
"""
Assign these data to the given source (instead of to the sum of all sources, which is the default)
:param source_name: name of the source (must be contained in the likelihood model)
:return: none
"""
if self._like_model is not None:
if source_name not in self._like_model.sources:
log.error(
"Source %s is not contained in "
"the likelihood model" % source_name
)
raise RuntimeError()
self._source_name = source_name
@property
def likelihood_model(self) -> Model:
if self._like_model is None:
log.error(f"plugin {self._name} does not have a likelihood model")
raise RuntimeError()
return self._like_model
[docs]
def get_pha_files(self) -> Dict[str, BinnedSpectrum]:
info: Dict[str, BinnedSpectrum] = {}
# we want to pass copies so that
# the user doesn't grab the instance
# and try to modify things. protection
info["pha"] = copy.copy(self._observed_spectrum)
if self._background_spectrum is not None:
info["bak"] = copy.copy(self._background_spectrum)
return info
[docs]
def set_active_measurements(self, *args, **kwargs) -> None:
"""
Set the measurements to be used during the analysis. Use as many ranges as you need, and you can specify
either energies or channels to be used.
NOTE to Xspec users: while XSpec uses integers and floats to distinguish between energies and channels
specifications, 3ML does not, as it would be error-prone when writing scripts. Read the following documentation
to know how to achieve the same functionality.
* Energy selections:
They are specified as 'emin-emax'. Energies are in keV. Example:
set_active_measurements('10-12.5','56.0-100.0')
which will set the energy range 10-12.5 keV and 56-100 keV to be
used in the analysis. Note that there is no difference in saying 10 or 10.0.
* Channel selections:
They are specified as 'c[channel min]-c[channel max]'. Example:
set_active_measurements('c10-c12','c56-c100')
This will set channels 10-12 and 56-100 as active channels to be used in the analysis
* Mixed channel and energy selections:
You can also specify mixed energy/channel selections, for example to go from 0.2 keV to channel 20 and from
channel 50 to 10 keV:
set_active_measurements('0.2-c10','c50-10')
* Use all measurements (i.e., reset to initial state):
Use 'all' to select all measurements, as in:
set_active_measurements('all')
Use 'reset' to return to native PHA quality from file, as in:
set_active_measurements('reset')
* Exclude measurements:
Excluding measurements work as selecting measurements, but with the "exclude" keyword set to the energies and/or
channels to be excluded. To exclude between channel 10 and 20 keV and 50 keV to channel 120 do:
set_active_measurements(exclude=["c10-20", "50-c120"])
* Select and exclude:
Call this method more than once if you need to select and exclude. For example, to select between 0.2 keV and
channel 10, but exclude channel 30-50 and energy , do:
set_active_measurements("0.2-c10",exclude=["c30-c50"])
* Using native PHA quality:
To simply add or exclude channels from the native PHA, one can use the use_quailty
option:
set_active_measurements(
"0.2-c10",exclude=["c30-c50"], use_quality=True)
This translates to including the channels from 0.2 keV - channel 10, exluding channels
30-50 and any channels flagged BAD in the PHA file will also be excluded.
:param args:
:param exclude: (list) exclude the provided channel/energy ranges
:param use_quality: (bool) use the native quality on the PHA file (default=False)
:return:
"""
# To implement this we will use an array of boolean index,
# which will filter
# out the non-used channels during the logLike
# Now build the new mask: values for which the mask is 0 will be masked
# We will build the high res mask even if we are
# already rebinned so that it can be saved
if self._rebinner is not None:
log.error(
"You cannot select active measurements if you have a rebinning active. "
"Remove it first with remove_rebinning"
)
raise RuntimeError()
if "use_quality" in kwargs:
use_quality = kwargs.pop("use_quality")
else:
use_quality = False
if use_quality:
# Start with quality mask. This means that channels
# marked good by quality will be used unless exluded in the arguments
# and channels marked bad by quality will be excluded unless included
# by the arguments
self._mask = self._observed_spectrum.quality.good
else:
# otherwise, we will start out with all channels deselected
# and turn the on/off by the arguments
self._mask = np.zeros(
self._observed_spectrum.n_channels, dtype=bool
)
if "all" in args:
# Just make sure than no further selections have been made.
if not (len(args) == 1):
log.error(
"If you specify 'all', you cannot specify more than one energy range."
)
raise RuntimeError()
# Convert the mask to all True (we use all channels)
self._mask[:] = True
elif "reset" in args:
# Convert the to native PHA masking specified in quality
if not (len(args) == 1):
log.error(
"If you specify 'reset', you cannot specify more than one energy range."
)
raise RuntimeError()
self._mask = self._observed_spectrum.quality.good
else:
for arg in args:
selections = dash_separated_string_to_tuple(arg)
# We need to find out if it is a channel or and energy being requested
idx = np.empty(2, dtype=int)
for i, s in enumerate(selections):
if s[0].lower() == "c":
if not (
int(s[1:]) <= self._observed_spectrum.n_channels
):
log.error(
f"%s is larger than the number of channels: %d"
% (
s,
self._observed_spectrum.n_channels,
)
)
raise RuntimeError()
idx[i] = int(s[1:])
else:
idx[i] = self._observed_spectrum.containing_bin(
float(s)
)
if not idx[0] < idx[1]:
log.error(
"The channel and energy selection (%s) are out of order and translates to %s-%s"
% (selections, idx[0], idx[1])
)
raise RuntimeError()
# we do the opposite of the exclude command!
self._mask[idx[0] : idx[1] + 1] = True
log.info(
"Range %s translates to channels %s-%s"
% (arg, idx[0], idx[1])
)
# If you are just excluding channels
if len(args) == 0:
self._mask[:] = True
if "exclude" in kwargs:
exclude = list(kwargs.pop("exclude"))
for arg in exclude:
selections = dash_separated_string_to_tuple(arg)
# We need to find out if it is a channel or and energy being requested
idx = np.empty(2, dtype=int)
for i, s in enumerate(selections):
if s[0].lower() == "c":
if not (
int(s[1:]) <= self._observed_spectrum.n_channels
):
log.error(
"%s is larger than the number of channels: %d"
% (
s,
self._observed_spectrum.n_channels,
)
)
raise RuntimeError()
idx[i] = int(s[1:])
else:
idx[i] = self._observed_spectrum.containing_bin(
float(s)
)
if not idx[0] < idx[1]:
log.error(
"The channel and energy selection (%s) are out of order and translate to %s-%s"
% (selections, idx[0], idx[1])
)
raise RuntimeError()
# we do the opposite of the exclude command!
self._mask[idx[0] : idx[1] + 1] = False
log.info(
"Range %s translates to excluding channels %s-%s"
% (arg, idx[0], idx[1])
)
log.info(
"Now using %s channels out of %s"
% (np.sum(self._mask), self._observed_spectrum.n_channels)
)
# Apply the mask
self._apply_mask_to_original_vectors()
# if the user did not specify use_quality, they may have selected channels which
# are marked BAD (5) in the native PHA file. We want to warn them in this case only (or maybe in all cases?)
if not use_quality:
number_of_native_good_channels = sum(
self._observed_spectrum.quality.good
)
number_of_user_good_channels = sum(self._mask)
if number_of_user_good_channels > number_of_native_good_channels:
# we have more good channels than specified in the PHA file
# so we need to figure out which channels these are where excluded
deselected_channels = []
for i in range(self._observed_spectrum.n_channels):
if self._observed_spectrum.quality.bad[i] and self._mask[i]:
deselected_channels.append(i)
log.warning(
"You have opted to use channels which are flagged BAD in the PHA file."
)
log.warning(
"These channels are: %s"
% (", ".join([str(ch) for ch in deselected_channels]))
)
def _apply_mask_to_original_vectors(self):
# Apply the mask
self._current_observed_counts = self._observed_counts[self._mask]
if self._observed_count_errors is not None:
self._current_observed_count_errors = self._observed_count_errors[
self._mask
]
if self._background_spectrum is not None:
self._current_background_counts = self._background_counts[
self._mask
]
self._current_scaled_background_counts = (
self._scaled_background_counts[self._mask]
)
if self._back_count_errors is not None:
self._current_back_count_errors = self._back_count_errors[
self._mask
]
@contextmanager
def _without_mask_nor_rebinner(self) -> None:
# Store mask and rebinner for later use
mask = self._mask
rebinner = self._rebinner
# Clean mask and rebinning
self.remove_rebinning()
self.set_active_measurements("all")
# Execute whathever
yield
# Restore mask and rebinner (if any)
self._mask = mask
if rebinner is not None:
# There was a rebinner, use it. Note that the rebinner applies the mask by itself
self._apply_rebinner(rebinner)
else:
# There was no rebinner, so we need to explicitly apply the mask
self._apply_mask_to_original_vectors()
[docs]
def get_simulated_dataset(
self,
new_name: Optional[str] = None,
store_model: Optional[bool] = True,
**kwargs,
) -> "SpectrumLike":
"""
Returns another Binned instance where data have been obtained by randomizing the current expectation from the
model, as well as from the background (depending on the respective noise models)
:param new_name: the base line name
:param store_model: to store the model configuration used to simulate the data set
:return: an BinnedSpectrum or child instance
"""
if self._like_model is None:
log.error("You need to set up a model before randomizing")
raise RuntimeError()
# Keep track of how many syntethic datasets we have generated
self._n_synthetic_datasets += 1
log.debug(f"now have {self._n_synthetic_datasets}")
# Generate a name for the new dataset if needed
if new_name is None:
new_name = "%s_sim_%i" % (self.name, self._n_synthetic_datasets)
# Generate randomized data depending on the different noise models
# We remove the mask temporarily because we need the various elements for all channels. We will restore it
# at the end
original_mask = np.array(self._mask, copy=True)
original_rebinner = self._rebinner
with self._without_mask_nor_rebinner():
# Get the source model for all channels (that's why we don't use the .folded_model property)
source_model_counts = (
self._evaluate_model()
* self.exposure
* self._nuisance_parameter.value
)
# sometimes the first channel has ZERO
# for its lower bound which can cause
# an inf or NaN for a given model
# we will set that to zero (better solution??)
# this should not affect most instruments as
# this is usually a crappy channel in the first
# place
if not np.isfinite(source_model_counts[0]):
source_model_counts[0] = 0
log.warning(
"simulated spectrum had infinite counts in first channel"
)
log.warning("setting to ZERO")
if not np.all(source_model_counts >= 0.0) and (
self._observation_noise_model == "poisson"
):
log.error("there are negative counts for this simulation")
for k, v in self._like_model.free_parameters.items():
log.error(f"{k} {v}")
raise RuntimeError()
if np.any(np.isnan(source_model_counts)):
log.error("there are NaN counts for this simulation")
for k, v in self._like_model.free_parameters.items():
log.error(f"{k} {v}")
raise RuntimeError()
# The likelihood evaluator keeps track of the proper likelihood needed to randomize
# quantities. It properly returns None if needed. This avoids multiple checks and dupilcate
# code for the MANY cases we can have. As new cases are added, this code will adapt.
randomized_source_counts = (
self._likelihood_evaluator.get_randomized_source_counts(
source_model_counts
)
)
randomized_source_count_err = (
self._likelihood_evaluator.get_randomized_source_errors()
)
randomized_background_counts = (
self._likelihood_evaluator.get_randomized_background_counts()
)
randomized_background_count_err = (
self._likelihood_evaluator.get_randomized_background_errors()
)
# create new source and background spectra
# the children of BinnedSpectra must properly override the new_spectrum
# member so as to build the appropriate spectrum type. All parameters of the current
# spectrum remain the same except for the rate and rate errors
# the profile likelihood automatically adjust the background spectrum to the
# same exposure and scale as the observation
# therefore, we must set the background simulation to have the exposure and scale
# of the observation
new_observation = self._observed_spectrum.clone(
new_counts=randomized_source_counts,
new_count_errors=randomized_source_count_err,
new_scale_factor=1.0,
)
if self._background_spectrum is not None:
new_background = self._background_spectrum.clone(
new_counts=randomized_background_counts,
new_count_errors=randomized_background_count_err,
new_exposure=self._observed_spectrum.exposure, # because it was adjusted
# new_scale_factor=1.0, # because it was adjusted
new_scale_factor=1.0 / self._total_scale_factor,
)
log.debug(
f"made {sum(randomized_background_counts)} bkg counts"
)
elif self._background_plugin is not None:
new_background = (
self._likelihood_evaluator.synthetic_background_plugin
)
else:
new_background = None
# Now create another instance of BinnedSpectrum with the randomized data we just generated
# notice that the _new member is a classmethod
# (we use verbose=False to avoid many messages when doing many simulations)
new_spectrum_plugin = self._new_plugin(
name=new_name,
observation=new_observation,
background=new_background,
verbose=False,
**kwargs,
)
# Apply the same selections as the current data set
if original_rebinner is not None:
# Apply rebinning, which also applies the mask
new_spectrum_plugin._apply_rebinner(original_rebinner)
else:
# Only apply the mask
new_spectrum_plugin._mask = original_mask
new_spectrum_plugin._apply_mask_to_original_vectors()
# We want to store the simulated parameters so that the user
# can recall them later
if store_model:
new_spectrum_plugin._simulation_storage = clone_model(
self._like_model
)
else:
new_spectrum_plugin._simulation_storage = None
# TODO: nuisance parameters
return new_spectrum_plugin
@classmethod
def _new_plugin(cls, *args, **kwargs):
"""
allows for constructing a new plugin of the appropriate
type in conjunction with the Spectrum.clone method.
It is used for example in get_simulated_dataset
new_background = self._background_spectrum.clone(new_counts=randomized_background_counts,
new_count_errors=randomized_background_count_err)
new_spectrum_plugin = self._new_plugin(name=new_name,
observation=new_observation,
background=new_background,
verbose=self._verbose,
**kwargs)
:param args:
:param kwargs:
:return:
"""
return cls(*args, **kwargs)
@property
def simulated_parameters(self) -> Model:
"""
Return the simulated dataset parameters
:return: a likelihood model copy
"""
if self._simulation_storage is None:
log.error("This is not a simulated data set")
raise RuntimeError()
return self._simulation_storage
[docs]
def rebin_on_background(self, min_number_of_counts: float) -> None:
"""
Rebin the spectrum guaranteeing the provided minimum number of counts in each background bin. This is usually
required for spectra with very few background counts to make the Poisson profile likelihood meaningful.
Of course this is not relevant if you treat the background as ideal, nor if the background spectrum has
Gaussian errors.
The observed spectrum will be rebinned in the same fashion as the background spectrum.
To neutralize this completely, use "remove_rebinning"
:param min_number_of_counts: the minimum number of counts in each bin
:return: none
"""
# NOTE: the rebinner takes care of the mask already
if self._background_spectrum is None:
log.error(
"This data has no background, cannot rebin on background!"
)
raise RuntimeError()
rebinner: Rebinner = Rebinner(
self._background_counts, min_number_of_counts, self._mask
)
if rebinner.n_bins < len(self._mask):
# only for the PHASpectrum subclass do we need to update the the grouping
if isinstance(self._observed_spectrum, PHASpectrum):
self._observed_spectrum.set_ogip_grouping(rebinner.grouping)
self._background_spectrum.set_ogip_grouping(rebinner.grouping)
self._apply_rebinner(rebinner)
else:
log.info("rebinning had no effect")
[docs]
def rebin_on_source(self, min_number_of_counts: int) -> None:
"""
Rebin the spectrum guaranteeing the provided minimum number of counts in each source bin.
To neutralize this completely, use "remove_rebinning"
:param min_number_of_counts: the minimum number of counts in each bin
:return: none
"""
# NOTE: the rebinner takes care of the mask already
rebinner = Rebinner(
self._observed_counts, min_number_of_counts, self._mask
)
if rebinner.n_bins < len(self._mask):
# only for the PHASpectrum subclass do we need to update the the grouping
if isinstance(self._observed_spectrum, PHASpectrum):
self._observed_spectrum.set_ogip_grouping(rebinner.grouping)
if self._background_spectrum is not None:
self._background_spectrum.set_ogip_grouping(
rebinner.grouping
)
self._apply_rebinner(rebinner)
else:
log.info("rebinning had no effect")
def _apply_rebinner(self, rebinner: Rebinner) -> None:
self._rebinner = rebinner
# Apply the rebinning to everything.
# NOTE: the output of the .rebin method are the vectors with the mask *already applied*
(self._current_observed_counts,) = self._rebinner.rebin(
self._observed_counts
)
if self._observed_count_errors is not None:
(
self._current_observed_count_errors,
) = self._rebinner.rebin_errors(self._observed_count_errors)
if self._background_spectrum is not None:
(
self._current_background_counts,
self._current_scaled_background_counts,
) = self._rebinner.rebin(
self._background_counts, self._scaled_background_counts
)
if self._back_count_errors is not None:
# NOTE: the output of the .rebin method are the vectors with the mask *already applied*
(
self._current_back_count_errors,
) = self._rebinner.rebin_errors(self._back_count_errors)
log.info("Now using %s bins" % self._rebinner.n_bins)
[docs]
def remove_rebinning(self) -> None:
"""
Remove the rebinning scheme set with rebin_on_background.
:return:
"""
# Restore original vectors with mask applied
self._apply_mask_to_original_vectors()
self._rebinner = None
def _get_expected_background_counts_scaled(
self, background_spectrum: BinnedSpectrum
) -> None:
"""
Get the background counts expected in the source interval and in the source region, based on the observed
background.
:return:
"""
# NOTE : this is called only once during construction!
# The scale factor is the ratio between the collection area of the source spectrum and the
# background spectrum. It is used for example for the typical aperture-photometry method used in
# X-ray astronomy, where the background region has a different size with respect to the source region
scale_factor = (
self._observed_spectrum.scale_factor
/ background_spectrum.scale_factor,
)
# The expected number of counts is the rate in the background file multiplied by its exposure, renormalized
# by the scale factor.
# (see http://heasarc.gsfc.nasa.gov/docs/asca/abc_backscal.html)
bkg_counts = (
background_spectrum.rates
* self._observed_spectrum.exposure
* scale_factor
)
return bkg_counts
@property
def current_observed_counts(self) -> np.ndarray:
return self._current_observed_counts
@property
def current_background_counts(self) -> Optional[np.ndarray]:
return self._current_background_counts
@property
def current_scaled_background_counts(self) -> Optional[np.ndarray]:
return self._current_scaled_background_counts
@property
def current_background_count_errors(self) -> Optional[np.ndarray]:
return self._current_back_count_errors
@property
def current_observed_count_errors(self) -> Optional[np.ndarray]:
return self._current_observed_count_errors
def _set_background_noise_model(self, new_model: str) -> None:
# Do not make differences between upper and lower cases
if new_model is not None:
new_model = new_model.lower()
if not (new_model in _known_noise_models):
log.error(
"Noise model %s not recognized. "
"Allowed models are: %s"
% (
new_model,
", ".join(_known_noise_models),
)
)
raise RuntimeError()
self._background_noise_model = new_model
# reset the likelihood
self._likelihood_evaluator = statistic_lookup[
self._observation_noise_model
][new_model](self)
log.warning(
"You are setting the background noise model to something that is not specified in the spectrum.\
Verify that this makes statistical sense."
)
def _get_background_noise_model(self) -> str:
return self._background_noise_model
background_noise_model = property(
_get_background_noise_model,
_set_background_noise_model,
doc="Sets/gets the noise model for the background spectrum",
)
def _set_observation_noise_model(self, new_model: str) -> None:
# Do not make differences between upper and lower cases
new_model = new_model.lower()
if not (new_model in _known_noise_models):
log.error(
"Noise model %s not recognized. "
"Allowed models are: %s"
% (
new_model,
", ".join(_known_noise_models),
)
)
raise RuntimeError()
self._observation_noise_model = new_model
# reset the likelihood
self._likelihood_evaluator = statistic_lookup[new_model][
self._background_noise_model
](self)
log.warning(
"You are setting the observation noise model to something that is not specified in the spectrum.\
Verify that this makes statistical sense."
)
def _get_observation_noise_model(self) -> str:
return self._observation_noise_model
observation_noise_model = property(
_get_observation_noise_model,
_set_observation_noise_model,
doc="Sets/gets the noise model for the background spectrum",
)
[docs]
def get_log_like(
self, precalc_fluxes: Optional[np.ndarray] = None
) -> float:
"""
Calls the likelihood from the pre-setup likelihood evaluator that "knows" of the currently set
noise models
:return:
"""
loglike, _ = self._likelihood_evaluator.get_current_value(
precalc_fluxes=precalc_fluxes
)
if self._exclude_from_fit: loglike*=0
return loglike
[docs]
def inner_fit(self) -> float:
return self.get_log_like()
[docs]
def set_model(self, likelihoodModel: Model) -> None:
"""
Set the model to be used in the joint minimization.
"""
# Store likelihood model
self._like_model = likelihoodModel
# We assume there are no extended sources, since we cannot handle them here
if not self._like_model.get_number_of_extended_sources() == 0:
log.error("SpectrumLike plugins do not support " "extended sources")
raise RuntimeError()
# check if we set a source name that the source is in the model
if self._source_name is not None:
if self._source_name not in self._like_model.sources:
log.error(
"Source %s is not contained in "
"the likelihood model" % self._source_name
)
raise RuntimeError()
# Get the differential flux function, and the integral function, with no dispersion,
# we simply integrate the model over the bins
differential_flux, integral = self._get_diff_flux_and_integral(
self._like_model, integrate_method=self._model_integrate_method
)
self._integral_flux = integral
def _evaluate_model(
self, precalc_fluxes: Optional[np.array] = None
) -> np.ndarray:
"""
Since there is no dispersion, we simply evaluate the model by integrating over the energy bins.
This can be overloaded to convolve the model with a response, for example
:return:
"""
if precalc_fluxes is not None:
return precalc_fluxes
elif self._has_contiguous_energies:
if self._predefined_energies is None:
return self._integral_flux(self._observed_spectrum.edges)
else:
return self._integral_flux()
else:
return np.array(
[
self._integral_flux(emin, emax)
for emin, emax in self._observed_spectrum.bin_stack
]
)
[docs]
def get_model(
self, precalc_fluxes: Optional[np.array] = None
) -> np.ndarray:
"""
The model integrated over the energy bins. Note that it only returns the model for the
currently active channels/measurements
:return: array of folded model
"""
if self._rebinner is not None:
(model,) = self._rebinner.rebin(
self._evaluate_model(precalc_fluxes=precalc_fluxes)
* self._observed_spectrum.exposure
)
else:
model = (
self._evaluate_model(precalc_fluxes=precalc_fluxes)[self._mask]
* self._observed_spectrum.exposure
)
return self._nuisance_parameter.value * model
def _evaluate_background_model(self) -> np.ndarray:
"""
Since there is no dispersion, we simply evaluate the model by integrating over the energy bins.
This can be overloaded to convolve the model with a response, for example
:return:
"""
if self._has_contiguous_energies:
if self._predefined_energies is None:
return self._background_integral_flux(
self._observed_spectrum.edges
)
else:
return self._background_integral_flux()
else:
return np.array(
[
self._background_integral_flux(emin, emax)
for emin, emax in self._observed_spectrum.bin_stack
]
)
[docs]
def get_background_model(self, without_mask: bool = False) -> np.ndarray:
"""
The background model integrated over the energy bins. Note that it only returns the model for the
currently active channels/measurements
:return: array of folded model
"""
if not without_mask:
if self._rebinner is not None:
(model,) = self._rebinner.rebin(
self._evaluate_background_model()
* self._background_exposure
)
else:
model = (
self._evaluate_background_model()[self._mask]
* self._background_exposure
)
else:
model = (
self._evaluate_background_model() * self._background_exposure
)
# TODO: should I use the constant here?
# return self._nuisance_parameter.value * model
return model
def _get_diff_flux_and_integral(
self, likelihood_model: Model, integrate_method: str = "simpson"
) -> Tuple[types.FunctionType, types.FunctionType]:
if not integrate_method in ["simpson", "trapz", "riemann"]:
log.error("Only simpson and trapz are valid integral_methods.")
raise RuntimeError()
if self._source_name is None:
log.debug(f"{self.name} is using all point sources")
n_point_sources = likelihood_model.get_number_of_point_sources()
# Make a function which will stack all point sources (OGIP do not support spatial dimension)
def differential_flux(energies):
fluxes = likelihood_model.get_point_source_fluxes(
0, energies, tag=self._tag, stokes=self._stokes
)
# If we have only one point source, this will never be executed
for i in range(1, n_point_sources):
fluxes += likelihood_model.get_point_source_fluxes(
i, energies, tag=self._tag, stokes=self._stokes
)
return fluxes
else:
# This SpectrumLike dataset refers to a specific source
# Note that we checked that self._source_name is in the model when the model was set
try:
def differential_flux(energies):
return likelihood_model.sources[self._source_name](
energies, tag=self._tag, stokes=self._stokes
)
log.debug(
f"{self.name} is using only the point source {self._source_name}"
)
except KeyError:
log.error(
"This SpectumLike plugin has been assigned to source %s, "
"which does not exist in the current model"
% self._source_name
)
raise KeyError()
# The following integrates the diffFlux function using Simpson's rule
# This assume that the intervals e1,e2 are all small, which is guaranteed
# for any reasonable response matrix, given that e1 and e2 are Monte-Carlo
# energies. It also assumes that the function is smooth in the interval
# e1 - e2 and twice-differentiable, again reasonable on small intervals for
# decent models. It might fail for models with too sharp features, smaller
# than the size of the monte carlo interval.
if integrate_method == "simpson":
# New way with simpson rule.
# Make sure to not calculate the model twice for the same energies
if self._has_contiguous_energies:
if self._predefined_energies is None:
def integral(e_edges):
# Make sure we do not calculate the flux two times at the same energy
# e_edges = np.append(e1, e2[-1])
e_m = (e_edges[1:] + e_edges[:-1]) / 2.0
diff_fluxes_edges = differential_flux(e_edges)
diff_fluxes_mid = differential_flux(e_m)
return _simps(
e_edges[:-1],
e_edges[1:],
diff_fluxes_edges,
diff_fluxes_mid,
)
else:
e_edges = np.array(self._predefined_energies)
ee1 = e_edges[:-1]
ee2 = e_edges[1:]
e_m = (ee1 + ee2) / 2.0
def integral():
diff_fluxes_edges = differential_flux(e_edges)
diff_fluxes_mid = differential_flux(e_m)
return _simps(
ee1, ee2, diff_fluxes_edges, diff_fluxes_mid
)
else:
def integral(e1, e2):
# single energy values given
return (
(e2 - e1)
/ 6.0
* (
differential_flux(e1)
+ 4 * differential_flux((e2 + e1) / 2.0)
+ differential_flux(e2)
)
)
elif integrate_method == "trapz":
if self._has_contiguous_energies:
if self._predefined_energies is None:
raise NotImplementedError("This is not yet here")
else:
e_edges = np.array(self._predefined_energies)
ee1 = e_edges[:-1]
ee2 = e_edges[1:]
def integral():
diff_fluxes_edges = differential_flux(e_edges)
return _trapz(
np.array(
[diff_fluxes_edges[:-1], diff_fluxes_edges[1:]]
).T,
np.array([ee1, ee2]).T,
)
else:
def integral(e1, e2):
# single energy values given
return _trapz(
np.array(
[differential_flux(e1), differential_flux(e2)]
),
np.array([e1, e2]),
)
elif integrate_method == "riemann":
if self._has_contiguous_energies:
if self._predefined_energies is None:
pass
else:
e_edges = np.array(self._predefined_energies)
ee1 = e_edges[:-1]
ee2 = e_edges[1:]
e_m = (ee1 + ee2) / 2.0
# energy width
de = ee2 - ee1
def integral():
return _rsum(differential_flux(e_m), de)
else:
def integral(e1, e2):
return differential_flux(0.5 * (e1 + e2)) * (e2 - e1)
return differential_flux, integral
[docs]
def use_effective_area_correction(
self,
min_value: Union[int, float] = 0.8,
max_value: Union[int, float] = 1.2,
use_gaussian_prior: bool = False,
mu: float = 1,
sigma: float = 0.1,
) -> None:
"""
Activate the use of the effective area correction, which is a multiplicative factor in front of the model which
might be used to mitigate the effect of intercalibration mismatch between different instruments.
NOTE: do not use this is you are using only one detector, as the multiplicative constant will be completely
degenerate with the normalization of the model.
NOTE2: always keep at least one multiplicative constant fixed to one (its default value), when using this
with other OGIPLike-type detectors
:param min_value: minimum allowed value (default: 0.8, corresponding to a 20% - effect)
:param max_value: maximum allowed value (default: 1.2, corresponding to a 20% + effect)
:param use_gaussian_prior: use a gaussian prior on the constant
:param mu: the center of the gaussian
:param sigma: the spread of the gaussian
:return:
"""
log.info(
f"{self._name} is using effective area correction (between {min_value} and {max_value})"
)
self._nuisance_parameter.free = True
self._nuisance_parameter.bounds = (min_value, max_value)
# Use a uniform prior by default
if use_gaussian_prior:
self._nuisance_parameter.prior = Truncated_gaussian(
mu=mu, sigma=sigma, lower_bound=min_value, upper_bound=max_value
)
else:
self._nuisance_parameter.set_uninformative_prior(Uniform_prior)
[docs]
def fix_effective_area_correction(
self, value: Union[int, float] = 1
) -> None:
"""
Fix the multiplicative factor (see use_effective_area_correction) to the provided value (default: 1)
:param value: new value (default: 1, i.e., no correction)
:return:
"""
self._nuisance_parameter.value = value
self._nuisance_parameter.fix = True
[docs]
def set_model_integrate_method(self, method: str) -> None:
"""
Change the integrate method for the model integration
:param method: (str) which method should be used (simpson or trapz)
"""
if not method in ["simpson", "trapz", "riemann"]:
log.error("Only simpson and trapz are valid intergate methods.")
raise RuntimeError()
self._model_integrate_method = method
log.info(f"{self._name} changing model integration method to {method}")
# if like_model already set, upadte the integral function
if self._like_model is not None:
differential_flux, integral = self._get_diff_flux_and_integral(
self._like_model, integrate_method=method
)
self._integral_flux = integral
def __set_model_integrate_method(self, value: str) -> None:
self.set_model_integrate_method(value)
def ___set_model_integrate_method(self, value: str) -> None:
self.__set_model_integrate_method(value)
def __get_model_integrate_method(self) -> str:
return self._model_integrate_method
def ___get_model_integrate_method(self) -> str:
self.__set_model_integrate_method()
model_integrate_method = property(
___get_model_integrate_method,
___set_model_integrate_method,
doc="""The method used to integrate the model across the response matrix """,
)
[docs]
def set_background_integrate_method(self, method: str) -> None:
"""
Change the integrate method for the background integration
:param method: (str) which method should be used (simpson or trapz)
"""
if not method in ["simpson", "trapz", "riemann"]:
log.error("Only simpson and trapz are valid intergate methods.")
raise RuntimeError()
self._background_integrate_method = method
log.info(
f"{self._name} changing background integration method to {method}"
)
# if background_plugin is set, update the integral function
if self._background_plugin is not None:
differential_flux, integral = self._get_diff_flux_and_integral(
self._background_plugin.likelihood_model,
integrate_method=method,
)
self._background_integral_flux = integral
def __set_background_integrate_method(self, value: str) -> None:
self.set_background_integrate_method(value)
def ___set_background_integrate_method(self, value: str) -> None:
self.__set_background_integrate_method(value)
def __get_background_integrate_method(self) -> str:
return self._background_integrate_method
def ___get_background_integrate_method(self) -> str:
self.__set_background_integrate_method()
background_integrate_method = property(
___get_background_integrate_method,
___set_background_integrate_method,
doc="""The method used to integrate the_background across the response matrix """,
)
@property
def mask(self) -> np.ndarray:
"""
The channel mask
:return:
"""
return self._mask
@property
def tstart(self) -> float:
return self._tstart
@property
def tstop(self) -> float:
return self._tstop
@property
def expected_model_rate(self) -> np.ndarray:
return self._evaluate_model() * self._nuisance_parameter.value
@property
def observed_counts(self) -> np.ndarray:
"""
:return: the observed counts
"""
return self._observed_counts
@property
def observed_count_errors(self) -> Optional[np.ndarray]:
"""
:return: the observed counts errors
"""
cnt_err = None
log.debug(
"self._observation_noise_model = %s" % self._observation_noise_model
)
log.debug(
"self._background_noise_model = %s" % self._background_noise_model
)
if self._observation_noise_model == "poisson":
cnt_err = np.sqrt(self._observed_counts)
else:
# if self._background_noise_model is None:
cnt_err = self._observed_count_errors
# elif self._background_noise_model is "gaussian":
# cnt_err = self._observed_count_errors
# calculate all the correct quantites
return cnt_err
@property
def background_counts(self) -> Optional[np.ndarray]:
"""
:return: the observed background counts
"""
background_counts = None
if self._observation_noise_model == "poisson":
if self._background_noise_model == "poisson":
background_counts = self._background_counts
# Gehrels weighting, a little bit better approximation when statistic is low
# (and inconsequential when statistic is high)
elif self._background_noise_model == "ideal":
background_counts = self._scaled_background_counts
elif self._background_noise_model == "gaussian":
background_counts = self._background_counts
elif self._background_noise_model is None:
background_counts = None
elif self._background_noise_model == "modeled":
# get the background counts from the background
# plugin.. NOT SCALED
background_counts = self.get_background_model(without_mask=True)
else:
raise RuntimeError("This is a bug")
else: # gaussian observation
if self._background_noise_model is None:
# Observed counts
background_counts = None
elif self._background_noise_model == "gaussian":
background_counts = self._background_counts
# calculate all the correct quantites
return background_counts
@property
def background_count_errors(self) -> Optional[np.ndarray]:
"""
:return: the observed background count errors
"""
background_errors = None
if self._observation_noise_model == "poisson":
if self._background_noise_model == "poisson":
# Gehrels weighting, a little bit better approximation when statistic is low
# (and inconsequential when statistic is high)
background_errors = 1 + np.sqrt(self._background_counts + 0.75)
elif self._background_noise_model == "ideal":
background_errors = np.zeros_like(
self._scaled_background_counts
)
elif self._background_noise_model == "gaussian":
background_errors = self._back_count_errors
elif self._background_noise_model is None:
return None
elif self._background_noise_model == "modeled":
# get the background count error from the background
# plugin.. NOT SCALED
background_errors = np.sqrt(
self.get_background_model(without_mask=True)
)
else:
raise RuntimeError("This is a bug")
else:
if self._background_noise_model is None:
background_errors = None
elif self._background_noise_model == "gaussian":
background_errors = self._back_count_errors
return background_errors
@property
def source_rate(self) -> np.ndarray:
"""
The source rate of the model. If there is background or a background background plugin present,
the source is background subtracted, but only for visual purposes. If no background is present,
then, this is just the observed rate.
:return: the source rate
"""
if (self._background_noise_model is not None) or (
self._background_plugin is not None
):
# since we compare to the model rate... background subtract but with proper propagation
src_rate = (
self.observed_counts
/ self._observed_spectrum.exposure
/ self._observed_spectrum.scale_factor
- (
self.background_counts
/ self._background_exposure
/ self._background_scale_factor
)
)
else:
# since we compare to the model rate... background subtract but with proper propagation
src_rate = self.observed_counts / self._observed_spectrum.exposure
return src_rate
@property
def source_rate_error(self) -> np.ndarray:
"""
The source rate error of the model. If there is background or a background background plugin present,
the source is background subtracted, but only for visual purposes. If no background is present,
then, this is just the observed rate.
:return: the source rate error
"""
if (self._background_noise_model is not None) or (
self._background_plugin is not None
):
src_rate_err = np.sqrt(
(
self.observed_count_errors
/ self._observed_spectrum.exposure
/ self._observed_spectrum.scale_factor
)
** 2
+ (
self.background_count_errors
/ self._background_exposure
/ self._background_scale_factor
)
** 2
)
else:
src_rate_err = (
self.observed_count_errors / self._observed_spectrum.exposure
)
return src_rate_err
@property
def quality(self) -> Quality:
return self._observed_spectrum.quality
@property
def energy_boundaries(self, mask: bool = True) -> Tuple[float]:
"""
Energy boundaries of channels currently in use (rebinned, if a rebinner is active)
:return: (energy_min, energy_max)
"""
energies = np.array(self._observed_spectrum.edges)
energy_min, energy_max = energies[:-1], energies[1:]
if self._rebinner is not None:
# Get the rebinned chans. NOTE: these are already masked
energy_min, energy_max = self._rebinner.get_new_start_and_stop(
energy_min, energy_max
)
else:
# Apply the mask
energy_min = energy_min[self._mask]
energy_max = energy_max[self._mask]
return energy_min, energy_max
@property
def n_data_points(self) -> int:
if self._rebinner is not None:
return self._rebinner.n_bins
else:
return int(self._mask.sum())
@property
def significance(self) -> float:
"""
:return: the significance of the data over background
"""
sig_obj = Significance(
Non=self._observed_spectrum.total_count,
Noff=self._background_spectrum.total_count
if self._background_spectrum is not None
else None,
alpha=self._total_scale_factor,
)
if self._background_spectrum is not None:
if (
self._observed_spectrum.is_poisson
and self._background_spectrum.is_poisson
):
# use simple li & ma
significance = sig_obj.li_and_ma()
elif (
self._observed_spectrum.is_poisson
and not self._background_spectrum.is_poisson
):
significance = (
sig_obj.li_and_ma_equivalent_for_gaussian_background(
self._background_spectrum.total_count_error
)
)
else:
raise NotImplementedError(
"We haven't put in other significances yet"
)
else:
log.warning(
"Significance with no background is not yet computed accurately"
)
significance = [np.NaN]
return significance[0]
@property
def significance_per_channel(self) -> np.ndarray:
"""
:return: the significance of the data over background per channel
"""
with np.errstate(divide="ignore", invalid="ignore"):
sig_obj = Significance(
Non=self._current_observed_counts,
Noff=self._current_background_counts,
alpha=self._total_scale_factor,
)
if (
self._observed_spectrum.is_poisson
and self._background_spectrum.is_poisson
):
# use simple li & ma
significance = sig_obj.li_and_ma()
elif (
self._observed_spectrum.is_poisson
and not self._background_spectrum.is_poisson
):
significance = (
sig_obj.li_and_ma_equivalent_for_gaussian_background(
self._current_back_count_errors
)
)
else:
raise NotImplementedError(
"We haven't put in other significances yet"
)
return significance
[docs]
def write_pha(self):
raise NotImplementedError("this is in progress")
# we just need to make a diagonal response and then follow the example in dispersion like
[docs]
def view_count_spectrum(
self,
plot_errors: bool = True,
show_bad_channels: bool = True,
show_warn_channels: bool = False,
significance_level: bool = None,
scale_background: bool = True,
) -> matplotlib.figure.Figure:
"""
View the count and background spectrum. Useful to check energy selections.
:param plot_errors: plot errors on the counts
:param show_bad_channels: (bool) show which channels are bad in the native PHA quality
:return:
"""
if sum(self._mask) == 0:
raise RuntimeError("There are no active channels selected to plot!")
# adding the rebinner: j. michael.
# In the future read colors from config file
# First plot the counts
# find out the type of observation
modeled_label = " "
if self._observation_noise_model == "poisson":
# Observed counts
observed_counts = copy.copy(self._current_observed_counts)
cnt_err = np.sqrt(observed_counts)
if self._background_noise_model == "poisson":
background_counts = copy.copy(self._current_background_counts)
background_errors = np.sqrt(background_counts)
elif self._background_noise_model == "ideal":
background_counts = copy.copy(
self._current_scaled_background_counts
)
background_errors = np.zeros_like(background_counts)
elif self._background_noise_model == "gaussian":
background_counts = copy.copy(self._current_background_counts)
background_errors = copy.copy(self._current_back_count_errors)
elif self._background_noise_model is None:
if self._background_plugin is None:
observed_counts = copy.copy(self._current_observed_counts)
background_counts = np.zeros(observed_counts.shape)
background_errors = np.zeros(observed_counts.shape)
else:
raise RuntimeError("This is a bug")
# we will show the modeled counts
background_counts = self.get_background_model()
background_errors = np.sqrt(background_counts)
modeled_label = "Modeled "
elif self._background_noise_model == "modeled":
background_counts = self.get_background_model()
background_errors = np.sqrt(background_counts)
modeled_label = "Modeled "
else:
raise RuntimeError("This is a bug")
# convert to rates, ugly, yes
# background_counts /= self._background_exposure
# background_errors /= self._background_exposure
background_rate = background_counts / self._background_exposure
background_rate_errors = (
background_errors / self._background_exposure
)
# Gaussian observation
else:
if self._background_noise_model is None:
observed_counts = copy.copy(self._current_observed_counts)
background_counts = np.zeros(
observed_counts.shape, dtype=np.int64
)
background_errors = np.zeros(
observed_counts.shape, dtype=np.int64
)
background_rate = np.zeros(observed_counts.shape)
background_rate_errors = np.zeros(observed_counts.shape)
cnt_err = copy.copy(self._current_observed_count_errors)
elif self._background_noise_model == "gaussian":
observed_counts = copy.copy(self._current_observed_counts)
background_counts = copy.copy(self._current_background_counts)
background_errors = copy.copy(self._current_back_count_errors)
background_rate = background_counts / self._background_exposure
background_rate_errors = (
background_errors / self._background_exposure
)
cnt_err = copy.copy(self._current_observed_count_errors)
# convert to rates, ugly, yes
observed_rates = (
observed_counts
/ self._observed_spectrum.exposure
/ self._observed_spectrum.scale_factor
)
rate_err = (
cnt_err
/ self._observed_spectrum.exposure
/ self._observed_spectrum.scale_factor
)
# observed_counts /= self._observed_spectrum.exposure
# cnt_err /= self._observed_spectrum.exposure
if scale_background and self._has_background:
background_rate /= self._background_scale_factor
background_rate_errors /= self._background_scale_factor
background_label = f"Scaled {modeled_label}Background"
else:
background_label = f"{modeled_label}Background"
# Make the plots
fig, ax = plt.subplots()
# Get the energy boundaries
energy_min, energy_max = self.energy_boundaries
energy_width = energy_max - energy_min
# plot counts and background for the currently selected data
channel_plot(
ax,
energy_min,
energy_max,
observed_rates,
color=threeML_config["plugins"]["ogip"]["data_plot"][
"counts_color"
],
lw=1.5,
alpha=1,
label="Total",
)
if not np.all(background_rate == 0) and self._has_background:
channel_plot(
ax,
energy_min,
energy_max,
background_rate,
color=threeML_config["plugins"]["ogip"]["data_plot"][
"background_color"
],
alpha=0.8,
label=background_label,
)
mean_chan = np.mean([energy_min, energy_max], axis=0)
# if asked, plot the errors
if plot_errors:
ax.errorbar(
mean_chan,
observed_rates / energy_width,
yerr=rate_err / energy_width,
fmt="",
# markersize=3,
linestyle="",
elinewidth=0.7,
alpha=0.9,
capsize=0,
# label=data._name,
color=threeML_config["plugins"]["ogip"]["data_plot"][
"counts_color"
],
)
if self._background_noise_model is not None:
ax.errorbar(
mean_chan,
background_rate / energy_width,
yerr=background_rate_errors / energy_width,
fmt="",
# markersize=3,
linestyle="",
elinewidth=0.7,
alpha=0.9,
capsize=0,
# label=data._name,
color=threeML_config["plugins"]["ogip"]["data_plot"][
"background_color"
],
)
# Now plot and fade the non-used channels
non_used_mask = ~self._mask
if True: # non_used_mask.sum() > 0:
# Get un-rebinned versions of all arrays, so we can directly apply the mask
energy_min_unrebinned, energy_max_unrebinned = (
np.array(self._observed_spectrum.starts),
np.array(self._observed_spectrum.stops),
)
energy_width_unrebinned = (
energy_max_unrebinned - energy_min_unrebinned
)
observed_rate_unrebinned = self._observed_counts / self.exposure
observed_rate_unrebinned_err = (
np.sqrt(self._observed_counts) / self.exposure
)
if non_used_mask.sum() > 0:
channel_plot(
ax,
energy_min_unrebinned[non_used_mask],
energy_max_unrebinned[non_used_mask],
observed_rate_unrebinned[non_used_mask],
color=threeML_config.plugins.ogip.data_plot.counts_color,
lw=1.5,
alpha=1,
)
if self._background_noise_model is not None:
if self._background_noise_model == "modeled":
background_rate_unrebinned = (
self.get_background_model(without_mask=True)
/ self._background_exposure
)
background_rate_unrebinned_err = (
np.sqrt(self.get_background_model(without_mask=True))
/ self._background_exposure
)
else:
background_rate_unrebinned = (
self._background_counts / self.background_exposure
)
background_rate_unrebinned_err = (
np.sqrt(self._background_counts)
/ self.background_exposure
)
if non_used_mask.sum() > 0:
channel_plot(
ax,
energy_min_unrebinned[non_used_mask],
energy_max_unrebinned[non_used_mask],
background_rate_unrebinned[non_used_mask],
color=threeML_config.plugins.ogip.data_plot.background_color,
alpha=0.8,
)
else:
background_rate_unrebinned = np.zeros_like(
observed_rate_unrebinned
)
background_rate_unrebinned_err = np.zeros_like(
observed_rate_unrebinned_err
)
if plot_errors:
mean_chan_unrebinned = np.mean(
[energy_min_unrebinned, energy_max_unrebinned], axis=0
)
ax.errorbar(
mean_chan_unrebinned[non_used_mask],
observed_rate_unrebinned[non_used_mask]
/ energy_width_unrebinned[non_used_mask],
yerr=observed_rate_unrebinned_err[non_used_mask]
/ energy_width_unrebinned[non_used_mask],
fmt="",
# markersize=3,
linestyle="",
elinewidth=0.7,
alpha=0.9,
capsize=0,
# label=data._name,
color=threeML_config.plugins.ogip.data_plot.counts_color,
)
if self._background_noise_model is not None:
ax.errorbar(
mean_chan_unrebinned[non_used_mask],
background_rate_unrebinned[non_used_mask]
/ energy_width_unrebinned[non_used_mask],
yerr=background_rate_unrebinned_err[non_used_mask]
/ energy_width_unrebinned[non_used_mask],
fmt="",
# markersize=3,
linestyle="",
elinewidth=0.7,
alpha=0.9,
capsize=0,
# label=data._name,
color=threeML_config.plugins.ogip.data_plot.background_color,
)
# make some nice top and bottom plot ranges
# tmp_bkg = background_rate_unrebinned / energy_width_unrebinned
# tmp_bkg = tmp_bkg[np.isfinite(tmp_bkg)]
# tmp_obs = observed_rate_unrebinned / energy_width_unrebinned
# tmp_obs = tmp_obs[np.isfinite(tmp_obs)]
tmp_bkg = background_rate / energy_width
tmp_bkg = tmp_bkg[np.isfinite(tmp_bkg)]
tmp_obs = observed_rates / energy_width
tmp_obs = tmp_obs[np.isfinite(tmp_obs)]
top = max([max(tmp_bkg), max(tmp_obs)]) * 1.5
bottom = (
min(
[
min(tmp_bkg),
min(tmp_obs),
]
)
* 0.8
)
# plot the deselected regions
disjoint_patch_plot(
ax,
energy_min_unrebinned,
energy_max_unrebinned,
top,
bottom,
~self._mask,
color=threeML_config.plugins.ogip.data_plot.masked_channels_color,
alpha=0.5,
)
# plot the bad quality channels if requested
if show_bad_channels:
if sum(self._observed_spectrum.quality.bad) > 0:
log.info("bad channels shown in red hatching\n")
disjoint_patch_plot(
ax,
energy_min_unrebinned,
energy_max_unrebinned,
top,
bottom,
self._observed_spectrum.quality.bad,
color="none",
edgecolor=threeML_config.plugins.ogip.data_plot.bad_channels_color,
hatch="/",
alpha=0.95,
)
if show_warn_channels:
if sum(self._observed_spectrum.quality.warn) > 0:
log.info("warned channels shown in purple hatching\n")
disjoint_patch_plot(
ax,
energy_min_unrebinned,
energy_max_unrebinned,
top,
bottom,
self._observed_spectrum.quality.bad,
color="none",
edgecolor=threeML_config.plugins.ogip.data_plot.warn_channels_color,
hatch="/",
alpha=0.95,
)
if significance_level is not None:
log.info(
"channels below the significance threshold shown in red\n"
)
with np.errstate(invalid="ignore"):
significance_mask = (
self.significance_per_channel < significance_level
)
disjoint_patch_plot(
ax,
energy_min_unrebinned,
energy_max_unrebinned,
top,
bottom,
significance_mask,
color="red",
alpha=0.3,
)
ax.set_xlabel("Energy (keV)")
ax.set_ylabel("Rate (counts s$^{-1}$ keV$^{-1}$)")
ax.set_xlim(
left=self._observed_spectrum.absolute_start,
right=self._observed_spectrum.absolute_stop,
)
ax.legend()
return fig
def _output(self):
# type: () -> pd.Series
obs = collections.OrderedDict()
obs["n. channels"] = self._observed_spectrum.n_channels
obs["total rate"] = self._observed_spectrum.total_rate
if not self._observed_spectrum.is_poisson:
obs["total rate error"] = self._observed_spectrum.total_rate_error
if self._background_spectrum is not None:
obs["total bkg. rate"] = self._background_spectrum.total_rate
if not self._background_spectrum.is_poisson:
obs[
"total bkg. rate error"
] = self._background_spectrum.total_rate_error
obs["bkg. exposure"] = self.background_exposure
obs["bkg. is poisson"] = self._background_spectrum.is_poisson
obs["exposure"] = self.exposure
obs["is poisson"] = self._observed_spectrum.is_poisson
if self._background_plugin is not None:
obs["background"] = (
"modeled from plugin %s" % self._background_plugin.name
)
obs["significance"] = self.significance
obs["src/bkg area ratio"] = self._area_ratio
obs["src/bkg exposure ratio"] = self._exposure_ratio
obs["src/bkg scale factor"] = self._total_scale_factor
elif self._background_spectrum is not None:
obs["background"] = "profiled"
obs["significance"] = self.significance
obs["src/bkg area ratio"] = self._area_ratio
obs["src/bkg exposure ratio"] = self._exposure_ratio
obs["src/bkg scale factor"] = self._total_scale_factor
# obs['response'] = self._observed_spectrum.response_file
return pd.Series(data=obs, index=list(obs.keys()))
[docs]
def get_number_of_data_points(self) -> int:
"""
returns the number of active data bins
:return:
"""
# the sum of the mask should be the number of data bins in use
return self._mask.sum()
[docs]
def display(self):
display(self._output().to_frame())
def __repr__(self):
return self._output().to_string()
def _construct_counts_arrays(
self,
min_rate: Union[int, float],
ratio_residuals: bool = False,
total_counts: bool = False,
) -> dict:
"""
Build new arrays before or after a fit of rebinned data/model
values. We keep this seperated from the plotting code because
it is cleaner and allows us to extract these quantites independently
:param min_rate:
:param ratio_residuals:
:param total_counts: Should this construct the total counts as "data". If not, the "data counts" are
observed-background and the model counts are only source counts. Otherwise "data counts" are observed
and model counts are source+background
:return:
"""
# energy_min, energy_max = self._rsp.ebounds[:-1], self._rsp.ebounds[1:]
energy_min = np.array(self._observed_spectrum.edges[:-1])
energy_max = np.array(self._observed_spectrum.edges[1:])
chan_width = energy_max - energy_min
# Source model
expected_model_rate = self.expected_model_rate
# Observed rate
observed_rate = self.observed_counts / self._observed_spectrum.exposure
observed_rate_err = (
self.observed_count_errors / self._observed_spectrum.exposure
)
# Background rate
if (self._background_noise_model is not None) or (
self._background_plugin is not None
):
background_rate = (
self.background_counts / self._background_exposure
) * self._area_ratio
background_rate_err = (
self.background_count_errors / self._background_exposure
) * self._area_ratio
else:
background_rate = np.zeros(len(observed_rate))
background_rate_err = np.zeros(len(observed_rate))
# Create a rebinner if either a min_rate has been given, or if the current data set has no rebinned on its own
# rebin on expected model rate
if (min_rate is not NO_REBIN) or (self._rebinner is None):
this_rebinner = Rebinner(expected_model_rate, min_rate, self._mask)
else:
# Use the rebinner already in the data
this_rebinner = self._rebinner
# get the rebinned counts
(
new_observed_rate,
new_model_rate,
new_background_rate,
) = this_rebinner.rebin(
observed_rate, expected_model_rate, background_rate
)
(new_observed_rate_err,) = this_rebinner.rebin_errors(observed_rate_err)
(new_background_rate_err,) = this_rebinner.rebin_errors(
background_rate_err
)
# adjust channels
new_energy_min, new_energy_max = this_rebinner.get_new_start_and_stop(
energy_min, energy_max
)
new_chan_width = new_energy_max - new_energy_min
# mean_energy = np.mean([new_energy_min, new_energy_max], axis=0)
# For each bin find the weighted average of the channel center
mean_energy = []
delta_energy = [[], []]
mean_energy_unrebinned = (energy_max + energy_min) / 2.0
for e_min, e_max in zip(new_energy_min, new_energy_max):
# Find all channels in this rebinned bin
idx = (mean_energy_unrebinned >= e_min) & (
mean_energy_unrebinned <= e_max
)
# Find the rates for these channels
r = observed_rate[idx]
if r.max() == 0 or min_rate < 0:
# All empty, cannot weight
this_mean_energy = (e_min + e_max) / 2.0
else:
# negative src rates cause the energy mean to
# go outside of the bounds. So we fix negative rates to
# zero when computing the mean
idx_negative = r < 0.0
r[idx_negative] = 0.0
# Do the weighted average of the mean energies
weights = r / np.sum(r)
this_mean_energy = np.average(
mean_energy_unrebinned[idx], weights=weights
)
# Compute "errors" for X (which aren't really errors, just to mark the size of the bin)
delta_energy[0].append(this_mean_energy - e_min)
delta_energy[1].append(e_max - this_mean_energy)
mean_energy.append(this_mean_energy)
# Residuals
# we need to get the rebinned counts
(rebinned_observed_counts,) = this_rebinner.rebin(self.observed_counts)
(rebinned_observed_count_errors,) = this_rebinner.rebin_errors(
self.observed_count_errors
)
# the rebinned counts expected from the model
rebinned_model_counts = (
new_model_rate * self._observed_spectrum.exposure
)
# and also the rebinned background
if self._background_noise_model is not None:
if False: # self._background_noise_model == "modeled":
(rebinned_background_counts,) = this_rebinner.rebin(
self.get_background_model()
)
(rebinned_background_errors,) = this_rebinner.rebin_errors(
np.sqrt(self.get_background_model())
)
else:
(rebinned_background_counts,) = this_rebinner.rebin(
self.background_counts
)
(rebinned_background_errors,) = this_rebinner.rebin_errors(
self.background_count_errors
)
else:
rebinned_background_counts = np.zeros_like(rebinned_observed_counts)
significance_calc = Significance(
rebinned_observed_counts,
rebinned_background_counts
+ rebinned_model_counts / self._total_scale_factor,
# min([self._total_scale_factor, 1.0])
self._total_scale_factor,
)
# Divide the various cases
# TODO check this: shoudn't it be obseved-background/model (for the old way) and
# observed/(model+background) (for the new way). Errors also wrong observed+background error
if ratio_residuals:
residuals = (
(rebinned_observed_counts - rebinned_model_counts)
/ rebinned_model_counts,
)
residual_errors = (
rebinned_observed_count_errors / rebinned_model_counts
)
else:
residual_errors = None
if self._observation_noise_model == "poisson":
if self._background_noise_model == "poisson":
# We use the Li-Ma formula to get the significance (sigma)
residuals = significance_calc.li_and_ma()
elif self._background_noise_model == "ideal":
residuals = significance_calc.known_background()
elif self._background_noise_model == "gaussian":
residuals = significance_calc.li_and_ma_equivalent_for_gaussian_background(
rebinned_background_errors
)
elif self._background_noise_model is None:
residuals = significance_calc.known_background()
elif self._background_noise_model == "modeled":
residuals = significance_calc.known_background()
else:
log.error("This is a bug!")
raise RuntimeError("This is a bug")
else:
if self._background_noise_model is None:
residuals = (
rebinned_observed_counts - rebinned_model_counts
) / rebinned_observed_count_errors
elif self._background_noise_model == "gaussian":
residuals = significance_calc.gaussian_background(
rebinned_observed_count_errors,
rebinned_background_errors,
)
else:
log.error("Not yet implemeted!")
raise NotImplementedError("Not yet implemented")
# construct a dict with all the new quantities
# so that we can extract them for plotting
rebinned_quantities = dict(
# Rebined
# observed
new_observed_rate=new_observed_rate,
new_observed_rate_err=new_observed_rate_err,
# background
new_background_rate=new_background_rate,
new_background_rate_err=new_background_rate_err,
# model
new_model_rate=new_model_rate,
# New echans
new_chan_width=new_chan_width,
new_energy_min=new_energy_min,
new_energy_max=new_energy_max,
mean_energy=mean_energy,
# Residuals
residuals=residuals,
residual_errors=residual_errors,
delta_energy=delta_energy,
# Unbinned model rate
expected_model_rate=expected_model_rate,
# Unbinned echans
energy_min=energy_min,
energy_max=energy_max,
chan_width=chan_width,
)
return rebinned_quantities
[docs]
def display_model(
self,
data_color: str = "k",
model_color: str = "r",
background_color: str = "b",
step: bool = True,
show_data: bool = True,
show_residuals: bool = True,
ratio_residuals: bool = False,
show_legend: bool = True,
min_rate: Union[int, float] = 1e-99,
model_label: Optional[str] = None,
model_kwargs: Optional[Dict[str, Any]] = None,
data_kwargs: Optional[Dict[str, Any]] = None,
background_label: Optional[str] = None,
background_kwargs: Optional[Dict[str, Any]] = None,
source_only: bool = True,
show_background: bool = False,
**kwargs,
) -> ResidualPlot:
"""
Plot the current model with or without the data and the residuals. Multiple models can be plotted by supplying
a previous axis to 'model_subplot'.
Example usage:
fig = data.display_model()
fig2 = data2.display_model(model_subplot=fig.axes)
:param data_color: the color of the data
:param model_color: the color of the model
:param step: (bool) create a step count histogram or interpolate the model
:param show_data: (bool) show_the data with the model
:param show_residuals: (bool) shoe the residuals
:param ratio_residuals: (bool) use model ratio instead of residuals
:param show_legend: (bool) show legend
:param min_rate: the minimum rate per bin
:param model_label: (optional) the label to use for the model default is plugin name
:param model_subplot: (optional) axis or list of axes to plot to
:param model_kwargs: plotting kwargs affecting the plotting of the model
:param data_kwargs: plotting kwargs affecting the plotting of the data and residuls
:return:
"""
# set up the default plotting
_default_model_kwargs = dict(color=model_color, alpha=1)
_default_background_kwargs = dict(
color=background_color, alpha=1, ls="--"
)
_sub_menu = threeML_config.plotting.residual_plot
_default_data_kwargs = dict(
color=data_color,
alpha=1,
fmt=_sub_menu.marker,
markersize=_sub_menu.size,
ls="",
elinewidth=_sub_menu.linewidth,
capsize=0,
)
# overwrite if these are in the confif
_kwargs_menu: BinnedSpectrumPlot = threeML_config.plugins.ogip.fit_plot
if _kwargs_menu.model_mpl_kwargs is not None:
for k, v in _kwargs_menu.model_mpl_kwargs.items():
_default_model_kwargs[k] = v
if _kwargs_menu.data_mpl_kwargs is not None:
for k, v in _kwargs_menu.data_mpl_kwargs.items():
_default_data_kwargs[k] = v
if _kwargs_menu.background_mpl_kwargs is not None:
for k, v in _kwargs_menu.background_mpl_kwargs.items():
_default_background_kwargs[k] = v
if model_kwargs is not None:
if not type(model_kwargs) == dict:
log.error("model_kwargs must be a dict")
raise RuntimeError()
for k, v in list(model_kwargs.items()):
if k in _default_model_kwargs:
_default_model_kwargs[k] = v
else:
_default_model_kwargs[k] = v
if data_kwargs is not None:
if not type(data_kwargs) == dict:
log.error("data_kwargs must be a dict")
raise RuntimeError()
for k, v in list(data_kwargs.items()):
if k in _default_data_kwargs:
_default_data_kwargs[k] = v
else:
_default_data_kwargs[k] = v
if background_kwargs is not None:
if not type(background_kwargs) == dict:
log.error("background_kwargs must be a dict")
raise RuntimeError()
for k, v in list(background_kwargs.items()):
if k in _default_background_kwargs:
_default_background_kwargs[k] = v
else:
_default_background_kwargs[k] = v
# since we define some defualts, lets not overwrite
# the users
_duplicates = (("ls", "linestyle"), ("lw", "linewidth"))
for d in _duplicates:
if (d[0] in _default_model_kwargs) and (
d[1] in _default_model_kwargs
):
_default_model_kwargs.pop(d[0])
if (d[0] in _default_data_kwargs) and (
d[1] in _default_data_kwargs
):
_default_data_kwargs.pop(d[0])
if (d[0] in _default_background_kwargs) and (
d[1] in _default_background_kwargs
):
_default_background_kwargs.pop(d[0])
if model_label is None:
model_label = "%s Model" % self._name
residual_plot = ResidualPlot(
show_residuals=show_residuals,
ratio_residuals=ratio_residuals,
**kwargs,
)
# compute the values for the plotting
rebinned_quantities = self._construct_counts_arrays(
min_rate, ratio_residuals
)
if source_only:
y_label = "Net rate\n(counts s$^{-1}$ keV$^{-1}$)"
weighted_data = old_div(
rebinned_quantities["new_observed_rate"]
- rebinned_quantities["new_background_rate"],
rebinned_quantities["new_chan_width"],
)
weighted_error = old_div(
np.sqrt(
rebinned_quantities["new_observed_rate_err"] ** 2
+ rebinned_quantities["new_background_rate_err"] ** 2
),
rebinned_quantities["new_chan_width"],
)
else:
y_label = "Observed rate\n(counts s$^{-1}$ keV$^{-1}$)"
weighted_data = old_div(
rebinned_quantities["new_observed_rate"],
rebinned_quantities["new_chan_width"],
)
weighted_error = old_div(
rebinned_quantities["new_observed_rate_err"],
rebinned_quantities["new_chan_width"],
)
# weighted_data = old_div(
# rebinned_quantities["new_rate"], rebinned_quantities["new_chan_width"]
# )
# weighted_error = old_div(
# rebinned_quantities["new_err"], rebinned_quantities["new_chan_width"]
# )
residual_plot.add_data(
rebinned_quantities["mean_energy"],
weighted_data,
rebinned_quantities["residuals"],
residual_yerr=rebinned_quantities["residual_errors"],
yerr=weighted_error,
xerr=rebinned_quantities["delta_energy"],
label=self._name,
show_data=show_data,
**_default_data_kwargs,
)
if show_background:
residual_plot.add_model_step(
rebinned_quantities["new_energy_min"],
rebinned_quantities["new_energy_max"],
rebinned_quantities["new_chan_width"],
rebinned_quantities["new_background_rate"],
label=background_label,
**_default_background_kwargs,
)
# a step historgram
if step:
if source_only:
# only source
eff_model = rebinned_quantities["new_model_rate"]
else:
eff_model = (
rebinned_quantities["new_model_rate"]
+ rebinned_quantities["new_background_rate"]
)
residual_plot.add_model_step(
rebinned_quantities["new_energy_min"],
rebinned_quantities["new_energy_max"],
rebinned_quantities["new_chan_width"],
eff_model,
label=model_label,
**_default_model_kwargs,
)
# residual_plot.add_model_step(
# rebinned_quantities["new_energy_min"],
# rebinned_quantities["new_energy_max"],
# rebinned_quantities["new_chan_width"],
# rebinned_quantities["new_model_rate"],
# label=model_label,
# **_default_model_kwargs
# )
else:
# We always plot the model un-rebinned here
# Mask the array so we don't plot the model where data have been excluded
# y = expected_model_rate / chan_width
y = np.ma.masked_where(
~self._mask,
old_div(
rebinned_quantities["expected_model_rate"],
rebinned_quantities["chan_width"],
),
)
x = np.mean(
[
rebinned_quantities["energy_min"],
rebinned_quantities["energy_max"],
],
axis=0,
)
residual_plot.add_model(
x, y, label=model_label, **_default_model_kwargs
)
return residual_plot.finalize(
xlabel="Energy\n(keV)",
ylabel=y_label,
xscale="log",
yscale="log",
show_legend=show_legend,
)
@nb.njit(fastmath=True, cache=True)
def _trapz(x, y):
return np.trapz(x, y)
@nb.njit(fastmath=True, cache=True)
def _simps(e1, e2, diff_fluxes_edges, diff_fluxes_mid):
return (
(e2 - e1)
/ 6.0
* (diff_fluxes_edges[:-1] + 4 * diff_fluxes_mid + diff_fluxes_edges[1:])
)
@nb.njit(fastmath=True, cache=True)
def _rsum(model_mid_points, de):
return np.multiply(model_mid_points, de)