__author__ = "grburgess"
import collections
import numpy as np
import pandas as pd
from threeML.io.logging import setup_logger
from threeML.utils.fitted_objects.fitted_point_sources import (
FittedPointSourceSpectralHandler,
)
from threeML.utils.progress_bar import tqdm
log = setup_logger(__name__)
def _setup_analysis_dictionaries(
analysis_results,
energy_range,
energy_unit,
flux_unit,
use_components,
components_to_use,
confidence_level,
equal_tailed,
differential,
sources_to_use,
include_extended,
):
"""
helper function to pull out analysis details that are common to flux and plotting functions
:param analysis_results:
:param energy_range:
:param energy_unit:
:param flux_unit:
:param use_components:
:param components_to_use:
:param confidence_level:
:param fraction_of_samples:
:param differential:
:param sources_to_use:
:param include_extended:
:return:
"""
bayesian_analyses = collections.OrderedDict()
mle_analyses = collections.OrderedDict()
# first we split up the bayesian and mle analysis
mle_sources = collections.OrderedDict()
bayes_sources = collections.OrderedDict()
for analysis in analysis_results:
items = (
list(analysis.optimized_model.point_sources.items())
if not include_extended
else list(analysis.optimized_model.sources.items())
)
for source_name, source in items:
if source_name in sources_to_use or not sources_to_use:
if analysis.analysis_type == "MLE":
# keep track of duplicate sources
mle_sources.setdefault(source_name, []).append(1)
if len(mle_sources[source_name]) > 1:
name = "%s_%d" % (
source_name,
len(mle_sources[source_name]),
)
else:
name = source_name
try:
comps = [
c.name
for c in source.spectrum.main.composite.functions
]
except:
try:
comps = [
c for c in source.components
]
except:
comps = []
# duplicate components
comps = [
"%s_n%i" % (s, suffix) if num > 1 else s
for s, num in list(collections.Counter(comps).items())
for suffix in range(1, num + 1)
]
mle_analyses[name] = {
"source": source_name,
"analysis": analysis,
"component_names": comps,
}
else:
bayes_sources.setdefault(source_name, []).append(1)
# keep track of duplicate sources
if len(bayes_sources[source_name]) > 1:
name = "%s_%d" % (
source_name,
len(bayes_sources[source_name]),
)
else:
name = source_name
try:
comps = [
c.name
for c in source.spectrum.main.composite.functions
]
except:
try:
comps = [
c for c in source.components
]
except:
comps = []
# duplicate components
comps = [
"%s_n%i" % (s, suffix) if num > 1 else s
for s, num in list(collections.Counter(comps).items())
for suffix in range(1, num + 1)
]
bayesian_analyses[name] = {
"source": source_name,
"analysis": analysis,
"component_names": comps,
}
# keep track of the number of sources we will use
num_sources_to_use = 0
# go through the MLE analysis and build up some fitted sources
if mle_analyses:
for key in tqdm(
list(mle_analyses.keys()), desc="processing MLE analyses"
):
# if we want to use this source
if (
not use_components
or ("total" in components_to_use)
or (not mle_analyses[key]["component_names"])
):
mle_analyses[key][
"fitted point source"
] = FittedPointSourceSpectralHandler(
mle_analyses[key]["analysis"],
mle_analyses[key]["source"],
energy_range,
energy_unit,
flux_unit,
confidence_level,
equal_tailed=equal_tailed,
is_differential_flux=differential,
)
num_sources_to_use += 1
# see if there are any components to use
if use_components:
num_components_to_use = 0
component_dict = {}
for component in mle_analyses[key]["component_names"]:
# if we want to plot all the components
if not components_to_use:
component_dict[
component
] = FittedPointSourceSpectralHandler(
mle_analyses[key]["analysis"],
mle_analyses[key]["source"],
energy_range,
energy_unit,
flux_unit,
confidence_level,
equal_tailed,
component=component,
is_differential_flux=differential,
)
num_components_to_use += 1
else:
# otherwise pick off only the ones of interest
if component in components_to_use:
component_dict[
component
] = FittedPointSourceSpectralHandler(
mle_analyses[key]["analysis"],
mle_analyses[key]["source"],
energy_range,
energy_unit,
flux_unit,
confidence_level,
equal_tailed,
component=component,
is_differential_flux=differential,
)
num_components_to_use += 1
# save these to the dict
mle_analyses[key]["components"] = component_dict
# keep track of how many components we need to plot
if use_components:
num_sources_to_use += num_components_to_use
if "total" in components_to_use:
num_sources_to_use += 1
# else:
#
# num_sources_to_use += 1
# repeat for the bayes analyses
if bayesian_analyses:
for key in tqdm(
list(bayesian_analyses.keys()), desc="processing Bayesian analyses"
):
# if we have a source to use
if (
not use_components
or ("total" in components_to_use)
or (not bayesian_analyses[key]["component_names"])
):
bayesian_analyses[key][
"fitted point source"
] = FittedPointSourceSpectralHandler(
bayesian_analyses[key]["analysis"],
bayesian_analyses[key]["source"],
energy_range,
energy_unit,
flux_unit,
confidence_level,
equal_tailed,
is_differential_flux=differential,
)
num_sources_to_use += 1
# if we want to use components
if use_components:
num_components_to_use = 0
component_dict = {}
for component in bayesian_analyses[key]["component_names"]:
# extracting all components
if not components_to_use:
component_dict[
component
] = FittedPointSourceSpectralHandler(
bayesian_analyses[key]["analysis"],
bayesian_analyses[key]["source"],
energy_range,
energy_unit,
flux_unit,
confidence_level,
equal_tailed,
component=component,
is_differential_flux=differential,
)
num_components_to_use += 1
# or just some of them
if component in components_to_use:
component_dict[
component
] = FittedPointSourceSpectralHandler(
bayesian_analyses[key]["analysis"],
bayesian_analyses[key]["source"],
energy_range,
energy_unit,
flux_unit,
confidence_level,
equal_tailed,
component=component,
is_differential_flux=differential,
)
num_components_to_use += 1
bayesian_analyses[key]["components"] = component_dict
# keep track of everything we added on
if use_components and num_components_to_use > 0:
num_sources_to_use += num_components_to_use
if "total" in components_to_use:
num_sources_to_use += 1
#
# else:
#
# num_sources_to_use += 1
# we may have the same source in a bayesian and mle analysis.
# we want to plot them, but make sure to label them differently.
# so let's keep track of them
duplicate_keys = []
for key in list(mle_analyses.keys()):
if key in list(bayesian_analyses.keys()):
duplicate_keys.append(key)
return mle_analyses, bayesian_analyses, num_sources_to_use, duplicate_keys
def _collect_sums_into_dictionaries(
analyses, use_components, components_to_use
):
"""
:param analyses:
:param use_components:
:param components_to_use:
:return:
"""
total_analysis = []
component_sum_dict = collections.OrderedDict()
num_sources_to_use = 0
for key in list(analyses.keys()):
# we won't assume to plot the total until the end
use_total = False
if use_components:
# append all the components we want to sum to their
# own key
if (not list(analyses[key]["components"].keys())) or (
"total" in components_to_use
):
use_total = True
for component in list(analyses[key]["components"].keys()):
component_sum_dict.setdefault(component, []).append(
analyses[key]["components"][component]
)
else:
use_total = True
if use_total:
# append the total spectrum
total_analysis.append(analyses[key]["fitted point source"])
if use_components:
for key, values in list(component_sum_dict.items()):
num_sources_to_use += len(values)
num_sources_to_use += len(total_analysis)
return total_analysis, component_sum_dict, num_sources_to_use
def _append_best_fit_and_errors(
samples, _defaults, label, fluxes, p_errors, n_errors, labels
):
if _defaults["best_fit"] == "average":
best_fit = samples.average[0, 0]
else:
best_fit = samples.median[0, 0]
positive_error = samples.upper_error[0, 0]
negative_error = samples.lower_error[0, 0]
fluxes.append(best_fit)
p_errors.append(positive_error)
n_errors.append(negative_error)
labels.append(label)
def _compute_output(analyses, _defaults, out):
fluxes = []
p_errors = []
n_errors = []
labels = []
# go thru the mle analysis and get the fluxes
for key in list(analyses.keys()):
# we won't assume to plot the total until the end
get_total = False
if _defaults["use_components"]:
# if this source has no components or none that we wish to plot
# then we will get the total flux after this
if (not list(analyses[key]["components"].keys())) or (
"total" in _defaults["components_to_use"]
):
get_total = True
for component in list(analyses[key]["components"].keys()):
# extract the information and plot it
samples = analyses[key]["components"][component]
label = f"{key}: {component}"
_append_best_fit_and_errors(
samples,
_defaults,
label,
fluxes,
p_errors,
n_errors,
labels,
)
else:
get_total = True
if get_total:
# it ends up that we need to plot the total spectrum
# which is just a repeat of the process
samples = analyses[key]["fitted point source"]
label = f"{key}: total"
_append_best_fit_and_errors(
samples, _defaults, label, fluxes, p_errors, n_errors, labels
)
if fluxes:
# now make a data frame
mle_df = pd.DataFrame(
{"flux": fluxes, "low bound": n_errors, "hi bound": p_errors},
index=labels,
)
mle_df = mle_df[["flux", "low bound", "hi bound"]]
mle_df = mle_df[["flux", "low bound", "hi bound"]]
out.append(mle_df)
# display(mle_df)
else:
out.append(None)
def _compute_output_with_components(
_defaults, component_sum_dict, total_analysis, out
):
fluxes = []
n_errors = []
p_errors = []
labels = []
if _defaults["use_components"] and list(component_sum_dict.keys()):
# we have components to calculate
for component, values in list(component_sum_dict.items()):
summed_analysis = sum(values)
if _defaults["best_fit"] == "average":
best_fit = summed_analysis.average[0, 0]
else:
best_fit = summed_analysis.median[0, 0]
positive_error = summed_analysis.upper_error[0, 0]
negative_error = summed_analysis.lower_error[0, 0]
label = component
fluxes.append(best_fit)
p_errors.append(positive_error)
n_errors.append(negative_error)
labels.append(label)
if total_analysis:
summed_analysis = sum(total_analysis)
if _defaults["best_fit"] == "average":
best_fit = summed_analysis.average[0, 0]
else:
best_fit = summed_analysis.median[0, 0]
positive_error = summed_analysis.upper_error[0, 0]
negative_error = summed_analysis.lower_error[0, 0]
label = "total"
fluxes.append(best_fit)
p_errors.append(positive_error)
n_errors.append(negative_error)
labels.append(label)
if fluxes:
# now make a data frame
df = pd.DataFrame(
{"flux": fluxes, "low bound": n_errors, "hi bound": p_errors},
index=labels,
)
df = df[["flux", "low bound", "hi bound"]]
out.append(df)
# display(df)
else:
out.append(None)
[docs]
def calculate_point_source_flux(*args, **kwargs):
log.error(
"The use of calculate_point_source_flux is deprecated. Please use the .get_point_source_flux()"
" method of the JointLikelihood.results or the BayesianAnalysis.results member. For example:"
" jl.results.get_point_source_flux()."
)
return _calculate_point_source_flux(*args, **kwargs)
def _calculate_point_source_flux(ene_min, ene_max, *analyses, **kwargs):
"""
:param ene_min: lower energy bound for the flux
:param ene_max: upper energy bound for the flux
:param analyses: fitted JointLikelihood or BayesianAnalysis objects
:param sources_to_use: (optional) list of PointSource string names to plot from the analysis
:param energy_unit: (optional) astropy energy unit in string form (can also be frequency)
:param flux_unit: (optional) astropy flux unit in string form
:param ene_min: (optional) minimum energy to plot
:param ene_max: (optional) maximum energy to plot
:param use_components: (optional) True or False to plot the spectral components
:param components_to_use: (optional) list of string names of the components to plot: including 'total'
will also plot the total spectrum
:param include_extended: (optional) if True, plot extended source spectra (spatially integrated) as well.
:return: mle_dataframe, bayes_dataframe
"""
_defaults = {
"confidence_level": 0.68,
"equal_tailed": True,
"best_fit": "median",
"energy_unit": "keV",
"flux_unit": "erg/(s cm2)",
"ene_min": ene_min,
"ene_max": ene_max,
"use_components": False,
"components_to_use": [],
"sources_to_use": [],
"sum_sources": False,
"include_extended": False,
}
for key, value in list(kwargs.items()):
if key in _defaults:
_defaults[key] = value
# set up the integral limits
energy_range = np.array([_defaults["ene_min"], _defaults["ene_max"]])
mle_analyses, bayesian_analyses, _, _ = _setup_analysis_dictionaries(
analyses,
energy_range,
_defaults["energy_unit"],
_defaults["flux_unit"],
_defaults["use_components"],
_defaults["components_to_use"],
_defaults["confidence_level"],
_defaults["equal_tailed"],
differential=False,
sources_to_use=_defaults["sources_to_use"],
include_extended=_defaults["include_extended"],
)
out = []
if not _defaults["sum_sources"]:
# Process the MLE analyses
_compute_output(mle_analyses, _defaults, out)
# now do the bayesian side
_compute_output(bayesian_analyses, _defaults, out)
else:
# instead we now sum the fluxes
# we keep bayes and mle apart
(
total_analysis_mle,
component_sum_dict_mle,
_,
) = _collect_sums_into_dictionaries(
mle_analyses,
_defaults["use_components"],
_defaults["components_to_use"],
)
_compute_output_with_components(
_defaults, component_sum_dict_mle, total_analysis_mle, out
)
# now do the bayesian side
(
total_analysis_bayes,
component_sum_dict_bayes,
_,
) = _collect_sums_into_dictionaries(
bayesian_analyses,
_defaults["use_components"],
_defaults["components_to_use"],
)
_compute_output_with_components(
_defaults, component_sum_dict_bayes, total_analysis_bayes, out
)
return out