from __future__ import division
from builtins import range
from past.utils import old_div
import collections
from pathlib import Path
import astropy.io.fits as fits
import numpy as np
import os
import warnings
import six
from threeML.io.progress_bar import progress_bar
from threeML.utils.OGIP.response import OGIPResponse, InstrumentResponse
from threeML.utils.OGIP.pha import PHAII
from threeML.utils.spectrum.binned_spectrum import BinnedSpectrumWithDispersion, Quality
from threeML.utils.spectrum.binned_spectrum_set import BinnedSpectrumSet
from threeML.utils.time_interval import TimeIntervalSet
_required_keywords = {}
_required_keywords["observed"] = (
"mission:TELESCOP,instrument:INSTRUME,filter:FILTER,"
+ "exposure:EXPOSURE,backfile:BACKFILE,"
+ "respfile:RESPFILE,"
+ "ancrfile:ANCRFILE,hduclass:HDUCLASS,"
+ "hduclas1:HDUCLAS1,poisserr:POISSERR,"
+ "chantype:CHANTYPE,detchans:DETCHANS,"
"backscal:BACKSCAL"
).split(",")
# python types, not fits
_required_keyword_types = {"POISSERR": bool}
# hduvers:HDUVERS
_required_keywords["background"] = (
"mission:TELESCOP,instrument:INSTRUME,filter:FILTER,"
+ "exposure:EXPOSURE,"
+ "hduclass:HDUCLASS,"
+ "hduclas1:HDUCLAS1,poisserr:POISSERR,"
+ "chantype:CHANTYPE,detchans:DETCHANS,"
"backscal:BACKSCAL"
).split(",")
# hduvers:HDUVERS
_might_be_columns = {}
_might_be_columns["observed"] = (
"EXPOSURE,BACKFILE," + "CORRFILE,CORRSCAL," + "RESPFILE,ANCRFILE," "BACKSCAL"
).split(",")
_might_be_columns["background"] = ("EXPOSURE,BACKSCAL").split(",")
def _read_pha_or_pha2_file(
pha_file_or_instance,
spectrum_number=None,
file_type="observed",
rsp_file=None,
arf_file=None,
treat_as_time_series=False,
):
"""
A function to extract information from pha and pha2 files. It is kept separate because the same method is
used for reading time series (MUCH faster than building a lot of individual spectra) and single spectra.
:param pha_file_or_instance: either a PHA file name or threeML.plugins.OGIP.pha.PHAII instance
:param spectrum_number: (optional) the spectrum number of the TypeII file to be used
:param file_type: observed or background
:param rsp_file: RMF filename or threeML.plugins.OGIP.response.InstrumentResponse instance
:param arf_file: (optional) and ARF filename
:param treat_as_time_series:
:return:
"""
assert isinstance(pha_file_or_instance, six.string_types) or isinstance(
pha_file_or_instance, PHAII
) or isinstance(pha_file_or_instance, Path), "Must provide a FITS file name or PHAII instance"
if isinstance(pha_file_or_instance, six.string_types) or isinstance(pha_file_or_instance, Path):
ext = os.path.splitext(pha_file_or_instance)[-1]
if "{" in ext:
spectrum_number = int(ext.split("{")[-1].replace("}", ""))
pha_file_or_instance = pha_file_or_instance.split("{")[0]
# Read the data
filename = pha_file_or_instance
# create a FITS_FILE instance
pha_file_or_instance = PHAII.from_fits_file(pha_file_or_instance)
# If this is already a FITS_FILE instance,
elif isinstance(pha_file_or_instance, PHAII):
# we simply create a dummy filename
filename = "pha_instance"
else:
raise RuntimeError("This is a bug")
file_name = filename
assert file_type.lower() in [
"observed",
"background",
], "Unrecognized filetype keyword value"
file_type = file_type.lower()
try:
HDUidx = pha_file_or_instance.index_of("SPECTRUM")
except:
raise RuntimeError(
"The input file %s is not in PHA format" % (pha_file_or_instance)
)
# spectrum_number = spectrum_number
spectrum = pha_file_or_instance[HDUidx]
data = spectrum.data
header = spectrum.header
# We don't support yet the rescaling
if "CORRFILE" in header:
if (header.get("CORRFILE").upper().strip() != "NONE") and (
header.get("CORRFILE").upper().strip() != ""
):
raise RuntimeError("CORRFILE is not yet supported")
# See if there is there is a QUALITY==0 in the header
if "QUALITY" in header:
has_quality_column = False
if header["QUALITY"] == 0:
is_all_data_good = True
else:
is_all_data_good = False
else:
if "QUALITY" in data.columns.names:
has_quality_column = True
is_all_data_good = False
else:
has_quality_column = False
is_all_data_good = True
warnings.warn(
"Could not find QUALITY in columns or header of PHA file. This is not a valid OGIP file. Assuming QUALITY =0 (good)"
)
# looking for tstart and tstop
tstart = None
tstop = None
has_tstart = False
has_tstop = False
has_telapse = False
if "TSTART" in header:
has_tstart_column = False
has_tstart = True
else:
if "TSTART" in data.columns.names:
has_tstart_column = True
has_tstart = True
if "TELAPSE" in header:
has_telapse_column = False
has_telapse = True
else:
if "TELAPSE" in data.columns.names:
has_telapse_column = True
has_telapse = True
if "TSTOP" in header:
has_tstop_column = False
has_tstop = True
else:
if "TSTOP" in data.columns.names:
has_tstop_column = True
has_tstop = True
if has_tstop and has_telapse:
warnings.warn("Found TSTOP and TELAPSE. This file is invalid. Using TSTOP.")
has_telapse = False
# Determine if this file contains COUNTS or RATES
if "COUNTS" in data.columns.names:
has_rates = False
data_column_name = "COUNTS"
elif "RATE" in data.columns.names:
has_rates = True
data_column_name = "RATE"
else:
raise RuntimeError(
"This file does not contain a RATE nor a COUNTS column. "
"This is not a valid PHA file"
)
# Determine if this is a PHA I or PHA II
if len(data.field(data_column_name).shape) == 2:
typeII = True
if spectrum_number == None and not treat_as_time_series:
raise RuntimeError(
"This is a PHA Type II file. You have to provide a spectrum number"
)
else:
typeII = False
# Collect information from mandatory keywords
keys = _required_keywords[file_type]
gathered_keywords = {}
for k in keys:
internal_name, keyname = k.split(":")
key_has_been_collected = False
if keyname in header:
if (
keyname in _required_keyword_types
and type(header.get(keyname)) is not _required_keyword_types[keyname]
):
warnings.warn(
"unexpected type of %(keyname)s, expected %(expected_type)s\n found %(found_type)s: %(found_value)s"
% dict(
keyname=keyname,
expected_type=_required_keyword_types[keyname],
found_type=type(header.get(keyname)),
found_value=header.get(keyname),
)
)
else:
gathered_keywords[internal_name] = header.get(keyname)
# Fix "NONE" in None
if (
gathered_keywords[internal_name] == "NONE"
or gathered_keywords[internal_name] == "none"
):
gathered_keywords[internal_name] = None
key_has_been_collected = True
# Note that we check again because the content of the column can override the content of the header
if keyname in _might_be_columns[file_type] and typeII:
# Check if there is a column with this name
if keyname in data.columns.names:
# This will set the exposure, among other things
if not treat_as_time_series:
# if we just want a single spectrum
gathered_keywords[internal_name] = data[keyname][
spectrum_number - 1
]
else:
# else get all the columns
gathered_keywords[internal_name] = data[keyname]
# Fix "NONE" in None
if (
gathered_keywords[internal_name] == "NONE"
or gathered_keywords[internal_name] == "none"
):
gathered_keywords[internal_name] = None
key_has_been_collected = True
if not key_has_been_collected:
# The keyword POISSERR is a special case, because even if it is missing,
# it is assumed to be False if there is a STAT_ERR column in the file
if keyname == "POISSERR" and "STAT_ERR" in data.columns.names:
warnings.warn(
"POISSERR is not set. Assuming non-poisson errors as given in the "
"STAT_ERR column"
)
gathered_keywords["poisserr"] = False
elif keyname == "ANCRFILE":
# Some non-compliant files have no ARF because they don't need one. Don't fail, but issue a
# warning
warnings.warn(
"ANCRFILE is not set. This is not a compliant OGIP file. Assuming no ARF."
)
gathered_keywords["ancrfile"] = None
elif keyname == "FILTER":
# Some non-compliant files have no FILTER because they don't need one. Don't fail, but issue a
# warning
warnings.warn(
"FILTER is not set. This is not a compliant OGIP file. Assuming no FILTER."
)
gathered_keywords["filter"] = None
else:
raise RuntimeError(
"Keyword %s not found. File %s is not a proper PHA "
"file" % (keyname, filename)
)
is_poisson = gathered_keywords["poisserr"]
exposure = gathered_keywords["exposure"]
# now we need to get the response file so that we can extract the EBOUNDS
if file_type == "observed":
if rsp_file is None:
# this means it should be specified in the header
rsp_file = gathered_keywords["respfile"]
if arf_file is None:
arf_file = gathered_keywords["ancrfile"]
# Read in the response
if isinstance(rsp_file, six.string_types) or isinstance(rsp_file, str) or isinstance(rsp_file, Path):
rsp = OGIPResponse(rsp_file, arf_file=arf_file)
else:
# assume a fully formed OGIPResponse
rsp = rsp_file
if file_type == "background":
# we need the rsp ebounds from response to build the histogram
assert isinstance(
rsp_file, InstrumentResponse
), "You must supply and OGIPResponse to extract the energy bounds"
rsp = rsp_file
# Now get the data (counts or rates) and their errors. If counts, transform them in rates
if typeII:
# PHA II file
if has_rates:
if not treat_as_time_series:
rates = data.field(data_column_name)[spectrum_number - 1, :]
rate_errors = None
if not is_poisson:
rate_errors = data.field("STAT_ERR")[spectrum_number - 1, :]
else:
rates = data.field(data_column_name)
rate_errors = None
if not is_poisson:
rate_errors = data.field("STAT_ERR")
else:
if not treat_as_time_series:
rates = old_div(
data.field(data_column_name)[spectrum_number - 1, :], exposure
)
rate_errors = None
if not is_poisson:
rate_errors = old_div(
data.field("STAT_ERR")[spectrum_number - 1, :], exposure
)
else:
rates = old_div(data.field(data_column_name), np.atleast_2d(exposure).T)
rate_errors = None
if not is_poisson:
rate_errors = old_div(
data.field("STAT_ERR"), np.atleast_2d(exposure).T
)
if "SYS_ERR" in data.columns.names:
if not treat_as_time_series:
sys_errors = data.field("SYS_ERR")[spectrum_number - 1, :]
else:
sys_errors = data.field("SYS_ERR")
else:
sys_errors = np.zeros(rates.shape)
if has_quality_column:
if not treat_as_time_series:
try:
quality = data.field("QUALITY")[spectrum_number - 1, :]
except (IndexError):
# GBM CSPEC files do not follow OGIP conventions and instead
# list simply QUALITY=0 for each spectrum
# so we have to read them differently
quality_element = data.field("QUALITY")[spectrum_number - 1]
warnings.warn(
"The QUALITY column has the wrong shape. This PHAII file does not follow OGIP standards"
)
if quality_element == 0:
quality = np.zeros_like(rates, dtype=int)
else:
quality = np.zeros_like(rates, dtype=int) + 5
else:
# we need to be careful again because the QUALITY column is not always the correct shape
quality_element = data.field("QUALITY")
if quality_element.shape == rates.shape:
# This is the proper way for the quality to be stored
quality = quality_element
else:
quality = np.zeros_like(rates, dtype=int)
for i, q in enumerate(quality_element):
if q != 0:
quality[i, :] = 5
else:
if is_all_data_good:
quality = np.zeros_like(rates, dtype=int)
else:
quality = np.zeros_like(rates, dtype=int) + 5
if has_tstart:
if has_tstart_column:
if not treat_as_time_series:
tstart = data.field("TSTART")[spectrum_number - 1]
else:
tstart = data.field("TSTART")
if has_tstop:
if has_tstop_column:
if not treat_as_time_series:
tstop = data.field("TSTOP")[spectrum_number - 1]
else:
tstop = data.field("TSTOP")
if has_telapse:
if has_telapse_column:
if not treat_as_time_series:
tstop = tstart + data.field("TELAPSE")[spectrum_number - 1]
else:
tstop = tstart + data.field("TELAPSE")
elif typeII == False:
assert (
not treat_as_time_series
), "This is not a PHAII file but you specified to treat it as a time series"
# PHA 1 file
if has_rates:
rates = data.field(data_column_name)
rate_errors = None
if not is_poisson:
rate_errors = data.field("STAT_ERR")
else:
rates = old_div(data.field(data_column_name), exposure)
rate_errors = None
if not is_poisson:
rate_errors = old_div(data.field("STAT_ERR"), exposure)
if "SYS_ERR" in data.columns.names:
sys_errors = data.field("SYS_ERR")
else:
sys_errors = np.zeros(rates.shape)
if has_quality_column:
quality = data.field("QUALITY")
else:
if is_all_data_good:
quality = np.zeros_like(rates, dtype=int)
else:
quality = np.zeros_like(rates, dtype=int) + 5
# read start and stop times if needed
if has_tstart:
if has_tstart_column:
tstart = data.field("TSTART")
else:
tstart = header["TSTART"]
if has_tstop:
if has_tstop_column:
tstop = data.field("TSTOP")
else:
tstop = header["TSTOP"]
if has_telapse:
if has_telapse_column:
tstop = tstart + data.field("TELAPSE")
else:
tstop = tstart + header["TELAPSE"]
# Now that we have read it, some safety checks
assert rates.shape[0] == gathered_keywords["detchans"], (
"The data column (RATES or COUNTS) has a different number of entries than the "
"DETCHANS declared in the header"
)
quality = Quality.from_ogip(quality)
if not treat_as_time_series:
counts = rates * exposure
if not is_poisson:
count_errors = rate_errors * exposure
else:
count_errors = None
else:
exposure = np.atleast_2d(exposure).T
counts = rates * exposure
if not is_poisson:
count_errors = rate_errors * exposure
else:
count_errors = None
out = collections.OrderedDict(
counts=counts,
count_errors=count_errors,
rates=rates,
rate_errors=rate_errors,
sys_errors=sys_errors,
exposure=exposure,
is_poisson=is_poisson,
rsp=rsp,
gathered_keywords=gathered_keywords,
quality=quality,
file_name=file_name,
tstart=tstart,
tstop=tstop,
)
return out
[docs]class PHASpectrum(BinnedSpectrumWithDispersion):
def __init__(
self,
pha_file_or_instance,
spectrum_number=None,
file_type="observed",
rsp_file=None,
arf_file=None,
):
"""
A spectrum with dispersion build from an OGIP-compliant PHA FITS file. Both Type I & II files can be read. Type II
spectra are selected either by specifying the spectrum_number or via the {spectrum_number} file name convention used
in XSPEC. If the file_type is background, a 3ML InstrumentResponse or subclass must be passed so that the energy
bounds can be obtained.
:param pha_file_or_instance: either a PHA file name or threeML.plugins.OGIP.pha.PHAII instance
:param spectrum_number: (optional) the spectrum number of the TypeII file to be used
:param file_type: observed or background
:param rsp_file: RMF filename or threeML.plugins.OGIP.response.InstrumentResponse instance
:param arf_file: (optional) and ARF filename
"""
# extract the spectrum number if needed
assert isinstance(pha_file_or_instance, six.string_types) or isinstance(
pha_file_or_instance, PHAII
), "Must provide a FITS file name or PHAII instance"
pha_information = _read_pha_or_pha2_file(
pha_file_or_instance,
spectrum_number,
file_type,
rsp_file,
arf_file,
treat_as_time_series=False,
)
# default the grouping to all open bins
# this will only be altered if the spectrum is rebinned
self._grouping = np.ones_like(pha_information["counts"])
# this saves the extra properties to the class
self._gathered_keywords = pha_information["gathered_keywords"]
self._file_type = file_type
self._file_name = pha_information["file_name"]
# pass the needed spectrum values back up
# remember that Spectrum reads counts, but returns
# rates!
super(PHASpectrum, self).__init__(
counts=pha_information["counts"],
exposure=pha_information["exposure"],
response=pha_information["rsp"],
count_errors=pha_information["count_errors"],
sys_errors=pha_information["sys_errors"],
is_poisson=pha_information["is_poisson"],
quality=pha_information["quality"],
mission=pha_information["gathered_keywords"]["mission"],
instrument=pha_information["gathered_keywords"]["instrument"],
tstart=pha_information["tstart"],
tstop=pha_information["tstop"],
)
def _return_file(self, key):
if key in self._gathered_keywords:
return self._gathered_keywords[key]
else:
return None
[docs] def set_ogip_grouping(self, grouping):
"""
If the counts are rebinned, this updates the grouping
:param grouping:
"""
self._grouping = grouping
@property
def filename(self):
return self._file_name
@property
def background_file(self):
"""
Returns the background file definied in the header, or None if there is none defined
p
:return: a path to a file, or None
"""
back_file = self._return_file('backfile')
if back_file == "":
back_file = None
return back_file
@property
def scale_factor(self):
"""
This is a scale factor (in the BACKSCAL keyword) which must be used to rescale background and source
regions
:return:
"""
return self._gathered_keywords["backscal"]
@property
def response_file(self):
"""
Returns the response file definied in the header, or None if there is none defined
:return: a path to a file, or None
"""
return self._return_file("respfile")
@property
def ancillary_file(self):
"""
Returns the ancillary file definied in the header, or None if there is none defined
:return: a path to a file, or None
"""
return self._return_file("ancrfile")
@property
def grouping(self):
return self._grouping
[docs] def clone(
self,
new_counts=None,
new_count_errors=None,
new_exposure=None,
new_scale_factor=None,
):
"""
make a new spectrum with new counts and errors and all other
parameters the same
:param new_exposure: the new exposure for the clone
:param new_scale_factor: the new scale factor for the clone
:param new_counts: new counts for the spectrum
:param new_count_errors: new errors from the spectrum
:return: new pha spectrum
"""
if new_exposure is None:
new_exposure = self.exposure
if new_counts is None:
new_counts = self.counts
new_count_errors = self.count_errors
if new_count_errors is None:
stat_err = None
else:
stat_err = old_div(new_count_errors, new_exposure)
if self._tstart is None:
tstart = 0
else:
tstart = self._tstart
if self._tstop is None:
telapse = new_exposure
else:
telapse = self._tstop - tstart
if new_scale_factor is None:
new_scale_factor = self.scale_factor
# create a new PHAII instance
pha = PHAII(
instrument_name=self.instrument,
telescope_name=self.mission,
tstart=tstart,
telapse=telapse,
channel=list(range(1, len(self) + 1)),
rate=old_div(new_counts, self.exposure),
stat_err=stat_err,
quality=self.quality.to_ogip(),
grouping=self.grouping,
exposure=new_exposure,
backscale=new_scale_factor,
respfile=None,
ancrfile=None,
is_poisson=self.is_poisson,
)
return pha
[docs] @classmethod
def from_dispersion_spectrum(cls, dispersion_spectrum, file_type="observed"):
# type: (BinnedSpectrumWithDispersion, str) -> PHASpectrum
if dispersion_spectrum.is_poisson:
rate_errors = None
else:
rate_errors = dispersion_spectrum.rate_errors
if dispersion_spectrum.tstart is None:
tstart = 0
else:
tstart = dispersion_spectrum.tstart
if dispersion_spectrum.tstop is None:
telapse = dispersion_spectrum.exposure
else:
telapse = dispersion_spectrum.tstop - tstart
pha = PHAII(
instrument_name=dispersion_spectrum.instrument,
telescope_name=dispersion_spectrum.mission,
tstart=tstart, # TODO: add this in so that we have proper time!
telapse=telapse,
channel=list(range(1, len(dispersion_spectrum) + 1)),
rate=dispersion_spectrum.rates,
stat_err=rate_errors,
quality=dispersion_spectrum.quality.to_ogip(),
grouping=np.ones(len(dispersion_spectrum)),
exposure=dispersion_spectrum.exposure,
backscale=dispersion_spectrum.scale_factor,
respfile=None,
ancrfile=None,
is_poisson=dispersion_spectrum.is_poisson,
)
return cls(
pha_file_or_instance=pha,
spectrum_number=1,
file_type=file_type,
rsp_file=dispersion_spectrum.response,
)
[docs]class PHASpectrumSet(BinnedSpectrumSet):
def __init__(
self, pha_file_or_instance, file_type="observed", rsp_file=None, arf_file=None
):
"""
A spectrum with dispersion build from an OGIP-compliant PHA FITS file. Both Type I & II files can be read. Type II
spectra are selected either by specifying the spectrum_number or via the {spectrum_number} file name convention used
in XSPEC. If the file_type is background, a 3ML InstrumentResponse or subclass must be passed so that the energy
bounds can be obtained.
:param pha_file_or_instance: either a PHA file name or threeML.plugins.OGIP.pha.PHAII instance
:param spectrum_number: (optional) the spectrum number of the TypeII file to be used
:param file_type: observed or background
:param rsp_file: RMF filename or threeML.plugins.OGIP.response.InstrumentResponse instance
:param arf_file: (optional) and ARF filename
"""
# extract the spectrum number if needed
assert isinstance(pha_file_or_instance, six.string_types) or isinstance(
pha_file_or_instance, PHAII
) or isinstance(pha_file_or_instance, Path), "Must provide a FITS file name or PHAII instance"
with fits.open(pha_file_or_instance) as f:
try:
HDUidx = f.index_of("SPECTRUM")
except:
raise RuntimeError(
"The input file %s is not in PHA format" % (pha2_file)
)
spectrum = f[HDUidx]
data = spectrum.data
if "COUNTS" in data.columns.names:
has_rates = False
data_column_name = "COUNTS"
elif "RATE" in data.columns.names:
has_rates = True
data_column_name = "RATE"
else:
raise RuntimeError(
"This file does not contain a RATE nor a COUNTS column. "
"This is not a valid PHA file"
)
# Determine if this is a PHA I or PHA II
if len(data.field(data_column_name).shape) == 2:
num_spectra = data.field(data_column_name).shape[0]
else:
raise RuntimeError("This appears to be a PHA I and not PHA II file")
pha_information = _read_pha_or_pha2_file(
pha_file_or_instance,
None,
file_type,
rsp_file,
arf_file,
treat_as_time_series=True,
)
# default the grouping to all open bins
# this will only be altered if the spectrum is rebinned
self._grouping = np.ones_like(pha_information["counts"])
# this saves the extra properties to the class
self._gathered_keywords = pha_information["gathered_keywords"]
self._file_type = file_type
# need to see if we have count errors, tstart, tstop
# if not, we create an list of None
if pha_information["count_errors"] is None:
count_errors = [None] * num_spectra
else:
count_errors = pha_information["count_errors"]
if pha_information["tstart"] is None:
tstart = [None] * num_spectra
else:
tstart = pha_information["tstart"]
if pha_information["tstop"] is None:
tstop = [None] * num_spectra
else:
tstop = pha_information["tstop"]
# now build the list of binned spectra
list_of_binned_spectra = []
with progress_bar(num_spectra, title="Loading PHAII spectra") as p:
for i in range(num_spectra):
list_of_binned_spectra.append(
BinnedSpectrumWithDispersion(
counts=pha_information["counts"][i],
exposure=pha_information["exposure"][i, 0],
response=pha_information["rsp"],
count_errors=count_errors[i],
sys_errors=pha_information["sys_errors"][i],
is_poisson=pha_information["is_poisson"],
quality=pha_information["quality"].get_slice(i),
mission=pha_information["gathered_keywords"]["mission"],
instrument=pha_information["gathered_keywords"]["instrument"],
tstart=tstart[i],
tstop=tstop[i],
)
)
p.increase()
# now get the time intervals
start_times = data.field("TIME")
stop_times = data.field("ENDTIME")
time_intervals = TimeIntervalSet.from_starts_and_stops(start_times, stop_times)
reference_time = 0
# see if there is a reference time in the file
if "TRIGTIME" in spectrum.header:
reference_time = spectrum.header["TRIGTIME"]
for t_number in range(spectrum.header["TFIELDS"]):
if "TZERO%d" % t_number in spectrum.header:
reference_time = spectrum.header["TZERO%d" % t_number]
super(PHASpectrumSet, self).__init__(
list_of_binned_spectra,
reference_time=reference_time,
time_intervals=time_intervals,
)
def _return_file(self, key):
if key in self._gathered_keywords:
return self._gathered_keywords[key]
else:
return None
[docs] def set_ogip_grouping(self, grouping):
"""
If the counts are rebinned, this updates the grouping
:param grouping:
"""
self._grouping = grouping
@property
def filename(self):
return self._file_name
@property
def background_file(self):
"""
Returns the background file definied in the header, or None if there is none defined
p
:return: a path to a file, or None
"""
return self._return_file("backfile")
@property
def scale_factor(self):
"""
This is a scale factor (in the BACKSCAL keyword) which must be used to rescale background and source
regions
:return:
"""
return self._gathered_keywords["backscal"]
@property
def response_file(self):
"""
Returns the response file definied in the header, or None if there is none defined
:return: a path to a file, or None
"""
return self._return_file("respfile")
@property
def ancillary_file(self):
"""
Returns the ancillary file definied in the header, or None if there is none defined
:return: a path to a file, or None
"""
return self._return_file("ancrfile")
@property
def grouping(self):
return self._grouping
[docs] def clone(
self, new_counts=None, new_count_errors=None,
):
"""
make a new spectrum with new counts and errors and all other
parameters the same
:param new_counts: new counts for the spectrum
:param new_count_errors: new errors from the spectrum
:return: new pha spectrum
"""
if new_counts is None:
new_counts = self.counts
new_count_errors = self.count_errors
if new_count_errors is None:
stat_err = None
else:
stat_err = old_div(new_count_errors, self.exposure)
# create a new PHAII instance
pha = PHAII(
instrument_name=self.instrument,
telescope_name=self.mission,
tstart=0,
telapse=self.exposure,
channel=list(range(1, len(self) + 1)),
rate=old_div(new_counts, self.exposure),
stat_err=stat_err,
quality=self.quality.to_ogip(),
grouping=self.grouping,
exposure=self.exposure,
backscale=self.scale_factor,
respfile=None,
ancrfile=None,
is_poisson=self.is_poisson,
)
return pha
[docs] @classmethod
def from_dispersion_spectrum(cls, dispersion_spectrum, file_type="observed"):
# type: (BinnedSpectrumWithDispersion, str) -> PHASpectrum
if dispersion_spectrum.is_poisson:
rate_errors = None
else:
rate_errors = dispersion_spectrum.rate_errors
pha = PHAII(
instrument_name=dispersion_spectrum.instrument,
telescope_name=dispersion_spectrum.mission,
tstart=dispersion_spectrum.tstart,
telapse=dispersion_spectrum.tstop - dispersion_spectrum.tstart,
channel=list(range(1, len(dispersion_spectrum) + 1)),
rate=dispersion_spectrum.rates,
stat_err=rate_errors,
quality=dispersion_spectrum.quality.to_ogip(),
grouping=np.ones(len(dispersion_spectrum)),
exposure=dispersion_spectrum.exposure,
backscale=dispersion_spectrum.scale_factor,
respfile=None,
ancrfile=None,
is_poisson=dispersion_spectrum.is_poisson,
)
return cls(
pha_file_or_instance=pha,
spectrum_number=1,
file_type=file_type,
rsp_file=dispersion_spectrum.response,
)