from __future__ import division, print_function
from builtins import range, zip
from past.utils import old_div
__author__ = "grburgess"
import collections
import copy
import os
import numpy as np
import pandas as pd
from pandas import HDFStore
import matplotlib.pyplot as plt
from threeML.utils.progress_bar import tqdm, trange
from threeML.config.config import threeML_config
from threeML.exceptions.custom_exceptions import custom_warnings
from threeML.io.file_utils import sanitize_filename
from threeML.io.logging import setup_logger, silence_console_log
from threeML.io.plotting.light_curve_plots import binned_light_curve_plot
from threeML.io.rich_display import display
from threeML.parallel.parallel_client import ParallelClient
from threeML.utils.binner import TemporalBinner
from threeML.utils.time_interval import TimeIntervalSet
from threeML.utils.time_series.polynomial import polyfit, unbinned_polyfit
from threeML.utils.time_series.time_series import TimeSeries
log = setup_logger(__name__)
[docs]class ReducingNumberOfThreads(Warning):
pass
[docs]class ReducingNumberOfSteps(Warning):
pass
[docs]class OverLappingIntervals(RuntimeError):
pass
# find out how many splits we need to make
[docs]def ceildiv(a, b):
return -(-a // b)
[docs]class EventList(TimeSeries):
def __init__(
self,
arrival_times,
measurement,
n_channels,
start_time=None,
stop_time=None,
native_quality=None,
first_channel=0,
ra=None,
dec=None,
mission=None,
instrument=None,
verbose=True,
edges=None,
):
"""
The EventList is a container for event data that is tagged in time and in PHA/energy. It handles event selection,
temporal polynomial fitting, temporal binning, and exposure calculations (in subclasses). Once events are selected
and/or polynomials are fit, the selections can be extracted via a PHAContainer which is can be read by an OGIPLike
instance and translated into a PHA instance.
:param n_channels: Number of detector channels
:param start_time: start time of the event list
:param stop_time: stop time of the event list
:param first_channel: where detchans begin indexing
:param rsp_file: the response file corresponding to these events
:param arrival_times: list of event arrival times
:param measurement: list of event energies or pha channels
:param native_quality: native pha quality flags
:param edges: The histogram boundaries if not specified by a response
:param mission:
:param instrument:
:param verbose:
:param ra:
:param dec:
"""
# pass up to TimeSeries
super(EventList, self).__init__(
start_time,
stop_time,
n_channels,
native_quality,
first_channel,
ra,
dec,
mission,
instrument,
verbose,
edges,
)
self._arrival_times = np.asarray(arrival_times)
self._measurement = np.asarray(measurement)
self._temporal_binner = None
assert (
self._arrival_times.shape[0] == self._measurement.shape[0]
), "Arrival time (%d) and energies (%d) have different shapes" % (
self._arrival_times.shape[0],
self._measurement.shape[0],
)
@property
def n_events(self):
return self._arrival_times.shape[0]
@property
def arrival_times(self):
return self._arrival_times
@property
def measurement(self):
return self._measurement
@property
def bins(self):
if self._temporal_binner is not None:
return self._temporal_binner
else:
raise RuntimeError("This EventList has no binning specified")
[docs] def bin_by_significance(self, start, stop, sigma, mask=None, min_counts=1):
"""
Interface to the temporal binner's significance binning model
:param start: start of the interval to bin on
:param stop: stop of the interval ot bin on
:param sigma: sigma-level of the bins
:param mask: (bool) use the energy mask to decide on ,significance
:param min_counts: minimum number of counts per bin
:return:
"""
if mask is not None:
# create phas to check
phas = np.arange(self._first_channel, self._n_channels)[mask]
this_mask = np.zeros_like(self._arrival_times, dtype=bool)
for channel in phas:
this_mask = np.logical_or(
this_mask, self._measurement == channel)
events = self._arrival_times[this_mask]
else:
events = copy.copy(self._arrival_times)
events = events[np.logical_and(events <= stop, events >= start)]
def tmp_bkg_getter(a, b): return self.get_total_poly_count(a, b, mask)
def tmp_err_getter(a, b): return self.get_total_poly_error(a, b, mask)
# self._temporal_binner.bin_by_significance(tmp_bkg_getter,
# background_error_getter=tmp_err_getter,
# sigma_level=sigma,
# min_counts=min_counts)
self._temporal_binner = TemporalBinner.bin_by_significance(
events,
tmp_bkg_getter,
background_error_getter=tmp_err_getter,
sigma_level=sigma,
min_counts=min_counts,
)
[docs] def bin_by_constant(self, start, stop, dt=1):
"""
Interface to the temporal binner's constant binning mode
:param start: start time of the bins
:param stop: stop time of the bins
:param dt: temporal spacing of the bins
:return:
"""
events = self._arrival_times[
np.logical_and(self._arrival_times >= start,
self._arrival_times <= stop)
]
self._temporal_binner = TemporalBinner.bin_by_constant(events, dt)
[docs] def bin_by_custom(self, start, stop):
"""
Interface to temporal binner's custom bin mode
:param start: start times of the bins
:param stop: stop times of the bins
:return:
"""
self._temporal_binner = TemporalBinner.bin_by_custom(start, stop)
# self._temporal_binner.bin_by_custom(start, stop)
[docs] def bin_by_bayesian_blocks(self, start, stop, p0, use_background=False):
events = self._arrival_times[
np.logical_and(self._arrival_times >= start,
self._arrival_times <= stop)
]
# self._temporal_binner = TemporalBinner(events)
if use_background:
def integral_background(
t): return self.get_total_poly_count(start, t)
self._temporal_binner = TemporalBinner.bin_by_bayesian_blocks(
events, p0, bkg_integral_distribution=integral_background
)
else:
self._temporal_binner = TemporalBinner.bin_by_bayesian_blocks(
events, p0)
[docs] def view_lightcurve(self,
start: float = -10,
stop: float = 20.0,
dt: float = 1.0,
use_binner: bool = False,
use_echans_start: int = 0,
use_echans_stop: int = -1
) -> plt.Figure:
# type: (float, float, float, bool) -> None
"""
:param start:
:param stop:
:param dt:
:param use_binner:
"""
# validate echan mask input
if not isinstance(use_echans_start, int):
log.error(f"The use_echans_start variable must be a integer."
f" Input is {use_echans_start}.")
raise AssertionError()
if not (use_echans_start > (-1)*(self.n_channels)-1 and
use_echans_start < (self.n_channels)):
log.error(f"The use_echans_start variable must be"
f"between {(-1)*(self.n_channels)} and {self.n_channels-1}."
f" Input is {use_echans_start}.")
raise AssertionError()
if not isinstance(use_echans_stop, int):
log.error(f"The use_echans_stop variable must be a integer."
f" Input is {use_echans_stop}.")
raise AssertionError()
if not (use_echans_stop > (-1)*(self.n_channels)-1 and
use_echans_stop < (self.n_channels)):
log.error(f"The use_echans_stop variable must be"
f"between {(-1)*(self.n_channels)} and {self.n_channels-1}."
f" Input is {use_echans_stop}.")
raise AssertionError()
if use_echans_start < 0:
use_echans_start = self.n_channels+use_echans_start
if use_echans_stop < 0:
use_echans_stop = self.n_channels+use_echans_stop
if not use_echans_stop >= use_echans_start:
log.error(f"The use_echans_stop variable must be larger"
f" or equal than the use_echans_start variable"
f" Input is use_echans_start: {use_echans_start}"
f" > use_echans_stop: {use_echans_stop}")
raise AssertionError()
# get echan bins
echan_bins = np.arange(use_echans_start, use_echans_stop+2, 1)-0.5
if use_binner:
# we will use the binner object to bin the
# light curve and ignore the normal linear binning
bins = self.bins.time_edges
# perhaps we want to look a little before or after the binner
if start < bins[0]:
pre_bins = np.arange(start, bins[0], dt).tolist()[:-1]
pre_bins.extend(bins)
bins = pre_bins
if stop > bins[-1]:
post_bins = np.arange(bins[-1], stop, dt)
bins.extend(post_bins[1:])
else:
# otherwise, just use regular linear binning
bins = np.arange(start, stop + dt, dt)
cnts, bins, _ = np.histogram2d(self.arrival_times, self.measurement,
bins=(bins, echan_bins))
cnts = np.sum(cnts, axis=1)
time_bins = np.array([[bins[i], bins[i + 1]]
for i in range(len(bins) - 1)])
# now we want to get the estimated background from the polynomial fit
if self.poly_fit_exists:
# we will store the bkg rate for each time bin
bkg = []
for j, tb in enumerate(time_bins):
# zero out the bkg
tmpbkg = 0.0
# sum up the counts over this interval
for poly in self.polynomials[use_echans_start:use_echans_stop+1]:
tmpbkg += poly.integral(tb[0], tb[1])
# capture the bkg *rate*
# Divide the background counts by the time intervall
# We do not use the dead time corrected exposure here
# because the integration is done over the full time bin
# and not the dead time corrected exposure
bkg.append(old_div(tmpbkg, tb[1]-tb[0]))
else:
bkg = None
width = []
for j, tb in enumerate(time_bins):
# capture the exposure
this_width = self.exposure_over_interval(tb[0], tb[1])
width.append(this_width)
width = np.array(width)
# pass all this to the light curve plotter
if self.time_intervals is not None:
selection = self.time_intervals.bin_stack
else:
selection = None
if self.bkg_intervals is not None:
bkg_selection = self.bkg_intervals.bin_stack
else:
bkg_selection = None
return binned_light_curve_plot(
time_bins=time_bins,
cnts=cnts,
width=width,
bkg=bkg,
selection=selection,
bkg_selections=bkg_selection,
)
[docs] def counts_over_interval(self, start, stop):
"""
return the number of counts in the selected interval
:param start: start of interval
:param stop: stop of interval
:return:
"""
# this will be a boolean list and the sum will be the
# number of events
return self._select_events(start, stop).sum()
[docs] def count_per_channel_over_interval(self, start, stop):
channels = list(
range(self._first_channel, self._n_channels + self._first_channel)
)
counts_per_channel = np.zeros(len(channels))
selection = self._select_events(start, stop)
for i, channel in enumerate(channels):
channel_mask = self._measurement[selection] == channel
counts_per_channel[i] += channel_mask.sum()
return counts_per_channel
def _select_events(self, start, stop):
"""
return an index of the selected events
:param start: start time
:param stop: stop time
:return:
"""
return np.logical_and(start <= self._arrival_times, self._arrival_times <= stop)
def _fit_polynomials(self, bayes=False):
"""
Binned fit to each channel. Sets the polynomial array that will be used to compute
counts over an interval
:return:
"""
self._poly_fit_exists = True
# Select all the events that are in the background regions
# and make a mask
all_bkg_masks = []
for selection in self._bkg_intervals:
all_bkg_masks.append(
np.logical_and(
self._arrival_times >= selection.start_time,
self._arrival_times <= selection.stop_time,
)
)
poly_mask = all_bkg_masks[0]
# If there are multiple masks:
if len(all_bkg_masks) > 1:
for mask in all_bkg_masks[1:]:
poly_mask = np.logical_or(poly_mask, mask)
# Select the all the events in the poly selections
# We only need to do this once
total_poly_events = self._arrival_times[poly_mask]
# For the channel energies we will need to down select again.
# We can go ahead and do this to avoid repeated computations
total_poly_energies = self._measurement[poly_mask]
# This calculation removes the unselected portion of the light curve
# so that we are not fitting zero counts. It will be used in the channel calculations
# as well
bin_width = 1.0 # seconds
these_bins = np.arange(self._start_time, self._stop_time, bin_width)
cnts, bins = np.histogram(total_poly_events, bins=these_bins)
# Find the mean time of the bins and calculate the exposure in each bin
mean_time = []
exposure_per_bin = []
for i in range(len(bins) - 1):
m = np.mean((bins[i], bins[i + 1]))
mean_time.append(m)
exposure_per_bin.append(
self.exposure_over_interval(bins[i], bins[i + 1]))
mean_time = np.array(mean_time)
exposure_per_bin = np.array(exposure_per_bin)
# Remove bins with zero counts
all_non_zero_mask = []
for selection in self._bkg_intervals:
all_non_zero_mask.append(
np.logical_and(
mean_time >= selection.start_time, mean_time <= selection.stop_time
)
)
non_zero_mask = all_non_zero_mask[0]
if len(all_non_zero_mask) > 1:
for mask in all_non_zero_mask[1:]:
non_zero_mask = np.logical_or(mask, non_zero_mask)
# Now we will find the the best poly order unless the use specified one
# The total cnts (over channels) is binned to .1 sec intervals
if self._user_poly_order == -1:
self._optimal_polynomial_grade = (
self._fit_global_and_determine_optimum_grade(
cnts[non_zero_mask],
mean_time[non_zero_mask],
exposure_per_bin[non_zero_mask],
bayes=bayes
)
)
log.info(
"Auto-determined polynomial order: %d" % self._optimal_polynomial_grade
)
else:
self._optimal_polynomial_grade = self._user_poly_order
channels = list(
range(self._first_channel, self._n_channels + self._first_channel)
)
if threeML_config["parallel"]["use_parallel"]:
def worker(channel):
channel_mask = total_poly_energies == channel
# Mask background events and current channel
# poly_chan_mask = np.logical_and(poly_mask, channel_mask)
# Select the masked events
current_events = total_poly_events[channel_mask]
cnts, bins = np.histogram(current_events, bins=these_bins)
polynomial, _ = polyfit(
mean_time[non_zero_mask],
cnts[non_zero_mask],
self._optimal_polynomial_grade,
exposure_per_bin[non_zero_mask],
bayes=bayes
)
return polynomial
client = ParallelClient()
polynomials = client.execute_with_progress_bar(
worker, channels, name=f"Fitting {self._instrument} background")
else:
polynomials = []
for channel in tqdm(channels, desc=f"Fitting {self._instrument} background"):
channel_mask = total_poly_energies == channel
# Mask background events and current channel
# poly_chan_mask = np.logical_and(poly_mask, channel_mask)
# Select the masked events
current_events = total_poly_events[channel_mask]
# now bin the selected channel counts
cnts, bins = np.histogram(current_events, bins=these_bins)
# Put data to fit in an x vector and y vector
polynomial, _ = polyfit(
mean_time[non_zero_mask],
cnts[non_zero_mask],
self._optimal_polynomial_grade,
exposure_per_bin[non_zero_mask],
bayes=bayes
)
polynomials.append(polynomial)
# We are now ready to return the polynomials
self._polynomials = polynomials
def _unbinned_fit_polynomials(self, bayes=False):
self._poly_fit_exists = True
# Select all the events that are in the background regions
# and make a mask
all_bkg_masks = []
total_duration = 0.0
poly_exposure = 0
for selection in self._bkg_intervals:
total_duration += selection.duration
poly_exposure += self.exposure_over_interval(
selection.start_time, selection.stop_time
)
all_bkg_masks.append(
np.logical_and(
self._arrival_times >= selection.start_time,
self._arrival_times <= selection.stop_time,
)
)
poly_mask = all_bkg_masks[0]
# If there are multiple masks:
if len(all_bkg_masks) > 1:
for mask in all_bkg_masks[1:]:
poly_mask = np.logical_or(poly_mask, mask)
# Select the all the events in the poly selections
# We only need to do this once
total_poly_events = self._arrival_times[poly_mask]
# For the channel energies we will need to down select again.
# We can go ahead and do this to avoid repeated computations
total_poly_energies = self._measurement[poly_mask]
# Now we will find the the best poly order unless the use specified one
# The total cnts (over channels) is binned to .1 sec intervals
if self._user_poly_order == -1:
self._optimal_polynomial_grade = (
self._unbinned_fit_global_and_determine_optimum_grade(
total_poly_events, poly_exposure, bayes=bayes
)
)
log.info(
"Auto-determined polynomial order: "
f"{self._optimal_polynomial_grade}"
)
else:
self._optimal_polynomial_grade = self._user_poly_order
channels = list(
range(self._first_channel, self._n_channels + self._first_channel)
)
# Check whether we are parallelizing or not
t_start = self._bkg_intervals.start_times
t_stop = self._bkg_intervals.stop_times
if threeML_config["parallel"]["use_parallel"]:
def worker(channel):
channel_mask = total_poly_energies == channel
# Mask background events and current channel
# poly_chan_mask = np.logical_and(poly_mask, channel_mask)
# Select the masked events
current_events = total_poly_events[channel_mask]
polynomial, _ = unbinned_polyfit(
current_events,
self._optimal_polynomial_grade,
t_start,
t_stop,
poly_exposure,
bayes=bayes
)
return polynomial
client = ParallelClient()
polynomials = client.execute_with_progress_bar(
worker, channels, name=f"Fitting {self._instrument} background")
else:
polynomials = []
for channel in tqdm(channels, desc=f"Fitting {self._instrument} background"):
channel_mask = total_poly_energies == channel
# Mask background events and current channel
# poly_chan_mask = np.logical_and(poly_mask, channel_mask)
# Select the masked events
current_events = total_poly_events[channel_mask]
polynomial, _ = unbinned_polyfit(
current_events,
self._optimal_polynomial_grade,
t_start,
t_stop,
poly_exposure,
bayes=bayes
)
polynomials.append(polynomial)
# We are now ready to return the polynomials
self._polynomials = polynomials
[docs] def set_active_time_intervals(self, *args):
"""Set the time interval(s) to be used during the analysis.
Specified as 'tmin-tmax'. Intervals are in seconds. Example:
set_active_time_intervals("0.0-10.0")
which will set the energy range 0-10. seconds.
"""
self._time_selection_exists = True
interval_masks = []
time_intervals = TimeIntervalSet.from_strings(*args)
time_intervals.merge_intersecting_intervals(in_place=True)
for interval in time_intervals:
tmin = interval.start_time
tmax = interval.stop_time
mask = self._select_events(tmin, tmax)
interval_masks.append(mask)
self._time_intervals = time_intervals
time_mask = interval_masks[0]
if len(interval_masks) > 1:
for mask in interval_masks[1:]:
time_mask = np.logical_or(time_mask, mask)
# calulate exposure and deadtime
exposure = 0
dead_time = 0
for interval in time_intervals:
tmin = interval.start_time
tmax = interval.stop_time
this_exposure = self.exposure_over_interval(tmin, tmax)
# check that the exposure is not larger than the total time
if this_exposure > (tmax-tmin):
log.error("The exposure in the active time bin is larger "
"than the total active time. "
"Something must be wrong!")
raise RuntimeError()
exposure += this_exposure
dead_time += (tmax-tmin)-this_exposure
self._exposure = exposure
self._active_dead_time = dead_time
tmp_counts = [] # Temporary list to hold the total counts per chan
for chan in range(self._first_channel, self._n_channels + self._first_channel):
channel_mask = self._measurement == chan
counts_mask = np.logical_and(channel_mask, time_mask)
total_counts = len(self._arrival_times[counts_mask])
tmp_counts.append(total_counts)
self._counts = np.array(tmp_counts)
tmp_counts = []
tmp_err = [] # Temporary list to hold the err counts per chan
if self._poly_fit_exists:
if not self._poly_fit_exists:
raise RuntimeError(
"A polynomial fit to the channels does not exist!")
for chan in range(self._n_channels):
total_counts = 0
counts_err = 0
for tmin, tmax in zip(
self._time_intervals.start_times, self._time_intervals.stop_times
):
# Now integrate the appropriate background polynomial
total_counts += self._polynomials[chan].integral(
tmin, tmax)
counts_err += (
self._polynomials[chan].integral_error(tmin, tmax)
) ** 2
tmp_counts.append(total_counts)
tmp_err.append(np.sqrt(counts_err))
self._poly_counts = np.array(tmp_counts)
self._poly_count_err = np.array(tmp_err)
# apply the dead time correction to the background counts
# and errors
corr = self._exposure/(self._active_dead_time+self._exposure)
self._poly_counts *= corr
self._poly_count_err *= corr
[docs]class EventListWithDeadTime(EventList):
def __init__(
self,
arrival_times,
measurement,
n_channels,
start_time=None,
stop_time=None,
dead_time=None,
first_channel=0,
quality=None,
ra=None,
dec=None,
mission=None,
instrument=None,
verbose=True,
edges=None,
):
"""
An EventList where the exposure is calculated via and array of dead times per event. Summing these dead times over an
interval => live time = interval - dead time
:param n_channels: Number of detector channels
:param start_time: start time of the event list
:param stop_time: stop time of the event list
:param dead_time: an array of deadtime per event
:param first_channel: where detchans begin indexing
:param quality: native pha quality flags
:param rsp_file: the response file corresponding to these events
:param arrival_times: list of event arrival times
:param measurement: list of event energies or pha channels
:param edges: The histogram boundaries if not specified by a response
:param mission: mission name
:param instrument: instrument name
:param verbose: verbose level
:param ra:
:param dec:
"""
super(EventListWithDeadTime, self).__init__(
arrival_times,
measurement,
n_channels,
start_time,
stop_time,
quality,
first_channel,
ra,
dec,
mission,
instrument,
verbose,
edges,
)
if dead_time is not None:
self._dead_time = np.asarray(dead_time)
assert (
self._arrival_times.shape[0] == self._dead_time.shape[0]
), "Arrival time (%d) and Dead Time (%d) have different shapes" % (
self._arrival_times.shape[0],
self._dead_time.shape[0],
)
else:
self._dead_time = None
[docs] def exposure_over_interval(self, start, stop):
"""
calculate the exposure over the given interval
:param start: start time
:param stop: stop time
:return:
"""
mask = self._select_events(start, stop)
if self._dead_time is not None:
interval_deadtime = (self._dead_time[mask]).sum()
else:
interval_deadtime = 0
return (stop - start) - interval_deadtime
[docs]class EventListWithDeadTimeFraction(EventList):
def __init__(
self,
arrival_times,
measurement,
n_channels,
start_time=None,
stop_time=None,
dead_time_fraction=None,
first_channel=0,
quality=None,
ra=None,
dec=None,
mission=None,
instrument=None,
verbose=True,
edges=None,
):
"""
An EventList where the exposure is calculated via and array dead time fractions per event .
Summing these dead times over an
interval => live time = interval - dead time
:param n_channels: Number of detector channels
:param start_time: start time of the event list
:param stop_time: stop time of the event list
:param dead_time: an array of deadtime fraction
:param first_channel: where detchans begin indexing
:param quality: native pha quality flags
:param rsp_file: the response file corresponding to these events
:param arrival_times: list of event arrival times
:param measurement: list of event energies or pha channels
:param edges: The histogram boundaries if not specified by a response
:param mission: mission name
:param instrument: instrument name
:param verbose: verbose level
:param ra:
:param dec:
"""
super(EventListWithDeadTimeFraction, self).__init__(
arrival_times,
measurement,
n_channels,
start_time,
stop_time,
quality,
first_channel,
ra,
dec,
mission,
instrument,
verbose,
edges,
)
if dead_time_fraction is not None:
self._dead_time_fraction = np.asarray(dead_time_fraction)
assert (
self._arrival_times.shape[0] == self._dead_time_fraction.shape[0]
), "Arrival time (%d) and Dead Time (%d) have different shapes" % (
self._arrival_times.shape[0],
self._dead_time_fraction.shape[0],
)
else:
self._dead_time_fraction = None
[docs] def exposure_over_interval(self, start, stop):
"""
calculate the exposure over the given interval
:param start: start time
:param stop: stop time
:return:
"""
mask = self._select_events(start, stop)
interval = stop - start
if self._dead_time_fraction is not None:
interval_deadtime = (
self._dead_time_fraction[mask]).mean() * interval
else:
interval_deadtime = 0
return interval - interval_deadtime
[docs]class EventListWithLiveTime(EventList):
def __init__(
self,
arrival_times,
measurement,
n_channels,
live_time,
live_time_starts,
live_time_stops,
start_time=None,
stop_time=None,
quality=None,
first_channel=0,
rsp_file=None,
ra=None,
dec=None,
mission=None,
instrument=None,
verbose=True,
edges=None,
):
"""
An EventList where the exposure is calculated via and array of livetimes per interval.
:param arrival_times: list of event arrival times
:param measurement: list of event energies or pha channels
:param live_time: array of livetime fractions
:param live_time_starts: start of livetime fraction bins
:param live_time_stops: stop of livetime fraction bins
:param mission: mission name
:param instrument: instrument name
:param n_channels: Number of detector channels
:param start_time: start time of the event list
:param stop_time: stop time of the event list
:param quality: native pha quality flags
:param first_channel: where detchans begin indexing
:param edges: The histogram boundaries if not specified by a response
:param rsp_file: the response file corresponding to these events
:param verbose:
:param ra:
:param dec:
"""
assert len(live_time) == len(
live_time_starts
), "Live time fraction (%d) and live time start (%d) have different shapes" % (
len(live_time),
len(live_time_starts),
)
assert len(live_time) == len(
live_time_stops
), "Live time fraction (%d) and live time stop (%d) have different shapes" % (
len(live_time),
len(live_time_stops),
)
super(EventListWithLiveTime, self).__init__(
arrival_times,
measurement,
n_channels,
start_time,
stop_time,
quality,
first_channel,
ra,
dec,
mission,
instrument,
verbose,
)
self._live_time = np.asarray(live_time)
self._live_time_starts = np.asarray(live_time_starts)
self._live_time_stops = np.asarray(live_time_stops)
[docs] def exposure_over_interval(self, start, stop):
"""
:param start: start time of interval
:param stop: stop time of interval
:return: exposure
"""
# First see if the interval is completely contained inside a
# livetime interval. In this case we only compute that.
# Note that this is explicitly on the open boundary of the
# intervals because the closed boundary is covered in the
# next step
inside_idx = np.logical_and(
self._live_time_starts < start, stop < self._live_time_stops
)
# see if it contains elements
if self._live_time[inside_idx].size > 0:
# we want to take a fraction of the live time covered
dt = self._live_time_stops[inside_idx] - \
self._live_time_starts[inside_idx]
fraction = old_div((stop - start), dt)
total_livetime = self._live_time[inside_idx] * fraction
else:
# First we get the live time of bins that are fully contained in the given interval
# We now go for the closed interval because it is possible to have overlap with other intervals
# when a closed interval exists... but not when there is only an open interval
full_inclusion_idx = np.logical_and(
start <= self._live_time_starts, stop >= self._live_time_stops
)
full_inclusion_livetime = self._live_time[full_inclusion_idx].sum()
# Now we get the fractional parts on the left and right
# Get the fractional part of the left bin
left_remainder_idx = np.logical_and(
start <= self._live_time_stops, self._live_time_starts <= start
)
dt = (
self._live_time_stops[left_remainder_idx]
- self._live_time_starts[left_remainder_idx]
)
# we want the distance to the stop of this bin
distance_from_next_bin = self._live_time_stops[left_remainder_idx] - start
fraction = old_div(distance_from_next_bin, dt)
left_fractional_livetime = self._live_time[left_remainder_idx] * fraction
# Get the fractional part of the right bin
right_remainder_idx = np.logical_and(
self._live_time_starts <= stop, stop <= self._live_time_stops
)
dt = (
self._live_time_stops[right_remainder_idx]
- self._live_time_starts[right_remainder_idx]
)
# we want the distance from the last full bin
distance_from_next_bin = stop - \
self._live_time_starts[right_remainder_idx]
fraction = old_div(distance_from_next_bin, dt)
right_fractional_livetime = self._live_time[right_remainder_idx] * fraction
# sum up all the live time
total_livetime = (
full_inclusion_livetime
+ left_fractional_livetime
+ right_fractional_livetime
)
# the sum at the end converts all the arrays to floats
return total_livetime.sum()