Source code for threeML.utils.time_series.binned_spectrum_series

from __future__ import division, print_function

from builtins import range, zip

import numpy as np
import matplotlib.pyplot as plt
from past.utils import old_div

from threeML.config.config import threeML_config
from threeML.io.logging import setup_logger, silence_console_log
from threeML.io.plotting.light_curve_plots import binned_light_curve_plot
from threeML.parallel.parallel_client import ParallelClient
from threeML.utils.progress_bar import tqdm
from threeML.utils.spectrum.binned_spectrum_set import BinnedSpectrumSet
from threeML.utils.time_interval import TimeIntervalSet
from threeML.utils.time_series.polynomial import polyfit
from threeML.utils.time_series.time_series import TimeSeries

log = setup_logger(__name__)


[docs] class BinnedSpectrumSeries(TimeSeries): def __init__( self, binned_spectrum_set, first_channel=1, ra=None, dec=None, mission=None, instrument=None, verbose=True, ): """ :param binned_spectrum_set: :param first_channel: :param rsp_file: :param ra: :param dec: :param mission: :param instrument: :param verbose: """ # pass up to TimeSeries super(BinnedSpectrumSeries, self).__init__( binned_spectrum_set.time_intervals.absolute_start, binned_spectrum_set.time_intervals.absolute_stop, binned_spectrum_set.n_channels, binned_spectrum_set.quality_per_bin[0], first_channel, ra, dec, mission, instrument, verbose, binned_spectrum_set._binned_spectrum_list[0].edges, ) self._binned_spectrum_set = binned_spectrum_set @property def bins(self): """ the time bins of the spectrum set :return: TimeIntervalSet """ return self._binned_spectrum_set.time_intervals @property def binned_spectrum_set(self): """ returns the spectrum set :return: binned_spectrum_set """ return self._binned_spectrum_set
[docs] def view_lightcurve(self, start: float = -10, stop: float = 20.0, dt: float = 1.0, use_binner: bool = False, use_echans_start: int = 0, use_echans_stop: int = -1, with_dead_time=True ) -> plt.Figure: # type: (float, float, float, bool) -> None """ :param start: :param stop: :param dt: :param use_binner: """ # validate echan mask input if not isinstance(use_echans_start, int): log.error(f"The use_echans_start variable must be a integer." f" Input is {use_echans_start}.") raise AssertionError() if not np.abs(use_echans_start) < self.n_channels: log.error(f"The use_echans_start variable must be" f"between {(-1)*(self.n_channels-1)} and {self.n_channels-1}." f" Input is {use_echans_start}.") raise AssertionError() if not isinstance(use_echans_stop, int): log.error(f"The use_echans_stop variable must be a integer." f" Input is {use_echans_stop}.") raise AssertionError() if not np.abs(use_echans_stop) < self.n_channels: log.error(f"The use_echans_stop variable must be" f"between {(-1)*(self.n_channels-1)} and {self.n_channels-1}." f" Input is {use_echans_start}.") raise AssertionError() if use_echans_start < 0: use_echans_start = self.n_channels+use_echans_start if use_echans_stop < 0: use_echans_stop = self.n_channels+use_echans_stop if not use_echans_stop >= use_echans_start: log.error(f"The use_echans_stop variable must be larger" f" or equal than the use_echans_start variable" f" Input is use_echans_start: {use_echans_start}" f" > use_echans_stop: {use_echans_stop}") raise AssertionError() # git a set of bins containing the intervals bins = self._binned_spectrum_set.time_intervals.containing_interval( start, stop ) # type: TimeIntervalSet cnts = [] width = [] width_dead = [] log.debug(f"viewing light curve with dead time: {with_dead_time}") for time_bin in bins: cnts.append( np.sum( self.count_per_channel_over_interval( time_bin.start_time, time_bin.stop_time )[use_echans_start:use_echans_stop+1] ) ) # use the actual exposure width_dead.append(self.exposure_over_interval( time_bin.start_time, time_bin.stop_time)) # just use the "defined edges" width.append(time_bin.duration) # now we want to get the estimated background from the polynomial fit if self.poly_fit_exists: bkg = [] for j, time_bin in enumerate(bins): tmpbkg = 0.0 for poly in self.polynomials[use_echans_start:use_echans_stop+1]: tmpbkg += poly.integral(time_bin.start_time, time_bin.stop_time) bkg.append(tmpbkg/width[j]) else: bkg = None # pass all this to the light curve plotter if self.time_intervals is not None: selection = self.time_intervals.bin_stack else: selection = None if self.bkg_intervals is not None: bkg_selection = self.bkg_intervals.bin_stack else: bkg_selection = None # plot the light curve fig = binned_light_curve_plot( time_bins=bins.bin_stack, cnts=np.array(cnts), width=np.array(width_dead), bkg=bkg, selection=selection, bkg_selections=bkg_selection, ) return fig
[docs] def counts_over_interval(self, start, stop): """ return the number of counts in the selected interval :param start: start of interval :param stop: stop of interval :return: """ # this will be a boolean list and the sum will be the # number of events bins = self._select_bins(start, stop) total_counts = 0 for idx in np.where(bins)[0]: # sum over channels because we just want the total counts total_counts += self._binned_spectrum_set[idx].counts.sum() return total_counts
[docs] def count_per_channel_over_interval(self, start, stop): """ return the number of counts in the selected interval :param start: start of interval :param stop: stop of interval :return: """ # this will be a boolean list and the sum will be the # number of events bins = self._select_bins(start, stop) total_counts = np.zeros(self._n_channels) for idx in np.where(bins)[0]: # don't sum over channels because we want the spectrum total_counts += self._binned_spectrum_set[idx].counts return total_counts
def _select_bins(self, start, stop): """ return an index of the selected bins :param start: start time :param stop: stop time :return: int indices """ return self._binned_spectrum_set.time_intervals.containing_interval( start, stop, as_mask=True ) def _adjust_to_true_intervals(self, time_intervals): """ adjusts time selections to those of the Binned spectrum set :param time_intervals: a time interval set :return: an adjusted time interval set """ # get all the starts and stops from these time intervals true_starts = np.array( self._binned_spectrum_set.time_intervals.start_times) true_stops = np.array( self._binned_spectrum_set.time_intervals.stop_times) new_starts = [] new_stops = [] # now go thru all the intervals for interval in time_intervals: # find where the suggest intervals hits the true interval # searchsorted is fast, but is not returing what we want # we want the actaul values of the bins closest to the input # idx = np.searchsorted(true_starts, interval.start_time,side) idx = (np.abs(true_starts - interval.start_time)).argmin() new_start = true_starts[idx] # idx = np.searchsorted(true_stops, interval.stop_time) idx = (np.abs(true_stops - interval.stop_time)).argmin() new_stop = true_stops[idx] new_starts.append(new_start) new_stops.append(new_stop) # alright, now we can make appropriate time intervals return TimeIntervalSet.from_starts_and_stops(new_starts, new_stops) def _fit_polynomials(self, bayes=False): """ fits a polynomial to all channels over the input time intervals :param fit_intervals: str input intervals :return: """ # mark that we have fit a poly now self._poly_fit_exists = True # we need to adjust the selection to the true intervals of # the time-binned spectra tmp_poly_intervals = self._bkg_intervals bkg_intervals = self._adjust_to_true_intervals(tmp_poly_intervals) self._bkg_intervals = bkg_intervals # now lets get all the counts, exposure and midpoints for the # selection selected_counts = [] selected_exposure = [] selected_midpoints = [] for selection in bkg_intervals: # get the mask of these bins mask = self._select_bins(selection.start_time, selection.stop_time) # the counts will be (time, channel) here, # so the mask is selecting time. # a sum along axis=0 is a sum in time, while axis=1 is a sum in energy selected_counts.extend( self._binned_spectrum_set.counts_per_bin[mask]) selected_exposure.extend( self._binned_spectrum_set.exposure_per_bin[mask]) selected_midpoints.extend( self._binned_spectrum_set.time_intervals.mid_points[mask] ) selected_counts = np.array(selected_counts) selected_midpoints = np.array(selected_midpoints) selected_exposure = np.array(selected_exposure) # Now we will find the the best poly order unless the use specified one # The total cnts (over channels) is binned if self._user_poly_order == -1: self._optimal_polynomial_grade = ( self._fit_global_and_determine_optimum_grade( selected_counts.sum(axis=1), selected_midpoints, selected_exposure, bayes=bayes, ) ) log.info( "Auto-determined polynomial order: %d" % self._optimal_polynomial_grade ) else: self._optimal_polynomial_grade = self._user_poly_order if threeML_config["parallel"]["use_parallel"]: def worker(counts): with silence_console_log(): polynomial, _ = polyfit( selected_midpoints, counts, self._optimal_polynomial_grade, selected_exposure, bayes=bayes, ) return polynomial client = ParallelClient() polynomials = client.execute_with_progress_bar( worker, selected_counts.T, name=f"Fitting {self._instrument} background") else: polynomials = [] # now fit the light curve of each channel # and save the estimated polynomial for counts in tqdm( selected_counts.T, desc=f"Fitting {self._instrument} background" ): with silence_console_log(): polynomial, _ = polyfit( selected_midpoints, counts, self._optimal_polynomial_grade, selected_exposure, bayes=bayes, ) polynomials.append(polynomial) self._polynomials = polynomials
[docs] def set_active_time_intervals(self, *args): """ Set the time interval(s) to be used during the analysis. Specified as 'tmin-tmax'. Intervals are in seconds. Example: set_active_time_intervals("0.0-10.0") which will set the time range 0-10. seconds. """ # mark that we now have a time selection self._time_selection_exists = True # lets build a time interval set from the selections # and then merge intersecting intervals time_intervals = TimeIntervalSet.from_strings(*args) time_intervals.merge_intersecting_intervals(in_place=True) # lets adjust the time intervals to the actual ones since they are prebinned time_intervals = self._adjust_to_true_intervals(time_intervals) # start out with no time bins selection all_idx = np.zeros( len(self._binned_spectrum_set.time_intervals), dtype=bool) # now we need to sum up the counts and total time total_time = 0 for interval in time_intervals: # the select bins method is called. # since we are sure that the interval bounds # are aligned with the true ones, we do not care if # it is inner or outer all_idx = np.logical_or( all_idx, self._select_bins( interval.start_time, interval.stop_time) ) total_time += interval.duration # sum along the time axis self._counts = self._binned_spectrum_set.counts_per_bin[all_idx].sum( axis=0) # the selected time intervals self._time_intervals = time_intervals tmp_counts = [] tmp_err = [] # Temporary list to hold the err counts per chan if self._poly_fit_exists: if not self._poly_fit_exists: raise RuntimeError( "A polynomial fit to the channels does not exist!") for chan in range(self._n_channels): total_counts = 0 counts_err = 0 for tmin, tmax in zip( self._time_intervals.start_times, self._time_intervals.stop_times ): # Now integrate the appropriate background polynomial total_counts += self._polynomials[chan].integral( tmin, tmax) counts_err += ( self._polynomials[chan].integral_error(tmin, tmax) ) ** 2 tmp_counts.append(total_counts) tmp_err.append(np.sqrt(counts_err)) self._poly_counts = np.array(tmp_counts) self._poly_count_err = np.array(tmp_err) self._exposure = self._binned_spectrum_set.exposure_per_bin[all_idx].sum( ) self._active_dead_time = total_time - self._exposure
[docs] def dead_time_over_interval(self, start, stop): """ computer the dead time over the interval """ mask = self._select_bins(start, stop) start = np.array(self.bins.starts)[mask][0] stop = np.array(self.bins.stops)[mask][-1] return (stop - start) - self.exposure_over_interval(start, stop)
[docs] def exposure_over_interval(self, start, stop): """ calculate the exposure over the given interval :param start: start time :param stop: stop time :return: """ mask = self._select_bins(start, stop) return self._binned_spectrum_set.exposure_per_bin[mask].sum()