from __future__ import division
import re
from builtins import map, str
import numpy
from astromodels import *
from astromodels.utils.angular_distance import angular_distance
from astropy.stats import circmean
from astropy import units as u
from threeML.config.config import threeML_config
from threeML.exceptions.custom_exceptions import custom_warnings
from threeML.io.dict_with_pretty_print import DictWithPrettyPrint
from threeML.io.logging import setup_logger
from pkg_resources import resource_filename
import os.path, os
log = setup_logger(__name__)
_trigger_name_match = re.compile("^GRB\d{9}$")
def _gbm_and_lle_valid_source_check(source):
"""
checks if source name is valid for both GBM and LLE data
:param source: source name
:return: bool
"""
warn_string = (
"The trigger %s is not valid. Must be in the form GRB080916009" % source
)
match = _trigger_name_match.match(source)
if match is None:
log.warning(warn_string)
answer = False
else:
answer = True
return answer
#########
def _sanitize_fgl_name(fgl_name):
swap = (
fgl_name.replace(" ", "_").replace("+", "p").replace("-", "m").replace(".", "d")
)
if swap[0] in ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]:
swap = "x%s" % swap
return swap
def _get_point_source_from_fgl(fgl_name, catalog_entry, fix=False):
"""
Translate a spectrum from the nFGL into an astromodels point source
"""
name = _sanitize_fgl_name(fgl_name)
spectrum_type = catalog_entry["spectrum_type"]
ra = float(catalog_entry["ra"])
dec = float(catalog_entry["dec"])
log.debug(f"source parameters for {fgl_name}")
log.debug(catalog_entry)
if spectrum_type == "PowerLaw":
this_spectrum = Powerlaw()
this_source = PointSource(name, ra=ra, dec=dec, spectral_shape=this_spectrum)
this_spectrum.piv = float(catalog_entry["pivot_energy"]) * u.MeV
if "pl_index" in catalog_entry:
this_spectrum.index = float(catalog_entry["pl_index"]) * -1
this_spectrum.K = float(catalog_entry["pl_flux_density"]) / (
u.cm ** 2 * u.s * u.MeV
)
else:
this_spectrum.index = float(catalog_entry["dnde_index"]) * -1
this_spectrum.K = float(catalog_entry["dnde"]) / (
u.cm ** 2 * u.s * u.MeV
)
this_spectrum.index.fix = fix
this_spectrum.K.fix = fix
this_spectrum.K.bounds = (
this_spectrum.K.value / 1000.0,
this_spectrum.K.value * 1000,
)
elif spectrum_type == "LogParabola":
this_spectrum = Log_parabola()
this_source = PointSource(name, ra=ra, dec=dec, spectral_shape=this_spectrum)
if "pivot_energy_catalog" in catalog_entry:
this_spectrum.piv = float(catalog_entry["pivot_energy_catalog"]) * u.MeV
else:
this_spectrum.piv = float(catalog_entry["pivot_energy"]) * u.MeV
if "lp_index" in catalog_entry:
this_spectrum.alpha = float(catalog_entry["lp_index"]) * -1
this_spectrum.beta = float(catalog_entry["lp_beta"])
this_spectrum.K = float(catalog_entry["lp_flux_density"]) / (
u.cm ** 2 * u.s * u.MeV
)
this_spectrum.K.bounds = (
this_spectrum.K.value / 1000.0,
this_spectrum.K.value * 1000,
)
else:
K = float(catalog_entry["flux_density"])
this_spectrum.K.bounds = (K / 1000.0, K * 1000)
this_spectrum.alpha = float(catalog_entry["spectral_index"]) * -1
this_spectrum.beta = float(catalog_entry["beta"])
this_spectrum.K = K / (u.cm ** 2 * u.s * u.MeV)
this_spectrum.alpha.fix = fix
this_spectrum.beta.fix = fix
this_spectrum.K.fix = fix
this_spectrum.K.bounds = (
this_spectrum.K.value / 1000.0,
this_spectrum.K.value * 1000,
)
elif spectrum_type == "PLExpCutoff":
this_spectrum = Cutoff_powerlaw()
this_source = PointSource(name, ra=ra, dec=dec, spectral_shape=this_spectrum)
this_spectrum.piv = float(catalog_entry["pivot_energy"]) * u.MeV
if "plec_index" in catalog_entry:
this_spectrum.index = float(catalog_entry["plec_index"]) * -1
this_spectrum.K = float(catalog_entry["plec_flux_density"]) / (
u.cm ** 2 * u.s * u.MeV
)
else:
this_spectrum.index = float(catalog_entry["dnde_index"]) * -1
this_spectrum.K = float(catalog_entry["dnde"]) / (
u.cm ** 2 * u.s * u.MeV
)
this_spectrum.xc = float(catalog_entry["cutoff"]) * u.MeV
this_spectrum.index.fix = fix
this_spectrum.K.fix = fix
this_spectrum.xc.fix = fix
this_spectrum.K.bounds = (
this_spectrum.K.value / 1000.0,
this_spectrum.K.value * 1000,
)
elif (spectrum_type in ["PLSuperExpCutoff", "PLSuperExpCutoff2"] ) and ("plec_exp_factor_s" not in catalog_entry.keys()):
# This is the new definition, from the 4FGL catalog.
# Note that in version 19 of the 4FGL, cutoff spectra are designated as PLSuperExpCutoff
# rather than PLSuperExpCutoff2 as in version , but the same parametrization is used.
this_spectrum = Super_cutoff_powerlaw()
this_source = PointSource(name, ra=ra, dec=dec, spectral_shape=this_spectrum)
if "plec_exp_factor" in catalog_entry:
# OLD parameterization 4FGL which is in fermipy:
a = float(catalog_entry["plec_exp_factor"])
E0 = float(catalog_entry["pivot_energy"])
b = float(catalog_entry["plec_exp_index"])
K = float(catalog_entry["plec_flux_density"])
i = float(catalog_entry["plec_index"])
else:
a = float(catalog_entry["expfactor"])
E0 = float(catalog_entry["pivot_energy"])
b = float(catalog_entry["exp_index"])
K = float(catalog_entry["dnde"])
i = float(catalog_entry["spectral_index"])
conv = numpy.exp(a * E0 ** b)
this_spectrum.index = -i
this_spectrum.gamma = b
this_spectrum.piv = E0 * u.MeV
this_spectrum.K = ( conv * K / (u.cm ** 2 * u.s * u.MeV))
this_spectrum.K.bounds = (
this_spectrum.K.value / 1000.0,
this_spectrum.K.value * 1000,
)
this_spectrum.xc = a ** (-1.0 / b ) * u.MeV
this_spectrum.K.fix = fix
this_spectrum.xc.fix = fix
this_spectrum.index.fix = fix
this_spectrum.gamma.fix = fix
elif (spectrum_type == "PLSuperExpCutoff4") or ((spectrum_type == "PLSuperExpCutoff") and ("plec_index_s" in catalog_entry.keys() ) ):
# new parameterization 4FGL-DR3. Still listed as PLSuperExpCutoff in VO.
this_spectrum = Super_cutoff_powerlaw()
this_source = PointSource(name, ra=ra, dec=dec, spectral_shape=this_spectrum)
if "plec_index_s" in catalog_entry.keys():
d = float(catalog_entry["plec_exp_factor_s"])
b = float(catalog_entry["plec_exp_index"])
Gs = float(catalog_entry["plec_index_s"])
K = float(catalog_entry["plec_flux_density"])
E0 = float(catalog_entry["pivot_energy"]) * u.MeV
conv = numpy.exp(d/b ** 2)
elif "plec_expfactors" in catalog_entry.keys():
d = float(catalog_entry["plec_expfactors"])
b = float(catalog_entry["plec_exp_index"])
Gs = float(catalog_entry["plec_indexs"])
K = float(catalog_entry["plec_flux_density"])
E0 = float(catalog_entry["pivot_energy"]) * u.MeV
conv = numpy.exp(d/b ** 2)
elif "expfactor" in catalog_entry.keys():
d = float(catalog_entry["expfactor"])
b = float(catalog_entry["exp_index"])
Gs = float(catalog_entry["spectral_index"])
K = float(catalog_entry["dnde"])
E0 = float(catalog_entry["pivot_energy_catalog"]) * u.MeV
wrong_E0 = float(catalog_entry["pivot_energy"]) * u.MeV
conv = 1
else:
raise NotImplementedError(
"Spectrum type %s is not a valid 4FGL type" % spectrum_type
)
this_spectrum.index = d/b - Gs
#this_spectrum.gamma = d/b
this_spectrum.gamma = b
this_spectrum.piv = E0
this_spectrum.xc = E0*(b**2/d)**(1/b)
if "expfactor" in catalog_entry.keys():
conv = 1.0 / this_spectrum(wrong_E0)
this_spectrum.K = ( conv * K / (u.cm ** 2 * u.s * u.MeV) )
this_spectrum.K.fix = fix
this_spectrum.K.bounds = (
this_spectrum.K.value / 1000.0,
this_spectrum.K.value * 1000,
)
this_spectrum.xc.fix = fix
this_spectrum.index.fix = fix
this_spectrum.gamma.fix = fix
else:
raise NotImplementedError(
"Spectrum type %s is not a valid 4FGL type" % spectrum_type
)
return this_source
def _get_extended_source_from_fgl(fgl_name, catalog_entry, fix=False):
"""
Translate a spectrum from the nFGL into an astromodels extended source
"""
name = _sanitize_fgl_name(fgl_name)
spectrum_type = catalog_entry["spectrum_type"]
ra = float(catalog_entry["ra"])
dec = float(catalog_entry["dec"])
theShape = catalog_entry["spatial_function"]
if theShape == "":
theShape = catalog_entry["spatialmodel"]
if theShape == "RadialDisk":
this_shape = Disk_on_sphere()
elif theShape == "RadialGaussian":
this_shape = Gaussian_on_sphere()
elif theShape == "SpatialMap":
the_file = catalog_entry["spatial_filename"]
if isinstance(the_file, bytes):
the_file = the_file.decode("ascii")
if "FERMIPY_DATA_DIR" not in os.environ:
os.environ["FERMIPY_DATA_DIR"] = resource_dir("fermipy", "data")
the_file = os.path.expandvars(the_file)
if os.path.exists( the_file ):
the_template = the_file
else:
the_dir = os.path.join(os.path.expandvars(catalog_entry["extdir"]), "Templates")
the_template = os.path.join(the_dir, the_file)
this_shape = SpatialTemplate_2D(fits_file=the_template)
else:
log.error(f"Spatial_Function {theShape} not implemented yet" )
raise NotImplementedError()
if spectrum_type == "PowerLaw":
this_spectrum = Powerlaw()
this_source = ExtendedSource(name, spatial_shape = this_shape, spectral_shape=this_spectrum)
this_spectrum.index = float(catalog_entry["pl_index"]) * -1
this_spectrum.index.fix = fix
this_spectrum.K = float(catalog_entry["pl_flux_density"]) / (
u.cm ** 2 * u.s * u.MeV
)
this_spectrum.K.fix = fix
this_spectrum.K.bounds = (
this_spectrum.K.value / 1000.0,
this_spectrum.K.value * 1000,
)
this_spectrum.piv = float(catalog_entry["pivot_energy"]) * u.MeV
elif spectrum_type == "LogParabola":
this_spectrum = Log_parabola()
this_source = ExtendedSource(name, spatial_shape = this_shape, spectral_shape=this_spectrum)
if "pivot_energy_catalog" in catalog_entry:
this_spectrum.piv = float(catalog_entry["pivot_energy_catalog"]) * u.MeV
else:
this_spectrum.piv = float(catalog_entry["pivot_energy"]) * u.MeV
if "lp_index" in catalog_entry:
this_spectrum.alpha = float(catalog_entry["lp_index"]) * -1
this_spectrum.beta = float(catalog_entry["lp_beta"])
this_spectrum.K = float(catalog_entry["lp_flux_density"]) / (
u.cm ** 2 * u.s * u.MeV
)
else:
K = float(catalog_entry["flux_density"])
this_spectrum.K.bounds = (K / 1000.0, K * 1000)
this_spectrum.alpha = float(catalog_entry["spectral_index"]) * -1
this_spectrum.beta = float(catalog_entry["beta"])
this_spectrum.K = K / (u.cm ** 2 * u.s * u.MeV)
this_spectrum.alpha.fix = fix
this_spectrum.beta.fix = fix
this_spectrum.K.fix = fix
this_spectrum.K.bounds = (
this_spectrum.K.value / 1000.0,
this_spectrum.K.value * 1000,
)
elif spectrum_type == "PLExpCutoff":
this_spectrum = Cutoff_powerlaw()
this_source = ExtendedSource(name, spatial_shape = this_shape, spectral_shape=this_spectrum)
this_spectrum.index = float(catalog_entry["plec_index"]) * -1
this_spectrum.index.fix = fix
this_spectrum.piv = float(catalog_entry["pivot_energy"]) * u.MeV
this_spectrum.K = float(catalog_entry["plec_flux_density"]) / (
u.cm ** 2 * u.s * u.MeV
)
this_spectrum.K.fix = fix
this_spectrum.K.bounds = (
this_spectrum.K.value / 1000.0,
this_spectrum.K.value * 1000,
)
this_spectrum.xc = float(catalog_entry["cutoff"]) * u.MeV
this_spectrum.xc.fix = fix
elif spectrum_type in ["PLSuperExpCutoff", "PLSuperExpCutoff2"]:
# This is the new definition, from the 4FGL catalog.
# Note that in version 19 of the 4FGL, cutoff spectra are designated as PLSuperExpCutoff
# rather than PLSuperExpCutoff2 as in version , but the same parametrization is used.
this_spectrum = Super_cutoff_powerlaw()
this_source = ExtendedSource(name, spatial_shape = this_shape, spectral_shape=this_spectrum)
# new parameterization 4FGLDR3:
if ('plec_index_s' in catalog_entry.keys()):
d = float(catalog_entry["plec_exp_factor_s"])
E0 = float(catalog_entry["pivot_energy"]) * u.MeV
b = float(catalog_entry["plec_exp_index"])
Gs = float(catalog_entry["plec_index_s"])
conv = numpy.exp(d/b ** 2)
this_spectrum.index = d/b - Gs
this_spectrum.index.fix = fix
this_spectrum.gamma = d/b
this_spectrum.gamma.fix = fix
this_spectrum.piv = E0
this_spectrum.K = (
conv * float(catalog_entry["plec_flux_density"]) / (u.cm ** 2 * u.s * u.MeV)
)
this_spectrum.xc = E0
else:
# OLD parameterization 4FGL which is in fermipy:
a = float(catalog_entry["plec_exp_factor"])
E0 = float(catalog_entry["pivot_energy"])
b = float(catalog_entry["plec_exp_index"])
conv = numpy.exp(a * E0 ** b)
this_spectrum.index = float(catalog_entry["plec_index"]) * -1
this_spectrum.index.fix = fix
this_spectrum.gamma = b
this_spectrum.gamma.fix = fix
this_spectrum.piv = E0 * u.MeV
this_spectrum.K = (
conv * float(catalog_entry["plec_flux_density"]) / (u.cm ** 2 * u.s * u.MeV)
)
this_spectrum.xc = a ** (-1.0 / b ) * u.MeV
this_spectrum.K.fix = fix
this_spectrum.K.bounds = (
this_spectrum.K.value / 1000.0,
this_spectrum.K.value * 1000,
)
this_spectrum.xc.fix = fix
else:
log.error( "Spectrum type %s is not a valid 4FGL type" % spectrum_type )
raise NotImplementedError()
try:
theRadius = catalog_entry["model_semimajor"]
except:
theRadius = catalog_entry["spatialwidth"]
if theShape == "RadialDisk":
this_shape.lon0 = ra * u.degree
this_shape.lon0.fix = True
this_shape.lat0 = dec * u.degree
this_shape.lat0.fix = True
this_shape.radius = theRadius * u.degree
this_shape.radius.fix = True
this_shape.radius.bounds = (0, theRadius) * u.degree
elif theShape == "RadialGaussian":
#factor 1/1.36 is the conversion from 68% containment radius to sigma
#Max of sigma/radius is used for get_boundaries().
this_shape.lon0 = ra * u.degree
this_shape.lon0.fix = True
this_shape.lat0 = dec * u.degree
this_shape.lat0.fix = True
this_shape.sigma = theRadius / 1.36 * u.degree
this_shape.sigma.fix = True
this_shape.sigma.bounds = (0, theRadius / 1.36) * u.degree
return this_source
[docs]
class ModelFromFGL(Model):
def __init__(self, ra_center, dec_center, *sources):
self._ra_center = float(ra_center)
self._dec_center = float(dec_center)
super(ModelFromFGL, self).__init__(*sources)
[docs]
def free_point_sources_within_radius(self, radius, normalization_only=True):
"""
Free the parameters for the point sources within the given radius of the center of the search cone
:param radius: radius in degree
:param normalization_only: if True, frees only the normalization of the source (default: True)
:return: none
"""
self._free_or_fix_ps(True, radius, normalization_only)
[docs]
def fix_point_sources_within_radius(self, radius, normalization_only=True):
"""
Fixes the parameters for the point sources within the given radius of the center of the search cone
:param radius: radius in degree
:param normalization_only: if True, fixes only the normalization of the source (default: True)
:return: none
"""
self._free_or_fix_ps(False, radius, normalization_only)
def _free_or_fix_ps(self, free, radius, normalization_only):
for src_name in self.point_sources:
src = self.point_sources[src_name]
this_d = angular_distance(
self._ra_center,
self._dec_center,
src.position.ra.value,
src.position.dec.value,
)
if this_d <= radius:
if normalization_only:
src.spectrum.main.shape.K.free = free
else:
for par in src.spectrum.main.shape.parameters:
if par == "piv": #don't free pivot energy
continue
src.spectrum.main.shape.parameters[par].free = free
[docs]
def free_extended_sources_within_radius(self, radius, normalization_only=True):
"""
Free the parameters for the point sources within the given radius of the center of the search cone
:param radius: radius in degree
:param normalization_only: if True, frees only the normalization of the source (default: True)
:return: none
"""
self._free_or_fix_ext(True, radius, normalization_only)
[docs]
def fix_extended_sources_within_radius(self, radius, normalization_only=True):
"""
Fixes the parameters for the point sources within the given radius of the center of the search cone
:param radius: radius in degree
:param normalization_only: if True, fixes only the normalization of the source (default: True)
:return: none
"""
self._free_or_fix_ext(False, radius, normalization_only)
def _free_or_fix_ext(self, free, radius, normalization_only):
for src_name in self.extended_sources:
src = self.extended_sources[src_name]
try:
ra, dec = src.spatial_shape.lon0.value, src.spatial_shape.lat0.value
except:
(ra_min, ra_max), (dec_min, dec_max) = src.spatial_shape.get_boundaries()
ra = circmean( [ra_min, ra_max]*u.deg ).value
dec = circmean( [dec_min, dec_max]*u.deg ).value
this_d = angular_distance(
self._ra_center,
self._dec_center,
ra,
dec,
)
if this_d <= radius:
if normalization_only:
src.spectrum.main.shape.K.free = free
else:
for par in src.spectrum.main.shape.parameters:
if par == "piv": #don't free pivot energy
continue
src.spectrum.main.shape.parameters[par].free = free