Source code for threeML.utils.binner

import numba as nb
import numpy as np
from threeML.config.config import threeML_config
from threeML.io.logging import setup_logger
from threeML.utils.bayesian_blocks import (bayesian_blocks,
                                           bayesian_blocks_not_unique)
from threeML.utils.numba_utils import VectorFloat64, VectorInt64
from threeML.utils.progress_bar import tqdm
from threeML.utils.statistics.stats_tools import Significance
from threeML.utils.time_interval import TimeIntervalSet

log = setup_logger(__name__)


[docs] class NotEnoughData(RuntimeError): pass
[docs] class Rebinner: """ A class to rebin vectors keeping a minimum value per bin. It supports array with a mask, so that elements excluded through the mask will not be considered for the rebinning """ def __init__(self, vector_to_rebin_on, min_value_per_bin, mask=None): # Basic check that it is possible to do what we have been requested to do total = np.sum(vector_to_rebin_on) if total < min_value_per_bin: log.error( "Vector total is %s, cannot rebin at %s per bin" % (total, min_value_per_bin) ) raise NotEnoughData() # Check if we have a mask, if not prepare a empty one if mask is not None: mask = np.array(mask, bool) assert mask.shape[0] == len(vector_to_rebin_on), ( "The provided mask must have the same number of " "elements as the vector to rebin on" ) else: mask = np.ones_like(vector_to_rebin_on, dtype=bool) self._mask = mask # Rebin taking the mask into account self._starts = [] self._stops = [] self._grouping = np.zeros_like(vector_to_rebin_on) n = 0 bin_open = False n_grouped_bins = 0 for index, b in enumerate(vector_to_rebin_on): if not mask[index]: # This element is excluded by the mask if not bin_open: # Do nothing continue else: # We need to close the bin here self._stops.append(index) n = 0 bin_open = False # If we have grouped more than one bin if n_grouped_bins > 1: # group all these bins self._grouping[index - n_grouped_bins + 1 : index] = -1 self._grouping[index] = 1 # reset the number of bins in this group n_grouped_bins = 0 else: # This element is included by the mask if not bin_open: # Open a new bin bin_open = True self._starts.append(index) n = 0 # Add the current value to the open bin n += b n_grouped_bins += 1 # If we are beyond the requested value, close the bin if n >= min_value_per_bin: self._stops.append(index + 1) n = 0 bin_open = False # If we have grouped more than one bin if n_grouped_bins > 1: # group all these bins self._grouping[index - n_grouped_bins + 1 : index] = -1 self._grouping[index] = 1 # reset the number of bins in this group n_grouped_bins = 0 # At the end of the loop, see if we left a bin open, if we did, close it if bin_open: self._stops.append(len(vector_to_rebin_on)) assert len(self._starts) == len(self._stops), ( "This is a bug: the starts and stops of the bins are not in " "equal number" ) self._min_value_per_bin = min_value_per_bin self._n_bins = len(self._starts) self._starts = np.array(self._starts) self._stops = np.array(self._stops) log.debug( f"Vector was rebinned from {len(vector_to_rebin_on)} to {self._n_bins}" ) @property def n_bins(self): """ Returns the number of bins defined. :return: """ return self._n_bins @property def grouping(self): return self._grouping
[docs] def rebin(self, *vectors): rebinned_vectors = [] for vector in vectors: assert len(vector) == len(self._mask), ( "The vector to rebin must have the same number of elements of the" "original (not-rebinned) vector" ) # Transform in array because we need to use the mask if vector.dtype == np.int64: rebinned_vectors.append( _rebin_vector_int( vector, self._starts, self._stops, self._mask, self._n_bins ) ) else: rebinned_vectors.append( _rebin_vector_float( vector, self._starts, self._stops, self._mask, self._n_bins ) ) return rebinned_vectors
[docs] def rebin_errors(self, *vectors): """ Rebin errors by summing the squares Args: *vectors: Returns: array of rebinned errors """ rebinned_vectors = [] for vector in vectors: # type: np.ndarray[np.ndarray] assert len(vector) == len(self._mask), ( "The vector to rebin must have the same number of elements of the" "original (not-rebinned) vector" ) rebinned_vector = [] for low_bound, hi_bound in zip(self._starts, self._stops): rebinned_vector.append(np.sqrt(np.sum(vector[low_bound:hi_bound] ** 2))) rebinned_vectors.append(np.array(rebinned_vector)) return rebinned_vectors
[docs] def get_new_start_and_stop(self, old_start, old_stop): assert len(old_start) == len(self._mask) and len(old_stop) == len(self._mask) new_start = np.zeros(len(self._starts)) new_stop = np.zeros(len(self._starts)) for i, (low_bound, hi_bound) in enumerate(zip(self._starts, self._stops)): new_start[i] = old_start[low_bound] new_stop[i] = old_stop[hi_bound - 1] return new_start, new_stop
[docs] class TemporalBinner(TimeIntervalSet): """ An extension of the TimeInterval set that includes binning capabilities """
[docs] @classmethod def bin_by_significance( cls, arrival_times, background_getter, background_error_getter=None, sigma_level=10, min_counts=1, tstart=None, tstop=None, ): """ Bin the data to a given significance level for a given background method and sigma method. If a background error function is given then it is assumed that the error distribution is gaussian. Otherwise, the error distribution is assumed to be Poisson. :param background_getter: function of a start and stop time that returns background counts :param background_error_getter: function of a start and stop time that returns background count errors :param sigma_level: the sigma level of the intervals :param min_counts: the minimum counts per bin :return: """ if tstart is None: tstart = arrival_times.min() else: tstart = float(tstart) if tstop is None: tstop = arrival_times.max() else: tstop = float(tstop) starts = [] stops = [] # Switching to a fast search # Idea inspired by Damien Begue # these factors change the time steps # in the fast search. should experiment if sigma_level > 25: increase_factor = 0.5 decrease_factor = 0.5 else: increase_factor = 0.25 decrease_factor = 0.25 current_start = arrival_times[0] # first we need to see if the interval provided has enough counts _, counts = TemporalBinner._select_events( arrival_times, current_start, arrival_times[-1] ) # if it does not, the flag for the big loop never gets set end_all_search = not TemporalBinner._check_exceeds_sigma_interval( current_start, arrival_times[-1], counts, sigma_level, background_getter, background_error_getter, ) # We will start the search at the mid point of the whole interval mid_point = 0.5 * (arrival_times[-1] + current_start) current_stop = mid_point # initialize the fast search flag end_fast_search = False # resolve once for functions used in the loop searchsorted = np.searchsorted # this is the main loop # as long as we have not reached the end of the interval # the loop will run if threeML_config.interface.progress_bars: pbar = tqdm(total=arrival_times.shape[0], desc="Binning by significance") while not end_all_search: # start of the fast search # we reset the flag for the interval # having been decreased in the last pass decreased_interval = False while not end_fast_search: # we calculate the sigma of the current region _, counts = TemporalBinner._select_events( arrival_times, current_start, current_stop ) sigma_exceeded = TemporalBinner._check_exceeds_sigma_interval( current_start, current_stop, counts, sigma_level, background_getter, background_error_getter, ) time_step = abs(current_stop - current_start) # if we do not exceed the sigma # we need to increase the time interval if not sigma_exceeded: # however, if in the last pass we had to decrease # the interval, it means we have found where we # we need to start the slow search if decreased_interval: # mark where we are in the list start_idx = searchsorted(arrival_times, current_stop) # end the fast search end_fast_search = True # otherwise we increase the interval else: # unless, we would increase it too far if ( current_stop + time_step * increase_factor ) >= arrival_times[-1]: # mark where we are in the interval start_idx = searchsorted(arrival_times, current_stop) # then we also want to go ahead and get out of the fast search end_fast_search = True else: # increase the interval current_stop += time_step * increase_factor # if we did exceede the sigma level we will need to step # back in time to find where it was NOT exceeded else: # decrease the interval current_stop -= time_step * decrease_factor # inform the loop that we have been back stepping decreased_interval = True # Now we are ready for the slow forward search # where we count up all the photons # we have already counted up the photons to this point total_counts = counts # start searching from where the fast search ended if threeML_config.interface.progress_bars: pbar.update(counts) for time in arrival_times[start_idx:]: total_counts += 1 if threeML_config.interface.progress_bars: pbar.update(1) if total_counts < min_counts: continue else: # first use the background function to know the number of background counts bkg = background_getter(current_start, time) sig = Significance(total_counts, bkg) if background_error_getter is not None: bkg_error = background_error_getter(current_start, time) sigma = sig.li_and_ma_equivalent_for_gaussian_background( bkg_error )[0] else: sigma = sig.li_and_ma()[0] # now test if we have enough sigma if sigma >= sigma_level: # if we succeeded we want to mark the time bins stops.append(time) starts.append(current_start) # set up the next fast search # by looking past this interval current_start = time current_stop = 0.5 * (arrival_times[-1] + time) end_fast_search = False # get out of the for loop break # if we never exceeded the sigma level by the # end of the search, we never will if end_fast_search: # so lets kill the main search end_all_search = True if not starts: log.error( "The requested sigma level could not be achieved in the interval. Try decreasing it." ) else: return cls.from_starts_and_stops(starts, stops)
[docs] @classmethod def bin_by_constant(cls, arrival_times, dt): """ Create bins with a constant dt :param dt: temporal spacing of the bins :return: None """ tmp = np.arange(arrival_times[0], arrival_times[-1], dt) starts = tmp stops = tmp + dt return cls.from_starts_and_stops(starts, stops)
[docs] @classmethod def bin_by_bayesian_blocks(cls, arrival_times, p0, bkg_integral_distribution=None): """Divide a series of events characterized by their arrival time in blocks of perceptibly constant count rate. If the background integral distribution is given, divide the series in blocks where the difference with respect to the background is perceptibly constant. :param arrival_times: An iterable (list, numpy.array...) containing the arrival time of the events. NOTE: the input array MUST be time-ordered, and without duplicated entries. To ensure this, you may execute the following code: tt_array = numpy.asarray(self._arrival_times) tt_array = numpy.unique(tt_array) tt_array.sort() before running the algorithm. :param p0: The probability of finding a variations (i.e., creating a new block) when there is none. In other words, the probability of a Type I error, i.e., rejecting the null-hypothesis when is true. All found variations will have a post-trial significance larger than p0. :param bkg_integral_distribution : the integral distribution for the background counts. It must be a function of the form f(x), which must return the integral number of counts expected from the background component between time 0 and x. """ try: final_edges = bayesian_blocks( arrival_times, arrival_times[0], arrival_times[-1], p0, bkg_integral_distribution, ) except Exception as e: if "duplicate" in str(e): log.warning( "There were possible duplicate time tags in the data. We will try to run a different algorithm" ) final_edges = bayesian_blocks_not_unique( arrival_times, arrival_times[0], arrival_times[-1], p0 ) else: print(e) raise RuntimeError() starts = np.asarray(final_edges)[:-1] stops = np.asarray(final_edges)[1:] return cls.from_starts_and_stops(starts, stops)
[docs] @classmethod def bin_by_custom(cls, starts, stops): """ Simplicity function to make custom bins. This form keeps introduction of custom bins uniform for other binning methods :param start: start times of the bins :param stop: stop times of the bins :return: """ return cls.from_starts_and_stops(starts, stops)
@staticmethod def _check_exceeds_sigma_interval( start, stop, counts, sigma_level, background_getter, background_error_getter=None, ): """ see if an interval exceeds a given sigma level :param start: :param stop: :param counts: :param sigma_level: :param background_getter: :param background_error_getter: :return: """ bkg = background_getter(start, stop) sig = Significance(counts, bkg) if background_error_getter is not None: bkg_error = background_error_getter(start, stop) sigma = sig.li_and_ma_equivalent_for_gaussian_background(bkg_error)[0] else: sigma = sig.li_and_ma()[0] # now test if we have enough sigma if sigma >= sigma_level: return True else: return False @staticmethod def _select_events(arrival_times, start, stop): """ get the events and total counts over an interval :param start: :param stop: :param events: :return: """ lt_idx = start <= arrival_times gt_idx = arrival_times <= stop idx = np.logical_and(lt_idx, gt_idx) return idx, arrival_times[idx].shape[0]
##### @nb.njit(fastmath=True) def _rebin_vector_float(vector, start, stop, mask, N): """ faster rebinner using numba """ rebinned_vector = VectorFloat64(0) for n in range(N): rebinned_vector.append(np.sum(vector[start[n] : stop[n]])) arr = rebinned_vector.arr test = np.abs((np.sum(arr) + 1e-100) / (np.sum(vector[mask]) + 1e-100) - 1) assert test < 1e-4 return arr @nb.njit(fastmath=True) def _rebin_vector_int(vector, start, stop, mask, N): """ faster rebinner using numba """ rebinned_vector = VectorInt64(0) for n in range(N): rebinned_vector.append(np.sum(vector[start[n] : stop[n]])) arr = rebinned_vector.arr test = np.abs((np.sum(arr) + 1e-100) / (np.sum(vector[mask]) + 1e-100) - 1) assert test < 1e-4 return arr