Source code for threeML.test.conftest

import os
import signal
import subprocess
import time
from pathlib import Path

import numpy as np
import numba as nb
import pytest
from astromodels import *
from astromodels import (Blackbody, Gaussian, Line, Log_uniform_prior, Model,
                         PointSource, Powerlaw, Uniform_prior)

from threeML.bayesian.bayesian_analysis import BayesianAnalysis
from threeML.classicMLE.joint_likelihood import JointLikelihood
from threeML.data_list import DataList
from threeML.io.package_data import get_path_of_data_dir
from threeML.plugins.OGIPLike import OGIPLike
from threeML.plugins.PhotometryLike import PhotometryLike
from threeML.plugins.XYLike import XYLike
from threeML.utils.photometry import get_photometric_filter_library, PhotometericObservation
from threeML.plugins.UnbinnedPoissonLike import EventObservation
from threeML.plugins.XYLike import XYLike
from threeML.utils.numba_utils import VectorFloat64

from threeML.io.logging import debug_mode

np.random.seed(12345)

# useful for testing
debug_mode()

# Set up an ipyparallel cluster for the tests to use


[docs] @pytest.fixture(scope="session", autouse=True) def setup_ipcluster(): ipycluster_process = subprocess.Popen(["ipcluster", "start", "-n", "2"]) time.sleep(10.0) yield ipycluster_process ipycluster_process.send_signal(signal.SIGINT) time.sleep(10.0) ipycluster_process.kill()
# This is run automatically before *every* test (autouse=True)
[docs] @pytest.fixture(scope="function", autouse=True) def reset_random_seed(): # Reset the random seed so the results of the tests using # random numbers are actually predictable np.random.seed(1234) # Suppress numpy warnings np.seterr(over="ignore", under="ignore", divide="ignore", invalid="ignore")
[docs] def get_grb_model(spectrum): triggerName = "bn090217206" ra = 204.9 dec = -8.4 GRB = PointSource(triggerName, ra, dec, spectral_shape=spectrum) model = Model(GRB) return model
[docs] def get_test_datasets_directory(): return Path(get_path_of_data_dir(), "datasets").absolute()
[docs] def get_dataset(): data_dir = Path(get_test_datasets_directory(), "bn090217206") obs_spectrum = Path(data_dir, "bn090217206_n6_srcspectra.pha{1}") bak_spectrum = Path(data_dir, "bn090217206_n6_bkgspectra.bak{1}") rsp_file = Path(data_dir, "bn090217206_n6_weightedrsp.rsp{1}") NaI6 = OGIPLike("NaI6", str(obs_spectrum), str(bak_spectrum), str(rsp_file)) NaI6.set_active_measurements("10.0-30.0", "40.0-950.0") return NaI6
[docs] def get_dataset_det(det): data_dir = Path(get_test_datasets_directory(), "bn090217206") obs_spectrum = Path(data_dir, f"bn090217206_{det}_srcspectra.pha{{1}}") bak_spectrum = Path(data_dir, f"bn090217206_{det}_bkgspectra.bak{{1}}") rsp_file = Path(data_dir, f"bn090217206_{det}_weightedrsp.rsp{{1}}") p = OGIPLike(det, str(obs_spectrum), str(bak_spectrum), str(rsp_file)) if det[0] == "b": p.set_active_measurements("250-25000") else: p.set_active_measurements("10.0-30.0", "40.0-950.0") return p
[docs] @pytest.fixture(scope="session") def data_list_bn090217206_nai6(): NaI6 = get_dataset() data_list = DataList(NaI6) return data_list
[docs] @pytest.fixture(scope="session") def data_list_bn090217206_nai6_nai9_bgo1(): p_list = [] p_list.append(get_dataset_det("n6")) p_list.append(get_dataset_det("n9")) p_list.append(get_dataset_det("b1")) data_list = DataList(*p_list) return data_list
# This is going to be run every time a test need it, so the jl object # is always "fresh"
[docs] @pytest.fixture(scope="function") def joint_likelihood_bn090217206_nai(data_list_bn090217206_nai6): powerlaw = Powerlaw() model = get_grb_model(powerlaw) jl = JointLikelihood(model, data_list_bn090217206_nai6, verbose=False) return jl
# This is going to be run every time a test need it, so the jl object # is always "fresh"
[docs] @pytest.fixture(scope="function") def joint_likelihood_bn090217206_nai6_nai9_bgo1(data_list_bn090217206_nai6_nai9_bgo1): powerlaw = Powerlaw() model = get_grb_model(powerlaw) jl = JointLikelihood( model, data_list_bn090217206_nai6_nai9_bgo1, verbose=False) return jl
# No need to keep refitting, so we fit once (scope=session)
[docs] @pytest.fixture(scope="function") def fitted_joint_likelihood_bn090217206_nai(joint_likelihood_bn090217206_nai): jl = joint_likelihood_bn090217206_nai fit_results, like_frame = jl.fit() return jl, fit_results, like_frame
# No need to keep refitting, so we fit once (scope=session)
[docs] @pytest.fixture(scope="function") def fitted_joint_likelihood_bn090217206_nai6_nai9_bgo1(joint_likelihood_bn090217206_nai6_nai9_bgo1): jl = joint_likelihood_bn090217206_nai6_nai9_bgo1 fit_results, like_frame = jl.fit() return jl, fit_results, like_frame
[docs] def set_priors(model): powerlaw = model.bn090217206.spectrum.main.Powerlaw powerlaw.index.prior = Uniform_prior(lower_bound=-5.0, upper_bound=5.0) powerlaw.K.prior = Log_uniform_prior(lower_bound=1.0, upper_bound=10)
[docs] def remove_priors(model): for parameter in model: parameter.prior = None
[docs] @pytest.fixture(scope="function") def bayes_fitter(fitted_joint_likelihood_bn090217206_nai): jl, fit_results, like_frame = fitted_joint_likelihood_bn090217206_nai datalist = jl.data_list model = jl.likelihood_model jl.restore_best_fit() set_priors(model) bayes = BayesianAnalysis(model, datalist) return bayes
[docs] @pytest.fixture(scope="function") def completed_bn090217206_bayesian_analysis(fitted_joint_likelihood_bn090217206_nai): jl, _, _ = fitted_joint_likelihood_bn090217206_nai jl.restore_best_fit() model = jl.likelihood_model data_list = jl.data_list powerlaw = jl.likelihood_model.bn090217206.spectrum.main.Powerlaw powerlaw.index.prior = Uniform_prior(lower_bound=-5.0, upper_bound=5.0) powerlaw.K.prior = Log_uniform_prior(lower_bound=1.0, upper_bound=10) bayes = BayesianAnalysis(model, data_list) bayes.set_sampler("emcee") bayes.sampler.setup(n_walkers=50, n_burn_in=200, n_iterations=500, seed=1234) samples = bayes.sample() return bayes, samples
2
[docs] @pytest.fixture(scope="session") def joint_likelihood_bn090217206_nai_multicomp(data_list_bn090217206_nai6): composite = Powerlaw() + Blackbody() model = get_grb_model(composite) jl = JointLikelihood(model, data_list_bn090217206_nai6, verbose=False) return jl
# No need to keep refitting, so we fit once (scope=session)
[docs] @pytest.fixture(scope="session") def fitted_joint_likelihood_bn090217206_nai_multicomp( joint_likelihood_bn090217206_nai_multicomp, ): jl = joint_likelihood_bn090217206_nai_multicomp fit_results, like_frame = jl.fit() return jl, fit_results, like_frame
[docs] @pytest.fixture(scope="session") def completed_bn090217206_bayesian_analysis_multicomp( fitted_joint_likelihood_bn090217206_nai_multicomp, ): jl, _, _ = fitted_joint_likelihood_bn090217206_nai_multicomp # This is necessary because other tests/functions might have messed up with the # model stored within jl.restore_best_fit() model = jl.likelihood_model data_list = jl.data_list spectrum = jl.likelihood_model.bn090217206.spectrum.main.shape spectrum.index_1.prior = Uniform_prior(lower_bound=-5.0, upper_bound=5.0) spectrum.K_1.prior = Log_uniform_prior(lower_bound=1.0, upper_bound=10) spectrum.K_2.prior = Log_uniform_prior(lower_bound=1e-20, upper_bound=10) spectrum.kT_2.prior = Log_uniform_prior(lower_bound=1e0, upper_bound=1e3) bayes = BayesianAnalysis(model, data_list) bayes.set_sampler("emcee") bayes.sampler.setup(n_walkers=50, n_burn_in=500, n_iterations=500, seed=1234) samples = bayes.sample() return bayes, bayes.samples
x = np.linspace(0, 10, 50) poiss_sig = np.array([44, 43, 38, 25, 51, 37, 46, 47, 55, 36, 40, 32, 46, 37, 44, 42, 50, 48, 52, 47, 39, 55, 80, 93, 123, 135, 96, 74, 43, 49, 43, 51, 27, 32, 35, 42, 43, 49, 38, 43, 59, 54, 50, 40, 50, 57, 55, 47, 38, 64])
[docs] @pytest.fixture(scope="session") def xy_model_and_datalist(): y = np.array(poiss_sig) xy = XYLike("test", x, y, poisson_data=True) fitfun = Line() + Gaussian() fitfun.b_1.bounds = (-10, 10.0) fitfun.a_1.bounds = (-100, 100.0) fitfun.F_2 = 60.0 fitfun.F_2.bounds = (1e-3, 200.0) fitfun.mu_2 = 5.0 fitfun.mu_2.bounds = (0.0, 100.0) fitfun.sigma_2.bounds = (1e-3, 10.0) model = Model(PointSource("fake", 0.0, 0.0, fitfun)) data = DataList(xy) return model, data
[docs] @pytest.fixture(scope="session") def xy_fitted_joint_likelihood(xy_model_and_datalist): model, data = xy_model_and_datalist jl = JointLikelihood(model, data) res_frame, like_frame = jl.fit() return jl, res_frame, like_frame
[docs] @pytest.fixture(scope="session") def xy_completed_bayesian_analysis(xy_fitted_joint_likelihood): jl, _, _ = xy_fitted_joint_likelihood jl.restore_best_fit() model = jl.likelihood_model data = jl.data_list model.fake.spectrum.main.composite.a_1.set_uninformative_prior( Uniform_prior) model.fake.spectrum.main.composite.b_1.set_uninformative_prior( Uniform_prior) model.fake.spectrum.main.composite.F_2.set_uninformative_prior( Log_uniform_prior) model.fake.spectrum.main.composite.mu_2.set_uninformative_prior( Uniform_prior) model.fake.spectrum.main.composite.sigma_2.set_uninformative_prior( Log_uniform_prior ) bs = BayesianAnalysis(model, data) bs.set_sampler("emcee") bs.sampler.setup(n_burn_in=100, n_iterations=100, n_walkers=20) samples = bs.sample() return bs, samples
[docs] @pytest.fixture(scope="function") def test_directory(): test_directory = Path("dummy_dir") test_directory.mkdir(parents=True, exist_ok=True) yield test_directory test_directory.rmdir()
[docs] @pytest.fixture(scope="function") def test_file(): test_file = Path("dummy_file") test_file.touch(exist_ok=True) yield test_file test_file.unlink()
[docs] @pytest.fixture(scope="session") def threeML_filter_library(): threeML_filter_library = get_photometric_filter_library() yield threeML_filter_library
[docs] @pytest.fixture(scope="session") def photo_obs(): photo_obs = PhotometericObservation.from_kwargs( g=(19.92, 0.1), r=(19.75, 0.1), i=(19.65, 0.1), z=(19.56, 0.1), J=(19.38, 0.1), H=(19.22, 0.1), K=(19.07, 0.1), ) fn = Path("grond_observation.h5") photo_obs.to_hdf5(fn, overwrite=True) restored = PhotometericObservation.from_hdf5(fn) yield restored fn.unlink()
[docs] @pytest.fixture(scope="function") def grond_plugin(threeML_filter_library, photo_obs): grond = PhotometryLike( "GROND", filters=threeML_filter_library.LaSilla.GROND, observation=photo_obs ) yield grond
[docs] @pytest.fixture(scope="function") def photometry_data_model(grond_plugin): spec = Powerlaw() # * XS_zdust() * XS_zdust() datalist = DataList(grond_plugin) model = Model(PointSource("grb", 0, 0, spectral_shape=spec)) yield model, datalist
[docs] @nb.njit(fastmath=True, cache=True) def poisson_generator(tstart, tstop, slope, intercept, seed=1234): """ Non-homogeneous poisson process generator for a given max rate and time range, this function generates time tags sampled from the energy integrated lightcurve. """ np.random.seed(seed) num_time_steps = 1000 time_grid = np.linspace(tstart, tstop + 1.0, num_time_steps) tmp = intercept + slope * time_grid fmax = tmp.max() time = tstart arrival_times = VectorFloat64(0) arrival_times.append(tstart) while time < tstop: time = time - (1.0 / fmax) * np.log(np.random.rand()) test = np.random.rand() p_test = (intercept + slope * time) / fmax if test <= p_test: arrival_times.append(time) return arrival_times.arr
[docs] @pytest.fixture(scope="session") def event_time_series(): events = poisson_generator( tstart=-10, tstop=60, slope=0, intercept=100, seed=1234) yield events
[docs] @pytest.fixture(scope="session") def event_observation_contiguous(): events = poisson_generator( tstart=0, tstop=10, slope=1., intercept=10, seed=1234) obs = EventObservation(events, exposure=10, start=0., stop=10.) yield obs
[docs] @pytest.fixture(scope="session") def event_observation_split(): events = poisson_generator( tstart=0, tstop=2, slope=.2, intercept=1, seed=1234) events = np.append(events, poisson_generator( tstart=30, tstop=40, slope=.2, intercept=1, seed=1234)) obs = EventObservation(events, exposure=12, start=[0., 30.], stop=[2., 40.]) yield obs