from __future__ import division
import collections
import os
from typing import Any, Dict, List, Optional, Tuple, Union
import astromodels
import astropy.io.fits as fits
import numpy as np
import yaml
from astromodels import Model, Parameter
from astromodels.core import parameter_transformation
from astropy import units as u
from astropy.stats import circmean
from past.utils import old_div
from threeML.config.config import threeML_config
from threeML.config.plotting_structure import FermiSpectrumPlot
from threeML.exceptions.custom_exceptions import custom_warnings
from threeML.io.dict_with_pretty_print import DictWithPrettyPrint
from threeML.io.file_utils import sanitize_filename
from threeML.io.logging import setup_logger
from threeML.io.package_data import get_path_of_data_file
from threeML.io.plotting.cmap_cycle import cmap_intervals
from threeML.io.plotting.data_residual_plot import ResidualPlot
from threeML.plugin_prototype import PluginPrototype
from threeML.utils.power_of_two_utils import is_power_of_2
from threeML.utils.statistics.gammaln import logfactorial
from threeML.utils.statistics.stats_tools import Significance
from threeML.utils.unique_deterministic_tag import get_unique_deterministic_tag
log = setup_logger(__name__)
from threeML.io.logging import setup_logger
log = setup_logger(__name__)
__instrument_name = "Fermi LAT (with fermipy)"
#########################################################
# NOTE:
# Fermipy does NOT support Unbinned Likelihood analysis
#########################################################
# A lookup map for the correspondence between IRFS and evclass
# See https://fermi.gsfc.nasa.gov/ssc/data/analysis/documentation/Cicerone/Cicerone_Data/LAT_DP.html#PhotonClassification
evclass_irf = {
8: "P8R3_TRANSIENT020E_V3",
16: "P8R3_TRANSIENT020_V3",
32: "P8R3_TRANSIENT010E_V3",
64: "P8R3_TRANSIENT010_V3",
128: "P8R3_SOURCE_V3",
256: "P8R3_CLEAN_V3",
512: "P8R3_ULTRACLEAN_V3",
1024: "P8R3_ULTRACLEANVETO_V3",
2048: "P8R3_SOURCEVETO_V3",
65536: "P8R3_TRANSIENT015S_V3",
}
def _get_unique_tag_from_configuration(configuration):
keys_for_hash = (
("data", ("evfile", "scfile")),
("binning", ("roiwidth", "binsz", "binsperdec")),
(
"selection",
(
"emin",
"emax",
"zmax",
"evclass",
"evtype",
"filter",
"ra",
"dec",
),
),
)
string_to_hash = []
for section, keys in keys_for_hash:
if not section in configuration:
log.critical(
"Configuration lacks section %s, which is required" % section
)
for key in keys:
if not key in configuration[section]:
log.critical(
"Section %s in configuration lacks key %s, which is required"
% key
)
string_to_hash.append("%s" % configuration[section][key])
return get_unique_deterministic_tag(",".join(string_to_hash))
def _get_fermipy_instance(configuration, likelihood_model):
"""
Generate a 'model' configuration section for fermipy starting from a likelihood model from astromodels
:param configuration: a dictionary containing the configuration for fermipy
:param likelihood_model: the input likelihood model from astromodels
:type likelihood_model: astromodels.Model
:return: a dictionary with the 'model' section of the fermipy configuration
"""
# Generate a new 'model' section in the configuration which reflects the model
# provided as input
# Get center and radius of ROI
ra_center = float(configuration["selection"]["ra"])
dec_center = float(configuration["selection"]["dec"])
roi_width = float(configuration["binning"]["roiwidth"])
roi_radius = old_div(roi_width, np.sqrt(2.0))
# Get IRFS
irfs = evclass_irf[int(configuration["selection"]["evclass"])]
log.info(f"Using IRFs {irfs}")
if "gtlike" in configuration and "irfs" in configuration["gtlike"]:
if irfs.upper() != configuration["gtlike"]["irfs"].upper():
log.critical(
"Evclass points to IRFS %s, while you specified %s in the "
"configuration" % (irfs, configuration["gtlike"]["irfs"])
)
else:
if not "gtlike" in configuration:
configuration["gtlike"] = {}
configuration["gtlike"]["irfs"] = irfs
# The fermipy model is just a dictionary. It corresponds to the 'model' section
# of the configuration file (http://fermipy.readthedocs.io/en/latest/config.html#model)
fermipy_model = {}
# Find Galactic and Isotropic templates appropriate for this IRFS
# (information on the ROI is used to cut the Galactic template, which speeds up the
# analysis a lot)
# NOTE: these are going to be absolute paths
galactic_template = str(
sanitize_filename(
findGalacticTemplate(irfs, ra_center, dec_center, roi_radius), True, # noqa: F821
)
)
isotropic_template = str(
sanitize_filename(findIsotropicTemplate(irfs), True)) # noqa: F821
# Add them to the fermipy model
fermipy_model["galdiff"] = galactic_template
fermipy_model["isodiff"] = isotropic_template
# Now iterate over all sources contained in the likelihood model
sources = []
# point sources
for point_source in list(
likelihood_model.point_sources.values()
): # type: astromodels.PointSource
this_source = {
"Index": 2.56233,
"Scale": 572.78,
"Prefactor": 2.4090e-12,
}
this_source["name"] = point_source.name
this_source["ra"] = point_source.position.ra.value
this_source["dec"] = point_source.position.dec.value
# The spectrum used here is unconsequential, as it will be substituted by a FileFunction
# later on. So I will just use PowerLaw for everything
this_source["SpectrumType"] = "PowerLaw"
sources.append(this_source)
# extended sources
for extended_source in list(
likelihood_model.extended_sources.values()
): # type: astromodels.ExtendedSource
this_source = {
"Index": 2.56233,
"Scale": 572.78,
"Prefactor": 2.4090e-12,
}
this_source["name"] = extended_source.name
# The spectrum used here is unconsequential, as it will be substituted by a FileFunction
# later on. So I will just use PowerLaw for everything
this_source["SpectrumType"] = "PowerLaw"
theShape = extended_source.spatial_shape
if theShape.name == "Disk_on_sphere":
this_source["SpatialModel"] = "RadialDisk"
this_source["ra"] = theShape.lon0.value
this_source["dec"] = theShape.lat0.value
this_source["SpatialWidth"] = theShape.radius.value
elif theShape.name == "Gaussian_on_sphere":
this_source["SpatialModel"] = "RadialGaussian"
this_source["ra"] = theShape.lon0.value
this_source["dec"] = theShape.lat0.value
# fermipy/fermi tools expect 68% containment radius = 1.36 sigma
this_source["SpatialWidth"] = 1.36 * theShape.sigma.value
elif theShape.name == "SpatialTemplate_2D":
try:
(ra_min, ra_max), (dec_min, dec_max) = theShape.get_boundaries()
this_source["ra"] = circmean([ra_min, ra_max] * u.deg).value
this_source["dec"] = circmean([dec_min, dec_max] * u.deg).value
except:
log.critical(
f"Source {extended_source.name} does not have a template file set; must call read_file first()"
)
this_source["SpatialModel"] = "SpatialMap"
this_source["Spatial_Filename"] = theShape._fitsfile
else:
log.critical(
f"Extended source {extended_source.name}: shape {theShape.name} not yet implemented for FermipyLike"
)
sources.append(this_source)
# Add all sources to the model
fermipy_model["sources"] = sources
# Now we can finally instance the GTAnalysis instance
configuration["model"] = fermipy_model
gta = GTAnalysis(configuration) # noqa: F821
# This will take a long time if it's the first time we run with this model
gta.setup()
# Substitute all spectra for point sources with FileSpectrum, so that we will be able to control
# them from 3ML
energies_keV = None
for point_source in list(
likelihood_model.point_sources.values()
): # type: astromodels.PointSource
# This will substitute the current spectrum with a FileFunction with the same shape and flux
gta.set_source_spectrum(
point_source.name, "FileFunction", update_source=False
)
# Get the energies at which to evaluate this source
this_log_energies, _flux = gta.get_source_dnde(point_source.name)
this_energies_keV = (
10**this_log_energies * 1e3
) # fermipy energies are in GeV, we need keV
if energies_keV is None:
energies_keV = this_energies_keV
else:
# This is to make sure that all sources are evaluated at the same energies
if not np.all(energies_keV == this_energies_keV):
log.critical(
"All sources should be evaluated at the same energies."
)
dnde = point_source(energies_keV) # ph / (cm2 s keV)
dnde_per_MeV = np.maximum(dnde * 1000.0, 1e-300) # ph / (cm2 s MeV)
gta.set_source_dnde(point_source.name, dnde_per_MeV, False)
# Same for extended source
for extended_source in list(
likelihood_model.extended_sources.values()
): # type: astromodels.ExtendedSource
# This will substitute the current spectrum with a FileFunction with the same shape and flux
gta.set_source_spectrum(
extended_source.name, "FileFunction", update_source=False
)
# Get the energies at which to evaluate this source
this_log_energies, _flux = gta.get_source_dnde(extended_source.name)
this_energies_keV = (
10**this_log_energies * 1e3
) # fermipy energies are in GeV, we need keV
if energies_keV is None:
energies_keV = this_energies_keV
else:
# This is to make sure that all sources are evaluated at the same energies
if not np.all(energies_keV == this_energies_keV):
log.critical(
"All sources should be evaluated at the same energies."
)
dnde = extended_source.get_spatially_integrated_flux(
energies_keV
) # ph / (cm2 s keV)
dnde_per_MeV = np.maximum(dnde * 1000.0, 1e-300) # ph / (cm2 s MeV)
gta.set_source_dnde(extended_source.name, dnde_per_MeV, False)
return gta, energies_keV
def _expensive_imports_hook():
from fermipy.gtanalysis import GTAnalysis
from GtBurst.LikelihoodComponent import (
findGalacticTemplate,
findIsotropicTemplate,
)
globals()["GTAnalysis"] = GTAnalysis
globals()["findGalacticTemplate"] = findGalacticTemplate
globals()["findIsotropicTemplate"] = findIsotropicTemplate
[docs]
class FermipyLike(PluginPrototype):
"""
Plugin for the data of the Fermi Large Area Telescope, based on fermipy (http://fermipy.readthedocs.io/)
"""
def __new__(cls, *args, **kwargs):
instance = object.__new__(cls)
# we do not catch here
_expensive_imports_hook()
return instance
def __init__(self, name, fermipy_config):
"""
:param name: a name for this instance
:param fermipy_config: either a path to a YAML configuration file or a dictionary containing the configuration
(see http://fermipy.readthedocs.io/)
"""
# There are no nuisance parameters
nuisance_parameters = {}
super(FermipyLike, self).__init__(
name, nuisance_parameters=nuisance_parameters
)
# Check whether the provided configuration is a file
if not isinstance(fermipy_config, dict):
# Assume this is a file name
configuration_file = sanitize_filename(fermipy_config)
if not os.path.exists(fermipy_config):
log.critical(
"Configuration file %s does not exist" % configuration_file
)
# Read the configuration
with open(configuration_file) as f:
self._configuration = yaml.load(f, Loader=yaml.SafeLoader)
else:
# Configuration is a dictionary. Nothing to do
self._configuration = fermipy_config
# If the user provided a 'model' key, issue a warning, as the model will be defined
# later on and will overwrite the one contained in 'model'
if "model" in self._configuration:
custom_warnings.warn(
"The provided configuration contains a 'model' section, which is useless as it "
"will be overridden"
)
self._configuration.pop("model")
if "fileio" in self._configuration:
custom_warnings.warn(
"The provided configuration contains a 'fileio' section, which will be "
"overwritten"
)
self._configuration.pop("fileio")
# Now check that the data exists
# As minimum there must be a evfile and a scfile
if not "evfile" in self._configuration["data"]:
log.critical("You must provide a evfile in the data section")
if not "scfile" in self._configuration["data"]:
log.critical("You must provide a scfile in the data section")
for datum in self._configuration["data"]:
# Sanitize file name, as fermipy is not very good at handling relative paths or env. variables
filename = str(
sanitize_filename(self._configuration["data"][datum], True)
)
self._configuration["data"][datum] = filename
if not os.path.exists(self._configuration["data"][datum]):
log.critical("File %s (%s) not found" % (filename, datum))
# Prepare the 'fileio' part
# Save all output in a directory with a unique name which depends on the configuration,
# so that the same configuration will write in the same directory and fermipy will
# know that it doesn't need to recompute things
self._unique_id = "__%s" % _get_unique_tag_from_configuration(
self._configuration
)
self._configuration["fileio"] = {"outdir": self._unique_id}
# Ensure that there is a complete definition of a Region Of Interest (ROI)
if not (
("ra" in self._configuration["selection"])
and ("dec" in self._configuration["selection"])
):
log.critical(
"You have to provide 'ra' and 'dec' in the 'selection' section of the configuration. Source name "
"resolution, as well as Galactic coordinates, are not currently supported"
)
# This is empty at the beginning, will be instanced in the set_model method
self._gta = None
# observation_duration = self._configuration["selection"]["tmax"] - self._configuration["selection"]["tmin"]
self.set_inner_minimization(True)
[docs]
@staticmethod
def get_basic_config(
evfile,
scfile,
ra,
dec,
emin=100.0,
emax=100000.0,
zmax=100.0,
evclass=128,
evtype=3,
filter="DATA_QUAL>0 && LAT_CONFIG==1",
fermipy_verbosity=2,
fermitools_chatter=2,
):
from fermipy.config import ConfigManager
# Get default config from fermipy
basic_config = ConfigManager.load(
get_path_of_data_file("fermipy_basic_config.yml")
) # type: dict
evfile = str(sanitize_filename(evfile))
scfile = str(sanitize_filename(scfile))
if not os.path.exists(evfile):
log.critical("The provided evfile %s does not exist" % evfile)
if not os.path.exists(scfile):
log.critical("The provided scfile %s does not exist" % scfile)
basic_config["data"]["evfile"] = evfile
basic_config["data"]["scfile"] = scfile
ra = float(ra)
dec = float(dec)
if not ((0 <= ra) and (ra <= 360)):
log.critical(
"The provided R.A. (%s) is not valid. Should be 0 <= ra <= 360.0"
% ra
)
if not ((-90 <= dec) and (dec <= 90)):
log.critical(
"The provided Dec (%s) is not valid. Should be -90 <= dec <= 90.0"
% dec
)
basic_config["selection"]["ra"] = ra
basic_config["selection"]["dec"] = dec
emin = float(emin)
emax = float(emax)
basic_config["selection"]["emin"] = emin
basic_config["selection"]["emax"] = emax
zmax = float(zmax)
if not ((0.0 <= zmax) and (zmax <= 180.0)):
log.critical(
"The provided Zenith angle cut (zmax = %s) is not valid. "
"Should be 0 <= zmax <= 180.0" % zmax
)
basic_config["selection"]["zmax"] = zmax
with fits.open(scfile) as ft2_:
tmin = float(ft2_[0].header["TSTART"])
tmax = float(ft2_[0].header["TSTOP"])
basic_config["selection"]["tmin"] = tmin
basic_config["selection"]["tmax"] = tmax
evclass = int(evclass)
if not is_power_of_2(evclass):
log.critical("The provided evclass is not a power of 2.")
basic_config["selection"]["evclass"] = evclass
evtype = int(evtype)
basic_config["selection"]["evtype"] = evtype
basic_config["selection"]["filter"] = filter
basic_config["logging"]["verbosity"] = fermipy_verbosity
# (In fermipy convention, 0 = critical only, 1 also errors, 2 also warnings, 3 also info, 4 also debug)
basic_config["logging"][
"chatter"
] = fermitools_chatter # 0 = no screen output. 2 = some output, 4 = lot of output.
return DictWithPrettyPrint(basic_config)
@property
def configuration(self):
"""
Returns the loaded configuration
:return: a dictionary containing the active configuration
"""
return self._configuration
@property
def gta(self):
if self._gta is None:
log.warning(
"You have to perform a fit or a bayesian analysis before accessing the "
"gta object. Returning None"
)
return self._gta
[docs]
def get_observation_duration(self):
event_file = self._configuration["data"]["evfile"]
tmin = self._configuration["selection"]["tmin"]
tmax = self._configuration["selection"]["tmax"]
with fits.open(event_file) as file:
gti_start = file["GTI"].data.START
gti_stop = file["GTI"].data.STOP
myfilter = (gti_stop > tmin) * (gti_start < tmax)
gti_sum = (gti_stop[myfilter] - gti_start[myfilter]).sum()
observation_duration = (
gti_sum
- (tmin - gti_start[myfilter][0])
- (gti_stop[myfilter][-1] - tmax)
)
log.info(f"FermipyLike - GTI SUM...:{observation_duration}")
return observation_duration
[docs]
def set_model(self, likelihood_model_instance):
"""
Set the model to be used in the joint minimization. Must be a LikelihoodModel instance.
"""
# This will take a long time if it's the first time we run, as it will select the data,
# produce livetime cube, expomap, source maps and so on
self._likelihood_model = likelihood_model_instance
self._gta, self._pts_energies = _get_fermipy_instance(
self._configuration, likelihood_model_instance
)
self._update_model_in_fermipy(update_dictionary=True, force_update=True)
# Build the list of the nuisance parameters
new_nuisance_parameters = self._set_nuisance_parameters()
self.update_nuisance_parameters(new_nuisance_parameters)
def _update_model_in_fermipy(
self, update_dictionary=False, delta=0.0, force_update=False
):
# Substitute all spectra for point sources with FileSpectrum, so that we will be able to control
# them from 3ML
for point_source in list(
self._likelihood_model.point_sources.values()
): # type: astromodels.PointSource
# Update this source only if it has free parameters (to gain time)
if not (point_source.has_free_parameters or force_update):
continue
# Update source position if free
if force_update or (
point_source.position.ra.free or point_source.position.dec.free
):
model_pos = point_source.position.sky_coord
fermipy_pos = self._gta.roi.get_source_by_name(
point_source.name
).skydir
if model_pos.separation(fermipy_pos).to("degree").value > delta:
# modeled after how this is done in fermipy
# (cf https://fermipy.readthedocs.io/en/latest/_modules/fermipy/sourcefind.html#SourceFind.localize)
temp_source = self._gta.delete_source(point_source.name)
temp_source.set_position(model_pos)
self._gta.add_source(
point_source.name, temp_source, free=False
)
self._gta.free_source(point_source.name, False)
self._gta.set_source_spectrum(
point_source.name,
"FileFunction",
update_source=update_dictionary,
)
# Now set the spectrum of this source to the right one
dnde = point_source(self._pts_energies) # ph / (cm2 s keV)
dnde_MeV = np.maximum(dnde * 1000.0, 1e-300) # ph / (cm2 s MeV)
# NOTE: I use update_source=False because it makes things 100x faster and I verified that
# it does not change the result.
# (HF: Not sure who wrote the above but I think sometimes we do want to update fermipy dictionaries.)
self._gta.set_source_dnde(
point_source.name, dnde_MeV, update_source=update_dictionary
)
# Same for extended source
for extended_source in list(
self._likelihood_model.extended_sources.values()
): # type: astromodels.ExtendedSource
# Update this source only if it has free parameters (to gain time)
if not (extended_source.has_free_parameters or force_update):
continue
theShape = extended_source.spatial_shape
if theShape.has_free_parameters or force_update:
fermipySource = self._gta.roi.get_source_by_name(
extended_source.name
)
fermipyPars = [
fermipySource["ra"],
fermipySource["dec"],
fermipySource["SpatialWidth"],
]
if theShape.name == "Disk_on_sphere":
amPars = [
theShape.lon0.value,
theShape.lat0.value,
theShape.radius.value,
]
if not np.allclose(fermipyPars, amPars, 1e-10):
temp_source = self._gta.delete_source(
extended_source.name
)
temp_source.set_spatial_model(
"RadialDisk",
{
"ra": theShape.lon0.value,
"dec": theShape.lat0.value,
"SpatialWidth": theShape.radius.value,
},
)
# from fermipy: FIXME: Issue with source map cache with source is initialized as fixed.
self._gta.add_source(
extended_source.name, temp_source, free=True
)
self._gta.free_source(extended_source.name, free=False)
self._gta.set_source_spectrum(
extended_source.name,
"FileFunction",
update_source=update_dictionary,
)
elif theShape.name == "Gaussian_on_sphere":
amPars = [
theShape.lon0.value,
theShape.lat0.value,
1.36 * theShape.sigma.value,
]
if not np.allclose(fermipyPars, amPars, 1e-10):
temp_source = self._gta.delete_source(
extended_source.name
)
temp_source.set_spatial_model(
"RadialGaussian",
{
"ra": theShape.lon0.value,
"dec": theShape.lat0.value,
"SpatialWidth": 1.36 * theShape.sigma.value,
},
)
# from fermipy: FIXME: Issue with source map cache with source is initialized as fixed.
self._gta.add_source(
extended_source.name, temp_source, free=True
)
self._gta.free_source(extended_source.name, free=False)
self._gta.set_source_spectrum(
extended_source.name,
"FileFunction",
update_source=update_dictionary,
)
elif theShape.name == "SpatialTemplate_2D":
# for now, assume we're not updating the fits file
pass
else:
# eventually, implement other shapes here.
pass
# Now set the spectrum of this source to the right one
dnde = extended_source.get_spatially_integrated_flux(
self._pts_energies
) # ph / (cm2 s keV)
dnde_MeV = np.maximum(dnde * 1000.0, 1e-300) # ph / (cm2 s MeV)
# NOTE: I use update_source=False because it makes things 100x faster and I verified that
# it does not change the result.
# (HF: Not sure who wrote the above but I think sometimes we do want to update fermipy dictionaries.)
self._gta.set_source_dnde(
extended_source.name, dnde_MeV, update_source=update_dictionary
)
[docs]
def get_log_like(self):
"""
Return the value of the log-likelihood with the current values for the
parameters stored in the ModelManager instance
"""
# Update all sources on the fermipy side
self._update_model_in_fermipy()
# update nuisance parameters
if self._fit_nuisance_params:
for parameter in self.nuisance_parameters:
self.set_nuisance_parameter_value(
parameter, self.nuisance_parameters[parameter].value
)
# self.like.syncSrcParams()
# Get value of the log likelihood
try:
value = self._gta.like.logLike.value()
except:
raise
return value - logfactorial(int(self._gta.like.total_nobs()))
[docs]
def inner_fit(self):
"""
This is used for the profile likelihood. Keeping fixed all parameters in the
LikelihoodModel, this method minimize the logLike over the remaining nuisance
parameters, i.e., the parameters belonging only to the model for this
particular detector. If there are no nuisance parameters, simply return the
logLike value.
"""
return self.get_log_like()
[docs]
def set_inner_minimization(self, flag: bool) -> None:
"""
Turn on the minimization of the internal Fermi
parameters
:param flag: turing on and off the minimization of the Fermipy internal parameters
:type flag: bool
:returns:
"""
self._fit_nuisance_params: bool = bool(flag)
for parameter in self.nuisance_parameters:
self.nuisance_parameters[parameter].free = self._fit_nuisance_params
[docs]
def get_number_of_data_points(self):
"""
Return the number of spatial/energy bins
:return: number of bins
"""
num = len(self._gta.components)
if self._gta.projtype == "WCS":
num = (
num
* self._gta._enumbins
* int(self._gta.npix[0])
* int(self._gta.npix[1])
)
if self._gta.projtype == "HPX":
num = num * np.sum(self.geom.npix)
return num
def _set_nuisance_parameters(self):
# Get the list of the sources
sources = list(self.gta.roi.get_sources())
sources = [s.name for s in sources if "diff" in s.name]
bg_param_names = []
nuisance_parameters = collections.OrderedDict()
for src_name in sources:
if self._fit_nuisance_params:
self.gta.free_norm(src_name)
pars = self.gta.get_free_source_params(src_name)
for par in pars:
thisName = f"{self.name}_{src_name}_{par}"
bg_param_names.append(thisName)
thePar = self.gta._get_param(src_name, par)
value = thePar["value"] * thePar["scale"]
nuisance_parameters[thisName] = Parameter(
thisName,
value,
min_value=thePar["min"],
max_value=thePar["max"],
delta=0.01 * value,
transformation=parameter_transformation.get_transformation(
"log10"
),
)
nuisance_parameters[thisName].free = self._fit_nuisance_params
log.debug(
f"Added nuisance parameter {nuisance_parameters[thisName]}"
)
return nuisance_parameters
def _split_nuisance_parameter(self, param_name):
tokens = param_name.split("_")
pname = tokens[-1]
src_name = "_".join(tokens[1:-1])
return src_name, pname
[docs]
def set_nuisance_parameter_value(self, paramName, value):
srcName, parName = self._split_nuisance_parameter(paramName)
self.gta.set_parameter(
srcName, parName, value, scale=1, update_source=False
)
[docs]
def display_model(
self,
data_color: str = "k",
model_cmap: str = threeML_config.plugins.fermipy.fit_plot.model_cmap.value,
model_color: str = threeML_config.plugins.fermipy.fit_plot.model_color,
total_model_color: str = threeML_config.plugins.fermipy.fit_plot.total_model_color,
background_color: str = "b",
show_data: bool = True,
primary_sources: Optional[Union[str, List[str]]] = None,
show_background_sources: bool = threeML_config.plugins.fermipy.fit_plot.show_background_sources,
shade_fixed_sources: bool = threeML_config.plugins.fermipy.fit_plot.shade_fixed_sources,
shade_secondary_source: bool = threeML_config.plugins.fermipy.fit_plot.shade_secondary_sources,
fixed_sources_color: str = threeML_config.plugins.fermipy.fit_plot.fixed_sources_color,
secondary_sources_color: str = threeML_config.plugins.fermipy.fit_plot.secondary_sources_color,
show_residuals: bool = True,
ratio_residuals: bool = False,
show_legend: bool = True,
model_label: Optional[str] = None,
model_kwargs: Optional[Dict[str, Any]] = None,
data_kwargs: Optional[Dict[str, Any]] = None,
background_kwargs: Optional[Dict[str, Any]] = None,
**kwargs,
) -> ResidualPlot:
"""
Plot the current model with or without the data and the residuals. Multiple models can be plotted by supplying
a previous axis to 'model_subplot'.
Example usage:
fig = data.display_model()
fig2 = data2.display_model(model_subplot=fig.axes)
:param data_color: the color of the data
:param model_color: the color of the model
:param show_data: (bool) show_the data with the model
:param show_residuals: (bool) shoe the residuals
:param ratio_residuals: (bool) use model ratio instead of residuals
:param show_legend: (bool) show legend
:param model_label: (optional) the label to use for the model default is plugin name
:param model_kwargs: plotting kwargs affecting the plotting of the model
:param data_kwargs: plotting kwargs affecting the plotting of the data and residuls
:param background_kwargs: plotting kwargs affecting the plotting of the background
:return:
"""
# The model color is set to red by default...
# It should be set to none or all the free sources will have the same color
model_color=None
log.debug(f'model_color : {model_color}')
# set up the default plotting
_default_model_kwargs = dict(alpha=1)
_default_background_kwargs = dict(
color=background_color, alpha=1, ls="--"
)
_sub_menu = threeML_config.plotting.residual_plot
_default_data_kwargs = dict(
color=data_color,
alpha=1,
fmt=_sub_menu.marker,
markersize=_sub_menu.size,
ls="",
elinewidth=2, # _sub_menu.linewidth,
capsize=0,
)
# overwrite if these are in the confif
_kwargs_menu: FermiSpectrumPlot = (
threeML_config.plugins.fermipy.fit_plot
)
if _kwargs_menu.model_mpl_kwargs is not None:
for k, v in _kwargs_menu.model_mpl_kwargs.items():
_default_model_kwargs[k] = v
if _kwargs_menu.data_mpl_kwargs is not None:
for k, v in _kwargs_menu.data_mpl_kwargs.items():
_default_data_kwargs[k] = v
if _kwargs_menu.background_mpl_kwargs is not None:
for k, v in _kwargs_menu.background_mpl_kwargs.items():
_default_background_kwargs[k] = v
if model_kwargs is not None:
assert type(model_kwargs) == dict, "model_kwargs must be a dict"
for k, v in list(model_kwargs.items()):
if k in _default_model_kwargs:
_default_model_kwargs[k] = v
else:
_default_model_kwargs[k] = v
if data_kwargs is not None:
assert type(data_kwargs) == dict, "data_kwargs must be a dict"
for k, v in list(data_kwargs.items()):
if k in _default_data_kwargs:
_default_data_kwargs[k] = v
else:
_default_data_kwargs[k] = v
if background_kwargs is not None:
assert (
type(background_kwargs) == dict
), "background_kwargs must be a dict"
for k, v in list(background_kwargs.items()):
if k in _default_background_kwargs:
_default_background_kwargs[k] = v
else:
_default_background_kwargs[k] = v
# since we define some defualts, lets not overwrite
# the users
_duplicates = (("ls", "linestyle"), ("lw", "linewidth"))
for d in _duplicates:
if (d[0] in _default_model_kwargs) and (
d[1] in _default_model_kwargs
):
_default_model_kwargs.pop(d[0])
if (d[0] in _default_data_kwargs) and (
d[1] in _default_data_kwargs
):
_default_data_kwargs.pop(d[0])
if (d[0] in _default_background_kwargs) and (
d[1] in _default_background_kwargs
):
_default_background_kwargs.pop(d[0])
if model_label is None:
model_label = "%s Model" % self._name
residual_plot = ResidualPlot(
show_residuals=show_residuals,
ratio_residuals=ratio_residuals,
**kwargs,
)
e1 = self._gta.energies[:-1] * 1000.0 # this has to be in keV
e2 = self._gta.energies[1:] * 1000.0 # this has to be in keV
log.debug(f"ENERGIES [MeV]: {self._gta.energies}")
ec = (e1 + e2) / 2.0
de = (e2 - e1) / 2.0
observation_duration = self.get_observation_duration()
conversion_factor = de * observation_duration
# now lets figure out the free and fixed sources
primary_source_names = []
if primary_sources is not None:
primary_source_names = np.atleast_1d(primary_sources)
primary_sources = []
fixed_sources = []
free_sources = []
for name in self._gta.like.sourceNames():
if name in primary_source_names:
primary_sources.append(name)
else:
if name in self._likelihood_model.sources:
this_source: astromodels.sources.Source = (
self._likelihood_model.sources[name]
)
if this_source.has_free_parameters:
free_sources.append(name)
else:
fixed_sources.append(name)
elif name == "galdiff":
# Diffuse emission models should be always displayed with the other sources
# if self._nuisance_parameters["LAT_galdiff_Prefactor"].free:
free_sources.append(name)
#else:
# fixed_sources.append(name)
elif name == "isodiff":
# Diffuse emission models should be always displayed with the other sources
#if self._nuisance_parameters["LAT_isodiff_Normalization"].free:
free_sources.append(name)
#else:
# fixed_sources.append(name)
log.debug(f"fixed_sources: {fixed_sources} ")
log.debug(f"free_sources: {free_sources} ")
log.debug(f"primary_sources: {primary_sources} ")
if not show_background_sources:
if primary_sources is None:
msg = "no primary_sources set! Cannot compute net rate"
log.error(msg)
raise RuntimeError(msg)
n_model_colors = 0
if not shade_fixed_sources and show_background_sources:
n_model_colors += len(fixed_sources)
if not shade_secondary_source and show_background_sources:
n_model_colors += len(free_sources)
if primary_sources is not None:
n_model_colors += len(primary_sources)
log.debug(f"there are {n_model_colors} colors to be used")
if model_color is not None:
model_colors = [model_color] * n_model_colors
else:
model_colors = cmap_intervals(n_model_colors, model_cmap)
sum_model = np.zeros_like(
self._gta.model_counts_spectrum(self._gta.like.sourceNames()[0])[0]
)
sum_backgrounds = np.zeros_like(sum_model)
color_itr = 0
for source_name in fixed_sources:
source_counts = self._gta.model_counts_spectrum(source_name)[0]
log.debug(f"{source_name}: source_counts= {source_counts.sum()}")
sum_model += source_counts
if not show_background_sources:
sum_backgrounds = sum_backgrounds + source_counts
else:
if shade_fixed_sources:
color = fixed_sources_color
label = None
else:
color = model_colors[color_itr]
color_itr += 1
label = source_name
residual_plot.add_model(
ec,
source_counts / conversion_factor,
label=label,
color=color,
**_default_model_kwargs,
)
for source_name in free_sources:
source_counts = self._gta.model_counts_spectrum(source_name)[0]
log.debug(f"{source_name}: source_counts= {source_counts.sum()}")
sum_model += source_counts
if not show_background_sources:
sum_backgrounds = sum_backgrounds + source_counts
else:
if shade_secondary_source:
color = secondary_sources_color
label = None
else:
color = model_colors[color_itr]
color_itr += 1
label = source_name
residual_plot.add_model(
ec,
source_counts / conversion_factor,
label=label,
color=color,
**_default_model_kwargs,
)
if primary_sources is not None:
for source_name in primary_sources:
source_counts = self._gta.model_counts_spectrum(source_name)[0]
log.debug(f"{source_name}: source_counts= {source_counts.sum()}")
sum_model += source_counts
# sum_backgrounds = sum_backgrounds + source_counts
color = model_colors[color_itr]
color_itr += 1
label = source_name
residual_plot.add_model(
ec,
source_counts / conversion_factor,
label=label,
color=color,
**_default_model_kwargs,
)
# sub.plot(ec, self._gta.like._srcCnts(source_name), label=source_name)
log.debug(f"sum_model={sum_model.sum()}")
log.debug(f"sum_backgrounds={sum_backgrounds.sum()}")
residual_plot.add_model(
ec,
sum_model / conversion_factor,
label="Total Model",
color=total_model_color,
**_default_model_kwargs,
)
# sub.plot(ec, sum_model, label="Total Model")
# now get the counts
y = self._gta._roi_data["counts"]
y_err = np.sqrt(y)
log.debug(f"counts={y.sum()}")
significance_calc = Significance(Non=y, Noff=sum_model)
if ratio_residuals:
resid = old_div((y - sum_model), sum_model)
resid_err = old_div(y_err, sum_model)
else:
# resid = significance_calc.li_and_ma()
resid = significance_calc.known_background()
resid_err = np.ones_like(resid)
y = y / conversion_factor
y_err = y_err / conversion_factor
residual_plot.add_data(
ec[y > 0],
y[y > 0],
resid[y > 0],
residual_yerr=resid_err[y > 0],
yerr=y_err[y > 0],
xerr=de[y > 0],
label=self._name,
show_data=show_data,
**_default_data_kwargs,
)
if show_background_sources:
y_label = "Rate\n(counts s$^{-1}$ keV$^{-1}$)"
else:
y_label = "Net Rate\n(counts s$^{-1}$ keV$^{-1}$)"
return residual_plot.finalize(
xlabel="Energy\n(keV)",
ylabel=y_label,
xscale="log",
yscale="log",
show_legend=show_legend,
)