Source code for threeML.utils.data_builders.fermi.lat_transient_builder

import collections
import os
import re
import shutil
import site
import subprocess
import uuid
from glob import glob

import pandas as pd
import yaml

from threeML.config import threeML_config
from threeML.io.file_utils import file_existing_and_readable
from threeML.io.logging import setup_logger

pd.reset_option('display.float_format')

log = setup_logger(__name__)


try: 
    from GtBurst import IRFS
    from GtBurst.Configuration import Configuration
    from GtBurst.FuncFactory import Spectra

    from threeML import FermiLATLike

    irfs = IRFS.IRFS.keys()
    spectra = Spectra()

    #irfs.append('auto')

    configuration = Configuration()

    has_fermitools = True

except (ImportError):

    has_fermitools = False

    if threeML_config.logging.startup_warnings:

        log.warning('No fermitools installed')
        




[docs] class LATLikelihoodParameter(object): def __init__(self, name, help_string, default_value=None, allowed_values=None, is_number=True, is_bool=False): """ A container for the parameters that are needed by GtBurst :param name: the parameter name :param help_string: the help string :param default_value: a default value if needed :param allowed_values: the values allowed for input :param is_number: if this is a number :param is_bool: if this is a bool :returns: :rtype: """ self._name = name self._allowed_values = allowed_values self._default_value = default_value self._is_number = is_number self._is_bool = is_bool self._help_string = help_string self._is_set = False # if there is a default value, lets go ahead and set it if default_value is not None: self.__set_value(default_value) def __get_value(self): # make sure that the value set is allowed if self._allowed_values is not None: assert self._current_value in set(self._allowed_values), 'The value of %s is not in %s' % (self._name, self._allowed_values ) # construct the class out_string = '--%s' % self._name if self._is_number: out_string += ' %f' % self._current_value elif self._is_bool: if not self._current_value: # we remove the string out_string = '' else: out_string += " '%s'" % self._current_value return out_string def __set_value(self, value): if self._allowed_values is not None: assert value in self._allowed_values, 'The value %s of %s is not in %s' % (value,self._name, self._allowed_values) self._current_value = value self._is_set = True value = property(__get_value, __set_value)
[docs] def get_disp_value(self): return self._current_value
@property def is_set(self): return self._is_set @property def name(self): return self._name
[docs] def display(self): print(self._help_string) if self._allowed_values is not None: print(self._allowed_values)
# make geetter setter _required_parameters = [ 'outfile', 'roi', 'tstarts', 'tstops', 'irf', 'galactic_model', 'particle_model', ] _optional_parameters = [ 'ra', 'dec', 'bin_file', 'tsmin', 'strategy', 'thetamax', 'spectralfiles', 'liketype', 'optimizeposition', 'datarepository', 'ltcube', 'expomap', 'ulphindex', 'flemin', 'flemax', 'fgl_mode', 'tsmap_spec', 'filter_GTI', 'likelihood_profile', 'remove_fits_files','log_bins', 'bin_file','source_model' ]
[docs] class TransientLATDataBuilder(object): def __init__(self, triggername, **init_values): """ Build the command for GtBurst's likelihood analysis and produce the required files for the FermiLATLike plugin :param triggername: the trigger name in YYMMDDXXX fermi format :returns: :rtype: """ self._triggername = triggername # we create a hash of all the parameters # and add them to the class # this is a really ugly and long way to do this self._parameters = collections.OrderedDict() # set the name for this parameter name = 'outfile' # add it to the hash as a parameter object # no value is set UNLESS there is a default self._parameters[name] = LATLikelihoodParameter( name=name, help_string="File for the results (will be overwritten)", is_number=False) # this keeps the user from erasing these objects accidentally super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) # and repeat name = 'ra' self._parameters[name] = LATLikelihoodParameter( name=name, help_string="R.A. of the object (J2000)", is_number=True) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'dec' self._parameters[name] = LATLikelihoodParameter( name=name, help_string="Dec. of the object (J2000)", is_number=True) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'roi' self._parameters[name] = LATLikelihoodParameter( name=name, help_string="Radius of the Region Of Interest (ROI)", is_number=True) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'tstarts' self._parameters[name] = LATLikelihoodParameter( name=name, default_value = None, help_string="Comma-separated list of start times (with respect to trigger)", is_number=False) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'tstops' self._parameters[name] = LATLikelihoodParameter( name=name, default_value = None, help_string="Comma-separated list of stop times (with respect to trigger)", is_number=False) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'zmax' self._parameters[name] = LATLikelihoodParameter( name=name, default_value=100., help_string="Zenith cut", is_number=True) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'emin' self._parameters[name] = LATLikelihoodParameter( name=name, default_value=100., help_string="Minimum energy for the analysis", is_number=True) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'emax' self._parameters[name] = LATLikelihoodParameter( name=name, default_value=100000., help_string="Maximum energy for the analysis", is_number=True) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'irf' self._parameters[name] = LATLikelihoodParameter( name=name, default_value='p8_source', help_string="Instrument Function to be used (IRF)", is_number=False, allowed_values=irfs) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'galactic_model' self._parameters[name] = LATLikelihoodParameter( name=name, help_string="Galactic model for the likelihood", is_number=False, allowed_values=['template (fixed norm.)', 'template', 'none']) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'particle_model' self._parameters[name] = LATLikelihoodParameter( name=name, help_string="Particle model", is_number=False, allowed_values=['isotr with pow spectrum', 'isotr template', 'none', 'bkge', 'auto']) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'source_model' self._parameters[name] = LATLikelihoodParameter( name=name, help_string="Source model", default_value='PowerLaw2', is_number=False, allowed_values=spectra.keys()) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'tsmin' self._parameters[name] = LATLikelihoodParameter( name=name, default_value=20., help_string="Minimum TS to consider a detection", is_number=True, ) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'strategy' self._parameters[name] = LATLikelihoodParameter( name=name, default_value='time', help_string="Strategy for Zenith cut: events or time", is_number=False, allowed_values=['events', 'time']) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'thetamax' self._parameters[name] = LATLikelihoodParameter( name=name, default_value=180., help_string="Theta cut", is_number=True) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'spectralfiles' self._parameters[name] = LATLikelihoodParameter( name=name, default_value='no', help_string="Produce spectral files to be used in XSPEC?", allowed_values=['yes', 'no'], is_number=False) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'liketype' self._parameters[name] = LATLikelihoodParameter( name=name, default_value='unbinned', help_string="Likelihood type (binned or unbinned)", allowed_values=['binned', 'unbinned'], is_number=False) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'bin_file' self._parameters[name] = LATLikelihoodParameter( name = name, default_value = None, help_string = "A string containing a text file 'res.txt start end' will get the start and stop times from the columns 'start' and 'end' in the file res.txt.", is_bool = False, is_number = False) ################################## name = 'log_bins' self._parameters[name] = LATLikelihoodParameter( name = name, default_value = None, help_string = "Use logarithmically-spaced bins. For example: '1.0 10000.0 30'", is_number = False, is_bool = False) ################################## name = 'optimizeposition' self._parameters[name] = LATLikelihoodParameter( name=name, default_value='no', help_string="Optimize position with gtfindsrc?", allowed_values=['no', 'yes'], is_number=False) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'datarepository' self._parameters[name] = LATLikelihoodParameter( name=name, default_value=configuration.get('dataRepository'), help_string="Dir where data are stored", is_number=False) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'ltcube' self._parameters[name] = LATLikelihoodParameter( name=name, default_value='', help_string="Pre-computed livetime cube", is_number=False) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'expomap' self._parameters[name] = LATLikelihoodParameter( name=name, default_value='', help_string="Pre-computed exposure map", is_number=False) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'ulphindex' self._parameters[name] = LATLikelihoodParameter( name=name, default_value=-2, help_string="Photon index for upper limits", is_number=True) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'flemin' self._parameters[name] = LATLikelihoodParameter( name=name, default_value=100, help_string="Lower bound energy for flux/upper limit computation", is_number=True) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'flemax' self._parameters[name] = LATLikelihoodParameter( name=name, default_value=10000, help_string="Upper bound energy for flux/upper limit computation", is_number=True) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'fgl_mode' self._parameters[name] = LATLikelihoodParameter( name=name, default_value='fast', help_string="Set 'complete' to use all FGL sources, set 'fast' to use only bright sources", is_number=False) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'tsmap_spec' self._parameters[name] = LATLikelihoodParameter( name=name, default_value=None, help_string= "A TS map specification of the type half_size,n_side. For example: \n 0.5,8' makes a TS map 1 deg x 1 deg with 64 points", is_number=False) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'filter_GTI' self._parameters[name] = LATLikelihoodParameter( name=name, default_value=False, help_string="Automatically divide time intervals crossing GTIs", is_bool=True, is_number=False) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'likelihood_profile' self._parameters[name] = LATLikelihoodParameter( name=name, default_value=False, help_string="Produce a text file containing the profile of the likelihood for a \n changing normalization ", is_bool=True, is_number=False) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) ################################## name = 'remove_fits_files' self._parameters[name] = LATLikelihoodParameter( name=name, default_value=False, help_string="Whether to remove the FITS files of every interval in order to save disk space", is_bool=True, is_number=False) super(TransientLATDataBuilder, self).__setattr__(name, self._parameters[name]) # Now if there are keywords from a configuration to read, # lets do it self._process_keywords(**init_values) def _process_keywords(self, **kwargs): """ processes the keywords from a dictionary likely loaded from a yaml config :returns: :rtype: """ for k, v in kwargs.items(): if k in self._parameters: self._parameters[k].value = v else: # add warning that there is something strange in the configuration pass def __setattr__(self, name, value): """ Override this so that we cannot erase parameters """ if (name in _required_parameters) or (name in _optional_parameters): raise AttributeError("%s is an immutable attribute." % name) else: super(TransientLATDataBuilder, self).__setattr__(name, value) def _get_command_string(self): """ This builds the cmd string for the script """ executable = os.path.join('fermitools', 'GtBurst', 'scripts', 'doTimeResolvedLike.py') cmd_str = '%s %s' % (executable, self._triggername) for k, v in self._parameters.items(): # only add on the parameters that are set if v.is_set: cmd_str += ' %s' % v.value else: # but fail if we did not set the ones needed assert v.name not in _required_parameters, '%s is not set but is required' % v.name return cmd_str
[docs] def run(self, include_previous_intervals=False, recompute_intervals=False): """ run GtBurst to produce the files needed for the FermiLATLike plugin """ assert has_fermitools, 'You do not have the fermitools installed and cannot run GtBurst' # This is not the cleanest way to do this, but at the moment I see # no way around it as I do not want to rewrite the fermitools cmd = self._get_command_string() # should not allow you to be missing args! # now we want to get the site package directory to find where the script is # located. This should be the first entry... might break in teh future! site_pkg = site.getsitepackages()[0] cmd = os.path.join(site_pkg, cmd) executable = cmd.split()[0] gtapp_mp_dir = os.path.join(site_pkg,'fermitools', 'GtBurst', 'gtapps_mp') executables = [ executable, os.path.join(gtapp_mp_dir, 'gtdiffrsp_mp.py'), os.path.join(gtapp_mp_dir, 'gtexpmap_mp.py'), os.path.join(gtapp_mp_dir, 'gtltcube_mp.py'), os.path.join(gtapp_mp_dir, 'gttsmap_mp.py'), ] for _e in executables: print ("Changing permission to %s" % _e) os.chmod(_e, 0o755) log.info('About to run the following command:\n%s' % cmd) # see what we already have intervals_before_run = glob('interval*-*') if recompute_intervals: if intervals_before_run: # ok, perhaps the user wants to recompute intervals in this folder. # We will move the old intervals to a tmp directory # so that they are not lost log.info('You have choosen to recompute the time intervals in this folder') tmp_dir = 'tmp_%s' % str(uuid.uuid4()) log.info('The older entries will be moved to %s' % tmp_dir) os.mkdir(tmp_dir) for interval in intervals_before_run: shutil.move(interval, tmp_dir) # now remove these intervals_before_run = [] if include_previous_intervals: intervals_before_run = [] # run this baby subprocess.call(cmd, shell=True) self.lat_observations = self._create_lat_observations_from_run(intervals_before_run) return self.lat_observations
def _create_lat_observations_from_run(self, intervals_before_run): """ After a run of gtburst, this collects the all the relevant files from each inteval and turns them into LAT observations. :rtype: """ # scroll thru the intervals that were created # place them in LAT observations # attach them to dictionary # dir = the interval lat_observations = [] # need a strategy to collect the intervals intervals = sorted(glob('interval*-*')) for interval in intervals: if interval in intervals_before_run: log.info( '%s existed before this run,\n it will not be auto included in the list,\n but you can manually see grab the data.' % interval ) else: # check that either tstarts,tstops or log_bins are defined if self._parameters['tstarts'].value is not None or self._parameters['tstops'].value is not None: tstart, tstop = [ float(x) for x in re.match('^interval(-?\d*\.\d*)-(-?\d*\.\d*)\/?$', interval).groups()] else: assert self._parameters['log_bins'].value is None, 'Choose either to use tstarts and tstops, or to use log_bins' event_file = os.path.join(interval, 'gll_ft1_tr_bn%s_v00_filt.fit' % self._triggername) if not file_existing_and_readable(event_file): log.info('The event file does not exist. Please examine!') ft2_file = os.path.join(interval, 'gll_ft2_tr_bn%s_v00_filt.fit' % self._triggername) if not file_existing_and_readable(ft2_file): log.info('The ft2 file does not exist. Please examine!') log.info('we will grab the data file for you.') base_ft2_file = os.path.join('%s' % self.datarepository.get_disp_value(), 'bn%s' % self._triggername, 'gll_ft2_tr_bn%s_v00.fit' % self._triggername) if not file_existing_and_readable(base_ft2_file): log.error('Cannot find any FT2 files!') raise AssertionError() shutil.copy(base_ft2_file, interval) ft2_file = os.path.join(interval, 'gll_ft2_tr_bn%s_v00.fit' % self._triggername) log.info('copied %s to %s' % (base_ft2_file, ft2_file)) exposure_map = os.path.join(interval, 'gll_ft1_tr_bn%s_v00_filt_expomap.fit' % self._triggername) if not file_existing_and_readable(exposure_map): log.info('The exposure map does not exist. Please examine!') livetime_cube = os.path.join(interval, 'gll_ft1_tr_bn%s_v00_filt_ltcube.fit' % self._triggername) if not file_existing_and_readable(livetime_cube): log.info('The livetime_cube does not exist. Please examine!') # optional bin_file parameter #if self._parameters['bin_file'].value is not None: # liketype matches # assert self._parameters['liketype'] == 'binned', 'liketype must be binned to use bin_file parameter %s'%self._parameters['bin_file'].value # value carries a few arguments, take first value- path # bin_file_path = self._parameters['bin_file'].value.split()[0] # bin_file = os.path.join(interval, bin_file_path) # if not file_existing_and_readable(bin_file): # print('The bin_file at %s does not exist. Please examine!'%bin_file) # now create a LAT observation object this_obs = LATObservation(event_file, ft2_file, exposure_map, livetime_cube, tstart, tstop, self._parameters['liketype'].get_disp_value(), self._triggername)#@, bin_file) lat_observations.append(this_obs) return lat_observations
[docs] def to_LATLike(self): _lat_like_plugins=[] for _lat_ob in self.lat_observations: _lat_like_plugins.append(_lat_ob.to_LATLike()) return _lat_like_plugins
[docs] def display(self, get=False): """ Display the current set parameters """ out = collections.OrderedDict() for k, v in self._parameters.items(): if v.is_set: out[k] = v.get_disp_value() df = pd.Series(out) print(df) if get: return df
def __repr__(self): return self.display(get=True).to_string()
[docs] def save_configuration(self, filename): """ Save the current configuration to a yaml file for use later. Suggested extension is .yml :param filename: the yaml file name to save to :returns: :rtype: """ # create a temporary dict to hold # the set values data = {} for k, v in self._parameters.items(): if v.is_set: data[k] = v.get_disp_value() with open(filename, 'w') as outfile: yaml.dump(data, outfile, default_flow_style=False)
[docs] @classmethod def from_saved_configuration(cls, triggername, config_file): """ Load a saved yaml configuration for the given trigger name :param triggername: Trigger name of the source in YYMMDDXXX :param config_file: the saved yaml configuration to use :returns: :rtype: """ with open(config_file, 'r') as stream: loaded_config = yaml.safe_load(stream) return cls(triggername, **loaded_config)
[docs] class LATObservation(object): def __init__(self, event_file, ft2_file, exposure_map, livetime_cube, tstart, tstop, liketype, triggername):#, bin_file): """ A container to formalize the storage of Fermi LAT observation files :param event_file: :param ft2_file: :param exposure_map: :param livetime_cube: :param tstart: :param tstop: :param liketype: :param triggername: :returns: :rtype: """ self._event_file = event_file self._ft2_file = ft2_file self._exposure_map = exposure_map self._livetime_cube = livetime_cube self._tstart = tstart self._tstop = tstop self._liketype =liketype self._triggername = triggername #self._bin_file = bin_file @property def event_file(self): return self._event_file @property def ft2_file(self): return self._ft2_file @property def exposure_map(self): return self._exposure_map @property def livetime_cube(self): return self._livetime_cube @property def tstart(self): return self._tstart @property def tstop(self): return self._tstop @property def liketype(self): return self._liketype #@property #def bin_file(self): # return self._bin_file @property def triggername(self): return self._triggername def __repr__(self): output = collections.OrderedDict() output['time interval'] = '%.3f-%.3f' % (self._tstart, self._tstop) output['event_file'] = self._event_file output['ft2_file'] = self._ft2_file output['exposure_map'] = self._exposure_map output['livetime_cube'] = self._livetime_cube output['triggername'] = self._triggername output['liketype'] = self._liketype #output['bin_file'] = self._bin_file df = pd.Series(output) return df.to_string()
[docs] def to_LATLike(self): _fermi_lat_like = FermiLATLike( name = ('LAT%dX%d' % (self._tstart, self._tstop)).replace('-','n'), event_file = self._event_file, ft2_file = self._ft2_file, livetime_cube_file = self._livetime_cube, kind = self._liketype, exposure_map_file=self._exposure_map, source_maps=None, binned_expo_map=None) #,bin_file = self._bin_file) #source_name=self._triggername) return _fermi_lat_like