Source code for threeML.utils.time_series.polynomial


from typing import Iterable, List, Optional, Tuple, Union

import numpy as np
from astromodels import (Constant, Cubic, Gaussian, Line, Log_normal, Model,
                         PointSource, Quadratic, Quartic)

from threeML.bayesian.bayesian_analysis import BayesianAnalysis
from threeML.classicMLE.joint_likelihood import JointLikelihood
from threeML.config.config import threeML_config
from threeML.config.config_utils import get_value
from threeML.data_list import DataList
from threeML.exceptions.custom_exceptions import BadCovariance#, FitFailed
from threeML.minimizer.minimization import FitFailed
from threeML.io.logging import setup_logger, silence_console_log
from threeML.minimizer.grid_minimizer import AllFitFailed
from threeML.minimizer.minimization import (CannotComputeCovariance,
                                            GlobalMinimization,
                                            LocalMinimization)
from threeML.plugins.UnbinnedPoissonLike import (EventObservation,
                                                 UnbinnedPoissonLike)
from threeML.plugins.XYLike import XYLike

log = setup_logger(__name__)

# we include the line twice to mimic a constant
_grade_model_lookup = (Line, Line, Quadratic, Cubic, Quartic)


[docs] class Polynomial(object): def __init__(self, coefficients: Iterable[float], is_integral: bool = False): """ A polynomial :param coefficients: array of poly coefficients :param is_integral: if this polynomial is an """ self._coefficients: Iterable[float] = coefficients self._degree: int = len(coefficients) - 1 log.debug(f"creating polynomial of degree {self._degree}") log.debug(f"with coefficients {self._coefficients}") self._i_plus_1: np.ndarray = np.array( list(range(1, self._degree + 1 + 1)), dtype=float) self._cov_matrix: np.ndarray = np.zeros( (self._degree + 1, self._degree + 1)) # we can fix some things for speed # we only need to set the coeff for the # integral polynomial if not is_integral: log.debug("This is NOT and intergral polynomial") integral_coeff = [0] integral_coeff.extend( [ self._coefficients[i - 1] / float(i) for i in range(1, self._degree + 1 + 1) ] ) self._integral_polynomial: "Polynomial" = Polynomial( integral_coeff, is_integral=True)
[docs] @classmethod def from_previous_fit(cls, coefficients, covariance) -> "Polynomial": log.debug("restoring polynomial from previous fit") poly = Polynomial(coefficients=coefficients) poly._cov_matrix = covariance return poly
@property def degree(self) -> int: """ the polynomial degree :return: """ return self._degree @property def error(self): """ the error on the polynomial coefficients :return: """ return np.sqrt(self._cov_matrix.diagonal()) def __get_coefficient(self): """ gets the coefficients""" return np.array(self._coefficients) def ___get_coefficient(self): """ Indirect coefficient getter """ return self.__get_coefficient() def __set_coefficient(self, val): """ sets the coefficients""" self._coefficients = val integral_coeff = [0] integral_coeff.extend( [ self._coefficients[i - 1] / float(i) for i in range(1, self._degree + 1 + 1) ] ) self._integral_polynomial = Polynomial( integral_coeff, is_integral=True) def ___set_coefficient(self, val): """ Indirect coefficient setter """ return self.__set_coefficient(val) coefficients = property( ___get_coefficient, ___set_coefficient, doc="""Gets or sets the coefficients of the polynomial.""", ) def __repr__(self): return f"({self._coefficients})" def __call__(self, x): result = 0 for coefficient in self._coefficients[::-1]: result = result * x + coefficient return result
[docs] def set_covariace_matrix(self, matrix) -> None: self._cov_matrix = matrix
@property def covariance_matrix(self) -> np.ndarray: return self._cov_matrix
[docs] def integral(self, xmin, xmax) -> float: """ Evaluate the integral of the polynomial between xmin and xmax """ return self._integral_polynomial(xmax) - self._integral_polynomial(xmin)
def _eval_basis(self, x): return (1.0 / self._i_plus_1) * np.power(x, self._i_plus_1)
[docs] def integral_error(self, xmin, xmax) -> float: """ computes the integral error of an interval :param xmin: start of the interval :param xmax: stop of the interval :return: interval error """ c = self._eval_basis(xmax) - self._eval_basis(xmin) tmp = c.dot(self._cov_matrix) err2 = tmp.dot(c) return np.sqrt(err2)
[docs] def polyfit(x: Iterable[float], y: Iterable[float], grade: int, exposure: Iterable[float], bayes: Optional[bool] = False) -> Tuple[Polynomial, float]: """ function to fit a polynomial to data. not a member to allow parallel computation :param x: the x coord of the data :param y: the y coord of the data :param grade: the polynomical order or grade :param expousure: the exposure of the interval :param bayes: to do a bayesian fit or not """ # Check that we have enough counts to perform the fit, otherwise # return a "zero polynomial" log.debug(f"starting polyfit with grade {grade} ") bayes = get_value("bayes", bayes, bool, threeML_config.time_series.fit.bayes) nan_mask = np.isnan(y) y = y[~nan_mask] x = x[~nan_mask] exposure = exposure[~nan_mask] non_zero_mask = y > 0 n_non_zero = non_zero_mask.sum() if n_non_zero == 0: log.debug("no counts, return 0") # No data, nothing to do! return Polynomial([0.0]*(grade+1)), 0.0 # create 3ML plugins and fit them with 3ML! # should eventuallly allow better config # seelct the model based on the grade shape = _grade_model_lookup[grade]() ps = PointSource("_dummy", 0, 0, spectral_shape=shape) model = Model(ps) avg = np.mean(y/exposure) log.debug(f"starting polyfit with avg norm {avg}") with silence_console_log(): xy = XYLike("series", x=x, y=y, exposure=exposure, poisson_data=True, quiet=True) #from matplotlib import pyplot as plt #xy.plot() #plt.plot(x,exposure) if not bayes: # make sure the model is positive for i, (k, v) in enumerate(model.free_parameters.items()): if i == 0: v.bounds = (0, None) v.value = avg else: v.value = 0.0 #v.bounds = (-1e-3, 1e-3) # we actually use a line here # because a constant is returns a # single number if grade == 0: shape.b = 0 shape.b.fix = True jl: JointLikelihood = JointLikelihood(model, DataList(xy)) jl.set_minimizer("minuit") # if the fit falis, retry and then just accept #print('polynomials grade:',grade) #print('polynomials model:') #model.display(complete=True) try: #print ("=================>FIRST FIT!!!!") jl.fit(quiet=True) except(FitFailed, BadCovariance, AllFitFailed, CannotComputeCovariance): #print ("=================>FIRST FIT FAILED!!!!") log.debug("1st fit failed") try: # print ("=================>SECOND FIT!!!!") jl.fit(quiet=True) except(FitFailed, BadCovariance, AllFitFailed, CannotComputeCovariance): # print ("=================>SECOND FIT FAILED!!!!") log.debug("all MLE fits failed") pass #plt.plot(x,model._dummy.spectrum.main(x),'k:') #plt.show() coeff = [v.value for _, v in model.free_parameters.items()] log.debug(f"got coeff: {coeff}") final_polynomial = Polynomial(coeff) try: final_polynomial.set_covariace_matrix( jl.results.covariance_matrix) except: log.exception(f"Fit failed in channel") raise FitFailed() min_log_likelihood = xy.get_log_like() else: # set smart priors for i, (k, v) in enumerate(model.free_parameters.items()): if i == 0: v.bounds = (0, None) v.prior = Log_normal( mu=np.log(avg), sigma=np.max([np.log(avg/2), 1])) v.value = 1 else: v.prior = Gaussian(mu=0, sigma=2) v.value = 1e-2 # we actually use a line here # because a constant is returns a # single number if grade == 0: shape.b = 0 shape.b.fix = True ba: BayesianAnalysis = BayesianAnalysis(model, DataList(xy)) ba.set_sampler("emcee") ba.sampler.setup(n_iterations=500, n_burn_in=200, n_walkers=20) ba.sample(quiet=True) ba.restore_median_fit() coeff = [v.value for _, v in model.free_parameters.items()] log.debug(f"got coeff: {coeff}") final_polynomial = Polynomial(coeff) final_polynomial.set_covariace_matrix( ba.results.estimate_covariance_matrix()) min_log_likelihood = xy.get_log_like() log.debug(f"-min loglike: {-min_log_likelihood}") return final_polynomial, -min_log_likelihood
[docs] def unbinned_polyfit(events: Iterable[float], grade: int, t_start: Iterable[float], t_stop: Iterable[float], exposure: float, bayes: bool) -> Tuple[Polynomial, float]: """ function to fit a polynomial to unbinned event data. not a member to allow parallel computation :param events: the events to fit :param grade: the polynomical order or grade :param t_start: the start time to fit over :param t_stop: the end time to fit over :param expousure: the exposure of the interval :param bayes: to do a bayesian fit or not """ log.debug(f"starting unbinned_polyfit with grade {grade}") log.debug(f"have {len(events)} events with {exposure} exposure") # create 3ML plugins and fit them with 3ML! # select the model based on the grade bayes = get_value("bayes", bayes, bool, threeML_config.time_series.fit.bayes) if len(events) == 0: log.debug("no events! returning zero") return Polynomial([0] * (grade + 1)), 0 shape = _grade_model_lookup[grade]() with silence_console_log(): ps = PointSource("dummy", 0, 0, spectral_shape=shape) model = Model(ps) observation = EventObservation(events, exposure, t_start, t_stop, for_timeseries=True) xy = UnbinnedPoissonLike("series", observation=observation) if not bayes: # make sure the model is positive for i, (k, v) in enumerate(model.free_parameters.items()): if i == 0: v.bounds = (0, None) v.value = 10 else: v.value = 0.0 # we actually use a line here # because a constant is returns a # single number if grade == 0: shape.b = 0 shape.b.fix = True jl: JointLikelihood = JointLikelihood(model, DataList(xy)) grid_minimizer = GlobalMinimization("grid") local_minimizer = LocalMinimization("minuit") my_grid = { model.dummy.spectrum.main.shape.a: np.logspace(0, 3, 10)} grid_minimizer.setup( second_minimization=local_minimizer, grid=my_grid) jl.set_minimizer(grid_minimizer) # if the fit falis, retry and then just accept try: jl.fit(quiet=True) except(FitFailed, BadCovariance, AllFitFailed, CannotComputeCovariance): try: jl.fit(quiet=True) except(FitFailed, BadCovariance, AllFitFailed, CannotComputeCovariance): log.debug("all MLE fits failed, returning zero") return Polynomial([0]*(grade + 1)), 0 coeff = [v.value for _, v in model.free_parameters.items()] log.debug(f"got coeff: {coeff}") final_polynomial = Polynomial(coeff) final_polynomial.set_covariace_matrix(jl.results.covariance_matrix) min_log_likelihood = xy.get_log_like() else: # set smart priors for i, (k, v) in enumerate(model.free_parameters.items()): if i == 0: v.bounds = (0, None) v.prior = Log_normal(mu=np.log(5), sigma=np.log(5)) v.value = 1 else: v.prior = Gaussian(mu=0, sigma=.5) v.value = 0.1 # we actually use a line here # because a constant is returns a # single number if grade == 0: shape.b = 0 shape.b.fix = True ba: BayesianAnalysis = BayesianAnalysis(model, DataList(xy)) ba.set_sampler("emcee") ba.sampler.setup(n_iterations=500, n_burn_in=200, n_walkers=20) ba.sample(quiet=True) ba.restore_median_fit() coeff = [v.value for _, v in model.free_parameters.items()] log.debug(f"got coeff: {coeff}") final_polynomial = Polynomial(coeff) final_polynomial.set_covariace_matrix( ba.results.estimate_covariance_matrix()) min_log_likelihood = xy.get_log_like() log.debug(f"-min loglike: {-min_log_likelihood}") return final_polynomial, -min_log_likelihood