import os
import os.path
import re
import numpy
from astromodels.core.model import Model
from astromodels.functions import (
Cutoff_powerlaw,
Disk_on_sphere,
Gaussian_on_sphere,
Log_parabola,
Powerlaw,
SpatialTemplate_2D,
Super_cutoff_powerlaw,
)
from astromodels.sources import ExtendedSource, PointSource
from astromodels.utils.angular_distance import angular_distance
from astropy import units as u
from astropy.stats import circmean
from threeML.io.logging import setup_logger
log = setup_logger(__name__)
_trigger_name_match = re.compile(r"^GRB\d{9}$")
def _gbm_and_lle_valid_source_check(source):
"""Checks if source name is valid for both GBM and LLE data.
:param source: source name
:return: bool
"""
warn_string = (
"The trigger %s is not valid. Must be in the form GRB080916009" % source
)
match = _trigger_name_match.match(source)
if match is None:
log.warning(warn_string)
answer = False
else:
answer = True
return answer
#########
def _sanitize_fgl_name(fgl_name):
swap = (
fgl_name.replace(" ", "_").replace("+", "p").replace("-", "m").replace(".", "d")
)
if swap[0] in ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]:
swap = "x%s" % swap
return swap
def _get_point_source_from_fgl(fgl_name, catalog_entry, fix=False):
"""Translate a spectrum from the nFGL into an astromodels point source."""
name = _sanitize_fgl_name(fgl_name)
spectrum_type = catalog_entry["spectrum_type"]
ra = float(catalog_entry["ra"])
dec = float(catalog_entry["dec"])
log.debug(f"source parameters for {fgl_name}")
log.debug(catalog_entry)
if spectrum_type == "PowerLaw":
this_spectrum = Powerlaw()
this_source = PointSource(name, ra=ra, dec=dec, spectral_shape=this_spectrum)
this_spectrum.piv = float(catalog_entry["pivot_energy"]) * u.MeV
if "pl_index" in catalog_entry:
this_spectrum.index = float(catalog_entry["pl_index"]) * -1
this_spectrum.K = float(catalog_entry["pl_flux_density"]) / (
u.cm**2 * u.s * u.MeV
)
else:
this_spectrum.index = float(catalog_entry["dnde_index"]) * -1
this_spectrum.K = float(catalog_entry["dnde"]) / (u.cm**2 * u.s * u.MeV)
this_spectrum.index.fix = fix
this_spectrum.K.fix = fix
this_spectrum.K.bounds = (
this_spectrum.K.value / 1000.0,
this_spectrum.K.value * 1000,
)
elif spectrum_type == "LogParabola":
this_spectrum = Log_parabola()
this_source = PointSource(name, ra=ra, dec=dec, spectral_shape=this_spectrum)
if "pivot_energy_catalog" in catalog_entry:
this_spectrum.piv = float(catalog_entry["pivot_energy_catalog"]) * u.MeV
else:
this_spectrum.piv = float(catalog_entry["pivot_energy"]) * u.MeV
if "lp_index" in catalog_entry:
this_spectrum.alpha = float(catalog_entry["lp_index"]) * -1
this_spectrum.beta = float(catalog_entry["lp_beta"])
this_spectrum.K = float(catalog_entry["lp_flux_density"]) / (
u.cm**2 * u.s * u.MeV
)
this_spectrum.K.bounds = (
this_spectrum.K.value / 1000.0,
this_spectrum.K.value * 1000,
)
else:
K = float(catalog_entry["flux_density"])
this_spectrum.K.bounds = (K / 1000.0, K * 1000)
this_spectrum.alpha = float(catalog_entry["spectral_index"]) * -1
this_spectrum.beta = float(catalog_entry["beta"])
this_spectrum.K = K / (u.cm**2 * u.s * u.MeV)
this_spectrum.alpha.fix = fix
this_spectrum.beta.fix = fix
this_spectrum.K.fix = fix
this_spectrum.K.bounds = (
this_spectrum.K.value / 1000.0,
this_spectrum.K.value * 1000,
)
elif spectrum_type == "PLExpCutoff":
this_spectrum = Cutoff_powerlaw()
this_source = PointSource(name, ra=ra, dec=dec, spectral_shape=this_spectrum)
this_spectrum.piv = float(catalog_entry["pivot_energy"]) * u.MeV
if "plec_index" in catalog_entry:
this_spectrum.index = float(catalog_entry["plec_index"]) * -1
this_spectrum.K = float(catalog_entry["plec_flux_density"]) / (
u.cm**2 * u.s * u.MeV
)
else:
this_spectrum.index = float(catalog_entry["dnde_index"]) * -1
this_spectrum.K = float(catalog_entry["dnde"]) / (u.cm**2 * u.s * u.MeV)
this_spectrum.xc = float(catalog_entry["cutoff"]) * u.MeV
this_spectrum.index.fix = fix
this_spectrum.K.fix = fix
this_spectrum.xc.fix = fix
this_spectrum.K.bounds = (
this_spectrum.K.value / 1000.0,
this_spectrum.K.value * 1000,
)
elif (spectrum_type in ["PLSuperExpCutoff", "PLSuperExpCutoff2"]) and (
"plec_exp_factor_s" not in catalog_entry.keys()
):
# This is the new definition, from the 4FGL catalog.
# Note that in version 19 of the 4FGL, cutoff spectra are designated as
# PLSuperExpCutoff rather than PLSuperExpCutoff2 as in version, but the same
# parametrization is used.
this_spectrum = Super_cutoff_powerlaw()
this_source = PointSource(name, ra=ra, dec=dec, spectral_shape=this_spectrum)
if "plec_exp_factor" in catalog_entry:
# OLD parameterization 4FGL which is in fermipy:
a = float(catalog_entry["plec_exp_factor"])
E0 = float(catalog_entry["pivot_energy"])
b = float(catalog_entry["plec_exp_index"])
K = float(catalog_entry["plec_flux_density"])
i = float(catalog_entry["plec_index"])
else:
a = float(catalog_entry["expfactor"])
E0 = float(catalog_entry["pivot_energy"])
b = float(catalog_entry["exp_index"])
K = float(catalog_entry["dnde"])
i = float(catalog_entry["spectral_index"])
conv = numpy.exp(a * E0**b)
this_spectrum.index = -i
this_spectrum.gamma = b
this_spectrum.piv = E0 * u.MeV
this_spectrum.K = conv * K / (u.cm**2 * u.s * u.MeV)
this_spectrum.K.bounds = (
this_spectrum.K.value / 1000.0,
this_spectrum.K.value * 1000,
)
this_spectrum.xc = a ** (-1.0 / b) * u.MeV
this_spectrum.K.fix = fix
this_spectrum.xc.fix = fix
this_spectrum.index.fix = fix
this_spectrum.gamma.fix = fix
elif (spectrum_type == "PLSuperExpCutoff4") or (
(spectrum_type == "PLSuperExpCutoff")
and ("plec_index_s" in catalog_entry.keys())
):
# new parameterization 4FGL-DR3. Still listed as PLSuperExpCutoff in VO.
this_spectrum = Super_cutoff_powerlaw()
this_source = PointSource(name, ra=ra, dec=dec, spectral_shape=this_spectrum)
if "plec_index_s" in catalog_entry.keys():
d = float(catalog_entry["plec_exp_factor_s"])
b = float(catalog_entry["plec_exp_index"])
Gs = float(catalog_entry["plec_index_s"])
K = float(catalog_entry["plec_flux_density"])
E0 = float(catalog_entry["pivot_energy"]) * u.MeV
conv = numpy.exp(d / b**2)
elif "plec_expfactors" in catalog_entry.keys():
d = float(catalog_entry["plec_expfactors"])
b = float(catalog_entry["plec_exp_index"])
Gs = float(catalog_entry["plec_indexs"])
K = float(catalog_entry["plec_flux_density"])
E0 = float(catalog_entry["pivot_energy"]) * u.MeV
conv = numpy.exp(d / b**2)
elif "expfactor" in catalog_entry.keys():
d = float(catalog_entry["expfactor"])
b = float(catalog_entry["exp_index"])
Gs = float(catalog_entry["spectral_index"])
K = float(catalog_entry["dnde"])
E0 = float(catalog_entry["pivot_energy_catalog"]) * u.MeV
wrong_E0 = float(catalog_entry["pivot_energy"]) * u.MeV
conv = 1
else:
raise NotImplementedError(
"Spectrum type %s is not a valid 4FGL type" % spectrum_type
)
this_spectrum.index = d / b - Gs
# this_spectrum.gamma = d/b
this_spectrum.gamma = b
this_spectrum.piv = E0
this_spectrum.xc = E0 * (b**2 / d) ** (1 / b)
if "expfactor" in catalog_entry.keys():
conv = 1.0 / this_spectrum(wrong_E0)
this_spectrum.K = conv * K / (u.cm**2 * u.s * u.MeV)
this_spectrum.K.fix = fix
this_spectrum.K.bounds = (
this_spectrum.K.value / 1000.0,
this_spectrum.K.value * 1000,
)
this_spectrum.xc.fix = fix
this_spectrum.index.fix = fix
this_spectrum.gamma.fix = fix
else:
raise NotImplementedError(
"Spectrum type %s is not a valid 4FGL type" % spectrum_type
)
return this_source
def _get_extended_source_from_fgl(fgl_name, catalog_entry, fix=False):
"""Translate a spectrum from the nFGL into an astromodels extended
source."""
name = _sanitize_fgl_name(fgl_name)
spectrum_type = catalog_entry["spectrum_type"]
ra = float(catalog_entry["ra"])
dec = float(catalog_entry["dec"])
theShape = catalog_entry["spatial_function"]
if theShape == "":
theShape = catalog_entry["spatialmodel"]
if theShape == "RadialDisk":
this_shape = Disk_on_sphere()
elif theShape == "RadialGaussian":
this_shape = Gaussian_on_sphere()
elif theShape == "SpatialMap":
the_file = catalog_entry["spatial_filename"]
if isinstance(the_file, bytes):
the_file = the_file.decode("ascii")
the_file = os.path.expandvars(the_file)
if os.path.exists(the_file):
the_template = the_file
else:
the_dir = os.path.join(
os.path.expandvars(catalog_entry["extdir"]), "Templates"
)
the_template = os.path.join(the_dir, the_file)
this_shape = SpatialTemplate_2D(fits_file=the_template)
else:
log.error(f"Spatial_Function {theShape} not implemented yet")
raise NotImplementedError()
if spectrum_type == "PowerLaw":
this_spectrum = Powerlaw()
this_source = ExtendedSource(
name, spatial_shape=this_shape, spectral_shape=this_spectrum
)
this_spectrum.index = float(catalog_entry["pl_index"]) * -1
this_spectrum.index.fix = fix
this_spectrum.K = float(catalog_entry["pl_flux_density"]) / (
u.cm**2 * u.s * u.MeV
)
this_spectrum.K.fix = fix
this_spectrum.K.bounds = (
this_spectrum.K.value / 1000.0,
this_spectrum.K.value * 1000,
)
this_spectrum.piv = float(catalog_entry["pivot_energy"]) * u.MeV
elif spectrum_type == "LogParabola":
this_spectrum = Log_parabola()
this_source = ExtendedSource(
name, spatial_shape=this_shape, spectral_shape=this_spectrum
)
if "pivot_energy_catalog" in catalog_entry:
this_spectrum.piv = float(catalog_entry["pivot_energy_catalog"]) * u.MeV
else:
this_spectrum.piv = float(catalog_entry["pivot_energy"]) * u.MeV
if "lp_index" in catalog_entry:
this_spectrum.alpha = float(catalog_entry["lp_index"]) * -1
this_spectrum.beta = float(catalog_entry["lp_beta"])
this_spectrum.K = float(catalog_entry["lp_flux_density"]) / (
u.cm**2 * u.s * u.MeV
)
else:
K = float(catalog_entry["flux_density"])
this_spectrum.K.bounds = (K / 1000.0, K * 1000)
this_spectrum.alpha = float(catalog_entry["spectral_index"]) * -1
this_spectrum.beta = float(catalog_entry["beta"])
this_spectrum.K = K / (u.cm**2 * u.s * u.MeV)
this_spectrum.alpha.fix = fix
this_spectrum.beta.fix = fix
this_spectrum.K.fix = fix
this_spectrum.K.bounds = (
this_spectrum.K.value / 1000.0,
this_spectrum.K.value * 1000,
)
elif spectrum_type == "PLExpCutoff":
this_spectrum = Cutoff_powerlaw()
this_source = ExtendedSource(
name, spatial_shape=this_shape, spectral_shape=this_spectrum
)
this_spectrum.index = float(catalog_entry["plec_index"]) * -1
this_spectrum.index.fix = fix
this_spectrum.piv = float(catalog_entry["pivot_energy"]) * u.MeV
this_spectrum.K = float(catalog_entry["plec_flux_density"]) / (
u.cm**2 * u.s * u.MeV
)
this_spectrum.K.fix = fix
this_spectrum.K.bounds = (
this_spectrum.K.value / 1000.0,
this_spectrum.K.value * 1000,
)
this_spectrum.xc = float(catalog_entry["cutoff"]) * u.MeV
this_spectrum.xc.fix = fix
elif spectrum_type in ["PLSuperExpCutoff", "PLSuperExpCutoff2"]:
# This is the new definition, from the 4FGL catalog.
# Note that in version 19 of the 4FGL, cutoff spectra are designated as
# PLSuperExpCutoff rather than PLSuperExpCutoff2 as in version, but the same
# parametrization is used.
this_spectrum = Super_cutoff_powerlaw()
this_source = ExtendedSource(
name, spatial_shape=this_shape, spectral_shape=this_spectrum
)
# new parameterization 4FGLDR3:
if "plec_index_s" in catalog_entry.keys():
d = float(catalog_entry["plec_exp_factor_s"])
E0 = float(catalog_entry["pivot_energy"]) * u.MeV
b = float(catalog_entry["plec_exp_index"])
Gs = float(catalog_entry["plec_index_s"])
conv = numpy.exp(d / b**2)
this_spectrum.index = d / b - Gs
this_spectrum.index.fix = fix
this_spectrum.gamma = d / b
this_spectrum.gamma.fix = fix
this_spectrum.piv = E0
this_spectrum.K = (
conv
* float(catalog_entry["plec_flux_density"])
/ (u.cm**2 * u.s * u.MeV)
)
this_spectrum.xc = E0
else:
# OLD parameterization 4FGL which is in fermipy:
a = float(catalog_entry["plec_exp_factor"])
E0 = float(catalog_entry["pivot_energy"])
b = float(catalog_entry["plec_exp_index"])
conv = numpy.exp(a * E0**b)
this_spectrum.index = float(catalog_entry["plec_index"]) * -1
this_spectrum.index.fix = fix
this_spectrum.gamma = b
this_spectrum.gamma.fix = fix
this_spectrum.piv = E0 * u.MeV
this_spectrum.K = (
conv
* float(catalog_entry["plec_flux_density"])
/ (u.cm**2 * u.s * u.MeV)
)
this_spectrum.xc = a ** (-1.0 / b) * u.MeV
this_spectrum.K.fix = fix
this_spectrum.K.bounds = (
this_spectrum.K.value / 1000.0,
this_spectrum.K.value * 1000,
)
this_spectrum.xc.fix = fix
else:
log.error("Spectrum type %s is not a valid 4FGL type" % spectrum_type)
raise NotImplementedError()
try:
theRadius = catalog_entry["model_semimajor"]
except Exception:
theRadius = catalog_entry["spatialwidth"]
if theShape == "RadialDisk":
this_shape.lon0 = ra * u.degree
this_shape.lon0.fix = True
this_shape.lat0 = dec * u.degree
this_shape.lat0.fix = True
this_shape.radius = theRadius * u.degree
this_shape.radius.fix = True
this_shape.radius.bounds = (0, theRadius) * u.degree
elif theShape == "RadialGaussian":
# factor 1/1.36 is the conversion from 68% containment radius to sigma
# Max of sigma/radius is used for get_boundaries().
this_shape.lon0 = ra * u.degree
this_shape.lon0.fix = True
this_shape.lat0 = dec * u.degree
this_shape.lat0.fix = True
this_shape.sigma = theRadius / 1.36 * u.degree
this_shape.sigma.fix = True
this_shape.sigma.bounds = (0, theRadius / 1.36) * u.degree
return this_source
[docs]
class ModelFromFGL(Model):
def __init__(self, ra_center, dec_center, *sources):
self._ra_center = float(ra_center)
self._dec_center = float(dec_center)
super(ModelFromFGL, self).__init__(*sources)
[docs]
def free_point_sources_within_radius(self, radius, normalization_only=True):
"""Free the parameters for the point sources within the given radius of
the center of the search cone.
:param radius: radius in degree
:param normalization_only: if True, frees only the normalization
of the source (default: True)
:return: none
"""
self._free_or_fix_ps(True, radius, normalization_only)
[docs]
def fix_point_sources_within_radius(self, radius, normalization_only=True):
"""Fixes the parameters for the point sources within the given radius
of the center of the search cone.
:param radius: radius in degree
:param normalization_only: if True, fixes only the normalization
of the source (default: True)
:return: none
"""
self._free_or_fix_ps(False, radius, normalization_only)
def _free_or_fix_ps(self, free, radius, normalization_only):
for src_name in self.point_sources:
src = self.point_sources[src_name]
this_d = angular_distance(
self._ra_center,
self._dec_center,
src.position.ra.value,
src.position.dec.value,
)
if this_d <= radius:
if normalization_only:
src.spectrum.main.shape.K.free = free
else:
for par in src.spectrum.main.shape.parameters:
if par == "piv": # don't free pivot energy
continue
src.spectrum.main.shape.parameters[par].free = free
[docs]
def free_extended_sources_within_radius(self, radius, normalization_only=True):
"""Free the parameters for the point sources within the given radius of
the center of the search cone.
:param radius: radius in degree
:param normalization_only: if True, frees only the normalization
of the source (default: True)
:return: none
"""
self._free_or_fix_ext(True, radius, normalization_only)
[docs]
def fix_extended_sources_within_radius(self, radius, normalization_only=True):
"""Fixes the parameters for the point sources within the given radius
of the center of the search cone.
:param radius: radius in degree
:param normalization_only: if True, fixes only the normalization
of the source (default: True)
:return: none
"""
self._free_or_fix_ext(False, radius, normalization_only)
def _free_or_fix_ext(self, free, radius, normalization_only):
for src_name in self.extended_sources:
src = self.extended_sources[src_name]
try:
ra, dec = src.spatial_shape.lon0.value, src.spatial_shape.lat0.value
except Exception:
(ra_min, ra_max), (dec_min, dec_max) = (
src.spatial_shape.get_boundaries()
)
ra = circmean([ra_min, ra_max] * u.deg).value
dec = circmean([dec_min, dec_max] * u.deg).value
this_d = angular_distance(
self._ra_center,
self._dec_center,
ra,
dec,
)
if this_d <= radius:
if normalization_only:
src.spectrum.main.shape.K.free = free
else:
for par in src.spectrum.main.shape.parameters:
if par == "piv": # don't free pivot energy
continue
src.spectrum.main.shape.parameters[par].free = free