import collections
import os
import re
import shutil
import site
import subprocess
import uuid
from glob import glob
import pandas as pd
import yaml
from threeML.config import threeML_config
from threeML.io.file_utils import file_existing_and_readable
from threeML.io.logging import setup_logger
pd.reset_option("display.float_format")
log = setup_logger(__name__)
try:
from GtBurst import IRFS
from GtBurst.Configuration import Configuration
from GtBurst.FuncFactory import Spectra
from threeML import FermiLATLike
irfs = IRFS.IRFS.keys()
spectra = Spectra()
# irfs.append('auto')
configuration = Configuration()
has_fermitools = True
except Exception as e:
log.debug(f"Importing fermitools failed with {e}")
has_fermitools = False
irfs = None
spectra = None
if threeML_config.logging.startup_warnings:
log.warning("No fermitools installed")
[docs]
class LATLikelihoodParameter(object):
def __init__(
self,
name,
help_string,
default_value=None,
allowed_values=None,
is_number=True,
is_bool=False,
):
"""A container for the parameters that are needed by GtBurst.
:param name: the parameter name
:param help_string: the help string
:param default_value: a default value if needed
:param allowed_values: the values allowed for input
:param is_number: if this is a number
:param is_bool: if this is a bool
:returns:
:rtype:
"""
self._name = name
self._allowed_values = allowed_values
self._default_value = default_value
self._is_number = is_number
self._is_bool = is_bool
self._help_string = help_string
self._is_set = False
# if there is a default value, lets go ahead and set it
if default_value is not None:
self.__set_value(default_value)
def __get_value(self):
# make sure that the value set is allowed
if self._allowed_values is not None:
assert self._current_value in set(
self._allowed_values
), "The value of %s is not in %s" % (self._name, self._allowed_values)
# construct the class
out_string = "--%s" % self._name
if self._is_number:
out_string += " %f" % self._current_value
elif self._is_bool:
if not self._current_value:
# we remove the string
out_string = ""
else:
out_string += " '%s'" % self._current_value
return out_string
def __set_value(self, value):
if self._allowed_values is not None:
assert value in self._allowed_values, "The value %s of %s is not in %s" % (
value,
self._name,
self._allowed_values,
)
self._current_value = value
self._is_set = True
value = property(__get_value, __set_value)
[docs]
def get_disp_value(self):
return self._current_value
@property
def is_set(self):
return self._is_set
@property
def name(self):
return self._name
[docs]
def display(self):
print(self._help_string)
if self._allowed_values is not None:
print(self._allowed_values)
# make geetter setter
_required_parameters = [
"outfile",
"roi",
"tstarts",
"tstops",
"irf",
"galactic_model",
"particle_model",
]
_optional_parameters = [
"ra",
"dec",
"bin_file",
"tsmin",
"strategy",
"thetamax",
"spectralfiles",
"liketype",
"optimizeposition",
"datarepository",
"ltcube",
"expomap",
"ulphindex",
"flemin",
"flemax",
"fgl_mode",
"tsmap_spec",
"filter_GTI",
"likelihood_profile",
"remove_fits_files",
"log_bins",
"bin_file",
"source_model",
]
[docs]
class TransientLATDataBuilder(object):
def __init__(self, triggername, **init_values):
"""Build the command for GtBurst's likelihood analysis and produce the
required files for the FermiLATLike plugin.
:param triggername: the trigger name in YYMMDDXXX fermi format
:returns:
:rtype:
"""
assert (
has_fermitools
), "You do not have the fermitools installed and cannot run GtBurst"
self._triggername = triggername
# we create a hash of all the parameters
# and add them to the class
# this is a really ugly and long way to do this
self._parameters = collections.OrderedDict()
# set the name for this parameter
name = "outfile"
# add it to the hash as a parameter object
# no value is set UNLESS there is a default
self._parameters[name] = LATLikelihoodParameter(
name=name,
help_string="File for the results (will be overwritten)",
is_number=False,
)
# this keeps the user from erasing these objects accidentally
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
# and repeat
name = "ra"
self._parameters[name] = LATLikelihoodParameter(
name=name, help_string="R.A. of the object (J2000)", is_number=True
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "dec"
self._parameters[name] = LATLikelihoodParameter(
name=name, help_string="Dec. of the object (J2000)", is_number=True
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "roi"
self._parameters[name] = LATLikelihoodParameter(
name=name,
help_string="Radius of the Region Of Interest (ROI)",
is_number=True,
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "tstarts"
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value=None,
help_string="Comma-separated list of start times (with respect to trigger)",
is_number=False,
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "tstops"
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value=None,
help_string="Comma-separated list of stop times (with respect to trigger)",
is_number=False,
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "zmax"
self._parameters[name] = LATLikelihoodParameter(
name=name, default_value=100.0, help_string="Zenith cut", is_number=True
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "emin"
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value=100.0,
help_string="Minimum energy for the analysis",
is_number=True,
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "emax"
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value=100000.0,
help_string="Maximum energy for the analysis",
is_number=True,
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "irf"
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value="p8_source",
help_string="Instrument Function to be used (IRF)",
is_number=False,
allowed_values=irfs,
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "galactic_model"
self._parameters[name] = LATLikelihoodParameter(
name=name,
help_string="Galactic model for the likelihood",
is_number=False,
allowed_values=["template (fixed norm.)", "template", "none"],
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "particle_model"
self._parameters[name] = LATLikelihoodParameter(
name=name,
help_string="Particle model",
is_number=False,
allowed_values=[
"isotr with pow spectrum",
"isotr template",
"none",
"bkge",
"auto",
],
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "source_model"
self._parameters[name] = LATLikelihoodParameter(
name=name,
help_string="Source model",
default_value="PowerLaw2",
is_number=False,
allowed_values=spectra.keys(),
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "tsmin"
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value=20.0,
help_string="Minimum TS to consider a detection",
is_number=True,
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "strategy"
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value="time",
help_string="Strategy for Zenith cut: events or time",
is_number=False,
allowed_values=["events", "time"],
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "thetamax"
self._parameters[name] = LATLikelihoodParameter(
name=name, default_value=180.0, help_string="Theta cut", is_number=True
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "spectralfiles"
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value="no",
help_string="Produce spectral files to be used in XSPEC?",
allowed_values=["yes", "no"],
is_number=False,
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "liketype"
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value="unbinned",
help_string="Likelihood type (binned or unbinned)",
allowed_values=["binned", "unbinned"],
is_number=False,
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "bin_file"
help_str = (
"A string containing a text file 'res.txt start end' will get the start and"
" stop times from the columns 'start' and 'end' in the file res.txt.",
)
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value=None,
help_string=help_str,
is_bool=False,
is_number=False,
)
##################################
name = "log_bins"
help_str = "Use logarithmically-spaced bins. For example: '1.0 10000.0 30'"
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value=None,
help_string=help_str,
is_number=False,
is_bool=False,
)
##################################
name = "optimizeposition"
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value="no",
help_string="Optimize position with gtfindsrc?",
allowed_values=["no", "yes"],
is_number=False,
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "datarepository"
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value=configuration.get("dataRepository"),
help_string="Dir where data are stored",
is_number=False,
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "ltcube"
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value="",
help_string="Pre-computed livetime cube",
is_number=False,
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "expomap"
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value="",
help_string="Pre-computed exposure map",
is_number=False,
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "ulphindex"
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value=-2,
help_string="Photon index for upper limits",
is_number=True,
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "flemin"
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value=100,
help_string="Lower bound energy for flux/upper limit computation",
is_number=True,
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "flemax"
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value=10000,
help_string="Upper bound energy for flux/upper limit computation",
is_number=True,
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "fgl_mode"
help_str = (
"Set 'complete' to use all FGL sources, set 'fast' to use only "
"bright sources"
)
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value="fast",
help_string=help_str,
is_number=False,
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "tsmap_spec"
help_str = (
"A TS map specification of the type half_size,n_side. For example: "
"\n 0.5,8' makes a TS map 1 deg x 1 deg with 64 points"
)
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value=None,
help_string=help_str,
is_number=False,
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "filter_GTI"
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value=False,
help_string="Automatically divide time intervals crossing GTIs",
is_bool=True,
is_number=False,
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "likelihood_profile"
help_str = (
"Produce a text file containing the profile of the likelihood for a "
"\n changing normalization"
)
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value=False,
help_string=help_str,
is_bool=True,
is_number=False,
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
##################################
name = "remove_fits_files"
help_str = (
"Whether to remove the FITS files of every interval in order to "
"save disk space"
)
self._parameters[name] = LATLikelihoodParameter(
name=name,
default_value=False,
help_string=help_str,
is_bool=True,
is_number=False,
)
super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name])
# Now if there are keywords from a configuration to read,
# lets do it
self._process_keywords(**init_values)
def _process_keywords(self, **kwargs):
"""Processes the keywords from a dictionary likely loaded from a yaml
config.
:returns:
:rtype:
"""
for k, v in kwargs.items():
if k in self._parameters:
self._parameters[k].value = v
else:
# add warning that there is something strange in the configuration
pass
def __setattr__(self, name, value):
"""Override this so that we cannot erase parameters."""
if (name in _required_parameters) or (name in _optional_parameters):
raise AttributeError("%s is an immutable attribute." % name)
else:
super(TransientLATDataBuilder, self).__setattr__(name, value)
def _get_command_string(self):
"""This builds the cmd string for the script."""
executable = os.path.join(
"fermitools", "GtBurst", "scripts", "doTimeResolvedLike.py"
)
cmd_str = "%s %s" % (executable, self._triggername)
for k, v in self._parameters.items():
# only add on the parameters that are set
if v.is_set:
cmd_str += " %s" % v.value
else:
# but fail if we did not set the ones needed
assert v.name not in _required_parameters, (
"%s is not set but is required" % v.name
)
return cmd_str
[docs]
def run(self, include_previous_intervals=False, recompute_intervals=False):
"""Run GtBurst to produce the files needed for the FermiLATLike
plugin."""
assert (
has_fermitools
), "You do not have the fermitools installed and cannot run GtBurst"
# This is not the cleanest way to do this, but at the moment I see
# no way around it as I do not want to rewrite the fermitools
cmd = self._get_command_string() # should not allow you to be missing args!
# now we want to get the site package directory to find where the script is
# located. This should be the first entry... might break in teh future!
site_pkg = site.getsitepackages()[0]
cmd = os.path.join(site_pkg, cmd)
executable = cmd.split()[0]
gtapp_mp_dir = os.path.join(site_pkg, "fermitools", "GtBurst", "gtapps_mp")
executables = [
executable,
os.path.join(gtapp_mp_dir, "gtdiffrsp_mp.py"),
os.path.join(gtapp_mp_dir, "gtexpmap_mp.py"),
os.path.join(gtapp_mp_dir, "gtltcube_mp.py"),
os.path.join(gtapp_mp_dir, "gttsmap_mp.py"),
]
try:
for _e in executables:
log.info("Changing permission to %s" % _e)
os.chmod(_e, 0o755)
except PermissionError:
pass
log.info("About to run the following command:\n%s" % cmd)
# see what we already have
intervals_before_run = glob("interval*-*")
if recompute_intervals:
if intervals_before_run:
# ok, perhaps the user wants to recompute intervals in this folder.
# We will move the old intervals to a tmp directory
# so that they are not lost
log.info(
"You have choosen to recompute the time intervals in this folder"
)
tmp_dir = "tmp_%s" % str(uuid.uuid4())
log.info("The older entries will be moved to %s" % tmp_dir)
os.mkdir(tmp_dir)
for interval in intervals_before_run:
shutil.move(interval, tmp_dir)
# now remove these
intervals_before_run = []
if include_previous_intervals:
intervals_before_run = []
# run this baby
subprocess.call(cmd, shell=True)
self.lat_observations = self._create_lat_observations_from_run(
intervals_before_run
)
return self.lat_observations
def _create_lat_observations_from_run(self, intervals_before_run):
"""After a run of gtburst, this collects the all the relevant files
from each inteval and turns them into LAT observations.
:rtype:
"""
# scroll thru the intervals that were created
# place them in LAT observations
# attach them to dictionary
# dir = the interval
lat_observations = []
# need a strategy to collect the intervals
intervals = sorted(glob("interval*-*"))
for interval in intervals:
if interval in intervals_before_run:
log.info(
f"{interval} existed before this run,\n it will not be auto "
"included in the list,\n but you can manually see grab the data."
)
else:
# check that either tstarts,tstops or log_bins are defined
if (
self._parameters["tstarts"].value is not None
or self._parameters["tstops"].value is not None
):
tstart, tstop = [
float(x)
for x in re.match(
r"^interval(-?\d*\.\d*)-(-?\d*\.\d*)\/?$", interval
).groups()
]
else:
assert (
self._parameters["log_bins"].value is None
), "Choose either to use tstarts and tstops, or to use log_bins"
event_file = os.path.join(
interval, "gll_ft1_tr_bn%s_v00_filt.fit" % self._triggername
)
if not file_existing_and_readable(event_file):
log.info("The event file does not exist. Please examine!")
ft2_file = os.path.join(
interval, "gll_ft2_tr_bn%s_v00_filt.fit" % self._triggername
)
if not file_existing_and_readable(ft2_file):
log.info("The ft2 file does not exist. Please examine!")
log.info("we will grab the data file for you.")
base_ft2_file = os.path.join(
"%s" % self.datarepository.get_disp_value(),
"bn%s" % self._triggername,
"gll_ft2_tr_bn%s_v00.fit" % self._triggername,
)
if not file_existing_and_readable(base_ft2_file):
log.error("Cannot find any FT2 files!")
raise AssertionError()
shutil.copy(base_ft2_file, interval)
ft2_file = os.path.join(
interval, "gll_ft2_tr_bn%s_v00.fit" % self._triggername
)
log.info("copied %s to %s" % (base_ft2_file, ft2_file))
exposure_map = os.path.join(
interval, "gll_ft1_tr_bn%s_v00_filt_expomap.fit" % self._triggername
)
if not file_existing_and_readable(exposure_map):
log.info("The exposure map does not exist. Please examine!")
livetime_cube = os.path.join(
interval, "gll_ft1_tr_bn%s_v00_filt_ltcube.fit" % self._triggername
)
if not file_existing_and_readable(livetime_cube):
log.info("The livetime_cube does not exist. Please examine!")
# optional bin_file parameter
# if self._parameters['bin_file'].value is not None:
# liketype matches
# assert self._parameters['liketype'] == 'binned', 'liketype must be
# binned to use bin_file parameter
# %s'%self._parameters['bin_file'].value
# value carries a few arguments, take first value- path
# bin_file_path = self._parameters['bin_file'].value.split()[0]
# bin_file = os.path.join(interval, bin_file_path)
# if not file_existing_and_readable(bin_file):
# print('The bin_file at %s does not exist. Please examine!
# '%bin_file)
# now create a LAT observation object
this_obs = LATObservation(
event_file,
ft2_file,
exposure_map,
livetime_cube,
tstart,
tstop,
self._parameters["liketype"].get_disp_value(),
self._triggername,
) # @, bin_file)
lat_observations.append(this_obs)
return lat_observations
[docs]
def to_LATLike(self):
_lat_like_plugins = []
for _lat_ob in self.lat_observations:
_lat_like_plugins.append(_lat_ob.to_LATLike())
return _lat_like_plugins
[docs]
def display(self, get=False):
"""Display the current set parameters."""
out = collections.OrderedDict()
for k, v in self._parameters.items():
if v.is_set:
out[k] = v.get_disp_value()
df = pd.Series(out)
print(df)
if get:
return df
def __repr__(self):
return self.display(get=True).to_string()
[docs]
def save_configuration(self, filename):
"""Save the current configuration to a yaml file for use later.
Suggested extension is .yml.
:param filename: the yaml file name to save to
:returns:
:rtype:
"""
# create a temporary dict to hold
# the set values
data = {}
for k, v in self._parameters.items():
if v.is_set:
data[k] = v.get_disp_value()
with open(filename, "w") as outfile:
yaml.dump(data, outfile, default_flow_style=False)
[docs]
@classmethod
def from_saved_configuration(cls, triggername, config_file):
"""Load a saved yaml configuration for the given trigger name.
:param triggername: Trigger name of the source in YYMMDDXXX
:param config_file: the saved yaml configuration to use
:returns:
:rtype:
"""
with open(config_file, "r") as stream:
loaded_config = yaml.safe_load(stream)
return cls(triggername, **loaded_config)
[docs]
class LATObservation(object):
def __init__(
self,
event_file,
ft2_file,
exposure_map,
livetime_cube,
tstart,
tstop,
liketype,
triggername,
): # , bin_file):
"""A container to formalize the storage of Fermi LAT observation files.
:param event_file:
:param ft2_file:
:param exposure_map:
:param livetime_cube:
:param tstart:
:param tstop:
:param liketype:
:param triggername:
:returns:
:rtype:
"""
self._event_file = event_file
self._ft2_file = ft2_file
self._exposure_map = exposure_map
self._livetime_cube = livetime_cube
self._tstart = tstart
self._tstop = tstop
self._liketype = liketype
self._triggername = triggername
# self._bin_file = bin_file
@property
def event_file(self):
return self._event_file
@property
def ft2_file(self):
return self._ft2_file
@property
def exposure_map(self):
return self._exposure_map
@property
def livetime_cube(self):
return self._livetime_cube
@property
def tstart(self):
return self._tstart
@property
def tstop(self):
return self._tstop
@property
def liketype(self):
return self._liketype
# @property
# def bin_file(self):
# return self._bin_file
@property
def triggername(self):
return self._triggername
def __repr__(self):
output = collections.OrderedDict()
output["time interval"] = "%.3f-%.3f" % (self._tstart, self._tstop)
output["event_file"] = self._event_file
output["ft2_file"] = self._ft2_file
output["exposure_map"] = self._exposure_map
output["livetime_cube"] = self._livetime_cube
output["triggername"] = self._triggername
output["liketype"] = self._liketype
# output['bin_file'] = self._bin_file
df = pd.Series(output)
return df.to_string()
[docs]
def to_LATLike(self):
_fermi_lat_like = FermiLATLike(
name=("LAT%dX%d" % (self._tstart, self._tstop)).replace("-", "n"),
event_file=self._event_file,
ft2_file=self._ft2_file,
livetime_cube_file=self._livetime_cube,
kind=self._liketype,
exposure_map_file=self._exposure_map,
source_maps=None,
binned_expo_map=None,
)
# ,bin_file = self._bin_file)
# source_name=self._triggername)
return _fermi_lat_like