Source code for threeML.utils.fitted_objects.fitted_point_sources

from __future__ import division

from past.utils import old_div

__author__ = "grburgess"

import collections

import numba as nb
import numpy as np
import scipy.integrate as integrate
from astropy import units as u

from threeML.config import threeML_config
from threeML.config.point_source_structure import IntegrateMethod
from threeML.io.logging import setup_logger
from threeML.utils.fitted_objects.fitted_source_handler import \
    GenericFittedSourceHandler

log = setup_logger(__name__)


@nb.njit(fastmath=True, cache=True)
def _trapz(x, y):
    return np.trapz(x, y)


[docs] class NotCompositeModelError(RuntimeError): pass
[docs] class InvalidUnitError(RuntimeError): pass
[docs] class FluxConversion(object): def __init__(self, flux_unit, energy_unit, flux_model): """ a generic flux conversion class to handle transforming spectra between different flux units :param flux_unit: the desired flux unit :param energy_unit: the energy unit :param flux_model: the model to be transformed """ self._flux_unit = flux_unit self._energy_unit = energy_unit self._model = flux_model self._test_value = 1.0 * energy_unit self._flux_type = None self._determine_quantity() self._calculate_conversion() def _determine_quantity(self): # scroll thru conversions until one works for k, v in self._flux_lookup.items(): try: self._flux_unit.to(v) self._flux_type = k except (u.UnitConversionError): continue if self._flux_type is None: raise InvalidUnitError( "The flux_unit provided is not a valid flux quantity" ) def _calculate_conversion(self): # convert the model to the right units so that we can # convert back later for speed tmp = self._model_converter[self._flux_type](self._test_value) if ( tmp.unit == u.dimensionless_unscaled or tmp.unit == self._test_value.unit or tmp.unit == (self._test_value.unit) ** 2 ): # this is a multiplicative model self._conversion = 1.0 self._is_dimensionless = True else: self._conversion = tmp.unit.to( self._flux_unit, equivalencies=u.spectral()) self._is_dimensionless = False @property def is_dimensionless(self): return self._is_dimensionless @property def model(self): """ the model converted :return: a model in the proper units """ return self._model_builder[self._flux_type] @property def conversion_factor(self): """ the conversion factor needed to finalize the model into the proper units after computations :return: """ return self._conversion
[docs] class DifferentialFluxConversion(FluxConversion): def __init__(self, flux_unit, energy_unit, flux_model, test_model): """ Handles differential flux conversion and model building for point sources :param test_model: model to test the flux on :param flux_unit: an astropy unit string for differential flux :param energy_unit: an astropy unit string for energy :param flux_model: the base flux model to use """ self._flux_lookup = { "photon_flux": 1.0 / (u.keV * u.cm ** 2 * u.s), "energy_flux": old_div(u.erg, (u.keV * u.cm ** 2 * u.s)), "nufnu_flux": old_div(u.erg ** 2, (u.keV * u.cm ** 2 * u.s)), } self._model_converter = { "photon_flux": test_model, "energy_flux": lambda x: x * test_model(x), "nufnu_flux": lambda x: x * x * test_model(x), } self._model_builder = { "photon_flux": flux_model, "energy_flux": lambda x, **param_specification: x * flux_model(x, **param_specification), "nufnu_flux": lambda x, **param_specification: x * x * flux_model(x, **param_specification), } super(DifferentialFluxConversion, self).__init__( flux_unit, energy_unit, flux_model )
[docs] def trap_integral(func, e1, e2, **args): if e2/e1 > 100: e_grid = np.logspace(np.log10(e1), np.log10(e2), 50) else: e_grid = np.linspace(e1, e2, 50) y = func(e_grid, args) return _trapz(y, e_grid)
[docs] class IntegralFluxConversion(FluxConversion): def __init__(self, flux_unit, energy_unit, flux_model, test_model): """ Handles integral flux conversion and model building for point sources :param flux_unit: an astropy unit string for integral flux :param energy_unit: an astropy unit string for energy :param flux_model: the base flux model to use """ self._flux_lookup = { "photon_flux": 1.0 / (u.cm ** 2 * u.s), "energy_flux": old_div(u.erg, (u.cm ** 2 * u.s)), "nufnu_flux": old_div(u.erg ** 2, (u.cm ** 2 * u.s)), } self._model_converter = { "photon_flux": lambda x: x * test_model(x), "energy_flux": lambda x: x * x * test_model(x), "nufnu_flux": lambda x: x ** 3 * test_model(x), } def photon_integrand(x, param_specification): return flux_model(x, **param_specification) def energy_integrand(x, param_specification): return x * flux_model(x, **param_specification) def nufnu_integrand(x, param_specification): return x * x * flux_model(x, **param_specification) if threeML_config.point_source.integrate_flux_method == IntegrateMethod.trapz: self._model_builder = { "photon_flux": lambda e1, e2, **param_specification: trap_integral( photon_integrand, e1, e2, **param_specification ), "energy_flux": lambda e1, e2, **param_specification: trap_integral( energy_integrand, e1, e2, **param_specification ), "nufnu_flux": lambda e1, e2, **param_specification: trap_integral( nufnu_integrand, e1, e2, **param_specification ), } elif threeML_config.point_source.integrate_flux_method == IntegrateMethod.quad: self._model_builder = { "photon_flux": lambda e1, e2, **param_specification: integrate.quad( photon_integrand, e1, e2, args=(param_specification) )[0], "energy_flux": lambda e1, e2, **param_specification: integrate.quad( energy_integrand, e1, e2, args=(param_specification) )[0], "nufnu_flux": lambda e1, e2, **param_specification: integrate.quad( nufnu_integrand, e1, e2, args=(param_specification) )[0], } else: log.error("This is not a valid integratio method") raise RuntimeError super(IntegralFluxConversion, self).__init__( flux_unit, energy_unit, flux_model)
[docs] class FittedPointSourceSpectralHandler(GenericFittedSourceHandler): def __init__( self, analysis_result, source, energy_range, energy_unit, flux_unit, confidence_level=0.68, equal_tailed=True, component=None, is_differential_flux=True, ): """ A 3ML fitted point source. :param confidence_level: :param equal_tailed: :param is_differential_flux: :param analysis_result: a 3ML analysis :param source: the source to solve for :param energy_range: an array of energies to calculate the source over :param energy_unit: string astropy unit :param flux_unit: string astropy flux unit :param component: the component name to calculate """ # first extract the source self._point_source = analysis_result.optimized_model.sources[source] # extract the components try: composite_model = self._point_source.spectrum.main.composite self._components = self._solve_for_component_flux(composite_model) except: try: self._components = self._point_source.components except: self._components = None if component is not None: if self._components is not None: if isinstance(self._components[component],dict): model = self._components[component]["function"].evaluate_at parameters = self._components[component]["function"].parameters test_model = self._components[component]["function"] parameter_names = self._components[component]["parameter_names"] else: model = self._components[component].shape.evaluate_at parameters = self._components[component].shape.parameters test_model = self._components[component].shape parameter_names = [ par.name for par in list( self._components[component].shape.parameters.values() ) ] else: raise NotCompositeModelError("This is not a composite model!") else: model = self._point_source.spectrum.main.shape.evaluate_at parameters = self._point_source.spectrum.main.shape.parameters test_model = self._point_source.spectrum.main.shape parameter_names = [ par.name for par in list( self._point_source.spectrum.main.shape.parameters.values() ) ] energy_unit = u.Unit(energy_unit) # for any plotting, the x-axis remains unaltered outside of this class # therefore we convert to keV using the spectral equiv. helper from # astropy so that we can easily except energy, wavelength, or frequency # energy units if isinstance(energy_range, u.Quantity): energy_range = (energy_range).to("keV", equivalencies=u.spectral()) else: energy_range = (energy_range * energy_unit).to( "keV", equivalencies=u.spectral() ) energy_unit = energy_range.unit energy_range = energy_range.value flux_unit = u.Unit(flux_unit) self._flux_unit = flux_unit # now we will will find out what type of units we need # if we are doing differential flux plotting: if is_differential_flux: converter = DifferentialFluxConversion( flux_unit, energy_unit, model, test_model ) flux_function = converter.model self._conversion = converter.conversion_factor super(FittedPointSourceSpectralHandler, self).__init__( analysis_result, flux_function, parameter_names, parameters, confidence_level, equal_tailed, energy_range, ) else: converter = IntegralFluxConversion( flux_unit, energy_unit, model, test_model ) flux_function = converter.model self._conversion = converter.conversion_factor # we treat the energy range as the range we want to integrate over e1 = np.array([energy_range.min()]) e2 = np.array([energy_range.max()]) # super will by pass the MLE/Bayes class if it is # inherited as well and go right to the general and # use the e1, e2 as the integral bounds super(FittedPointSourceSpectralHandler, self).__init__( analysis_result, flux_function, parameter_names, parameters, confidence_level, equal_tailed, e1, e2, ) self._is_dimensionless = converter.is_dimensionless @property def is_dimensionless(self): return self._is_dimensionless @property def components(self): """ :return: the components of the function """ return self._components def _transform(self, value): """ transform the values into the proper flux unit and apply the units :param value: :return: """ return self._conversion * self._flux_unit * value @staticmethod def _solve_for_component_flux(composite_model): """ now that we are using RandomVariates, we only need to compute the function directly to see the error in a component :param composite_model: an astromodels composite model :return: dict of component properties """ function_dict = {} names = [f.name for f in composite_model.functions] counts = collections.Counter(names) for s, num in list(counts.items()): if num > 1: # ignore strings that only appear once for suffix in range( 1, num + 1 ): # suffix starts at 1 and increases by 1 each time names[names.index(s)] = "%s_n%i" % ( s, suffix, ) # replace each appearance of s for i, function in enumerate(composite_model.functions): tmp_dict = {} # extract the parameter names using the static_name property # because this is what the children will use in evaluate_at parameter_names = [ par.static_name for par in list(function.parameters.values()) ] tmp_dict["parameter_names"] = parameter_names tmp_dict["function"] = function function_dict[names[i]] = tmp_dict return function_dict