Source code for threeML.utils.bayesian_blocks

# Author: Giacomo Vianello (giacomov@stanford.edu)


import sys

from threeML.utils.progress_bar import tqdm
import numexpr
import numpy as np


from threeML.io.logging import setup_logger

logger = setup_logger(__name__)

__all__ = ["bayesian_blocks", "bayesian_blocks_not_unique"]


[docs] def bayesian_blocks_not_unique(tt, ttstart, ttstop, p0): # Verify that the input array is one-dimensional tt = np.asarray(tt, dtype=float) assert tt.ndim == 1 # Now create the array of unique times unique_t = np.unique(tt) t = tt tstart = ttstart tstop = ttstop # Create initial cell edges (Voronoi tessellation) using the unique time stamps edges = np.concatenate([[tstart], 0.5 * (unique_t[1:] + unique_t[:-1]), [tstop]]) # The last block length is 0 by definition block_length = tstop - edges if np.sum((block_length <= 0)) > 1: raise RuntimeError( "Events appears to be out of order! Check for order, or duplicated events." ) N = unique_t.shape[0] # arrays to store the best configuration best = np.zeros(N, dtype=float) last = np.zeros(N, dtype=int) # Pre-computed priors (for speed) # eq. 21 from Scargle 2012 priors = 4 - np.log(73.53 * p0 * np.power(np.arange(1, N + 1), -0.478)) # Count how many events are in each Voronoi cell x, _ = np.histogram(t, edges) # Speed tricks: resolve once for all the functions which will be used # in the loop cumsum = np.cumsum log = np.log argmax = np.argmax numexpr_evaluate = numexpr.evaluate arange = np.arange # Decide the step for reporting progress incr = max(int(float(N) / 100.0 * 10), 1) logger.debug("Finding blocks...") # This is where the computation happens. Following Scargle et al. 2012. # This loop has been optimized for speed: # * the expression for the fitness function has been rewritten to # avoid multiple log computations, and to avoid power computations # * the use of scipy.weave and numexpr has been evaluated. The latter # gives a big gain (~40%) if used for the fitness function. No other # gain is obtained by using it anywhere else # Set numexpr precision to low (more than enough for us), which is # faster than high oldaccuracy = numexpr.set_vml_accuracy_mode("low") numexpr.set_num_threads(1) numexpr.set_vml_num_threads(1) for R in tqdm(range(N)): br = block_length[R + 1] T_k = block_length[: R + 1] - br # N_k: number of elements in each block # This expression has been simplified for the case of # unbinned events (i.e., one element in each block) # It was: N_k = cumsum(x[: R + 1][::-1])[::-1] # Now it is: # N_k = arange(R + 1, 0, -1) # Evaluate fitness function # This is the slowest part, which I'm speeding up by using # numexpr. It provides a ~40% gain in execution speed. fit_vec = numexpr_evaluate( """N_k * log(N_k/ T_k) """, optimization="aggressive", local_dict={"N_k": N_k, "T_k": T_k}, ) p = priors[R] A_R = fit_vec - p A_R[1:] += best[:R] i_max = argmax(A_R) last[R] = i_max best[R] = A_R[i_max] numexpr.set_vml_accuracy_mode(oldaccuracy) logger.debug("Done\n") # Now find blocks change_points = np.zeros(N, dtype=int) i_cp = N ind = N while True: i_cp -= 1 change_points[i_cp] = ind if ind == 0: break ind = last[ind - 1] change_points = change_points[i_cp:] finalEdges = edges[change_points] return np.asarray(finalEdges)
[docs] def bayesian_blocks(tt, ttstart, ttstop, p0, bkg_integral_distribution=None): """ Divide a series of events characterized by their arrival time in blocks of perceptibly constant count rate. If the background integral distribution is given, divide the series in blocks where the difference with respect to the background is perceptibly constant. :param tt: arrival times of the events :param ttstart: the start of the interval :param ttstop: the stop of the interval :param p0: the false positive probability. This is used to decide the penalization on the likelihood, so this parameter affects the number of blocks :param bkg_integral_distribution: (default: None) If given, the algorithm account for the presence of the background and finds changes in rate with respect to the background :return: the np.array containing the edges of the blocks """ # Verify that the input array is one-dimensional tt = np.asarray(tt, dtype=float) assert tt.ndim == 1 if bkg_integral_distribution is not None: # Transforming the inhomogeneous Poisson process into an homogeneous one with rate 1, # by changing the time axis according to the background rate logger.debug( "Transforming the inhomogeneous Poisson process to a homogeneous one with rate 1..." ) t = np.array(bkg_integral_distribution(tt)) logger.debug("done") # Now compute the start and stop time in the new system tstart = bkg_integral_distribution(ttstart) tstop = bkg_integral_distribution(ttstop) else: t = tt tstart = ttstart tstop = ttstop # Create initial cell edges (Voronoi tessellation) edges = np.concatenate([[t[0]], 0.5 * (t[1:] + t[:-1]), [t[-1]]]) # Create the edges also in the original time system edges_ = np.concatenate([[tt[0]], 0.5 * (tt[1:] + tt[:-1]), [tt[-1]]]) # Create a lookup table to be able to transform back from the transformed system # to the original one lookup_table = {key: value for (key, value) in zip(edges, edges_)} # The last block length is 0 by definition block_length = tstop - edges if np.sum((block_length <= 0)) > 1: raise RuntimeError( "Events appears to be out of order! Check for order, or duplicated events." ) N = t.shape[0] # arrays to store the best configuration best = np.zeros(N, dtype=float) last = np.zeros(N, dtype=int) # eq. 21 from Scargle 2012 prior = 4 - np.log(73.53 * p0 * (N ** -0.478)) logger.debug("Finding blocks...") # This is where the computation happens. Following Scargle et al. 2012. # This loop has been optimized for speed: # * the expression for the fitness function has been rewritten to # avoid multiple log computations, and to avoid power computations # * the use of scipy.weave and numexpr has been evaluated. The latter # gives a big gain (~40%) if used for the fitness function. No other # gain is obtained by using it anywhere else # Set numexpr precision to low (more than enough for us), which is # faster than high oldaccuracy = numexpr.set_vml_accuracy_mode("low") numexpr.set_num_threads(1) numexpr.set_vml_num_threads(1) # Speed tricks: resolve once for all the functions which will be used # in the loop numexpr_evaluate = numexpr.evaluate numexpr_re_evaluate = numexpr.re_evaluate # Pre-compute this aranges = np.arange(N + 1, 0, -1) for R in range(N): br = block_length[R + 1] T_k = ( block_length[: R + 1] - br ) # this looks like it is not used, but it actually is, # inside the numexpr expression # N_k: number of elements in each block # This expression has been simplified for the case of # unbinned events (i.e., one element in each block) # It was: # N_k = cumsum(x[:R + 1][::-1])[::-1] # Now it is: N_k = aranges[N - R :] # where aranges has been pre-computed # Evaluate fitness function # This is the slowest part, which I'm speeding up by using # numexpr. It provides a ~40% gain in execution speed. # The first time we need to "compile" the expression in numexpr, # all the other times we can reuse it if R == 0: fit_vec = numexpr_evaluate( """N_k * log(N_k/ T_k) """, optimization="aggressive", local_dict={"N_k": N_k, "T_k": T_k}, ) else: fit_vec = numexpr_re_evaluate(local_dict={"N_k": N_k, "T_k": T_k}) A_R = fit_vec - prior # type: np.ndarray A_R[1:] += best[:R] i_max = A_R.argmax() last[R] = i_max best[R] = A_R[i_max] numexpr.set_vml_accuracy_mode(oldaccuracy) logger.debug("Done\n") # Now peel off and find the blocks (see the algorithm in Scargle et al.) change_points = np.zeros(N, dtype=int) i_cp = N ind = N while True: i_cp -= 1 change_points[i_cp] = ind if ind == 0: break ind = last[ind - 1] change_points = change_points[i_cp:] edg = edges[change_points] # Transform the found edges back into the original time system if bkg_integral_distribution is not None: final_edges = [lookup_table[x] for x in edg] else: final_edges = edg # Now fix the first and last edge so that they are tstart and tstop final_edges[0] = ttstart final_edges[-1] = ttstop return np.asarray(final_edges)
# To be run with a profiler if __name__ == "__main__": tt = np.random.uniform(0, 1000, int(sys.argv[1])) tt.sort() with open("sim.txt", "w+") as f: for t in tt: f.write("%s\n" % (t)) res = bayesian_blocks(tt, 0, 1000, 1e-3, None) print(res)