Source code for threeML.classicMLE.likelihood_ratio_test

from builtins import object
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy.stats as stats

from astromodels import clone_model

from threeML.classicMLE.joint_likelihood import JointLikelihood
from threeML.classicMLE.joint_likelihood_set import JointLikelihoodSet
from threeML.data_list import DataList
from threeML.exceptions.custom_exceptions import custom_warnings
from threeML.plugins.OGIPLike import OGIPLike
from threeML.utils.OGIP.pha import PHAWrite


[docs]class LikelihoodRatioTest(object): def __init__(self, joint_likelihood_instance0, joint_likelihood_instance1): self._joint_likelihood_instance0 = ( joint_likelihood_instance0 ) # type: JointLikelihood self._joint_likelihood_instance1 = ( joint_likelihood_instance1 ) # type: JointLikelihood # Restore best fit and store the reference value for the likelihood self._joint_likelihood_instance0.restore_best_fit() self._joint_likelihood_instance1.restore_best_fit() self._reference_TS = 2 * ( self._joint_likelihood_instance0.current_minimum - self._joint_likelihood_instance1.current_minimum ) # Safety check that the user has provided the models in the right order if self._reference_TS < 0: custom_warnings.warn( "The reference TS is negative, either you specified the likelihood objects " "in the wrong order, or the fit for the alternative hyp. has failed. Since the " "two hyp. are nested, by definition the more complex hypothesis should give a " "better or equal fit with respect to the null hypothesis." ) # Check that the dataset is the same if ( self._joint_likelihood_instance1.data_list != self._joint_likelihood_instance0.data_list ): # Since this check might fail if the user loaded twice the same data, only issue a warning, instead of # an exception. custom_warnings.warn( "The data lists for the null hyp. and for the alternative hyp. seems to be different." " If you loaded twice the same data and made the same data selections, disregard this " "message. Otherwise, consider the fact that the LRT is meaningless if the two data " "sets are not exactly the same. We will use the data loaded as part of the null " "hypothesis JointLikelihood object", RuntimeWarning, ) # For saving pha files self._save_pha = False self._data_container = []
[docs] def get_simulated_data(self, id): # Generate a new data set for each plugin contained in the data list new_datas = [] for dataset in list(self._joint_likelihood_instance0.data_list.values()): # Make sure that the active likelihood model is the null hypothesis # This is needed if the user has used the same DataList instance for both # JointLikelihood instances dataset.set_model(self._joint_likelihood_instance0.likelihood_model) new_data = dataset.get_simulated_dataset("%s_sim" % dataset.name) new_datas.append(new_data) new_data_list = DataList(*new_datas) if self._save_pha: self._data_container.append(new_data_list) return new_data_list
[docs] def get_models(self, id): # Make a copy of the best fit models, so that we don't touch the original models during the fit, and we # also always restart from the best fit (instead of the last iteration) new_model0 = clone_model(self._joint_likelihood_instance0.likelihood_model) new_model1 = clone_model(self._joint_likelihood_instance1.likelihood_model) return new_model0, new_model1
[docs] def by_mc(self, n_iterations=1000, continue_on_failure=False, save_pha=False): """ Compute the Likelihood Ratio Test by generating Monte Carlo datasets and fitting the current models on them. The fraction of synthetic datasets which have a value for the TS larger or equal to the observed one gives the null-hypothesis probability (i.e., the probability that the observed TS is obtained by chance from the null hypothesis) :param n_iterations: number of MC iterations to perform (default: 1000) :param continue_of_failure: whether to continue in the case a fit fails (False by default) :param save_pha: Saves pha files for reading into XSPEC as a cross check. Currently only supports OGIP data. This can become slow! (False by default) :return: tuple (null. hyp. probability, TSs, frame with all results, frame with all likelihood values) """ self._save_pha = save_pha # Create the joint likelihood set jl_set = JointLikelihoodSet( self.get_simulated_data, self.get_models, n_iterations, iteration_name="simulation", ) # Use the same minimizer as in the first joint likelihood object jl_set.set_minimizer(self._joint_likelihood_instance0.minimizer_in_use) # Run the set data_frame, like_data_frame = jl_set.go(continue_on_failure=continue_on_failure) # Get the TS values TS_ = 2 * ( like_data_frame["-log(likelihood)"][:, "model_0", "total"] - like_data_frame["-log(likelihood)"][:, "model_1", "total"] ) # type: pd.Series TS = pd.Series(TS_.values, name="TS") # Compute the null hyp probability idx = TS >= self._reference_TS # type: np.ndarray null_hyp_prob = np.sum(idx) / float(n_iterations) # save these for later self._null_hyp_prob = null_hyp_prob self._TS_distribution = TS # Save the sims to phas if requested if self._save_pha: self._process_saved_data() return null_hyp_prob, TS, data_frame, like_data_frame
[docs] def plot_TS_distribution(self, show_chi2=True, scale=1.0, **hist_kwargs): """ :param show_chi2: :param scale: :param hist_kwargs: :return: """ fig, ax = plt.subplots() counts, bins, _ = ax.hist( self._TS_distribution, density=True, label="monte carlo", **hist_kwargs ) ax.axvline(self._reference_TS, color="r", ls="--", label="Ref. TS") if show_chi2: x_plot = np.linspace(bins[0], bins[-1], 100,) # get the difference in number of free parameters dof = len( self._joint_likelihood_instance1.likelihood_model.free_parameters ) - len(self._joint_likelihood_instance0.likelihood_model.free_parameters) assert ( dof >= 0 ), "The difference in the number of parameters between the alternative and null models is negative!" chi2 = stats.chi2.pdf(x_plot, dof) if scale == 1.0: _scale = "" else: _scale = "%.1f" % scale label = r"$%s\chi^{2}_{%d}$" % (_scale, dof) ax.plot(x_plot, scale * chi2, label=label) ax.set_yscale("log") ax.set_xlabel("TS") ax.set_ylabel("Probability distribution") return fig
@property def reference_TS(self): return self._reference_TS @property def TS_distribution(self): return self._TS_distribution @property def null_hypothesis_probability(self): return self._null_hyp_prob def _process_saved_data(self): """ Saves data sets for each plugin to PHAs for OGIP data. :return: """ for plugin in list(self._data_container[0].values()): assert isinstance( plugin, OGIPLike ), "Saving simulations is only supported for OGIP plugins currently" # The first entry is always a test by the JL Set class. # so we do not use it for key in list(self._data_container[0].keys()): per_plugin_list = [] for data in self._data_container[1:]: per_plugin_list.append(data[key]) # Now write them pha_writer = PHAWrite(*per_plugin_list) pha_writer.write("%s" % key, overwrite=True)