Source code for threeML.test.test_analysis_results
from __future__ import division
from __future__ import print_function
from builtins import zip
from past.utils import old_div
import pytest
import os
import numpy as np
import astropy.units as u
from threeML.plugins.XYLike import XYLike
from threeML import Model, DataList, JointLikelihood, PointSource
from threeML import BayesianAnalysis, Uniform_prior, Log_uniform_prior
from threeML.analysis_results import (
MLEResults,
load_analysis_results,
load_analysis_results_hdf,
convert_fits_analysis_result_to_hdf,
AnalysisResultsSet,
)
from astromodels import Line, Gaussian, Powerlaw
_cache = {}
# These are the same simulated dataset we use in the test of the XY plugin
x = np.linspace(0, 10, 50)
poiss_sig = [
44,
43,
38,
25,
51,
37,
46,
47,
55,
36,
40,
32,
46,
37,
44,
42,
50,
48,
52,
47,
39,
55,
80,
93,
123,
135,
96,
74,
43,
49,
43,
51,
27,
32,
35,
42,
43,
49,
38,
43,
59,
54,
50,
40,
50,
57,
55,
47,
38,
64,
]
def _results_are_same(res1, res2, bayes=False):
# Check that they are the same
if not bayes:
# Check covariance
assert np.allclose(res1.covariance_matrix, res2.covariance_matrix)
else:
# Check samples
np.allclose(res1.samples, res2.samples)
frame1 = res1.get_data_frame()
frame2 = res2.get_data_frame()
# Remove the units (which cannot be checked with np.allclose)
unit1 = frame1.pop("unit")
unit2 = frame2.pop("unit")
assert np.allclose(frame1.values, frame2.values, rtol=0.15)
assert np.all(unit1 == unit2)
# Now check the values for the statistics
s1 = res1.optimal_statistic_values
s2 = res2.optimal_statistic_values
assert np.allclose(s1.values, s2.values)
[docs]
def test_analysis_results_input_output(xy_fitted_joint_likelihood):
jl, _, _ = xy_fitted_joint_likelihood # type: JointLikelihood, None, None
jl.restore_best_fit()
ar = jl.results # type: MLEResults
temp_file = "__test_mle.fits"
ar.write_to(temp_file, overwrite=True)
ar_reloaded = load_analysis_results(temp_file)
os.remove(temp_file)
_results_are_same(ar, ar_reloaded)
[docs]
def test_analysis_results_input_output_hdf(xy_fitted_joint_likelihood):
jl, _, _ = xy_fitted_joint_likelihood # type: JointLikelihood, None, None
jl.restore_best_fit()
ar = jl.results # type: MLEResults
temp_file = "__test_mle.h5"
ar.write_to(temp_file, overwrite=True, as_hdf=True)
ar_reloaded = load_analysis_results_hdf(temp_file)
os.remove(temp_file)
_results_are_same(ar, ar_reloaded)
[docs]
def test_analysis_set_input_output(xy_fitted_joint_likelihood):
# Collect twice the same analysis results just to see if we can
# save them in a file as set of results
jl, _, _ = xy_fitted_joint_likelihood # type: JointLikelihood, None, None
jl.restore_best_fit()
ar = jl.results # type: MLEResults
ar2 = jl.results
analysis_set = AnalysisResultsSet([ar, ar2])
analysis_set.set_bins("testing", [-1, 1], [3, 5], unit="s")
temp_file = "_analysis_set_test"
analysis_set.write_to(temp_file, overwrite=True)
analysis_set_reloaded = load_analysis_results(temp_file)
os.remove(temp_file)
# Test they are the same
assert len(analysis_set_reloaded) == len(analysis_set)
for res1, res2 in zip(analysis_set, analysis_set_reloaded):
_results_are_same(res1, res2)
[docs]
def test_conversion_fits2hdf(xy_fitted_joint_likelihood):
jl, _, _ = xy_fitted_joint_likelihood # type: JointLikelihood, None, None
jl.restore_best_fit()
ar = jl.results # type: MLEResults
ar2 = jl.results
analysis_set = AnalysisResultsSet([ar, ar2])
analysis_set.set_bins("testing", [-1, 1], [3, 5], unit="s")
temp_file = "_analysis_set_test.fits"
analysis_set.write_to(temp_file, overwrite=True)
convert_fits_analysis_result_to_hdf(temp_file)
analysis_set_reloaded = load_analysis_results_hdf("_analysis_set_test.h5")
# Test they are the same
assert len(analysis_set_reloaded) == len(analysis_set)
for res1, res2 in zip(analysis_set, analysis_set_reloaded):
_results_are_same(res1, res2)
[docs]
def test_analysis_set_input_output_hdf(xy_fitted_joint_likelihood):
# Collect twice the same analysis results just to see if we can
# save them in a file as set of results
jl, _, _ = xy_fitted_joint_likelihood # type: JointLikelihood, None, None
jl.restore_best_fit()
ar = jl.results # type: MLEResults
ar2 = jl.results
analysis_set = AnalysisResultsSet([ar, ar2])
analysis_set.set_bins("testing", [-1, 1], [3, 5], unit="s")
temp_file = "_analysis_set_test_hdf"
analysis_set.write_to(temp_file, overwrite=True, as_hdf=True)
analysis_set_reloaded = load_analysis_results_hdf(temp_file)
os.remove(temp_file)
# Test they are the same
assert len(analysis_set_reloaded) == len(analysis_set)
for res1, res2 in zip(analysis_set, analysis_set_reloaded):
_results_are_same(res1, res2)
[docs]
def test_error_propagation(xy_fitted_joint_likelihood):
jl, _, _ = xy_fitted_joint_likelihood # type: JointLikelihood, None, None
jl.restore_best_fit()
ar = jl.results # type: MLEResults
# You can use the results for propagating errors non-linearly for analytical functions
p1 = ar.get_variates("fake.spectrum.main.composite.b_1")
p2 = ar.get_variates("fake.spectrum.main.composite.a_1")
# Test the printing
print(p1)
print(p2)
res = p1 + p2
assert old_div(abs(res.value - (p1.value + p2.value)), (p1.value + p2.value)) < 0.01
# Make ratio with error 0
res = old_div(p1, p1)
low_b, hi_b = res.equal_tail_interval()
assert low_b == 1
assert hi_b == 1
# Now with a function
fitfun = ar.optimized_model.fake.spectrum.main.shape
arguments = {}
for par in list(fitfun.parameters.values()):
if par.free:
this_name = par.name
this_variate = ar.get_variates(par.path)
# Do not use more than 1000 values (would make computation too slow for nothing)
if len(this_variate) > 1000:
this_variate = np.random.choice(this_variate, size=1000)
arguments[this_name] = this_variate
# Prepare the error propagator function
pp = ar.propagate(
ar.optimized_model.fake.spectrum.main.shape.evaluate_at, **arguments
)
new_variate = pp(5.0)
assert abs(new_variate.median - 130.0) < 20
low_b, hi_b = new_variate.equal_tail_interval()
assert abs(low_b - 120) < 20
assert abs(hi_b - 140) < 20
[docs]
def test_bayesian_input_output(xy_completed_bayesian_analysis):
bs, _ = xy_completed_bayesian_analysis
rb1 = bs.results
temp_file = "_test_bayes.fits"
rb1.write_to(temp_file, overwrite=True)
rb2 = load_analysis_results(temp_file)
os.remove(temp_file)
_results_are_same(rb1, rb2, bayes=True)
[docs]
def test_corner_plotting(xy_completed_bayesian_analysis):
bs, _ = xy_completed_bayesian_analysis
ar = bs.results
ar.corner_plot()
ar.corner_plot(components = [*ar._free_parameters.keys()][0:2])
[docs]
def test_one_free_parameter_input_output():
fluxUnit = 1.0 / (u.TeV * u.cm ** 2 * u.s)
temp_file = "__test_mle.fits"
spectrum = Powerlaw()
source = PointSource("tst", ra=100, dec=20, spectral_shape=spectrum)
model = Model(source)
spectrum.piv = 7 * u.TeV
spectrum.index = -2.3
spectrum.K = 1e-15 * fluxUnit
spectrum.piv.fix = True
# two free parameters (one with units)
spectrum.index.fix = False
spectrum.K.fix = False
cov_matrix = np.diag([0.001] * 2)
ar = MLEResults(model, cov_matrix, {"1": 1})
ar.write_to(temp_file, overwrite=True)
ar_reloaded = load_analysis_results(temp_file)
os.remove(temp_file)
_results_are_same(ar, ar_reloaded)
# one free parameter with units
spectrum.index.fix = True
spectrum.K.fix = False
cov_matrix = np.diag([0.001] * 1)
ar = MLEResults(model, cov_matrix, {"1": 1})
ar.write_to(temp_file, overwrite=True)
ar_reloaded = load_analysis_results(temp_file)
os.remove(temp_file)
_results_are_same(ar, ar_reloaded)
# one free parameter without units
spectrum.index.fix = False
spectrum.K.fix = True
cov_matrix = np.diag([0.001] * 1)
ar = MLEResults(model, cov_matrix, {"1": 1})
ar.write_to(temp_file, overwrite=True)
ar_reloaded = load_analysis_results(temp_file)
os.remove(temp_file)
_results_are_same(ar, ar_reloaded)