Source code for threeML.test.test_hawc

from __future__ import division
from __future__ import print_function
from builtins import zip
from builtins import range
from past.utils import old_div
import pytest
import os
import numpy as np

from threeML import *


try:

    from threeML.plugins.HAWCLike import HAWCLike

except ImportError:

    has_HAWC = False

else:

    has_HAWC = True

from threeML.io.file_utils import sanitize_filename

# This defines a decorator which can be applied to single tests to
# skip them if the condition is not met
skip_if_hawc_is_not_available = pytest.mark.skipif(
    (os.environ.get("HAWC_3ML_TEST_DATA_DIR") is None) or (not has_HAWC),
    reason="HAWC test dataset or HAWC environment is not available",
)


[docs] def is_within_tolerance(truth, value, relative_tolerance=0.01): assert truth != 0 if abs(old_div((truth - value), truth)) <= relative_tolerance: return True else: return False
[docs] def is_null_within_tolerance(value, absolute_tolerance): if abs(value) <= absolute_tolerance: return True else: return False
_maptree_name = "maptree_256.root" _response_name = "detector_response.root"
[docs] @pytest.fixture(scope="session") def hawc_point_source_fitted_joint_like(): data_path = sanitize_filename( os.environ.get("HAWC_3ML_TEST_DATA_DIR"), abspath=True ) maptree = os.path.join(data_path, _maptree_name) response = os.path.join(data_path, _response_name) assert os.path.exists(maptree) and os.path.exists(response), ( "Data files do not exist at %s" % data_path ) # The simulated source has this spectrum (credits for simulation: Colas Riviere): # CutOffPowerLaw,3.15e-11,2.37,42.3 # at this position: # 100,22 # Define the spectral and spatial models for the source spectrum = Cutoff_powerlaw() source = PointSource("TestSource", ra=100.0, dec=22.0, spectral_shape=spectrum) spectrum.K = old_div(3.15e-11, (u.TeV * u.cm ** 2 * u.s)) spectrum.K.bounds = (1e-22, 1e-18) # without units energies are in keV spectrum.piv = 1 * u.TeV spectrum.piv.fix = True spectrum.index = -2.37 spectrum.index.bounds = (-4, -1) spectrum.xc = 42.3 * u.TeV spectrum.xc.bounds = (1 * u.TeV, 100 * u.TeV) q = source(1 * u.keV) assert np.isclose(q.value, 67.3458058177) # Set up a likelihood model using the source. # Then create a HAWCLike object using the model, the maptree, and detector # response. lm = Model(source) llh = HAWCLike("HAWC", maptree, response) llh.set_active_measurements(1, 9) # Double check the free parameters print("Likelihood model:\n") print(lm) # Set up the likelihood and run the fit print("Performing likelihood fit...\n") datalist = DataList(llh) jl = JointLikelihood(lm, datalist, verbose=True) jl.set_minimizer("ROOT") parameter_frame, like = jl.fit(compute_covariance=False) return jl, parameter_frame, like
[docs] @skip_if_hawc_is_not_available def test_set_active_measurements(): data_path = sanitize_filename( os.environ.get("HAWC_3ML_TEST_DATA_DIR"), abspath=True ) maptree = os.path.join(data_path, _maptree_name) response = os.path.join(data_path, _response_name) assert os.path.exists(maptree) and os.path.exists(response), ( "Data files do not exist at %s" % data_path ) llh = HAWCLike("HAWC", maptree, response) # Test one way llh.set_active_measurements(1, 9) # Test the other way llh.set_active_measurements(bin_list=["4", "5", "6", "7", "8", "9"])
[docs] @skip_if_hawc_is_not_available def test_hawc_fullsky_options(): assert is_plugin_available("HAWCLike"), "HAWCLike is not available!" data_path = sanitize_filename( os.environ.get("HAWC_3ML_TEST_DATA_DIR"), abspath=True ) maptree = os.path.join(data_path, _maptree_name) response = os.path.join(data_path, _response_name) assert os.path.exists(maptree) and os.path.exists(response), ( "Data files do not exist at %s" % data_path ) # The simulated source has this spectrum (credits for simulation: Colas Riviere): # CutOffPowerLaw,3.15e-11,2.37,42.3 # at this position: # 100,22 # Define the spectral and spatial models for the source spectrum = Cutoff_powerlaw() source = PointSource("TestSource", ra=100.0, dec=22.0, spectral_shape=spectrum) spectrum.K = old_div(3.15e-11, (u.TeV * u.cm ** 2 * u.s)) spectrum.K.bounds = (1e-22, 1e-18) # without units energies are in keV spectrum.piv = 1 * u.TeV spectrum.piv.fix = True spectrum.index = -2.37 spectrum.index.bounds = (-4, -1) spectrum.xc = 42.3 * u.TeV spectrum.xc.bounds = (1 * u.TeV, 100 * u.TeV) q = source(1 * u.keV) assert np.isclose(q.value, 67.3458058177) # Set up a likelihood model using the source. # Then create a HAWCLike object using the model, the maptree, and detector # response. lm = Model(source) # Test with fullsky=True, and try to perform a fit to verify that we throw an exception llh = HAWCLike("HAWC", maptree, response, fullsky=True) llh.set_active_measurements(1, 9) # Double check the free parameters print("Likelihood model:\n") print(lm) # Set up the likelihood and run the fit print("Performing likelihood fit...\n") datalist = DataList(llh) with pytest.raises(RuntimeError): jl = JointLikelihood(lm, datalist, verbose=False) # Now we use set_ROI and this should work llh.set_ROI(100.0, 22.0, 2.0) jl = JointLikelihood(lm, datalist, verbose=False) # Now test that we can use set_ROI even though fullsky=False llh = HAWCLike("HAWC", maptree, response, fullsky=False) llh.set_active_measurements(1, 9) llh.set_ROI(100.0, 22.0, 1.0) # Double check the free parameters print("Likelihood model:\n") print(lm) # Set up the likelihood print("Performing likelihood fit...\n") datalist = DataList(llh) jl = JointLikelihood(lm, datalist, verbose=False)
[docs] @skip_if_hawc_is_not_available def test_hawc_point_source_fit(hawc_point_source_fitted_joint_like): # Ensure test environment is valid assert is_plugin_available("HAWCLike"), "HAWCLike is not available!" jl, parameter_frame, like = hawc_point_source_fitted_joint_like spectrum = jl.likelihood_model.TestSource.spectrum.main.shape # Check that we have converged to the right solution # (the true value of course are not exactly the value simulated, # they are just the point where the fit should converge) assert is_within_tolerance( 3.3246428894535895e-20, parameter_frame["value"]["TestSource.spectrum.main.Cutoff_powerlaw.K"], ) assert is_within_tolerance( -2.33736923856, parameter_frame["value"]["TestSource.spectrum.main.Cutoff_powerlaw.index"], ) assert is_within_tolerance( 37478522636.504425, parameter_frame["value"]["TestSource.spectrum.main.Cutoff_powerlaw.xc"], ) assert is_within_tolerance(55979.424031, like["-log(likelihood)"]["HAWC"]) # Print up the TS, significance, and fit parameters, and then plot stuff print("\nTest statistic:") TS = jl.data_list["HAWC"].calc_TS() sigma = np.sqrt(TS) print("Test statistic: %g" % TS) print("Significance: %g\n" % sigma) assert is_within_tolerance(14366.4, TS) assert is_within_tolerance(119.86, sigma) # Get the differential flux at 1 TeV diff_flux = spectrum(1 * u.TeV) # Convert it to 1 / (TeV cm2 s) diff_flux_TeV = diff_flux.to(old_div(1, (u.TeV * u.cm ** 2 * u.s))) print("Norm @ 1 TeV: %s \n" % diff_flux_TeV) assert is_within_tolerance(3.2371079347638675e-11, diff_flux_TeV.value) spectrum.display()
[docs] @skip_if_hawc_is_not_available def test_hawc_extended_source_fit(): # Ensure test environment is valid assert is_plugin_available("HAWCLike"), "HAWCLike is not available!" data_path = sanitize_filename( os.environ.get("HAWC_3ML_TEST_DATA_DIR"), abspath=True ) maptree = os.path.join(data_path, _maptree_name) response = os.path.join(data_path, _response_name) assert os.path.exists(maptree) and os.path.exists(response), ( "Data files do not exist at %s" % data_path ) # The simulated source has this spectrum (credits for simulation: Colas Riviere): # CutOffPowerLaw,1.32e-07,2.37,42.3 # at this position: # 100,22 # with a disk shape with an extension of 1.5 deg # Define the spectral and spatial models for the source spectrum = Cutoff_powerlaw() shape = Disk_on_sphere() source = ExtendedSource("ExtSource", spatial_shape=shape, spectral_shape=spectrum) shape.lon0 = 100.0 shape.lon0.fix = True shape.lat0 = 22.0 shape.lat0.fix = True shape.radius = 1.5 * u.degree shape.radius.bounds = (0.5 * u.degree, 1.55 * u.degree) # shape.radius.fix = True spectrum.K = 4.39964273e-20 spectrum.K.bounds = (1e-24, 1e-17) spectrum.piv = 1 * u.TeV # spectrum.piv.fix = True spectrum.index = -2.37 spectrum.index.bounds = (-4, -1) # spectrum.index.fix = True spectrum.xc = 42.3 * u.TeV spectrum.xc.bounds = (1 * u.TeV, 100 * u.TeV) spectrum.xc.fix = True # Set up a likelihood model using the source. # Then create a HAWCLike object using the model, the maptree, and detector # response. lm = Model(source) llh = HAWCLike("HAWC", maptree, response) llh.set_active_measurements(1, 9) # Double check the free parameters print("Likelihood model:\n") print(lm) # Set up the likelihood and run the fit print("Performing likelihood fit...\n") datalist = DataList(llh) jl = JointLikelihood(lm, datalist, verbose=True) jl.set_minimizer("ROOT") parameter_frame, like = jl.fit(compute_covariance=False) # Check that we have converged to the right solution # (the true value of course are not exactly the value simulated, # they are just the point where the fit should converge) assert is_within_tolerance( 4.7805737823025172e-20, parameter_frame["value"]["ExtSource.spectrum.main.Cutoff_powerlaw.K"], ) assert is_within_tolerance( -2.44931279819, parameter_frame["value"]["ExtSource.spectrum.main.Cutoff_powerlaw.index"], ) assert is_within_tolerance( 1.4273457159139373, parameter_frame["value"]["ExtSource.Disk_on_sphere.radius"] ) assert is_within_tolerance(186389.581117, like["-log(likelihood)"]["HAWC"]) # Print up the TS, significance, and fit parameters, and then plot stuff print("\nTest statistic:") TS = llh.calc_TS() sigma = np.sqrt(TS) assert is_within_tolerance(3510.26, TS) assert is_within_tolerance(59.2475, sigma) print("Test statistic: %g" % TS) print("Significance: %g\n" % sigma) # Get the differential flux at 1 TeV diff_flux = spectrum(1 * u.TeV) # Convert it to 1 / (TeV cm2 s) diff_flux_TeV = diff_flux.to(old_div(1, (u.TeV * u.cm ** 2 * u.s))) print("Norm @ 1 TeV: %s \n" % diff_flux_TeV) assert is_within_tolerance(4.66888328668e-11, diff_flux_TeV.value) spectrum.display() shape.display()
[docs] @skip_if_hawc_is_not_available def test_hawc_display_residuals(hawc_point_source_fitted_joint_like): # Ensure test environment is valid assert is_plugin_available("HAWCLike"), "HAWCLike is not available!" jl, parameter_frame, like = hawc_point_source_fitted_joint_like source = jl.likelihood_model.TestSource # Check the 'display' functions (plot model&data/residuals vs analysis bins) llh = jl.data_list["HAWC"] llh.display(radius=0.5) llh.display_residuals_at_position( source.position.ra.value, source.position.dec.value, radius=0.5 ) # Now check the bin-dependent radius llh.display_residuals_at_position( source.position.ra.value, source.position.dec.value, radius=[0.5] * 9 )
[docs] @skip_if_hawc_is_not_available def test_null_hyp_prob(hawc_point_source_fitted_joint_like): # Ensure test environment is valid assert is_plugin_available("HAWCLike"), "HAWCLike is not available!" jl, parameter_frame, like = hawc_point_source_fitted_joint_like source = jl.likelihood_model.TestSource llh = jl.data_list["HAWC"] p_value = llh.calc_p_value( source.position.ra.value, source.position.dec.value, radius=[0.5] * 9 ) assert np.isclose(p_value, 0.88173524636, rtol=0.1)
[docs] @skip_if_hawc_is_not_available def test_radial_profile(hawc_point_source_fitted_joint_like): # Ensure test environment is valid assert is_plugin_available("HAWCLike"), "HAWCLike is not available!" jl, parameter_frame, like = hawc_point_source_fitted_joint_like source = jl.likelihood_model.TestSource llh = jl.data_list["HAWC"] lm = jl.likelihood_model correct_radii = [0.1, 0.3, 0.5, 0.7, 0.9, 1.1, 1.3, 1.5, 1.7, 1.9] correct_model = [ 1.006176e07, 3.775266e06, 6.518357e05, 1.390542e05, 4.952657e04, 2.139160e04, 1.067152e04, 5.250257e03, 2.334314e03, 9.489685e02, ] correct_data = [ 9.851449e06, 3.862865e06, 6.724352e05, 1.395860e05, 4.972864e04, 3.894414e04, -5.817591e04, 1.270122e04, 4.575470e03, -1.882920e04, ] correct_error = [ 2.360076e05, 8.138953e04, 4.063915e04, 3.042109e04, 2.590773e04, 2.452525e04, 2.383261e04, 2.194618e04, 1.990404e04, 1.751396e04, ] correct_bins = ["4", "5", "6", "7", "8", "9"] subtracted_data = [d - m for m, d in zip(correct_model, correct_data)] max_radius = 2.0 n_bins = 10 bins_to_use = ["4", "5", "6", "7", "8", "9"] ( radii, excess_model, excess_data, excess_error, list_of_bin_names, ) = llh.get_radial_profile( source.position.ra.value, source.position.dec.value, bins_to_use, max_radius, n_bins, ) # Un-comment the next lines to re-generate the "correct_" values if needed # print 'model, data, error:' # for v in [excess_model, excess_data, excess_error]: # print '[', # for vv in v: # print '%e,' %vv, # print ']' assert len(radii) == n_bins assert len(excess_model) == n_bins assert len(excess_data) == n_bins assert len(excess_error) == n_bins assert list_of_bin_names == correct_bins for i in range(0, n_bins): assert is_within_tolerance(radii[i], correct_radii[i]) assert is_within_tolerance(excess_model[i], correct_model[i]) assert is_within_tolerance(excess_data[i], correct_data[i]) assert is_within_tolerance(excess_error[i], correct_error[i]) # Now again subtracting the model from data and model ( radii, excess_model, excess_data, excess_error, list_of_bin_names, ) = llh.get_radial_profile( source.position.ra.value, source.position.dec.value, bins_to_use, max_radius, n_bins, model_to_subtract=lm, subtract_model_from_model=True, ) assert len(radii) == n_bins assert len(excess_model) == n_bins assert len(excess_data) == n_bins assert len(excess_error) == n_bins assert list_of_bin_names == correct_bins for i in range(0, n_bins): assert is_within_tolerance(radii[i], correct_radii[i]) assert is_null_within_tolerance(excess_model[i], 0.01 * correct_model[i]) assert is_within_tolerance(excess_data[i], correct_data[i] - correct_model[i]) assert is_within_tolerance(excess_error[i], correct_error[i])
[docs] @skip_if_hawc_is_not_available def test_CommonNorm_fit(): assert is_plugin_available("HAWCLike"), "HAWCLike is not available!" data_path = sanitize_filename( os.environ.get("HAWC_3ML_TEST_DATA_DIR"), abspath=True ) maptree = os.path.join(data_path, _maptree_name) response = os.path.join(data_path, _response_name) assert os.path.exists(maptree) and os.path.exists(response), ( "Data files do not exist at %s" % data_path ) # The simulated source has this spectrum (credits for simulation: Colas Riviere): # CutOffPowerLaw,3.15e-11,2.37,42.3 # at this position: # 100,22 # Define the spectral and spatial models for the source spectrum = Cutoff_powerlaw() source = PointSource("TestSource", ra=100.0, dec=22.0, spectral_shape=spectrum) spectrum.K = old_div(3.15e-11, (u.TeV * u.cm ** 2 * u.s)) spectrum.K.bounds = (1e-22, 1e-18) # without units energies are in keV spectrum.K.fix = True spectrum.piv = 1 * u.TeV spectrum.piv.fix = True spectrum.index = -2.37 spectrum.index.bounds = (-4, -1) spectrum.index.free = False spectrum.xc = 42.3 * u.TeV spectrum.xc.bounds = (1 * u.TeV, 100 * u.TeV) spectrum.xc.free = False q = source(1 * u.keV) assert np.isclose(q.value, 67.3458058177) # Set up a likelihood model using the source. # Then create a HAWCLike object using the model, the maptree, and detector # response. lm = Model(source) llh = HAWCLike("HAWC", maptree, response) llh.set_active_measurements(1, 9) llh.activate_CommonNorm() # Double check the free parameters print("Likelihood model:\n") print(lm) # Set up the likelihood and run the fit print("Performing likelihood fit...\n") datalist = DataList(llh) jl = JointLikelihood(lm, datalist, verbose=True) jl.set_minimizer("ROOT") parameter_frame, like = jl.fit(compute_covariance=False) assert np.isclose(lm.HAWC_ComNorm.value, 1.0756519971562115, rtol=1e-2)
[docs] @skip_if_hawc_is_not_available def test_hawc_get_number_of_data_points(hawc_point_source_fitted_joint_like): # Ensure test environment is valid assert is_plugin_available("HAWCLike"), "HAWCLike is not available!" jl, parameter_frame, like = hawc_point_source_fitted_joint_like llh = jl.data_list["HAWC"] assert llh.get_number_of_data_points() == 13428
[docs] @skip_if_hawc_is_not_available def test_hawc_write_map(hawc_point_source_fitted_joint_like): # Ensure test environment is valid assert is_plugin_available("HAWCLike"), "HAWCLike is not available!" jl, parameter_frame, like = hawc_point_source_fitted_joint_like llh = jl.data_list["HAWC"] file_name = "__hawc_map.root" llh.write_map(file_name) assert os.path.exists(file_name) os.remove(file_name)