# Source code for threeML.utils.time_series.polynomial

```
from typing import Iterable, List, Optional, Tuple, Union

import numpy as np
from astromodels import (Constant, Cubic, Gaussian, Line, Log_normal, Model,

from threeML.bayesian.bayesian_analysis import BayesianAnalysis
from threeML.classicMLE.joint_likelihood import JointLikelihood
from threeML.config.config import threeML_config
from threeML.data_list import DataList
from threeML.io.logging import setup_logger, silence_console_log
from threeML.minimizer.grid_minimizer import AllFitFailed
from threeML.minimizer.minimization import (CannotComputeCovariance,
GlobalMinimization,
LocalMinimization)
from threeML.plugins.UnbinnedPoissonLike import (EventObservation,
UnbinnedPoissonLike)
from threeML.plugins.XYLike import XYLike

log = setup_logger(__name__)

# we include the line twice to mimic a constant

[docs]class Polynomial(object):
def __init__(self, coefficients: Iterable[float], is_integral: bool = False):
"""
A polynomial

:param coefficients: array of poly coefficients
:param is_integral: if this polynomial is an
"""
self._coefficients: Iterable[float] = coefficients
self._degree: int = len(coefficients) - 1

log.debug(f"creating polynomial of degree {self._degree}")
log.debug(f"with coefficients {self._coefficients}")

self._i_plus_1: np.ndarray = np.array(
list(range(1, self._degree + 1 + 1)), dtype=float)

self._cov_matrix: np.ndarray = np.zeros(
(self._degree + 1, self._degree + 1))

# we can fix some things for speed
# we only need to set the coeff for the
# integral polynomial
if not is_integral:

log.debug("This is NOT and intergral polynomial")

integral_coeff = [0]

integral_coeff.extend(
[
self._coefficients[i - 1] / float(i)
for i in range(1, self._degree + 1 + 1)
]
)

self._integral_polynomial: "Polynomial" = Polynomial(
integral_coeff, is_integral=True)

[docs]    @classmethod
def from_previous_fit(cls, coefficients, covariance) -> "Polynomial":

log.debug("restoring polynomial from previous fit")

poly = Polynomial(coefficients=coefficients)
poly._cov_matrix = covariance

return poly

@property
def degree(self) -> int:
"""
the polynomial degree
:return:
"""
return self._degree

@property
def error(self):
"""
the error on the polynomial coefficients
:return:
"""
return np.sqrt(self._cov_matrix.diagonal())

def __get_coefficient(self):
""" gets the coefficients"""

return np.array(self._coefficients)

def ___get_coefficient(self):
""" Indirect coefficient getter """

return self.__get_coefficient()

def __set_coefficient(self, val):
""" sets the coefficients"""

self._coefficients = val

integral_coeff = [0]

integral_coeff.extend(
[
self._coefficients[i - 1] / float(i)
for i in range(1, self._degree + 1 + 1)
]
)

self._integral_polynomial = Polynomial(
integral_coeff, is_integral=True)

def ___set_coefficient(self, val):
""" Indirect coefficient setter """

return self.__set_coefficient(val)

coefficients = property(
___get_coefficient,
___set_coefficient,
doc="""Gets or sets the coefficients of the polynomial.""",
)

def __repr__(self):
return f"({self._coefficients})"

def __call__(self, x):

result = 0
for coefficient in self._coefficients[::-1]:
result = result * x + coefficient
return result

[docs]    def set_covariace_matrix(self, matrix) -> None:

self._cov_matrix = matrix

@property
def covariance_matrix(self) -> np.ndarray:
return self._cov_matrix

[docs]    def integral(self, xmin, xmax) -> float:
"""
Evaluate the integral of the polynomial between xmin and xmax

"""

return self._integral_polynomial(xmax) - self._integral_polynomial(xmin)

def _eval_basis(self, x):

return (1.0 / self._i_plus_1) * np.power(x, self._i_plus_1)

[docs]    def integral_error(self, xmin, xmax) -> float:
"""
computes the integral error of an interval
:param xmin: start of the interval
:param xmax: stop of the interval
:return: interval error
"""
c = self._eval_basis(xmax) - self._eval_basis(xmin)
tmp = c.dot(self._cov_matrix)
err2 = tmp.dot(c)

return np.sqrt(err2)

[docs]def polyfit(x: Iterable[float], y: Iterable[float], grade: int, exposure: Iterable[float], bayes: bool = False) -> Tuple[Polynomial, float]:
"""
function to fit a polynomial to data.
not a member to allow parallel computation

:param x: the x coord of the data
:param y: teh y coord of the data
:param expousure: the exposure of the interval
:param bayes: to do a bayesian fit or not

"""

# Check that we have enough counts to perform the fit, otherwise
# return a "zero polynomial"

if threeML_config.time_series.default_fit_method is not None:

bayes = threeML_config.time_series.default_fit_method
log.debug("using a default poly fit method")

if n_non_zero == 0:

log.debug("no counts, return 0")

# No data, nothing to do!

# create 3ML plugins and fit them with 3ML!
# should eventuallly allow better config

# seelct the model based on the grade

ps = PointSource("_dummy", 0, 0, spectral_shape=shape)

model = Model(ps)

avg = np.mean(y/exposure)

log.debug(f"starting polyfit with avg norm {avg}")

with silence_console_log():

xy = XYLike("series", x=x, y=y, exposure=exposure,
poisson_data=True, quiet=True)

if not bayes:

# make sure the model is positive

for i, (k, v) in enumerate(model.free_parameters.items()):

if i == 0:

v.bounds = (0, None)

v.value = avg

else:

v.value = 0.0

# we actually use a line here
# because a constant is returns a
# single number

shape.b = 0
shape.b.fix = True

jl: JointLikelihood = JointLikelihood(model, DataList(xy))

jl.set_minimizer("minuit")

# if the fit falis, retry and then just accept

try:

jl.fit(quiet=True)

log.debug("1st fit failed")

try:

jl.fit(quiet=True)

log.debug("all MLE fits failed")

pass

coeff = [v.value for _, v in model.free_parameters.items()]

log.debug(f"got coeff: {coeff}")

final_polynomial = Polynomial(coeff)

try:
final_polynomial.set_covariace_matrix(
jl.results.covariance_matrix)

except:

log.exception(f"Fit failed in channel")
raise FitFailed()

min_log_likelihood = xy.get_log_like()

else:

# set smart priors

for i, (k, v) in enumerate(model.free_parameters.items()):

if i == 0:

v.bounds = (0, None)
v.prior = Log_normal(
mu=np.log(avg), sigma=np.max([np.log(avg/2), 1]))
v.value = 1

else:

v.prior = Gaussian(mu=0, sigma=2)
v.value = 1e-2

# we actually use a line here
# because a constant is returns a
# single number

shape.b = 0
shape.b.fix = True

ba: BayesianAnalysis = BayesianAnalysis(model, DataList(xy))

ba.set_sampler("emcee")

ba.sampler.setup(n_iterations=500, n_burn_in=200, n_walkers=20)

ba.sample(quiet=True)

ba.restore_median_fit()

coeff = [v.value for _, v in model.free_parameters.items()]

log.debug(f"got coeff: {coeff}")

final_polynomial = Polynomial(coeff)

final_polynomial.set_covariace_matrix(
ba.results.estimate_covariance_matrix())

min_log_likelihood = xy.get_log_like()

log.debug(f"-min loglike: {-min_log_likelihood}")

return final_polynomial, -min_log_likelihood

[docs]def unbinned_polyfit(events: Iterable[float], grade: int, t_start: float, t_stop: float, exposure: float, bayes: bool) -> Tuple[Polynomial, float]:
"""
function to fit a polynomial to unbinned event data.
not a member to allow parallel computation

:param events: the events to fit
:param t_start: the start time to fit over
:param t_stop: the end time to fit over
:param expousure: the exposure of the interval
:param bayes: to do a bayesian fit or not

"""

log.debug(f"have {len(events)} events with {exposure} exposure")

# create 3ML plugins and fit them with 3ML!
# should eventuallly allow better config

# select the model based on the grade

if threeML_config.time_series.default_fit_method is not None:

bayes = threeML_config.time_series.default_fit_method
log.debug("using a default poly fit method")

if len(events) == 0:

log.debug("no events! returning zero")

return Polynomial([0] * (grade + 1)), 0

with silence_console_log():

ps = PointSource("dummy", 0, 0, spectral_shape=shape)

model = Model(ps)

observation = EventObservation(events, exposure, t_start, t_stop)

xy = UnbinnedPoissonLike("series", observation=observation)

if not bayes:

# make sure the model is positive

for i, (k, v) in enumerate(model.free_parameters.items()):

if i == 0:

v.bounds = (0, None)

v.value = 10

else:

v.value = 0.0

# we actually use a line here
# because a constant is returns a
# single number

shape.b = 0
shape.b.fix = True

jl: JointLikelihood = JointLikelihood(model, DataList(xy))

grid_minimizer = GlobalMinimization("grid")

local_minimizer = LocalMinimization("minuit")

my_grid = {
model.dummy.spectrum.main.shape.a: np.logspace(0, 3, 10)}

grid_minimizer.setup(
second_minimization=local_minimizer, grid=my_grid)

jl.set_minimizer(grid_minimizer)

# if the fit falis, retry and then just accept

try:

jl.fit(quiet=True)

try:

jl.fit(quiet=True)

log.debug("all MLE fits failed, returning zero")

coeff = [v.value for _, v in model.free_parameters.items()]

log.debug(f"got coeff: {coeff}")

final_polynomial = Polynomial(coeff)

final_polynomial.set_covariace_matrix(jl.results.covariance_matrix)

min_log_likelihood = xy.get_log_like()

else:

# set smart priors

for i, (k, v) in enumerate(model.free_parameters.items()):

if i == 0:

v.bounds = (0, None)
v.prior = Log_normal(mu=np.log(5), sigma=np.log(5))
v.value = 1

else:

v.prior = Gaussian(mu=0, sigma=.5)
v.value = 0.1

# we actually use a line here
# because a constant is returns a
# single number

shape.b = 0
shape.b.fix = True

ba: BayesianAnalysis = BayesianAnalysis(model, DataList(xy))

ba.set_sampler("emcee")

ba.sampler.setup(n_iterations=500, n_burn_in=200, n_walkers=20)

ba.sample(quiet=True)

ba.restore_median_fit()

coeff = [v.value for _, v in model.free_parameters.items()]

log.debug(f"got coeff: {coeff}")

final_polynomial = Polynomial(coeff)

final_polynomial.set_covariace_matrix(
ba.results.estimate_covariance_matrix())

min_log_likelihood = xy.get_log_like()

log.debug(f"-min loglike: {-min_log_likelihood}")

return final_polynomial, -min_log_likelihood
```