Source code for threeML.io.calculate_flux

__author__ = "grburgess"

import collections

import numpy as np
import pandas as pd
from threeML.io.logging import setup_logger
from threeML.utils.fitted_objects.fitted_point_sources import (
    FittedPointSourceSpectralHandler,
)
from threeML.utils.progress_bar import tqdm

log = setup_logger(__name__)


def _setup_analysis_dictionaries(
    analysis_results,
    energy_range,
    energy_unit,
    flux_unit,
    use_components,
    components_to_use,
    confidence_level,
    equal_tailed,
    differential,
    sources_to_use,
    include_extended,
):
    """
    helper function to pull out analysis details that are common to flux and plotting functions


    :param analysis_results:
    :param energy_range:
    :param energy_unit:
    :param flux_unit:
    :param use_components:
    :param components_to_use:
    :param confidence_level:
    :param fraction_of_samples:
    :param differential:
    :param sources_to_use:
    :param include_extended:
    :return:
    """

    bayesian_analyses = collections.OrderedDict()
    mle_analyses = collections.OrderedDict()

    # first we split up the bayesian and mle analysis

    mle_sources = collections.OrderedDict()
    bayes_sources = collections.OrderedDict()

    for analysis in analysis_results:

        items = (
            list(analysis.optimized_model.point_sources.items())
            if not include_extended
            else list(analysis.optimized_model.sources.items())
        )

        for source_name, source in items:

            if source_name in sources_to_use or not sources_to_use:

                if analysis.analysis_type == "MLE":

                    # keep track of duplicate sources

                    mle_sources.setdefault(source_name, []).append(1)

                    if len(mle_sources[source_name]) > 1:
                        name = "%s_%d" % (
                            source_name,
                            len(mle_sources[source_name]),
                        )

                    else:

                        name = source_name

                    try:

                        comps = [
                            c.name
                            for c in source.spectrum.main.composite.functions
                        ]

                    except:

                        try:

                            comps = [
                                c for c in source.components
                            ]

                        except:

                            comps = []

                    # duplicate components
                    comps = [
                        "%s_n%i" % (s, suffix) if num > 1 else s
                        for s, num in list(collections.Counter(comps).items())
                        for suffix in range(1, num + 1)
                    ]

                    mle_analyses[name] = {
                        "source": source_name,
                        "analysis": analysis,
                        "component_names": comps,
                    }

                else:

                    bayes_sources.setdefault(source_name, []).append(1)

                    # keep track of duplicate sources

                    if len(bayes_sources[source_name]) > 1:
                        name = "%s_%d" % (
                            source_name,
                            len(bayes_sources[source_name]),
                        )

                    else:

                        name = source_name

                    try:

                        comps = [
                            c.name
                            for c in source.spectrum.main.composite.functions
                        ]

                    except:

                        try:

                            comps = [
                                c for c in source.components
                            ]

                        except:

                            comps = []

                    # duplicate components
                    comps = [
                        "%s_n%i" % (s, suffix) if num > 1 else s
                        for s, num in list(collections.Counter(comps).items())
                        for suffix in range(1, num + 1)
                    ]

                    bayesian_analyses[name] = {
                        "source": source_name,
                        "analysis": analysis,
                        "component_names": comps,
                    }

    # keep track of the number of sources we will use

    num_sources_to_use = 0

    # go through the MLE analysis and build up some fitted sources

    if mle_analyses:

        for key in tqdm(
            list(mle_analyses.keys()), desc="processing MLE analyses"
        ):

            # if we want to use this source
            if (
                not use_components
                or ("total" in components_to_use)
                or (not mle_analyses[key]["component_names"])
            ):
                mle_analyses[key][
                    "fitted point source"
                ] = FittedPointSourceSpectralHandler(
                    mle_analyses[key]["analysis"],
                    mle_analyses[key]["source"],
                    energy_range,
                    energy_unit,
                    flux_unit,
                    confidence_level,
                    equal_tailed=equal_tailed,
                    is_differential_flux=differential,
                )

                num_sources_to_use += 1

            # see if there are any components to use

            if use_components:

                num_components_to_use = 0

                component_dict = {}

                for component in mle_analyses[key]["component_names"]:

                    # if we want to plot all the components

                    if not components_to_use:

                        component_dict[
                            component
                        ] = FittedPointSourceSpectralHandler(
                            mle_analyses[key]["analysis"],
                            mle_analyses[key]["source"],
                            energy_range,
                            energy_unit,
                            flux_unit,
                            confidence_level,
                            equal_tailed,
                            component=component,
                            is_differential_flux=differential,
                        )

                        num_components_to_use += 1

                    else:

                        # otherwise pick off only the ones of interest

                        if component in components_to_use:
                            component_dict[
                                component
                            ] = FittedPointSourceSpectralHandler(
                                mle_analyses[key]["analysis"],
                                mle_analyses[key]["source"],
                                energy_range,
                                energy_unit,
                                flux_unit,
                                confidence_level,
                                equal_tailed,
                                component=component,
                                is_differential_flux=differential,
                            )

                            num_components_to_use += 1

                # save these to the dict

                mle_analyses[key]["components"] = component_dict

            # keep track of how many components we need to plot

            if use_components:

                num_sources_to_use += num_components_to_use

                if "total" in components_to_use:
                    num_sources_to_use += 1

            # else:
            #
            #     num_sources_to_use += 1

    # repeat for the bayes analyses

    if bayesian_analyses:

        for key in tqdm(
            list(bayesian_analyses.keys()), desc="processing Bayesian analyses"
        ):

            # if we have a source to use

            if (
                not use_components
                or ("total" in components_to_use)
                or (not bayesian_analyses[key]["component_names"])
            ):
                bayesian_analyses[key][
                    "fitted point source"
                ] = FittedPointSourceSpectralHandler(
                    bayesian_analyses[key]["analysis"],
                    bayesian_analyses[key]["source"],
                    energy_range,
                    energy_unit,
                    flux_unit,
                    confidence_level,
                    equal_tailed,
                    is_differential_flux=differential,
                )

                num_sources_to_use += 1

            # if we want to use components

            if use_components:

                num_components_to_use = 0

                component_dict = {}

                for component in bayesian_analyses[key]["component_names"]:

                    # extracting all components

                    if not components_to_use:
                        component_dict[
                            component
                        ] = FittedPointSourceSpectralHandler(
                            bayesian_analyses[key]["analysis"],
                            bayesian_analyses[key]["source"],
                            energy_range,
                            energy_unit,
                            flux_unit,
                            confidence_level,
                            equal_tailed,
                            component=component,
                            is_differential_flux=differential,
                        )

                        num_components_to_use += 1

                    # or just some of them

                    if component in components_to_use:
                        component_dict[
                            component
                        ] = FittedPointSourceSpectralHandler(
                            bayesian_analyses[key]["analysis"],
                            bayesian_analyses[key]["source"],
                            energy_range,
                            energy_unit,
                            flux_unit,
                            confidence_level,
                            equal_tailed,
                            component=component,
                            is_differential_flux=differential,
                        )

                        num_components_to_use += 1

                bayesian_analyses[key]["components"] = component_dict

            # keep track of everything we added on

            if use_components and num_components_to_use > 0:

                num_sources_to_use += num_components_to_use

                if "total" in components_to_use:
                    num_sources_to_use += 1
            #
            # else:
            #
            #     num_sources_to_use += 1

        # we may have the same source in a bayesian and mle analysis.
        # we want to plot them, but make sure to label them differently.
        # so let's keep track of them

    duplicate_keys = []

    for key in list(mle_analyses.keys()):

        if key in list(bayesian_analyses.keys()):
            duplicate_keys.append(key)

    return mle_analyses, bayesian_analyses, num_sources_to_use, duplicate_keys


def _collect_sums_into_dictionaries(
    analyses, use_components, components_to_use
):
    """

    :param analyses:
    :param use_components:
    :param components_to_use:
    :return:
    """

    total_analysis = []

    component_sum_dict = collections.OrderedDict()

    num_sources_to_use = 0

    for key in list(analyses.keys()):

        # we won't assume to plot the total until the end

        use_total = False

        if use_components:

            # append all the components we want to sum to their
            # own key

            if (not list(analyses[key]["components"].keys())) or (
                "total" in components_to_use
            ):
                use_total = True

            for component in list(analyses[key]["components"].keys()):
                component_sum_dict.setdefault(component, []).append(
                    analyses[key]["components"][component]
                )

        else:

            use_total = True

        if use_total:
            # append the total spectrum

            total_analysis.append(analyses[key]["fitted point source"])

    if use_components:

        for key, values in list(component_sum_dict.items()):
            num_sources_to_use += len(values)

    num_sources_to_use += len(total_analysis)

    return total_analysis, component_sum_dict, num_sources_to_use


def _append_best_fit_and_errors(
    samples, _defaults, label, fluxes, p_errors, n_errors, labels
):
    if _defaults["best_fit"] == "average":

        best_fit = samples.average[0, 0]

    else:

        best_fit = samples.median[0, 0]

    positive_error = samples.upper_error[0, 0]

    negative_error = samples.lower_error[0, 0]

    fluxes.append(best_fit)
    p_errors.append(positive_error)
    n_errors.append(negative_error)
    labels.append(label)


def _compute_output(analyses, _defaults, out):
    fluxes = []
    p_errors = []
    n_errors = []
    labels = []

    # go thru the mle analysis and get the fluxes
    for key in list(analyses.keys()):

        # we won't assume to plot the total until the end

        get_total = False

        if _defaults["use_components"]:

            # if this source has no components or none that we wish to plot
            # then we will get the total flux after this

            if (not list(analyses[key]["components"].keys())) or (
                "total" in _defaults["components_to_use"]
            ):
                get_total = True

            for component in list(analyses[key]["components"].keys()):
                # extract the information and plot it

                samples = analyses[key]["components"][component]

                label = f"{key}: {component}"

                _append_best_fit_and_errors(
                    samples,
                    _defaults,
                    label,
                    fluxes,
                    p_errors,
                    n_errors,
                    labels,
                )

        else:

            get_total = True

        if get_total:
            # it ends up that we need to plot the total spectrum
            # which is just a repeat of the process
            samples = analyses[key]["fitted point source"]

            label = f"{key}: total"

            _append_best_fit_and_errors(
                samples, _defaults, label, fluxes, p_errors, n_errors, labels
            )

    if fluxes:
        # now make a data frame

        mle_df = pd.DataFrame(
            {"flux": fluxes, "low bound": n_errors, "hi bound": p_errors},
            index=labels,
        )
        mle_df = mle_df[["flux", "low bound", "hi bound"]]
        mle_df = mle_df[["flux", "low bound", "hi bound"]]
        out.append(mle_df)

        # display(mle_df)

    else:

        out.append(None)


def _compute_output_with_components(
    _defaults, component_sum_dict, total_analysis, out
):

    fluxes = []
    n_errors = []
    p_errors = []
    labels = []

    if _defaults["use_components"] and list(component_sum_dict.keys()):

        # we have components to calculate

        for component, values in list(component_sum_dict.items()):

            summed_analysis = sum(values)

            if _defaults["best_fit"] == "average":

                best_fit = summed_analysis.average[0, 0]

            else:

                best_fit = summed_analysis.median[0, 0]

            positive_error = summed_analysis.upper_error[0, 0]

            negative_error = summed_analysis.lower_error[0, 0]

            label = component

            fluxes.append(best_fit)
            p_errors.append(positive_error)
            n_errors.append(negative_error)
            labels.append(label)

    if total_analysis:

        summed_analysis = sum(total_analysis)

        if _defaults["best_fit"] == "average":

            best_fit = summed_analysis.average[0, 0]

        else:

            best_fit = summed_analysis.median[0, 0]

        positive_error = summed_analysis.upper_error[0, 0]

        negative_error = summed_analysis.lower_error[0, 0]

        label = "total"

        fluxes.append(best_fit)
        p_errors.append(positive_error)
        n_errors.append(negative_error)
        labels.append(label)

    if fluxes:
        # now make a data frame

        df = pd.DataFrame(
            {"flux": fluxes, "low bound": n_errors, "hi bound": p_errors},
            index=labels,
        )
        df = df[["flux", "low bound", "hi bound"]]
        out.append(df)

        # display(df)

    else:

        out.append(None)


[docs] def calculate_point_source_flux(*args, **kwargs): log.error( "The use of calculate_point_source_flux is deprecated. Please use the .get_point_source_flux()" " method of the JointLikelihood.results or the BayesianAnalysis.results member. For example:" " jl.results.get_point_source_flux()." ) return _calculate_point_source_flux(*args, **kwargs)
def _calculate_point_source_flux(ene_min, ene_max, *analyses, **kwargs): """ :param ene_min: lower energy bound for the flux :param ene_max: upper energy bound for the flux :param analyses: fitted JointLikelihood or BayesianAnalysis objects :param sources_to_use: (optional) list of PointSource string names to plot from the analysis :param energy_unit: (optional) astropy energy unit in string form (can also be frequency) :param flux_unit: (optional) astropy flux unit in string form :param ene_min: (optional) minimum energy to plot :param ene_max: (optional) maximum energy to plot :param use_components: (optional) True or False to plot the spectral components :param components_to_use: (optional) list of string names of the components to plot: including 'total' will also plot the total spectrum :param include_extended: (optional) if True, plot extended source spectra (spatially integrated) as well. :return: mle_dataframe, bayes_dataframe """ _defaults = { "confidence_level": 0.68, "equal_tailed": True, "best_fit": "median", "energy_unit": "keV", "flux_unit": "erg/(s cm2)", "ene_min": ene_min, "ene_max": ene_max, "use_components": False, "components_to_use": [], "sources_to_use": [], "sum_sources": False, "include_extended": False, } for key, value in list(kwargs.items()): if key in _defaults: _defaults[key] = value # set up the integral limits energy_range = np.array([_defaults["ene_min"], _defaults["ene_max"]]) mle_analyses, bayesian_analyses, _, _ = _setup_analysis_dictionaries( analyses, energy_range, _defaults["energy_unit"], _defaults["flux_unit"], _defaults["use_components"], _defaults["components_to_use"], _defaults["confidence_level"], _defaults["equal_tailed"], differential=False, sources_to_use=_defaults["sources_to_use"], include_extended=_defaults["include_extended"], ) out = [] if not _defaults["sum_sources"]: # Process the MLE analyses _compute_output(mle_analyses, _defaults, out) # now do the bayesian side _compute_output(bayesian_analyses, _defaults, out) else: # instead we now sum the fluxes # we keep bayes and mle apart ( total_analysis_mle, component_sum_dict_mle, _, ) = _collect_sums_into_dictionaries( mle_analyses, _defaults["use_components"], _defaults["components_to_use"], ) _compute_output_with_components( _defaults, component_sum_dict_mle, total_analysis_mle, out ) # now do the bayesian side ( total_analysis_bayes, component_sum_dict_bayes, _, ) = _collect_sums_into_dictionaries( bayesian_analyses, _defaults["use_components"], _defaults["components_to_use"], ) _compute_output_with_components( _defaults, component_sum_dict_bayes, total_analysis_bayes, out ) return out