Source code for threeML.utils.time_series.polynomial

from __future__ import division
from builtins import zip
from builtins import range
from past.utils import old_div
from builtins import object
import numpy as np
import scipy.optimize as opt
import warnings
from threeML.utils.differentiation import get_hessian, ParameterOnBoundary
from threeML.exceptions.custom_exceptions import custom_warnings


[docs]class CannotComputeCovariance(RuntimeWarning): pass
from threeML.config.config import threeML_config
[docs]class Polynomial(object): def __init__(self, coefficients, is_integral=False): """ :param coefficients: array of poly coefficients :param is_integral: if this polynomial is an """ self._coefficients = coefficients self._degree = len(coefficients) - 1 self._i_plus_1 = np.array(list(range(1, self._degree + 1 + 1)), dtype=float) self._cov_matrix = np.zeros((self._degree + 1, self._degree + 1)) # we can fix some things for speed # we only need to set the coeff for the # integral polynomial if not is_integral: integral_coeff = [0] integral_coeff.extend( [ self._coefficients[i - 1] / float(i) for i in range(1, self._degree + 1 + 1) ] ) self._integral_polynomial = Polynomial(integral_coeff, is_integral=True)
[docs] @classmethod def from_previous_fit(cls, coefficients, covariance): poly = Polynomial(coefficients=coefficients) poly._cov_matrix = covariance return poly
@property def degree(self): """ the polynomial degree :return: """ return self._degree @property def error(self): """ the error on the polynomial coefficients :return: """ return np.sqrt(self._cov_matrix.diagonal()) def __get_coefficient(self): """ gets the coefficients""" return self._coefficients def ___get_coefficient(self): """ Indirect coefficient getter """ return self.__get_coefficient() def __set_coefficient(self, val): """ sets the coefficients""" self._coefficients = val integral_coeff = [0] integral_coeff.extend( [ self._coefficients[i - 1] / float(i) for i in range(1, self._degree + 1 + 1) ] ) self._integral_polynomial = Polynomial(integral_coeff, is_integral=True) def ___set_coefficient(self, val): """ Indirect coefficient setter """ return self.__set_coefficient(val) coefficients = property( ___get_coefficient, ___set_coefficient, doc="""Gets or sets the coefficients of the polynomial.""", ) def __call__(self, x): result = 0 for coefficient in self._coefficients[::-1]: result = result * x + coefficient return result
[docs] def compute_covariance_matrix(self, function, best_fit_parameters): """ Compute the covariance matrix of this fit :param function: the loglike for the fit :param best_fit_parameters: the best fit parameters :return: """ minima = np.zeros_like(best_fit_parameters) - 100 maxima = np.zeros_like(best_fit_parameters) + 100 try: hessian_matrix = get_hessian(function, best_fit_parameters, minima, maxima) except ParameterOnBoundary: custom_warnings.warn( "One or more of the parameters are at their boundaries. Cannot compute covariance and" " errors", CannotComputeCovariance, ) n_dim = len(best_fit_parameters) self._cov_matrix = np.zeros((n_dim, n_dim)) * np.nan # Invert it to get the covariance matrix try: covariance_matrix = np.linalg.inv(hessian_matrix) self._cov_matrix = covariance_matrix except: custom_warnings.warn( "Cannot invert Hessian matrix, looks like the matrix is singluar" ) n_dim = len(best_fit_parameters) self._cov_matrix = np.zeros((n_dim, n_dim)) * np.nan
@property def covariance_matrix(self): return self._cov_matrix
[docs] def integral(self, xmin, xmax): """ Evaluate the integral of the polynomial between xmin and xmax """ return self._integral_polynomial(xmax) - self._integral_polynomial(xmin)
def _eval_basis(self, x): return (1.0 / self._i_plus_1) * np.power(x, self._i_plus_1)
[docs] def integral_error(self, xmin, xmax): """ computes the integral error of an interval :param xmin: start of the interval :param xmax: stop of the interval :return: interval error """ c = self._eval_basis(xmax) - self._eval_basis(xmin) tmp = c.dot(self._cov_matrix) err2 = tmp.dot(c) return np.sqrt(err2)
[docs]class PolyLogLikelihood(object): def __init__(self, model, exposure): self._model = model self._parameters = model.coefficients self._exposure = exposure # build the covariance call self._build_cov_call() def _evaluate_logM(self, M): # Evaluate the logarithm with protection for negative or small # numbers, using a smooth linear extrapolation (better than just a sharp # cutoff) tiny = np.float64(np.finfo(M[0]).tiny) non_tiny_mask = M > 2.0 * tiny tink_mask = np.logical_not(non_tiny_mask) if tink_mask.sum() > 0: logM = np.zeros(len(M)) logM[tink_mask] = old_div(np.abs(M[tink_mask]), tiny) + np.log(tiny) - 1 logM[non_tiny_mask] = np.log(M[non_tiny_mask]) else: logM = np.log(M) return logM def _fix_precision(self, v): """ Round extremely small number inside v to the smallest usable number of the type corresponding to v. This is to avoid warnings and errors like underflows or overflows in math operations. """ tiny = np.float64(np.finfo(v[0]).tiny) zero_mask = np.abs(v) <= tiny # type: np.ndarray if zero_mask.sum() > 0: v[zero_mask] = np.sign(v[zero_mask]) * tiny return v, tiny def _build_cov_call(self): raise NotImplementedError("must be built in subclass")
[docs]class PolyBinnedLogLikelihood(PolyLogLikelihood): """ Implements a Poisson likelihood (i.e., the Cash statistic). Mind that this is not the Castor statistic (Cstat). The difference between the two is a constant given a dataset. I kept Cash instead of Castor to make easier the comparison with ROOT during tests, since ROOT implements the Cash statistic. """ def __init__(self, x, y, model, exposure): self._bin_centers = x self._counts = y self._non_zero_mask = self._counts > 0 super(PolyBinnedLogLikelihood, self).__init__(model, exposure) def _build_cov_call(self): def cov_call(*parameters): self._model.coefficients = parameters M = self._model(self._bin_centers) * self._exposure M_fixed, tiny = self._fix_precision(M) # Replace negative values for the model (impossible in the Poisson context) # with zero negative_mask = M < 0 if negative_mask.sum() > 0: M[negative_mask] = 0.0 # Poisson loglikelihood statistic (Cash) is: # L = Sum ( M_i - D_i * log(M_i)) logM = self._evaluate_logM(M) # Evaluate v_i = D_i * log(M_i): if D_i = 0 then the product is zero # whatever value has log(M_i). Thus, initialize the whole vector v = {v_i} # to zero, then overwrite the elements corresponding to D_i > 0 d_times_logM = np.zeros(len(self._counts)) d_times_logM[self._non_zero_mask] = ( self._counts[self._non_zero_mask] * logM[self._non_zero_mask] ) log_likelihood = np.sum(M_fixed - d_times_logM) return log_likelihood self.cov_call = cov_call def __call__(self, parameters): """ Evaluate the Cash statistic for the given set of parameters """ # Compute the values for the model given this set of parameters # model is in counts self._model.coefficients = parameters M = self._model(self._bin_centers) * self._exposure M_fixed, tiny = self._fix_precision(M) # Replace negative values for the model (impossible in the Poisson context) # with zero negative_mask = M < 0 if negative_mask.sum() > 0: M[negative_mask] = 0.0 # Poisson loglikelihood statistic (Cash) is: # L = Sum ( M_i - D_i * log(M_i)) logM = self._evaluate_logM(M) # Evaluate v_i = D_i * log(M_i): if D_i = 0 then the product is zero # whatever value has log(M_i). Thus, initialize the whole vector v = {v_i} # to zero, then overwrite the elements corresponding to D_i > 0 d_times_logM = np.zeros_like(self._counts) d_times_logM[self._non_zero_mask] = ( self._counts[self._non_zero_mask] * logM[self._non_zero_mask] ) log_likelihood = np.sum(M_fixed - d_times_logM) return log_likelihood
[docs]class PolyUnbinnedLogLikelihood(PolyLogLikelihood): """ Implements a Poisson likelihood (i.e., the Cash statistic). Mind that this is not the Castor statistic (Cstat). The difference between the two is a constant given a dataset. I kept Cash instead of Castor to make easier the comparison with ROOT during tests, since ROOT implements the Cash statistic. """ def __init__(self, events, model, t_start, t_stop, exposure): self._events = events self._t_start = t_start # list of starts self._t_stop = t_stop super(PolyUnbinnedLogLikelihood, self).__init__(model, exposure) def _build_cov_call(self): def cov_call(*parameters): # Compute the values for the model given this set of parameters self._model.coefficients = parameters # Integrate the polynomial (or in the future, model) over the given interval n_expected_counts = 0.0 for start, stop in zip(self._t_start, self._t_stop): n_expected_counts += self._model.integral(start, stop) # Now evaluate the model at the event times and multiply by the exposure M = self._model(self._events) * self._exposure # Replace negative values for the model (impossible in the Poisson context) # with zero negative_mask = M < 0 if negative_mask.sum() > 0: M[negative_mask] = 0.0 # Poisson loglikelihood statistic is: # logL = -Nexp + Sum ( log M_i ) logM = self._evaluate_logM(M) log_likelihood = -n_expected_counts + logM.sum() return -log_likelihood self.cov_call = cov_call def __call__(self, parameters): """ """ # Compute the values for the model given this set of parameters self._model.coefficients = parameters # Integrate the polynomial (or in the future, model) over the given interval n_expected_counts = 0.0 for start, stop in zip(self._t_start, self._t_stop): n_expected_counts += self._model.integral(start, stop) # Now evaluate the model at the event times and multiply by the exposure M = self._model(self._events) * self._exposure # Replace negative values for the model (impossible in the Poisson context) # with zero negative_mask = M < 0 if negative_mask.sum() > 0: M[negative_mask] = 0.0 # Poisson loglikelihood statistic is: # logL = -Nexp + Sum ( log M_i ) logM = self._evaluate_logM(M) log_likelihood = -n_expected_counts + logM.sum() return -log_likelihood
[docs]def polyfit(x, y, grade, exposure): """ function to fit a polynomial to event data. not a member to allow parallel computation """ # Check that we have enough counts to perform the fit, otherwise # return a "zero polynomial" non_zero_mask = y > 0 n_non_zero = non_zero_mask.sum() if n_non_zero == 0: # No data, nothing to do! return Polynomial([0.0]), 0.0 # Compute an initial guess for the polynomial parameters, # with a least-square fit (with weight=1) using SVD (extremely robust): # (note that polyfit returns the coefficient starting from the maximum grade, # thus we need to reverse the order) with warnings.catch_warnings(): warnings.simplefilter("ignore") initial_guess = np.polyfit(x, y, grade) initial_guess = initial_guess[::-1] polynomial = Polynomial(initial_guess) # Check that the solution found is meaningful (i.e., definite positive # in the interval of interest) M = polynomial(x) negative_mask = M < 0 if negative_mask.sum() > 0: # Least square fit failed to converge to a meaningful solution # Reset the initialGuess to reasonable value initial_guess[0] = np.mean(y) meanx = np.mean(x) initial_guess = [ old_div(abs(i[1]), pow(meanx, i[0])) for i in enumerate(initial_guess) ] # Improve the solution using a logLikelihood statistic (Cash statistic) log_likelihood = PolyBinnedLogLikelihood(x, y, polynomial, exposure) # Check that we have enough non-empty bins to fit this grade of polynomial, # otherwise lower the grade dof = n_non_zero - (grade + 1) if dof <= 2: # Fit is poorly or ill-conditioned, have to reduce the number of parameters while dof < 2 and len(initial_guess) > 1: initial_guess = initial_guess[:-1] polynomial = Polynomial(initial_guess) log_likelihood = PolyBinnedLogLikelihood(x, y, polynomial, exposure) # Try to improve the fit with the log-likelihood final_estimate = opt.minimize( log_likelihood, initial_guess, method=threeML_config["event list"]["binned fit method"], options=threeML_config["event list"]["binned fit options"], )["x"] final_estimate = np.atleast_1d(final_estimate) # Get the value for cstat at the minimum min_log_likelihood = log_likelihood(final_estimate) # Update the polynomial with the fitted parameters, # and the relative covariance matrix final_polynomial = Polynomial(final_estimate) final_polynomial.compute_covariance_matrix(log_likelihood.cov_call, final_estimate) return final_polynomial, min_log_likelihood
[docs]def unbinned_polyfit(events, grade, t_start, t_stop, exposure, initial_amplitude=1): """ function to fit a polynomial to event data. not a member to allow parallel computation """ # first do a simple amplitude fit with warnings.catch_warnings(): warnings.simplefilter("ignore") search_grid = np.logspace(-2, 4, 10) initial_guess = np.zeros(grade + 1) polynomial = Polynomial(initial_guess) # if there are no events then return nothing if len(events) == 0: return Polynomial([0]), 0 log_likelihood = PolyUnbinnedLogLikelihood( events, polynomial, t_start, t_stop, exposure ) like_grid = [] for amp in search_grid: initial_guess[0] = amp like_grid.append(log_likelihood(initial_guess)) initial_guess[0] = search_grid[np.argmin(like_grid)] # Improve the solution dof = len(events) - (grade + 1) if dof <= 2: # Fit is poorly or ill-conditioned, have to reduce the number of parameters while dof < 1 and len(initial_guess) > 1: initial_guess = initial_guess[:-1] polynomial = Polynomial(initial_guess) log_likelihood = PolyUnbinnedLogLikelihood( events, polynomial, t_start, t_stop, exposure ) final_estimate = opt.minimize( log_likelihood, initial_guess, method=threeML_config["event list"]["unbinned fit method"], options=threeML_config["event list"]["unbinned fit options"], )["x"] final_estimate = np.atleast_1d(final_estimate) min_log_likelihood = log_likelihood(final_estimate) # Update the polynomial with the fitted parameters, # and the relative covariance matrix final_polynomial = Polynomial(final_estimate) final_polynomial.compute_covariance_matrix(log_likelihood.cov_call, final_estimate) return final_polynomial, min_log_likelihood