__author__ = "grburgess"
import collections
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, List, Optional
import warnings
import h5py
import numpy as np
import pandas as pd
from threeML.config.config import threeML_config
from threeML.config.config_utils import get_value_kwargs
from threeML.io.file_utils import sanitize_filename
from threeML.io.logging import setup_logger
from threeML.parallel.parallel_client import ParallelClient
from threeML.utils.progress_bar import trange
from threeML.utils.spectrum.binned_spectrum import Quality
from threeML.utils.time_interval import TimeIntervalSet
from threeML.utils.time_series.polynomial import (Polynomial, polyfit,
unbinned_polyfit)
log = setup_logger(__name__)
[docs]class ReducingNumberOfThreads(Warning):
pass
[docs]class ReducingNumberOfSteps(Warning):
pass
[docs]class OverLappingIntervals(RuntimeError):
pass
# find out how many splits we need to make
[docs]def ceildiv(a, b):
return -(-a // b)
@dataclass(frozen=True)
class _OutputContainer:
"""
A dummy contaier to extract information from the light curve
"""
instrument: str
telescope: str
tstart: Iterable[float]
telapse: Iterable[float]
channel: Iterable[int]
counts: Iterable[int]
rates: Iterable[float]
edges: Iterable[float]
quality: Quality
backfile: str
grouping: Iterable[int]
exposure: Iterable[float]
counts_error: Optional[Iterable[float]] = None
rate_error: Optional[Iterable[float]] = None
[docs]class TimeSeries(object):
def __init__(
self,
start_time: float,
stop_time: float,
n_channels: int,
native_quality=None,
first_channel: int = 1,
ra: float = None,
dec: float = None,
mission: str = None,
instrument: str = None,
verbose: bool = True,
edges=None,
):
"""
The EventList is a container for event data that is tagged in time
and in PHA/energy. It handles event selection,
temporal polynomial fitting, temporal binning, and exposure
calculations (in subclasses). Once events are selected
and/or polynomials are fit, the selections can be extracted via a
PHAContainer which is can be read by an OGIPLike
instance and translated into a PHA instance.
:param n_channels: Number of detector channels
:param start_time: start time of the event list
:param stop_time: stop time of the event list
:param first_channel: where detchans begin indexing
:param rsp_file: the response file corresponding to these events
:param arrival_times: list of event arrival times
:param energies: list of event energies or pha channels
:param native_quality: native pha quality flags
:param edges: The histogram boundaries if not specified by a response
:param mission:
:param instrument:
:param verbose:
:param ra:
:param dec:
"""
self._verbose: bool = verbose
self._n_channels: int = n_channels
self._first_channel: int = first_channel
self._native_quality = native_quality
# we haven't made selections yet
self._time_intervals = None
self._bkg_intervals = None
self._counts = None
self._exposure = None
self._poly_counts = None
self._poly_count_err = None
self._bkg_selected_counts = None
self._bkg_exposure = None
# ebounds for objects w/o a response
self._edges = edges
if native_quality is not None:
assert len(native_quality) == n_channels, (
"the native quality has length %d but you specified there were %d channels"
% (len(native_quality), n_channels)
)
self._start_time = start_time
self._stop_time = stop_time
# name the instrument if there is not one
if instrument is None:
log.warning("No instrument name is given. Setting to UNKNOWN")
self._instrument = "UNKNOWN"
else:
self._instrument = instrument
if mission is None:
log.warning("No mission name is given. Setting to UNKNOWN")
self._mission = "UNKNOWN"
else:
self._mission = mission
self._user_poly_order = -1
self._time_selection_exists = False
self._poly_fit_exists = False
self._fit_method_info = {"bin type": None, "fit method": None}
[docs] def set_active_time_intervals(self, *args):
raise RuntimeError("Must be implemented in subclass")
@property
def poly_fit_exists(self) -> bool:
return self._poly_fit_exists
@property
def n_channels(self) -> int:
return self._n_channels
@property
def bkg_intervals(self):
return self._bkg_intervals
@property
def polynomials(self):
""" Returns polynomial is they exist"""
if self._poly_fit_exists:
return self._polynomials
else:
RuntimeError("A polynomial fit has not been made.")
[docs] def get_poly_info(self) -> dict:
"""
Return a pandas panel frame with the polynomial coeffcients
and errors
Returns:
a DataFrame
"""
if self._poly_fit_exists:
coeff = []
err = []
for poly in self._polynomials:
coeff.append(poly.coefficients)
err.append(poly.error)
df_coeff = pd.DataFrame(coeff)
df_err = pd.DataFrame(err)
# print('Coefficients')
#
# display(df_coeff)
#
# print('Coefficient Error')
#
# display(df_err)
pan = {"coefficients": df_coeff, "error": df_err}
return pan
else:
log.error("A polynomial fit has not been made.")
RuntimeError()
[docs] def get_total_poly_count(self, start: float,
stop: float, mask=None) -> int:
"""
Get the total poly counts
:param start:
:param stop:
:return:
"""
if mask is None:
mask = np.ones_like(self._polynomials, dtype=bool)
total_counts = 0
for p in np.asarray(self._polynomials)[mask]:
total_counts += p.integral(start, stop)
return total_counts
[docs] def get_total_poly_error(self, start: float,
stop: float, mask=None)-> float:
"""
Get the total poly error
:param start:
:param stop:
:return:
"""
if mask is None:
mask = np.ones_like(self._polynomials, dtype=bool)
total_counts = 0
for p in np.asarray(self._polynomials)[mask]:
total_counts += p.integral_error(start, stop) ** 2
return np.sqrt(total_counts)
@property
def bins(self):
if self._temporal_binner is not None:
return self._temporal_binner
else:
raise RuntimeError("This EventList has no binning specified")
def __set_poly_order(self, value: int):
""" Set poly order only in allowed range and redo fit """
assert type(value) is int, "Polynomial order must be integer"
assert (
-1 <= value <= 4
), "Polynomial order must be 0-4 or -1 to have it determined"
self._user_poly_order = value
log.debug(f"poly order set to {value}")
if self._poly_fit_exists:
log.info(
f"Refitting background with new polynomial order "
"({value}) and existing selections"
)
if self._time_selection_exists:
log.debug("recomputing time selection")
self.set_polynomial_fit_interval(
*self._bkg_intervals.to_string().split(","),
unbinned=self._unbinned,
)
else:
RuntimeError("This is a bug. Should never get here")
def ___set_poly_order(self, value):
""" Indirect poly order setter """
self.__set_poly_order(value)
def __get_poly_order(self):
""" get the poly order """
return self._optimal_polynomial_grade
def ___get_poly_order(self):
""" Indirect poly order getter """
return self.__get_poly_order()
poly_order = property(
___get_poly_order,
___set_poly_order,
doc="Get or set the polynomial order"
)
@property
def time_intervals(self):
"""
the time intervals of the events
:return:
"""
return self._time_intervals
[docs] def exposure_over_interval(self, tmin, tmax) -> float:
""" calculate the exposure over a given interval """
raise RuntimeError("Must be implemented in sub class")
[docs] def counts_over_interval(self, start, stop) -> int:
"""
return the number of counts in the selected interval
:param start: start of interval
:param stop: stop of interval
:return:
"""
# this will be a boolean list and the sum will be the
# number of events
raise RuntimeError("Must be implemented in sub class")
[docs] def count_per_channel_over_interval(self, start, stop):
"""
:param start:
:param stop:
:return:
"""
raise RuntimeError("Must be implemented in sub class")
[docs] def set_background_interval(self, *time_intervals, **options):
"""Set the time interval for the background observation.
Multiple intervals can be input as separate arguments
Specified as 'tmin-tmax'. Intervals are in seconds. Example:
set_polynomial_fit_interval("-10.0-0.0","10.-15.")
:param time_intervals: intervals to fit on
:param options:
"""
fit_poly, options = get_value_kwargs("fit_poly",
bool,
threeML_config.time_series.fit.fit_poly,
**options)
self._select_background_time_interval(*time_intervals)
if fit_poly:
log.debug("Fit a polynominal to the background time intervals.")
self.fit_polynomial(**options)
log.debug("Fitting a polynominal to the background "
"time intervals done.")
else:
if self._poly_fit_exists:
# if we already did a poly fit and change the bkg interval
# now, without refitting the poly, we have to delete all the
# old fitting information!
log.info("Poly Fit exists and you want to change the "
"bkg time selection now without refitting "
"the poly. We will delete the old information "
"from the last poly fit!")
self._delete_polynominal_fit()
log.debug("Did not fit a polynominal to the background "
"time intervals.")
[docs] def fit_polynomial(self, **kwargs):
"""
Fit the polynominals to the selected time intervals
:param kwargs:
:returns:
"""
if self.bkg_intervals is None:
log.error("You first have to select the background intervals with "
"the set_background_interval method before you can "
"fit the background polynomials.")
raise RuntimeError()
# Find out if we want to binned or unbinned.
unbinned, kwargs = get_value_kwargs("unbinned",
bool,
threeML_config.time_series.fit.unbinned,
**kwargs)
bayes, kwargs = get_value_kwargs("bayes",
bool,
threeML_config.time_series.fit.bayes,
**kwargs)
if unbinned:
log.info("At the moment this unbinned polynominal fitting "
"is only correct if the dead time ratio is constant "
"in the selected background time intervals!")
if bayes:
self._fit_method_info["fit method"] = "bayes"
else:
self._fit_method_info["fit method"] = "mle"
# Fit the events with the given intervals
if unbinned:
self._unbinned = True # keep track!
self._unbinned_fit_polynomials(bayes=bayes)
else:
self._unbinned = False
self._fit_polynomials(bayes=bayes)
# we have a fit now
self._poly_fit_exists = True
log.info(
f"{self._fit_method_info['bin type']} "
f"{self._optimal_polynomial_grade}-order "
"polynomial fit with the "
f"{self._fit_method_info['fit method']} method"
)
# recalculate the selected counts
if self._time_selection_exists:
self.set_active_time_intervals(
*self._time_intervals.to_string().split(",")
)
def _select_background_time_interval(self, *time_intervals):
# we create some time intervals
bkg_intervals = TimeIntervalSet.from_strings(*time_intervals)
# adjust the selections to the data
new_intervals = []
self._bkg_selected_counts = []
self._bkg_exposure = 0.
for time_interval in bkg_intervals:
t1 = time_interval.start_time
t2 = time_interval.stop_time
if (self._stop_time <= t1) or (t2 <= self._start_time):
log.warning(
f"The time interval {t1}-{t2} is out side of the "
"arrival times and will be dropped"
)
else:
if t1 < self._start_time:
log.warning(
f"The time interval {t1}-{t2} started before the "
f"first arrival time ({self._start_time}), so we are"
f"changing the intervals to {self._start_time}-{t2}"
)
t1 = self._start_time # + 1
if t2 > self._stop_time:
log.warning(
f"The time interval {t1}-{t2} ended after the last "
f"arrival time ({self._stop_time}), so we are "
f"changing the intervals to {t1}-{self._stop_time}"
)
t2 = self._stop_time # - 1.
new_intervals.append(f"{t1}-{t2}")
self._bkg_selected_counts.append(
self.count_per_channel_over_interval(t1, t2)
)
self._bkg_exposure += self.exposure_over_interval(t1, t2)
# make new intervals after checks
bkg_intervals = TimeIntervalSet.from_strings(*new_intervals)
self._bkg_selected_counts = np.sum(self._bkg_selected_counts, axis=0)
# set the poly intervals as an attribute
self._bkg_intervals = bkg_intervals
def _delete_polynominal_fit(self):
"""
Delte all the information from previous poly fits
:returns:
"""
if not self._poly_fit_exists:
log.error("You can not delete the polynominal fit information "
"because no information is saved at the moment!")
raise AssertionError()
del self._unbinned
del self._polynomials
del self._optimal_polynomial_grade
self._poly_fit_exists = False
[docs] def set_polynomial_fit_interval(self, *time_intervals, **kwargs) -> None:
"""Set the time interval to fit the background.
Multiple intervals can be input as separate arguments
Specified as 'tmin-tmax'. Intervals are in seconds. Example:
set_polynomial_fit_interval("-10.0-0.0","10.-15.")
:param time_intervals: intervals to fit on
:param unbinned:
:param bayes:
:param kwargs:
"""
log.warning("set_polynomial_fit_interval will be deprecated in the "
"next release. Please use set_background_interval with "
"the same input.")
warnings.warn(DeprecationWarning())
# Find out if we want to binned or unbinned.
if "unbinned" in kwargs:
unbinned = kwargs.pop("unbinned")
assert type(
unbinned) == bool, "unbinned option must be True or False"
else:
# assuming unbinned
# could use config file here
# unbinned = threeML_config['ogip']['use-unbinned-poly-fitting']
unbinned = True
# check if we are doing a bayesian
# fit and record this info
if "bayes" in kwargs:
bayes = kwargs.pop("bayes")
else:
bayes = False
if bayes:
self._fit_method_info["fit method"] = "bayes"
else:
self._fit_method_info["fit method"] = "mle"
if unbinned:
log.info("At the moment this unbinned polynominal fitting "
"is only correct if the dead time ratio is constant "
"in the selected background time intervals!")
# we create some time intervals
bkg_intervals = TimeIntervalSet.from_strings(*time_intervals)
# adjust the selections to the data
new_intervals = []
self._bkg_selected_counts = []
self._bkg_exposure = 0.0
for time_interval in bkg_intervals:
t1 = time_interval.start_time
t2 = time_interval.stop_time
if (self._stop_time <= t1) or (t2 <= self._start_time):
log.warning(
"The time interval %f-%f is out side of the arrival times and will be dropped"
% (t1, t2)
)
else:
if t1 < self._start_time:
log.warning(
"The time interval %f-%f started before the first arrival time (%f), so we are changing the intervals to %f-%f"
% (t1, t2, self._start_time, self._start_time, t2)
)
t1 = self._start_time # + 1
if t2 > self._stop_time:
log.warning(
"The time interval %f-%f ended after the last arrival time (%f), so we are changing the intervals to %f-%f"
% (t1, t2, self._stop_time, t1, self._stop_time)
)
t2 = self._stop_time # - 1.
new_intervals.append("%f-%f" % (t1, t2))
self._bkg_selected_counts.append(
self.count_per_channel_over_interval(t1, t2)
)
self._bkg_exposure += self.exposure_over_interval(t1, t2)
# make new intervals after checks
bkg_intervals = TimeIntervalSet.from_strings(*new_intervals)
self._bkg_selected_counts = np.sum(self._bkg_selected_counts, axis=0)
# set the poly intervals as an attribute
self._bkg_intervals = bkg_intervals
# Fit the events with the given intervals
if unbinned:
self._unbinned = True # keep track!
self._unbinned_fit_polynomials(bayes=bayes)
else:
self._unbinned = False
self._fit_polynomials(bayes=bayes)
# we have a fit now
self._poly_fit_exists = True
log.info(
f"{self._fit_method_info['bin type']} "
f"{self._optimal_polynomial_grade}-order "
"polynomial fit with the "
f"{self._fit_method_info['fit method']} method"
)
# recalculate the selected counts
if self._time_selection_exists:
self.set_active_time_intervals(
*self._time_intervals.to_string().split(","))
def __repr__(self):
"""
Examine the currently selected info as well other things.
"""
return self._output().to_string()
def _output(self):
info_dict = collections.OrderedDict()
for i, interval in enumerate(self.time_intervals):
info_dict["active selection (%d)" % (i + 1)] = interval.__repr__()
info_dict["active deadtime"] = self._active_dead_time
if self._poly_fit_exists:
for i, interval in enumerate(self.bkg_intervals):
info_dict["polynomial selection (%d)" % (
i + 1)] = interval.__repr__()
info_dict["polynomial order"] = self._optimal_polynomial_grade
info_dict["polynomial fit type"] =\
self._fit_method_info["bin type"]
info_dict["polynomial fit method"] =\
self._fit_method_info["fit method"]
return pd.Series(info_dict, index=list(info_dict.keys()))
def _fit_global_and_determine_optimum_grade(self,
cnts,
bins,
exposure,
bayes=False):
"""
Provides the ability to find the optimum polynomial grade for
*binned* counts by fitting the total (all channels) to 0-4 order
polynomials and then comparing them via a likelihood ratio test.
:param cnts: counts per bin
:param bins: the bins used
:param exposure: exposure per bin
:param bayes:
:return: polynomial grade
"""
min_grade = 0
max_grade = 4
log_likelihoods = []
log.debug("attempting to find best poly with binned data")
if threeML_config["parallel"]["use_parallel"]:
def worker(grade):
polynomial, log_like = polyfit(
bins, cnts, grade, exposure, bayes=bayes)
return log_like
client = ParallelClient()
log_likelihoods = client.execute_with_progress_bar(
worker,
list(range(min_grade, max_grade + 1)),
name="Finding best polynomial Order"
)
else:
for grade in trange(min_grade,
max_grade + 1,
desc="Finding best polynomial Order"
):
polynomial, log_like = polyfit(
bins, cnts, grade, exposure, bayes=bayes)
log_likelihoods.append(log_like)
# Found the best one
delta_loglike = np.array(
[2 * (x[0] - x[1])
for x in zip(log_likelihoods[:-1], log_likelihoods[1:])]
)
log.debug(f"log likes {log_likelihoods}")
log.debug(f" delta loglikes {delta_loglike}")
delta_threshold = 9.0
mask = delta_loglike >= delta_threshold
if len(mask.nonzero()[0]) == 0:
# best grade is zero!
best_grade = 0
else:
best_grade = mask.nonzero()[0][-1] + 1
return best_grade
def _unbinned_fit_global_and_determine_optimum_grade(self,
events,
exposure,
bayes=False):
"""
Provides the ability to find the optimum polynomial grade for
*unbinned* events by fitting the total (all channels) to 0-2
order polynomials and then comparing them via a likelihood ratio test.
:param events: an event list
:param exposure: the exposure per event
:return: polynomial grade
"""
# Fit the sum of all the channels to determine the optimal polynomial
# grade
min_grade = 0
max_grade = 2
log_likelihoods = []
t_start = self._bkg_intervals.start_times
t_stop = self._bkg_intervals.stop_times
log.debug("attempting to find best fit poly with unbinned")
if threeML_config["parallel"]["use_parallel"]:
def worker(grade):
polynomial, log_like = unbinned_polyfit(
events, grade, t_start, t_stop, exposure, bayes=bayes
)
return log_like
client = ParallelClient()
log_likelihoods = client.execute_with_progress_bar(
worker,
list(range(min_grade, max_grade + 1)),
name="Finding best polynomial Order"
)
else:
for grade in trange(min_grade,
max_grade + 1,
desc="Finding best polynomial Order"):
polynomial, log_like = unbinned_polyfit(
events, grade, t_start, t_stop, exposure, bayes=bayes
)
log_likelihoods.append(log_like)
# Found the best one
delta_loglike = np.array(
[2 * (x[0] - x[1])
for x in zip(log_likelihoods[:-1], log_likelihoods[1:])]
)
log.debug(f"log likes {log_likelihoods}")
log.debug(f" delta loglikes {delta_loglike}")
delta_threshold = 9.0
mask = delta_loglike >= delta_threshold
if len(mask.nonzero()[0]) == 0:
# best grade is zero!
best_grade = 0
else:
best_grade = mask.nonzero()[0][-1] + 1
return best_grade
def _fit_polynomials(self, bayes=False):
raise NotImplementedError("this must be implemented in a subclass")
def _unbinned_fit_polynomials(self, bayes=False):
raise NotImplementedError("this must be implemented in a subclass")
[docs] def save_background(self, filename, overwrite=False):
"""
save the background to an HD5F
:param filename:
:return:
"""
# make the file name proper
filename = os.path.splitext(filename)
filename = "%s.h5" % filename[0]
filename_sanitized: Path = sanitize_filename(filename)
# Check that it does not exists
if filename_sanitized.exists():
if overwrite:
try:
filename_sanitized.unlink()
except:
log.error(
f"The file {filename_sanitized} already exists "
"and cannot be removed (maybe you do not have "
"permissions to do so?). "
)
raise IOError()
else:
log.error(f"The file {filename_sanitized} already exists!")
raise IOError()
with h5py.File(filename_sanitized, "w") as store:
# extract the polynomial information and save it
if self._poly_fit_exists:
coeff = np.empty(
(self._n_channels, self._optimal_polynomial_grade + 1))
err = np.empty(
(
self._n_channels,
self._optimal_polynomial_grade + 1,
self._optimal_polynomial_grade + 1,
)
)
for i, poly in enumerate(self._polynomials):
coeff[i, :] = poly.coefficients
err[i, ...] = poly.covariance_matrix
# df_coeff = pd.Series(coeff)
# df_err = pd.Series(err)
else:
log.error("the polynomials have not been fit yet")
raise RuntimeError()
store.create_dataset("coefficients", data=np.array(coeff))
store.create_dataset("covariance", data=np.array(err))
store.attrs["poly_order"] = self._optimal_polynomial_grade
store.attrs["poly_selections"] = list(
zip(
self._bkg_intervals.start_times,
self._bkg_intervals.stop_times,
)
)
store.attrs["unbinned"] = self._unbinned
store.attrs["fit_method"] = self._fit_method_info["fit method"]
log.info(f"Saved fitted background to {filename_sanitized}")
[docs] def restore_fit(self, filename):
filename_sanitized: Path = sanitize_filename(filename)
with h5py.File(filename_sanitized, "r") as store:
coefficients = store["coefficients"][()]
covariance = store["covariance"][()]
self._polynomials = []
# create new polynomials
for i in range(len(coefficients)):
coeff = np.array(coefficients[i])
# make sure we get the right order
# pandas stores the non-needed coeff
# as nans.
coeff = coeff[np.isfinite(coeff)]
cov = covariance[i]
self._polynomials.append(
Polynomial.from_previous_fit(coeff, cov))
metadata = store.attrs
self._optimal_polynomial_grade = metadata["poly_order"]
poly_selections = np.array(metadata["poly_selections"])
self._bkg_intervals = TimeIntervalSet.from_starts_and_stops(
poly_selections[:, 0], poly_selections[:, 1]
)
self._unbinned = metadata["unbinned"]
if self._unbinned:
self._fit_method_info["bin type"] = "unbinned"
else:
self._fit_method_info["bin type"] = "binned"
self._fit_method_info["fit method"] = metadata["fit_method"]
# go thru and count the counts!
log.debug("resest the poly form the file")
self._poly_fit_exists = True
# we must go thru and collect the polynomial exposure and counts
# so that they be extracted if needed
self._bkg_exposure = 0.0
self._bkg_selected_counts = []
for i, time_interval in enumerate(self._bkg_intervals):
t1 = time_interval.start_time
t2 = time_interval.stop_time
self._bkg_selected_counts.append(
self.count_per_channel_over_interval(t1, t2)
)
self._bkg_exposure += self.exposure_over_interval(t1, t2)
self._bkg_selected_counts = np.sum(self._bkg_selected_counts, axis=0)
if self._time_selection_exists:
self.set_active_time_intervals(
*self._time_intervals.to_string().split(","))
[docs] def view_lightcurve(self, start=-10, stop=20.0, dt=1.0, use_binner=False,
use_echans_start=0, use_echans_stop=-1):
raise NotImplementedError("must be implemented in subclass")