Source code for threeML.utils.spectrum.pha_spectrum

from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterable, Optional, Union

import astropy.io.fits as fits
import numpy as np

from threeML.io.fits_file import FITSFile
from threeML.io.logging import setup_logger
from threeML.utils.OGIP.pha import PHAII
from threeML.utils.OGIP.response import InstrumentResponse, OGIPResponse
from threeML.utils.progress_bar import trange
from threeML.utils.spectrum.binned_spectrum import (
    BinnedSpectrumWithDispersion,
    Quality,
)
from threeML.utils.spectrum.binned_spectrum_set import BinnedSpectrumSet
from threeML.utils.time_interval import TimeIntervalSet

log = setup_logger(__name__)

_required_keywords = {}
_required_keywords["observed"] = (
    "mission:TELESCOP,instrument:INSTRUME,filter:FILTER,"
    + "exposure:EXPOSURE,backfile:BACKFILE,"
    + "respfile:RESPFILE,"
    + "ancrfile:ANCRFILE,hduclass:HDUCLASS,"
    + "hduclas1:HDUCLAS1,poisserr:POISSERR,"
    + "chantype:CHANTYPE,detchans:DETCHANS,"
    "backscal:BACKSCAL"
).split(",")

# python types, not fits
_required_keyword_types = {"POISSERR": bool}

# hduvers:HDUVERS

_required_keywords["background"] = (
    "mission:TELESCOP,instrument:INSTRUME,filter:FILTER,"
    + "exposure:EXPOSURE,"
    + "hduclass:HDUCLASS,"
    + "hduclas1:HDUCLAS1,poisserr:POISSERR,"
    + "chantype:CHANTYPE,detchans:DETCHANS,"
    "backscal:BACKSCAL"
).split(",")

# hduvers:HDUVERS

_might_be_columns = {}
_might_be_columns["observed"] = (
    "EXPOSURE,BACKFILE," + "CORRFILE,CORRSCAL," + "RESPFILE,ANCRFILE,BACKSCAL"
).split(",")
_might_be_columns["background"] = ("EXPOSURE,BACKSCAL").split(",")


_valid_input_types = (str, Path, PHAII, FITSFile)


@dataclass(frozen=True)
class _PHAInfo:
    """A container to hold all the gathered information."""

    counts: Iterable[float]
    rates: Iterable[float]
    exposure: Iterable[float]
    is_poisson: bool
    rsp: InstrumentResponse
    gathered_keywords: Dict[str, Any]
    quality: Quality
    file_name: str
    tstart: Optional[Union[float, Iterable[float]]]
    tstop: Optional[Union[float, Iterable[float]]]
    rate_errors: Optional[Iterable[float]]
    sys_errors: Optional[Iterable[float]]
    count_errors: Optional[Iterable[float]]


def _read_pha_or_pha2_file(
    pha_file_or_instance: Union[str, Path, PHAII, FITSFile],
    spectrum_number: Optional[int] = None,
    file_type: str = "observed",
    rsp_file: Optional[Union[str, InstrumentResponse]] = None,
    arf_file: Optional[str] = None,
    treat_as_time_series: bool = False,
) -> _PHAInfo:
    """A function to extract information from pha and pha2 files. It is kept
    separate because the same method is used for reading time series (MUCH
    faster than building a lot of individual spectra) and single spectra.

    :param pha_file_or_instance: either a PHA file name or
        threeML.plugins.OGIP.pha.PHAII instance
    :param spectrum_number: (optional) the spectrum number of the TypeII
        file to be used
    :param file_type: observed or background
    :param rsp_file: RMF filename or
        threeML.plugins.OGIP.response.InstrumentResponse instance
    :param arf_file: (optional) and ARF filename
    :param treat_as_time_series:
    :return:
    """

    for t in _valid_input_types:
        if isinstance(pha_file_or_instance, t):
            break

    else:
        log.error(
            "Must provide a FITS file name or PHAII/FITSFile instance. "
            f"Got {type(pha_file_or_instance)}"
        )

        raise RuntimeError()

    if not isinstance(pha_file_or_instance, (PHAII, FITSFile)):
        pha_file_or_instance: Path = Path(pha_file_or_instance)

        ext = pha_file_or_instance.suffix

        if "{" in ext:
            spectrum_number = int(ext.split("{")[-1].replace("}", ""))

            pha_file_or_instance = str(pha_file_or_instance).split("{")[0]

        # Read the data

        file_name: Path = Path(pha_file_or_instance)

        # create a FITS_FILE instance

        pha_file_or_instance = PHAII.from_fits_file(pha_file_or_instance)

    # If this is already a FITS_FILE instance,

    elif isinstance(pha_file_or_instance, (PHAII, FITSFile)):
        # we simply create a dummy filename

        file_name: Path = Path("pha_instance")

    else:
        log.error("This is a bug. Should never get here!")

        raise RuntimeError()

    if not file_type.lower() in [
        "observed",
        "background",
    ]:
        log.error("Unrecognized filetype keyword value")

        raise RuntimeError()

    file_type = file_type.lower()

    try:
        HDUidx = pha_file_or_instance.index_of("SPECTRUM")

    except KeyError:
        log.error(f"The input file {file_name} is not in PHA format")
        raise RuntimeError()

    # spectrum_number = spectrum_number

    spectrum = pha_file_or_instance[HDUidx]

    data = spectrum.data

    header = spectrum.header

    # We don't support yet the rescaling

    if "CORRFILE" in header:
        if (header.get("CORRFILE").upper().strip() != "NONE") and (
            header.get("CORRFILE").upper().strip() != ""
        ):
            log.error("CORRFILE is not yet supported")

            raise RuntimeError()

    # See if there is there is a QUALITY==0 in the header

    if "QUALITY" in header:
        has_quality_column = False

        if header["QUALITY"] == 0:
            is_all_data_good = True

        else:
            is_all_data_good = False

    else:
        if "QUALITY" in data.columns.names:
            has_quality_column = True

            is_all_data_good = False

        else:
            has_quality_column = False

            is_all_data_good = True

            log.warning(
                "Could not find QUALITY in columns or header of PHA file. This is not a"
                " valid OGIP file. Assuming QUALITY =0 (good)"
            )

    # looking for tstart and tstop

    tstart = None
    tstop = None

    has_tstart = False
    has_tstop = False
    has_telapse = False

    if "TSTART" in header:
        has_tstart_column = False

        has_tstart = True

    else:
        if "TSTART" in data.columns.names:
            has_tstart_column = True

            has_tstart = True

    if "TELAPSE" in header:
        has_telapse_column = False

        has_telapse = True

    else:
        if "TELAPSE" in data.columns.names:
            has_telapse_column = True

            has_telapse = True

    if "TSTOP" in header:
        has_tstop_column = False

        has_tstop = True

    else:
        if "TSTOP" in data.columns.names:
            has_tstop_column = True

            has_tstop = True

    if has_tstop and has_telapse:
        log.warning("Found TSTOP and TELAPSE. This file is invalid. Using TSTOP.")

        has_telapse = False

    # Determine if this file contains COUNTS or RATES

    if "COUNTS" in data.columns.names:
        has_rates = False
        data_column_name = "COUNTS"

    elif "RATE" in data.columns.names:
        has_rates = True
        data_column_name = "RATE"

    else:
        log.error(
            "This file does not contain a RATE nor a COUNTS column. "
            "This is not a valid PHA file"
        )
        raise RuntimeError()

    # Determine if this is a PHA I or PHA II

    if len(data.field(data_column_name).shape) == 2:
        is_typeII_file = True

        if spectrum_number is None and not treat_as_time_series:
            log.error(
                "This is a PHA Type II file. You have to provide a spectrum number"
            )

            raise RuntimeError(
                "This is a PHA Type II file. You have to provide a spectrum number"
            )

    else:
        is_typeII_file = False

    # Collect information from mandatory keywords

    keys = _required_keywords[file_type]

    gathered_keywords = {}

    for k in keys:
        internal_name, keyname = k.split(":")

        key_has_been_collected = False

        if keyname in header:
            if (
                keyname in _required_keyword_types
                and type(header.get(keyname)) is not _required_keyword_types[keyname]
            ):
                log.warning(
                    f"unexpected type of {keyname}, expected "
                    f"{_required_keyword_types[keyname]}\n found "
                    f"{type(header.get(keyname))}: {header.get(keyname)}"
                )
            else:
                gathered_keywords[internal_name] = header.get(keyname)

                # Fix "NONE" in None

                if (
                    gathered_keywords[internal_name] == "NONE"
                    or gathered_keywords[internal_name] == "none"
                ):
                    gathered_keywords[internal_name] = None

                key_has_been_collected = True

        # Note that we check again because the content of the column can override the
        # content of the header

        if keyname in _might_be_columns[file_type] and is_typeII_file:
            # Check if there is a column with this name

            if keyname in data.columns.names:
                # This will set the exposure, among other things

                if not treat_as_time_series:
                    # if we just want a single spectrum

                    gathered_keywords[internal_name] = data[keyname][
                        spectrum_number - 1
                    ]

                else:
                    # else get all the columns

                    gathered_keywords[internal_name] = data[keyname]

                # Fix "NONE" in None
                if isinstance(gathered_keywords[internal_name], np.ndarray):
                    idx = np.where(
                        (gathered_keywords[internal_name] == "NONE")
                        | (gathered_keywords[internal_name] == "none")
                    )
                    gathered_keywords[internal_name][idx] = None
                else:
                    if (
                        gathered_keywords[internal_name] == "NONE"
                        or gathered_keywords[internal_name] == "none"
                    ):
                        gathered_keywords[internal_name] = None

                key_has_been_collected = True

        if not key_has_been_collected:
            # The keyword POISSERR is a special case, because even if it is missing,
            # it is assumed to be False if there is a STAT_ERR column in the file

            if keyname == "POISSERR" and "STAT_ERR" in data.columns.names:
                log.warning(
                    "POISSERR is not set. Assuming non-poisson errors as given in the "
                    "STAT_ERR column"
                )

                gathered_keywords["poisserr"] = False

            elif keyname == "ANCRFILE":
                # Some non-compliant files have no ARF because they don't need one.
                # Don't fail, but issue a warning

                log.warning(
                    "ANCRFILE is not set. This is not a compliant OGIP file. Assuming "
                    "no ARF."
                )

                gathered_keywords["ancrfile"] = None

            elif keyname == "FILTER":
                # Some non-compliant files have no FILTER because they don't need one.
                # Don't fail, but issue a warning

                log.warning(
                    "FILTER is not set. This is not a compliant OGIP file. Assuming no "
                    "FILTER."
                )

                gathered_keywords["filter"] = None

            else:
                log.error(
                    f"Keyword {keyname} not found. File {file_name} is not a proper PHA"
                    " file"
                )

                raise RuntimeError()

    is_poisson = gathered_keywords["poisserr"]

    exposure = gathered_keywords["exposure"]

    # now we need to get the response file so that we can extract the EBOUNDS

    if file_type == "observed":
        if rsp_file is None:
            # this means it should be specified in the header
            rsp_file = gathered_keywords["respfile"]

            if arf_file is None:
                arf_file = gathered_keywords["ancrfile"]

                # Read in the response

        if (
            isinstance(rsp_file, str)
            or isinstance(rsp_file, str)
            or isinstance(rsp_file, Path)
        ):
            rsp: InstrumentResponse = OGIPResponse(rsp_file, arf_file=arf_file)

        elif isinstance(rsp_file, InstrumentResponse):
            # assume a fully formed OGIPResponse
            rsp = rsp_file

        else:
            log.error(f"{rsp_file} is not correct type")

            raise RuntimeError()

    if file_type == "background":
        # we need the rsp ebounds from response to build the histogram

        if not isinstance(rsp_file, InstrumentResponse):
            log.error("You must supply and OGIPResponse to extract the energy bounds")

            raise RuntimeError()

        rsp = rsp_file

    # Now get the data (counts or rates) and their errors. If counts, transform them in
    # rates

    if is_typeII_file:
        # PHA II file
        if has_rates:
            log.debug(f"{file_name} has rates and NOT counts")

            if not treat_as_time_series:
                rates = data.field(data_column_name)[spectrum_number - 1, :]

                rate_errors = None

                if not is_poisson:
                    rate_errors = data.field("STAT_ERR")[spectrum_number - 1, :]

            else:
                rates = data.field(data_column_name)

                rate_errors = None

                if not is_poisson:
                    rate_errors = data.field("STAT_ERR")

        else:
            log.debug(f"{file_name} has counts and NOT rates")

            if not treat_as_time_series:
                # extract the counts

                counts = data.field(data_column_name)[spectrum_number - 1, :].astype(
                    np.int64
                )

                # count the rates

                rates = counts / exposure

                rate_errors = None

                if not is_poisson:
                    rate_errors = (
                        data.field("STAT_ERR")[spectrum_number - 1, :] / exposure
                    )

            else:
                counts = data.field(data_column_name).astype(np.int64)

                rates = counts / np.atleast_2d(exposure).T

                rate_errors = None

                if not is_poisson:
                    rate_errors = data.field("STAT_ERR") / np.atleast_2d(exposure).T

        if "SYS_ERR" in data.columns.names:
            if not treat_as_time_series:
                sys_errors = data.field("SYS_ERR")[spectrum_number - 1, :]

            else:
                sys_errors = data.field("SYS_ERR")

        else:
            sys_errors = np.zeros(rates.shape)

        if has_quality_column:
            if not treat_as_time_series:
                try:
                    quality = data.field("QUALITY")[spectrum_number - 1, :]

                except IndexError:
                    # GBM CSPEC files do not follow OGIP conventions and instead
                    # list simply QUALITY=0 for each spectrum
                    # so we have to read them differently

                    quality_element = data.field("QUALITY")[spectrum_number - 1]

                    log.warning(
                        "The QUALITY column has the wrong shape. This PHAII file does "
                        "not follow OGIP standards"
                    )

                    if quality_element == 0:
                        quality = np.zeros_like(rates, dtype=int)

                    else:
                        quality = np.zeros_like(rates, dtype=int) + 5

            else:
                # we need to be careful again because the QUALITY column is not always
                # the correct shape

                quality_element = data.field("QUALITY")

                if quality_element.shape == rates.shape:
                    # This is the proper way for the quality to be stored

                    quality = quality_element

                else:
                    quality = np.zeros_like(rates, dtype=int)

                    for i, q in enumerate(quality_element):
                        if q != 0:
                            quality[i, :] = 5

        else:
            if is_all_data_good:
                quality = np.zeros_like(rates, dtype=int)

            else:
                quality = np.zeros_like(rates, dtype=int) + 5

        if has_tstart:
            if has_tstart_column:
                if not treat_as_time_series:
                    tstart = data.field("TSTART")[spectrum_number - 1]

                else:
                    tstart = data.field("TSTART")

        if has_tstop:
            if has_tstop_column:
                if not treat_as_time_series:
                    tstop = data.field("TSTOP")[spectrum_number - 1]

                else:
                    tstop = data.field("TSTOP")

        if has_telapse:
            if has_telapse_column:
                if not treat_as_time_series:
                    tstop = tstart + data.field("TELAPSE")[spectrum_number - 1]

                else:
                    tstop = tstart + data.field("TELAPSE")

    elif not is_typeII_file:
        if treat_as_time_series:
            log.error(
                "This is not a PHAII file but you specified to treat it as a time "
                "series"
            )
            raise RuntimeError()

        # PHA 1 file
        if has_rates:
            rates = data.field(data_column_name)

            rate_errors = None

            if not is_poisson:
                rate_errors = data.field("STAT_ERR")

        else:
            counts = data.field(data_column_name).astype(np.int64)

            rates = counts / exposure

            rate_errors = None

            if not is_poisson:
                rate_errors = data.field("STAT_ERR") / exposure

        if "SYS_ERR" in data.columns.names:
            sys_errors = data.field("SYS_ERR")

        else:
            sys_errors = np.zeros(rates.shape)

        if has_quality_column:
            quality = data.field("QUALITY")

        else:
            if is_all_data_good:
                quality = np.zeros_like(rates, dtype=int)

            else:
                quality = np.zeros_like(rates, dtype=int) + 5

        # read start and stop times if needed

        if has_tstart:
            if has_tstart_column:
                tstart = data.field("TSTART")

            else:
                tstart = header["TSTART"]

        if has_tstop:
            if has_tstop_column:
                tstop = data.field("TSTOP")

            else:
                tstop = header["TSTOP"]

        if has_telapse:
            if has_telapse_column:
                tstop = tstart + data.field("TELAPSE")

            else:
                tstop = tstart + header["TELAPSE"]

        # Now that we have read it, some safety checks

        if rates.shape[0] != gathered_keywords["detchans"]:
            log.error(
                "The data column (RATES or COUNTS) has a different number of entries "
                "than the DETCHANS declared in the header"
            )
            raise RuntimeError()

    quality = Quality.from_ogip(quality)

    if not treat_as_time_series:
        log.debug(f"{file_name} is not a time series")

        if has_rates:
            counts = rates * exposure

        if not is_poisson:
            log.debug(f"{file_name} is not Poisson")

            count_errors = rate_errors * exposure

        else:
            log.debug(f"{file_name} is Poisson")

            count_errors = None

    else:
        log.debug(f"{file_name} is a time series")

        exposure = np.atleast_2d(exposure).T

        if has_rates:
            counts = rates * exposure

        if not is_poisson:
            log.debug(f"{file_name} is not Poisson")

            count_errors = rate_errors * exposure

        else:
            log.debug(f"{file_name} is Poisson")

            count_errors = None

    return _PHAInfo(
        counts=counts,
        count_errors=count_errors,
        rates=rates,
        rate_errors=rate_errors,
        sys_errors=sys_errors,
        exposure=exposure,
        is_poisson=is_poisson,
        rsp=rsp,
        gathered_keywords=gathered_keywords,
        quality=quality,
        file_name=file_name,
        tstart=tstart,
        tstop=tstop,
    )


[docs] class PHASpectrum(BinnedSpectrumWithDispersion): def __init__( self, pha_file_or_instance: Union[str, Path, PHAII, FITSFile], spectrum_number: Optional[int] = None, file_type: str = "observed", rsp_file: Optional[Union[str, InstrumentResponse]] = None, arf_file: Optional[str] = None, ) -> None: """A spectrum with dispersion build from an OGIP-compliant PHA FITS file. Both Type I & II files can be read. Type II spectra are selected either by specifying the spectrum_number or via the {spectrum_number} file name convention used in XSPEC. If the file_type is background, a 3ML InstrumentResponse or subclass must be passed so that the energy bounds can be obtained. :param pha_file_or_instance: either a PHA file name or threeML.plugins.OGIP.pha.PHAII instance :param spectrum_number: (optional) the spectrum number of the TypeII file to be used :param file_type: observed or background :param rsp_file: RMF filename or threeML.plugins.OGIP.response.InstrumentResponse instance :param arf_file: (optional) and ARF filename """ # extract the spectrum number if needed for t in _valid_input_types: if isinstance(pha_file_or_instance, t): break else: log.error( "Must provide a FITS file name or PHAII/FITSFile instance. " f"Got {type(pha_file_or_instance)}" ) raise RuntimeError() pha_information: _PHAInfo = _read_pha_or_pha2_file( pha_file_or_instance, spectrum_number, file_type, rsp_file, arf_file, treat_as_time_series=False, ) # default the grouping to all open bins # this will only be altered if the spectrum is rebinned self._grouping: np.ndarray = np.ones_like(pha_information.counts) # this saves the extra properties to the class self._gathered_keywords = pha_information.gathered_keywords self._file_type: str = file_type self._file_name: str = pha_information.file_name # pass the needed spectrum values back up # remember that Spectrum reads counts, but returns # rates! super(PHASpectrum, self).__init__( counts=pha_information.counts, exposure=pha_information.exposure, response=pha_information.rsp, count_errors=pha_information.count_errors, sys_errors=pha_information.sys_errors, is_poisson=pha_information.is_poisson, quality=pha_information.quality, mission=pha_information.gathered_keywords["mission"], instrument=pha_information.gathered_keywords["instrument"], tstart=pha_information.tstart, tstop=pha_information.tstop, ) def _return_file(self, key) -> Union[None, str]: if key in self._gathered_keywords: return self._gathered_keywords[key] else: return None
[docs] def set_ogip_grouping(self, grouping) -> None: """If the counts are rebinned, this updates the grouping :param grouping:""" self._grouping = grouping
[docs] def to_binned_spectrum(self) -> BinnedSpectrumWithDispersion: """Convert directly to as Binned Spectrum :returns:""" return BinnedSpectrumWithDispersion( counts=self.counts, exposure=self.exposure, response=self.response, count_errors=self.count_errors, sys_errors=self.sys_errors, quality=self.quality, scale_factor=self.scale_factor, is_poisson=self.is_poisson, mission=self.mission, instrument=self.instrument, tstart=self.tstart, tstop=self.tstart, )
@property def filename(self) -> str: return self._file_name @property def background_file(self) -> Union[None, str]: """Returns the background file definied in the header, or None if there is none defined :return: a path to a file, or None.""" back_file = self._return_file("backfile") if back_file == "": back_file = None return back_file @property def scale_factor(self) -> float: """This is a scale factor (in the BACKSCAL keyword) which must be used to rescale background and source regions. :return: """ return self._gathered_keywords["backscal"] @property def response_file(self) -> Union[str, None]: """Returns the response file definied in the header, or None if there is none defined. :return: a path to a file, or None """ return self._return_file("respfile") @property def ancillary_file(self) -> Union[str, None]: """Returns the ancillary file definied in the header, or None if there is none defined. :return: a path to a file, or None """ return self._return_file("ancrfile") @property def grouping(self) -> np.ndarray: return self._grouping
[docs] def clone( self, new_counts=None, new_count_errors=None, new_exposure=None, new_scale_factor=None, ) -> "PHASpectrum": """Make a new spectrum with new counts and errors and all other parameters the same. :param new_exposure: the new exposure for the clone :param new_scale_factor: the new scale factor for the clone :param new_counts: new counts for the spectrum :param new_count_errors: new errors from the spectrum :return: new pha spectrum """ if new_exposure is None: new_exposure = self.exposure if new_counts is None: new_counts = self.counts new_count_errors = self.count_errors if new_count_errors is None: stat_err = None else: stat_err = new_count_errors / new_exposure if self._tstart is None: tstart = 0 else: tstart = self._tstart if self._tstop is None: telapse = new_exposure else: telapse = self._tstop - tstart if new_scale_factor is None: new_scale_factor = self.scale_factor # create a new PHAII instance pha = PHAII( instrument_name=self.instrument, telescope_name=self.mission, tstart=tstart, telapse=telapse, channel=list(range(1, len(self) + 1)), rate=new_counts / self.exposure, stat_err=stat_err, quality=self.quality.to_ogip(), grouping=self.grouping, exposure=new_exposure, backscale=new_scale_factor, respfile=None, ancrfile=None, is_poisson=self.is_poisson, ) return pha
[docs] @classmethod def from_dispersion_spectrum( cls, dispersion_spectrum, file_type="observed", response=None ): # type: (BinnedSpectrumWithDispersion, str) -> PHASpectrum if dispersion_spectrum.is_poisson: rate_errors = None else: rate_errors = dispersion_spectrum.rate_errors if dispersion_spectrum.tstart is None: tstart = 0 else: tstart = dispersion_spectrum.tstart if dispersion_spectrum.tstop is None: telapse = dispersion_spectrum.exposure else: telapse = dispersion_spectrum.tstop - tstart pha = PHAII( instrument_name=dispersion_spectrum.instrument, telescope_name=dispersion_spectrum.mission, tstart=tstart, # TODO: add this in so that we have proper time! telapse=telapse, channel=list(range(1, len(dispersion_spectrum) + 1)), rate=dispersion_spectrum.rates, stat_err=rate_errors, quality=dispersion_spectrum.quality.to_ogip(), grouping=np.ones(len(dispersion_spectrum)), exposure=dispersion_spectrum.exposure, backscale=dispersion_spectrum.scale_factor, respfile=None, ancrfile=None, is_poisson=dispersion_spectrum.is_poisson, ) if file_type == "background": if response is None: log.error( "passed a background file but no response to extract energy " "spectra." ) raise AssertionError() else: response = dispersion_spectrum.response return cls( pha_file_or_instance=pha, spectrum_number=1, file_type=file_type, rsp_file=response, )
[docs] class PHASpectrumSet(BinnedSpectrumSet): def __init__( self, pha_file_or_instance: Union[str, Path, PHAII], file_type: str = "observed", rsp_file: Optional[str] = None, arf_file: Optional[str] = None, ): """A spectrum with dispersion build from an OGIP-compliant PHA FITS file. Both Type I & II files can be read. Type II spectra are selected either by specifying the spectrum_number or via the {spectrum_number} file name convention used in XSPEC. If the file_type is background, a 3ML InstrumentResponse or subclass must be passed so that the energy bounds can be obtained. :param pha_file_or_instance: either a PHA file name or threeML.plugins.OGIP.pha.PHAII instance :param spectrum_number: (optional) the spectrum number of the TypeII file to be used :param file_type: observed or background :param rsp_file: RMF filename or threeML.plugins.OGIP.response.InstrumentResponse instance :param arf_file: (optional) and ARF filename """ # extract the spectrum number if needed for t in _valid_input_types: if isinstance(pha_file_or_instance, t): break else: log.error( "Must provide a FITS file name or PHAII instance. " f"Got {type(pha_file_or_instance)}" ) raise RuntimeError() with fits.open(pha_file_or_instance) as f: try: HDUidx = f.index_of("SPECTRUM") except KeyError: raise RuntimeError( "The input file %s is not in PHA format" % (pha_file_or_instance) ) spectrum = f[HDUidx] data = spectrum.data if "COUNTS" in data.columns.names: data_column_name = "COUNTS" elif "RATE" in data.columns.names: data_column_name = "RATE" else: log.error( "This file does not contain a RATE nor a COUNTS column. " "This is not a valid PHA file" ) raise RuntimeError() # Determine if this is a PHA I or PHA II if len(data.field(data_column_name).shape) == 2: num_spectra = data.field(data_column_name).shape[0] else: log.error("This appears to be a PHA I and not PHA II file") raise RuntimeError() pha_information: _PHAInfo = _read_pha_or_pha2_file( pha_file_or_instance, None, file_type, rsp_file, arf_file, treat_as_time_series=True, ) # default the grouping to all open bins # this will only be altered if the spectrum is rebinned self._grouping = np.ones_like(pha_information.counts) # this saves the extra properties to the class self._gathered_keywords = pha_information.gathered_keywords self._file_type = file_type # need to see if we have count errors, tstart, tstop # if not, we create an list of None if pha_information.count_errors is None: count_errors = [None] * num_spectra else: count_errors = pha_information.count_errors if pha_information.tstart is None: tstart = [None] * num_spectra else: tstart = pha_information.tstart if pha_information.tstop is None: tstop = [None] * num_spectra else: tstop = pha_information.tstop # now build the list of binned spectra list_of_binned_spectra = [] for i in trange(num_spectra, desc="Loading PHAII Spectra"): list_of_binned_spectra.append( BinnedSpectrumWithDispersion( counts=pha_information.counts[i], exposure=pha_information.exposure[i, 0], response=pha_information.rsp, count_errors=count_errors[i], sys_errors=pha_information.sys_errors[i], is_poisson=pha_information.is_poisson, quality=pha_information.quality.get_slice(i), mission=pha_information.gathered_keywords["mission"], instrument=pha_information.gathered_keywords["instrument"], tstart=tstart[i], tstop=tstop[i], ) ) # now get the time intervals _allowed_time_keys = (("TIME", "ENDTIME"), ("TSTART", "TSTOP")) for keys in _allowed_time_keys: try: start_times = data.field(keys[0]) stop_times = data.field(keys[1]) break except KeyError: pass else: log.error( f"Could not find times in {pha_file_or_instance}. Tried: " f"{_allowed_time_keys}" ) raise RuntimeError() time_intervals = TimeIntervalSet.from_starts_and_stops(start_times, stop_times) reference_time = 0 # see if there is a reference time in the file if "TRIGTIME" in spectrum.header: reference_time = spectrum.header["TRIGTIME"] for t_number in range(spectrum.header["TFIELDS"]): if "TZERO%d" % t_number in spectrum.header: reference_time = spectrum.header["TZERO%d" % t_number] super(PHASpectrumSet, self).__init__( list_of_binned_spectra, reference_time=reference_time, time_intervals=time_intervals, ) def _return_file(self, key): if key in self._gathered_keywords: return self._gathered_keywords[key] else: return None
[docs] def set_ogip_grouping(self, grouping): """If the counts are rebinned, this updates the grouping :param grouping:""" self._grouping = grouping
@property def filename(self): return self._file_name @property def background_file(self): """Returns the background file definied in the header, or None if there is none defined :return: a path to a file, or None.""" return self._return_file("backfile") @property def scale_factor(self): """This is a scale factor (in the BACKSCAL keyword) which must be used to rescale background and source regions. :return: """ return self._gathered_keywords["backscal"] @property def response_file(self): """Returns the response file definied in the header, or None if there is none defined. :return: a path to a file, or None """ return self._return_file("respfile") @property def ancillary_file(self): """Returns the ancillary file definied in the header, or None if there is none defined. :return: a path to a file, or None """ return self._return_file("ancrfile") @property def grouping(self): return self._grouping
[docs] def clone( self, new_counts=None, new_count_errors=None, ): """Make a new spectrum with new counts and errors and all other parameters the same. :param new_counts: new counts for the spectrum :param new_count_errors: new errors from the spectrum :return: new pha spectrum """ if new_counts is None: new_counts = self.counts new_count_errors = self.count_errors if new_count_errors is None: stat_err = None else: stat_err = new_count_errors / self.exposure # create a new PHAII instance pha = PHAII( instrument_name=self.instrument, telescope_name=self.mission, tstart=0, telapse=self.exposure, channel=list(range(1, len(self) + 1)), rate=new_counts / self.exposure, stat_err=stat_err, quality=self.quality.to_ogip(), grouping=self.grouping, exposure=self.exposure, backscale=self.scale_factor, respfile=None, ancrfile=None, is_poisson=self.is_poisson, ) return pha
[docs] @classmethod def from_dispersion_spectrum(cls, dispersion_spectrum, file_type="observed"): # type: (BinnedSpectrumWithDispersion, str) -> PHASpectrum if dispersion_spectrum.is_poisson: rate_errors = None else: rate_errors = dispersion_spectrum.rate_errors pha = PHAII( instrument_name=dispersion_spectrum.instrument, telescope_name=dispersion_spectrum.mission, tstart=dispersion_spectrum.tstart, telapse=dispersion_spectrum.tstop - dispersion_spectrum.tstart, channel=list(range(1, len(dispersion_spectrum) + 1)), rate=dispersion_spectrum.rates, stat_err=rate_errors, quality=dispersion_spectrum.quality.to_ogip(), grouping=np.ones(len(dispersion_spectrum)), exposure=dispersion_spectrum.exposure, backscale=dispersion_spectrum.scale_factor, respfile=None, ancrfile=None, is_poisson=dispersion_spectrum.is_poisson, ) return cls( pha_file_or_instance=pha, spectrum_number=1, file_type=file_type, rsp_file=dispersion_spectrum.response, )