Source code for threeML.utils.data_builders.fermi.lat_transient_builder

import collections
import os
import re
import shutil
import site
import subprocess
import uuid
from glob import glob

import pandas as pd
import yaml

from threeML.config import threeML_config
from threeML.io.file_utils import file_existing_and_readable
from threeML.io.logging import setup_logger

pd.reset_option("display.float_format")

log = setup_logger(__name__)

try:
    from GtBurst import IRFS
    from GtBurst.Configuration import Configuration
    from GtBurst.FuncFactory import Spectra

    from threeML import FermiLATLike

    irfs = IRFS.IRFS.keys()
    spectra = Spectra()

    # irfs.append('auto')

    configuration = Configuration()

    has_fermitools = True

except Exception as e:
    log.debug(f"Importing fermitools failed with {e}")
    has_fermitools = False
    irfs = None
    spectra = None

    if threeML_config.logging.startup_warnings:
        log.warning("No fermitools installed")


[docs] class LATLikelihoodParameter(object): def __init__( self, name, help_string, default_value=None, allowed_values=None, is_number=True, is_bool=False, ): """A container for the parameters that are needed by GtBurst. :param name: the parameter name :param help_string: the help string :param default_value: a default value if needed :param allowed_values: the values allowed for input :param is_number: if this is a number :param is_bool: if this is a bool :returns: :rtype: """ self._name = name self._allowed_values = allowed_values self._default_value = default_value self._is_number = is_number self._is_bool = is_bool self._help_string = help_string self._is_set = False # if there is a default value, lets go ahead and set it if default_value is not None: self.__set_value(default_value) def __get_value(self): # make sure that the value set is allowed if self._allowed_values is not None: assert self._current_value in set( self._allowed_values ), "The value of %s is not in %s" % (self._name, self._allowed_values) # construct the class out_string = "--%s" % self._name if self._is_number: out_string += " %f" % self._current_value elif self._is_bool: if not self._current_value: # we remove the string out_string = "" else: out_string += " '%s'" % self._current_value return out_string def __set_value(self, value): if self._allowed_values is not None: assert value in self._allowed_values, "The value %s of %s is not in %s" % ( value, self._name, self._allowed_values, ) self._current_value = value self._is_set = True value = property(__get_value, __set_value)
[docs] def get_disp_value(self): return self._current_value
@property def is_set(self): return self._is_set @property def name(self): return self._name
[docs] def display(self): print(self._help_string) if self._allowed_values is not None: print(self._allowed_values)
# make geetter setter _required_parameters = [ "outfile", "roi", "tstarts", "tstops", "irf", "galactic_model", "particle_model", ] _optional_parameters = [ "ra", "dec", "bin_file", "tsmin", "strategy", "thetamax", "spectralfiles", "liketype", "optimizeposition", "datarepository", "ltcube", "expomap", "ulphindex", "flemin", "flemax", "fgl_mode", "tsmap_spec", "filter_GTI", "likelihood_profile", "remove_fits_files", "log_bins", "bin_file", "source_model", ]
[docs] class TransientLATDataBuilder(object): def __init__(self, triggername, **init_values): """Build the command for GtBurst's likelihood analysis and produce the required files for the FermiLATLike plugin. :param triggername: the trigger name in YYMMDDXXX fermi format :returns: :rtype: """ assert ( has_fermitools ), "You do not have the fermitools installed and cannot run GtBurst" self._triggername = triggername # we create a hash of all the parameters # and add them to the class # this is a really ugly and long way to do this self._parameters = collections.OrderedDict() # set the name for this parameter name = "outfile" # add it to the hash as a parameter object # no value is set UNLESS there is a default self._parameters[name] = LATLikelihoodParameter( name=name, help_string="File for the results (will be overwritten)", is_number=False, ) # this keeps the user from erasing these objects accidentally super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) # and repeat name = "ra" self._parameters[name] = LATLikelihoodParameter( name=name, help_string="R.A. of the object (J2000)", is_number=True ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "dec" self._parameters[name] = LATLikelihoodParameter( name=name, help_string="Dec. of the object (J2000)", is_number=True ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "roi" self._parameters[name] = LATLikelihoodParameter( name=name, help_string="Radius of the Region Of Interest (ROI)", is_number=True, ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "tstarts" self._parameters[name] = LATLikelihoodParameter( name=name, default_value=None, help_string="Comma-separated list of start times (with respect to trigger)", is_number=False, ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "tstops" self._parameters[name] = LATLikelihoodParameter( name=name, default_value=None, help_string="Comma-separated list of stop times (with respect to trigger)", is_number=False, ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "zmax" self._parameters[name] = LATLikelihoodParameter( name=name, default_value=100.0, help_string="Zenith cut", is_number=True ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "emin" self._parameters[name] = LATLikelihoodParameter( name=name, default_value=100.0, help_string="Minimum energy for the analysis", is_number=True, ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "emax" self._parameters[name] = LATLikelihoodParameter( name=name, default_value=100000.0, help_string="Maximum energy for the analysis", is_number=True, ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "irf" self._parameters[name] = LATLikelihoodParameter( name=name, default_value="p8_source", help_string="Instrument Function to be used (IRF)", is_number=False, allowed_values=irfs, ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "galactic_model" self._parameters[name] = LATLikelihoodParameter( name=name, help_string="Galactic model for the likelihood", is_number=False, allowed_values=["template (fixed norm.)", "template", "none"], ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "particle_model" self._parameters[name] = LATLikelihoodParameter( name=name, help_string="Particle model", is_number=False, allowed_values=[ "isotr with pow spectrum", "isotr template", "none", "bkge", "auto", ], ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "source_model" self._parameters[name] = LATLikelihoodParameter( name=name, help_string="Source model", default_value="PowerLaw2", is_number=False, allowed_values=spectra.keys(), ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "tsmin" self._parameters[name] = LATLikelihoodParameter( name=name, default_value=20.0, help_string="Minimum TS to consider a detection", is_number=True, ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "strategy" self._parameters[name] = LATLikelihoodParameter( name=name, default_value="time", help_string="Strategy for Zenith cut: events or time", is_number=False, allowed_values=["events", "time"], ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "thetamax" self._parameters[name] = LATLikelihoodParameter( name=name, default_value=180.0, help_string="Theta cut", is_number=True ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "spectralfiles" self._parameters[name] = LATLikelihoodParameter( name=name, default_value="no", help_string="Produce spectral files to be used in XSPEC?", allowed_values=["yes", "no"], is_number=False, ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "liketype" self._parameters[name] = LATLikelihoodParameter( name=name, default_value="unbinned", help_string="Likelihood type (binned or unbinned)", allowed_values=["binned", "unbinned"], is_number=False, ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "bin_file" help_str = ( "A string containing a text file 'res.txt start end' will get the start and" " stop times from the columns 'start' and 'end' in the file res.txt.", ) self._parameters[name] = LATLikelihoodParameter( name=name, default_value=None, help_string=help_str, is_bool=False, is_number=False, ) ################################## name = "log_bins" help_str = "Use logarithmically-spaced bins. For example: '1.0 10000.0 30'" self._parameters[name] = LATLikelihoodParameter( name=name, default_value=None, help_string=help_str, is_number=False, is_bool=False, ) ################################## name = "optimizeposition" self._parameters[name] = LATLikelihoodParameter( name=name, default_value="no", help_string="Optimize position with gtfindsrc?", allowed_values=["no", "yes"], is_number=False, ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "datarepository" self._parameters[name] = LATLikelihoodParameter( name=name, default_value=configuration.get("dataRepository"), help_string="Dir where data are stored", is_number=False, ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "ltcube" self._parameters[name] = LATLikelihoodParameter( name=name, default_value="", help_string="Pre-computed livetime cube", is_number=False, ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "expomap" self._parameters[name] = LATLikelihoodParameter( name=name, default_value="", help_string="Pre-computed exposure map", is_number=False, ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "ulphindex" self._parameters[name] = LATLikelihoodParameter( name=name, default_value=-2, help_string="Photon index for upper limits", is_number=True, ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "flemin" self._parameters[name] = LATLikelihoodParameter( name=name, default_value=100, help_string="Lower bound energy for flux/upper limit computation", is_number=True, ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "flemax" self._parameters[name] = LATLikelihoodParameter( name=name, default_value=10000, help_string="Upper bound energy for flux/upper limit computation", is_number=True, ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "fgl_mode" help_str = ( "Set 'complete' to use all FGL sources, set 'fast' to use only " "bright sources" ) self._parameters[name] = LATLikelihoodParameter( name=name, default_value="fast", help_string=help_str, is_number=False, ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "tsmap_spec" help_str = ( "A TS map specification of the type half_size,n_side. For example: " "\n 0.5,8' makes a TS map 1 deg x 1 deg with 64 points" ) self._parameters[name] = LATLikelihoodParameter( name=name, default_value=None, help_string=help_str, is_number=False, ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "filter_GTI" self._parameters[name] = LATLikelihoodParameter( name=name, default_value=False, help_string="Automatically divide time intervals crossing GTIs", is_bool=True, is_number=False, ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "likelihood_profile" help_str = ( "Produce a text file containing the profile of the likelihood for a " "\n changing normalization" ) self._parameters[name] = LATLikelihoodParameter( name=name, default_value=False, help_string=help_str, is_bool=True, is_number=False, ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = "remove_fits_files" help_str = ( "Whether to remove the FITS files of every interval in order to " "save disk space" ) self._parameters[name] = LATLikelihoodParameter( name=name, default_value=False, help_string=help_str, is_bool=True, is_number=False, ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) # Now if there are keywords from a configuration to read, # lets do it self._process_keywords(**init_values) def _process_keywords(self, **kwargs): """Processes the keywords from a dictionary likely loaded from a yaml config. :returns: :rtype: """ for k, v in kwargs.items(): if k in self._parameters: self._parameters[k].value = v else: # add warning that there is something strange in the configuration pass def __setattr__(self, name, value): """Override this so that we cannot erase parameters.""" if (name in _required_parameters) or (name in _optional_parameters): raise AttributeError("%s is an immutable attribute." % name) else: super(TransientLATDataBuilder, self).__setattr__(name, value) def _get_command_string(self): """This builds the cmd string for the script.""" executable = os.path.join( "fermitools", "GtBurst", "scripts", "doTimeResolvedLike.py" ) cmd_str = "%s %s" % (executable, self._triggername) for k, v in self._parameters.items(): # only add on the parameters that are set if v.is_set: cmd_str += " %s" % v.value else: # but fail if we did not set the ones needed assert v.name not in _required_parameters, ( "%s is not set but is required" % v.name ) return cmd_str
[docs] def run(self, include_previous_intervals=False, recompute_intervals=False): """Run GtBurst to produce the files needed for the FermiLATLike plugin.""" assert ( has_fermitools ), "You do not have the fermitools installed and cannot run GtBurst" # This is not the cleanest way to do this, but at the moment I see # no way around it as I do not want to rewrite the fermitools cmd = self._get_command_string() # should not allow you to be missing args! # now we want to get the site package directory to find where the script is # located. This should be the first entry... might break in teh future! site_pkg = site.getsitepackages()[0] cmd = os.path.join(site_pkg, cmd) executable = cmd.split()[0] gtapp_mp_dir = os.path.join(site_pkg, "fermitools", "GtBurst", "gtapps_mp") executables = [ executable, os.path.join(gtapp_mp_dir, "gtdiffrsp_mp.py"), os.path.join(gtapp_mp_dir, "gtexpmap_mp.py"), os.path.join(gtapp_mp_dir, "gtltcube_mp.py"), os.path.join(gtapp_mp_dir, "gttsmap_mp.py"), ] try: for _e in executables: log.info("Changing permission to %s" % _e) os.chmod(_e, 0o755) except PermissionError: pass log.info("About to run the following command:\n%s" % cmd) # see what we already have intervals_before_run = glob("interval*-*") if recompute_intervals: if intervals_before_run: # ok, perhaps the user wants to recompute intervals in this folder. # We will move the old intervals to a tmp directory # so that they are not lost log.info( "You have choosen to recompute the time intervals in this folder" ) tmp_dir = "tmp_%s" % str(uuid.uuid4()) log.info("The older entries will be moved to %s" % tmp_dir) os.mkdir(tmp_dir) for interval in intervals_before_run: shutil.move(interval, tmp_dir) # now remove these intervals_before_run = [] if include_previous_intervals: intervals_before_run = [] # run this baby subprocess.call(cmd, shell=True) self.lat_observations = self._create_lat_observations_from_run( intervals_before_run ) return self.lat_observations
def _create_lat_observations_from_run(self, intervals_before_run): """After a run of gtburst, this collects the all the relevant files from each inteval and turns them into LAT observations. :rtype: """ # scroll thru the intervals that were created # place them in LAT observations # attach them to dictionary # dir = the interval lat_observations = [] # need a strategy to collect the intervals intervals = sorted(glob("interval*-*")) for interval in intervals: if interval in intervals_before_run: log.info( f"{interval} existed before this run,\n it will not be auto " "included in the list,\n but you can manually see grab the data." ) else: # check that either tstarts,tstops or log_bins are defined if ( self._parameters["tstarts"].value is not None or self._parameters["tstops"].value is not None ): tstart, tstop = [ float(x) for x in re.match( r"^interval(-?\d*\.\d*)-(-?\d*\.\d*)\/?$", interval ).groups() ] else: assert ( self._parameters["log_bins"].value is None ), "Choose either to use tstarts and tstops, or to use log_bins" event_file = os.path.join( interval, "gll_ft1_tr_bn%s_v00_filt.fit" % self._triggername ) if not file_existing_and_readable(event_file): log.info("The event file does not exist. Please examine!") ft2_file = os.path.join( interval, "gll_ft2_tr_bn%s_v00_filt.fit" % self._triggername ) if not file_existing_and_readable(ft2_file): log.info("The ft2 file does not exist. Please examine!") log.info("we will grab the data file for you.") base_ft2_file = os.path.join( "%s" % self.datarepository.get_disp_value(), "bn%s" % self._triggername, "gll_ft2_tr_bn%s_v00.fit" % self._triggername, ) if not file_existing_and_readable(base_ft2_file): log.error("Cannot find any FT2 files!") raise AssertionError() shutil.copy(base_ft2_file, interval) ft2_file = os.path.join( interval, "gll_ft2_tr_bn%s_v00.fit" % self._triggername ) log.info("copied %s to %s" % (base_ft2_file, ft2_file)) exposure_map = os.path.join( interval, "gll_ft1_tr_bn%s_v00_filt_expomap.fit" % self._triggername ) if not file_existing_and_readable(exposure_map): log.info("The exposure map does not exist. Please examine!") livetime_cube = os.path.join( interval, "gll_ft1_tr_bn%s_v00_filt_ltcube.fit" % self._triggername ) if not file_existing_and_readable(livetime_cube): log.info("The livetime_cube does not exist. Please examine!") # optional bin_file parameter # if self._parameters['bin_file'].value is not None: # liketype matches # assert self._parameters['liketype'] == 'binned', 'liketype must be # binned to use bin_file parameter # %s'%self._parameters['bin_file'].value # value carries a few arguments, take first value- path # bin_file_path = self._parameters['bin_file'].value.split()[0] # bin_file = os.path.join(interval, bin_file_path) # if not file_existing_and_readable(bin_file): # print('The bin_file at %s does not exist. Please examine! # '%bin_file) # now create a LAT observation object this_obs = LATObservation( event_file, ft2_file, exposure_map, livetime_cube, tstart, tstop, self._parameters["liketype"].get_disp_value(), self._triggername, ) # @, bin_file) lat_observations.append(this_obs) return lat_observations
[docs] def to_LATLike(self): _lat_like_plugins = [] for _lat_ob in self.lat_observations: _lat_like_plugins.append(_lat_ob.to_LATLike()) return _lat_like_plugins
[docs] def display(self, get=False): """Display the current set parameters.""" out = collections.OrderedDict() for k, v in self._parameters.items(): if v.is_set: out[k] = v.get_disp_value() df = pd.Series(out) print(df) if get: return df
def __repr__(self): return self.display(get=True).to_string()
[docs] def save_configuration(self, filename): """Save the current configuration to a yaml file for use later. Suggested extension is .yml. :param filename: the yaml file name to save to :returns: :rtype: """ # create a temporary dict to hold # the set values data = {} for k, v in self._parameters.items(): if v.is_set: data[k] = v.get_disp_value() with open(filename, "w") as outfile: yaml.dump(data, outfile, default_flow_style=False)
[docs] @classmethod def from_saved_configuration(cls, triggername, config_file): """Load a saved yaml configuration for the given trigger name. :param triggername: Trigger name of the source in YYMMDDXXX :param config_file: the saved yaml configuration to use :returns: :rtype: """ with open(config_file, "r") as stream: loaded_config = yaml.safe_load(stream) return cls(triggername, **loaded_config)
[docs] class LATObservation(object): def __init__( self, event_file, ft2_file, exposure_map, livetime_cube, tstart, tstop, liketype, triggername, ): # , bin_file): """A container to formalize the storage of Fermi LAT observation files. :param event_file: :param ft2_file: :param exposure_map: :param livetime_cube: :param tstart: :param tstop: :param liketype: :param triggername: :returns: :rtype: """ self._event_file = event_file self._ft2_file = ft2_file self._exposure_map = exposure_map self._livetime_cube = livetime_cube self._tstart = tstart self._tstop = tstop self._liketype = liketype self._triggername = triggername # self._bin_file = bin_file @property def event_file(self): return self._event_file @property def ft2_file(self): return self._ft2_file @property def exposure_map(self): return self._exposure_map @property def livetime_cube(self): return self._livetime_cube @property def tstart(self): return self._tstart @property def tstop(self): return self._tstop @property def liketype(self): return self._liketype # @property # def bin_file(self): # return self._bin_file @property def triggername(self): return self._triggername def __repr__(self): output = collections.OrderedDict() output["time interval"] = "%.3f-%.3f" % (self._tstart, self._tstop) output["event_file"] = self._event_file output["ft2_file"] = self._ft2_file output["exposure_map"] = self._exposure_map output["livetime_cube"] = self._livetime_cube output["triggername"] = self._triggername output["liketype"] = self._liketype # output['bin_file'] = self._bin_file df = pd.Series(output) return df.to_string()
[docs] def to_LATLike(self): _fermi_lat_like = FermiLATLike( name=("LAT%dX%d" % (self._tstart, self._tstop)).replace("-", "n"), event_file=self._event_file, ft2_file=self._ft2_file, livetime_cube_file=self._livetime_cube, kind=self._liketype, exposure_map_file=self._exposure_map, source_maps=None, binned_expo_map=None, ) # ,bin_file = self._bin_file) # source_name=self._triggername) return _fermi_lat_like