Source code for threeML.classicMLE.joint_likelihood_set

import warnings
from builtins import object, range

import numpy as np
import pandas as pd
from astromodels import Model

from threeML.analysis_results import AnalysisResultsSet
from threeML.classicMLE.joint_likelihood import JointLikelihood
from threeML.config.config import threeML_config
from threeML.data_list import DataList
from import setup_logger, silence_console_log
from threeML.minimizer.minimization import (LocalMinimization, _Minimization,
from threeML.parallel.parallel_client import ParallelClient
from threeML.utils.progress_bar import trange

log = setup_logger(__name__)

[docs]class JointLikelihoodSet(object): def __init__( self, data_getter, model_getter, n_iterations, iteration_name="interval", preprocessor=None, ): # Store the data and model getter self._data_getter = data_getter # Now get the first model(s) and see whether there is one or more models # Then, we make a wrapper if it returns only one model, so that we will not need to specialize # the worker, as it will be able to assume that self._model_getter always returns a list of models # (of maybe one element) model_or_models = model_getter(0) try: n_models = len(model_or_models) except TypeError: # Only one instance, let's check that it is actually a model if not isinstance(model_or_models, Model): log.error( "The model getter function should return a model or a list of " "models") raise RuntimeError() # Save that self._n_models = 1 # Wrap the function so that self._model_getter will return a list of one element self._model_getter = lambda id: [model_getter(id)] else: # More than one model # Check that all models are instances of Model for this_model in model_or_models: if not isinstance( this_model, Model ): log.error( "The model getter function should return a model or a list of models") raise RuntimeError() # No need for a wrapper in this case self._model_getter = model_getter # save the number of models self._n_models = n_models # Set up some attributes we will need self._n_iterations = n_iterations # This is used only to print error messages self._iteration_name = iteration_name # Default minimizer is minuit self._minimization = LocalMinimization("minuit") # By default, crash if a fit fails self._continue_on_failure = False # By default do not compute the covariance matrix self._compute_covariance = False self._all_results = None self._preprocessor = preprocessor
[docs] def set_minimizer(self, minimizer): if isinstance(minimizer, _Minimization): self._minimization = minimizer else: if not minimizer.upper() in _minimizers: log.error("Minimizer %s is not available on this system. " "Available minimizers: %s" % (minimizer, ",".join(list(_minimizers.keys()))) ) raise RuntimeError() # The string can only specify a local minimization. This will return an error if that is not the case. # In order to setup global optimization the user needs to use the GlobalMinimization factory directly self._minimization = LocalMinimization(minimizer) self._minimization = minimizer
[docs] def worker(self, interval): # Get the dataset for this interval this_data = self._data_getter(interval) # type: DataList # Get the model for this interval this_models = self._model_getter(interval) # Apply preprocessor (if any) if self._preprocessor is not None: self._preprocessor(this_models, this_data) n_models = len(this_models) # Fit all models and collect the results parameters_frames = [] like_frames = [] analysis_results = [] for this_model in this_models: # Prepare a joint likelihood and fit it with warnings.catch_warnings(): warnings.simplefilter("ignore", RuntimeWarning) jl = JointLikelihood(this_model, this_data) this_parameter_frame, this_like_frame = self._fitter(jl) # Append results parameters_frames.append(this_parameter_frame) like_frames.append(this_like_frame) analysis_results.append(jl.results) # Now merge the results in one data frame for the parameters and one for the likelihood # values if n_models > 1: # Prepare the keys so that the first model will be indexed with model_0, the second model_1 and so on keys = ["model_%i" % x for x in range(n_models)] # Concatenate all results in one frame for parameters and one for likelihood frame_with_parameters = pd.concat(parameters_frames, keys=keys) frame_with_like = pd.concat(like_frames, keys=keys) else: frame_with_parameters = parameters_frames[0] frame_with_like = like_frames[0] return frame_with_parameters, frame_with_like, analysis_results
def _fitter(self, jl): # Set the minimizer jl.set_minimizer(self._minimization) try: model_results, logl_results = quiet=True, compute_covariance=self._compute_covariance ) except Exception as e: log.exception("**** FIT FAILED! ***") if self._continue_on_failure: # Return empty data frame return pd.DataFrame(), pd.DataFrame() else: raise return model_results, logl_results
[docs] def go( self, continue_on_failure=True, compute_covariance=False, verbose=False, **options_for_parallel_computation ): # Generate the data frame which will contain all results self._continue_on_failure = continue_on_failure self._compute_covariance = compute_covariance # let's iterate, perform the fit and fill the data frame if threeML_config["parallel"]["use_parallel"]: # Parallel computation with silence_console_log(and_progress_bars=False): client = ParallelClient(**options_for_parallel_computation) results = client.execute_with_progress_bar( self.worker, list(range(self._n_iterations)) ) else: # Serial computation results = [] with silence_console_log(and_progress_bars=False): for i in trange(self._n_iterations, desc="Goodness of fit computation"): results.append(self.worker(i)) assert len(results) == self._n_iterations, ( "Something went wrong, I have %s results " "for %s intervals" % (len(results), self._n_iterations) ) # Store the results in the data frames parameter_frames = pd.concat( [x[0] for x in results], keys=list(range(self._n_iterations)) ) like_frames = pd.concat( [x[1] for x in results], keys=list(range(self._n_iterations)) ) # Store a list with all results (this is a list of lists, each list contains the results for the different # iterations for the same model) self._all_results = [] for i in range(self._n_models): this_model_results = [x[2][i] for x in results] self._all_results.append(AnalysisResultsSet(this_model_results)) return parameter_frames, like_frames
@property def results(self): """ Returns a results set for each model. If there is more than one model, it will return a list of AnalysisResultsSet instances, otherwise it will return one AnalysisResultsSet instance :return: """ if len(self._all_results) == 1: return self._all_results[0] else: return self._all_results
[docs] def write_to(self, filenames, overwrite=False): """ Write the results to one file per model. If you need more control, get the results using the .results property then write each results set by itself. :param filenames: list of filenames, one per model, or filename (if there is only one model per interval) :param overwrite: overwrite existing files :return: None """ # Trick to make filenames always a list filenames = list(np.array(filenames, ndmin=1)) # Check that we have the right amount of file names assert len(filenames) == self._n_models # Now write one file for each model for i in range(self._n_models): this_results = self._all_results[i] this_results.write_to(filenames[i], overwrite=overwrite)
[docs]class JointLikelihoodSetAnalyzer(object): """ A class to help in offline re-analysis of the results obtained with the JointLikelihoodSet class """ def __init__(self, get_data, get_model, data_frame, like_data_frame): self._get_data = get_data self._get_model = get_model self._data_frame = data_frame self._like_data_frame = like_data_frame
[docs] def restore_best_fit_model(self, interval): # Get sub-frame containing the results for the requested interval sub_frame = self._data_frame.loc[interval] # Get the model for this interval this_model = self._get_model(interval) # Get data for this interval this_data = self._get_data(interval) # Instance a useless joint likelihood object so that plugins have the chance to add nuisance parameters to the # model _ = JointLikelihood(this_model, this_data) # Restore best fit parameters for parameter in this_model.free_parameters: this_model[parameter].value = sub_frame["value"][parameter] return this_model, this_data