Source code for threeML.utils.spectrum.pha_spectrum

from __future__ import division
from builtins import range
from past.utils import old_div
import collections

import astropy.io.fits as fits
import numpy as np
import os
import warnings
import six


from threeML.io.progress_bar import progress_bar
from threeML.utils.OGIP.response import OGIPResponse, InstrumentResponse
from threeML.utils.OGIP.pha import PHAII
from threeML.utils.spectrum.binned_spectrum import BinnedSpectrumWithDispersion, Quality
from threeML.utils.spectrum.binned_spectrum_set import BinnedSpectrumSet
from threeML.utils.time_interval import TimeIntervalSet

_required_keywords = {}
_required_keywords["observed"] = (
    "mission:TELESCOP,instrument:INSTRUME,filter:FILTER,"
    + "exposure:EXPOSURE,backfile:BACKFILE,"
    + "respfile:RESPFILE,"
    + "ancrfile:ANCRFILE,hduclass:HDUCLASS,"
    + "hduclas1:HDUCLAS1,poisserr:POISSERR,"
    + "chantype:CHANTYPE,detchans:DETCHANS,"
    "backscal:BACKSCAL"
).split(",")

# python types, not fits
_required_keyword_types = {"POISSERR": bool}

# hduvers:HDUVERS

_required_keywords["background"] = (
    "mission:TELESCOP,instrument:INSTRUME,filter:FILTER,"
    + "exposure:EXPOSURE,"
    + "hduclass:HDUCLASS,"
    + "hduclas1:HDUCLAS1,poisserr:POISSERR,"
    + "chantype:CHANTYPE,detchans:DETCHANS,"
    "backscal:BACKSCAL"
).split(",")

# hduvers:HDUVERS

_might_be_columns = {}
_might_be_columns["observed"] = (
    "EXPOSURE,BACKFILE," + "CORRFILE,CORRSCAL," + "RESPFILE,ANCRFILE," "BACKSCAL"
).split(",")
_might_be_columns["background"] = ("EXPOSURE,BACKSCAL").split(",")


def _read_pha_or_pha2_file(
    pha_file_or_instance,
    spectrum_number=None,
    file_type="observed",
    rsp_file=None,
    arf_file=None,
    treat_as_time_series=False,
):
    """
    A function to extract information from pha and pha2 files. It is kept separate because the same method is
    used for reading time series (MUCH faster than building a lot of individual spectra) and single spectra.


    :param pha_file_or_instance: either a PHA file name or threeML.plugins.OGIP.pha.PHAII instance
    :param spectrum_number: (optional) the spectrum number of the TypeII file to be used
    :param file_type: observed or background
    :param rsp_file: RMF filename or threeML.plugins.OGIP.response.InstrumentResponse instance
    :param arf_file: (optional) and ARF filename
    :param treat_as_time_series:
    :return:
    """

    assert isinstance(pha_file_or_instance, six.string_types) or isinstance(
        pha_file_or_instance, PHAII
    ), "Must provide a FITS file name or PHAII instance"

    if isinstance(pha_file_or_instance, six.string_types):

        ext = os.path.splitext(pha_file_or_instance)[-1]

        if "{" in ext:
            spectrum_number = int(ext.split("{")[-1].replace("}", ""))

            pha_file_or_instance = pha_file_or_instance.split("{")[0]

        # Read the data

        filename = pha_file_or_instance

        # create a FITS_FILE instance

        pha_file_or_instance = PHAII.from_fits_file(pha_file_or_instance)

    # If this is already a FITS_FILE instance,

    elif isinstance(pha_file_or_instance, PHAII):

        # we simply create a dummy filename

        filename = "pha_instance"

    else:

        raise RuntimeError("This is a bug")

    file_name = filename

    assert file_type.lower() in [
        "observed",
        "background",
    ], "Unrecognized filetype keyword value"

    file_type = file_type.lower()

    try:

        HDUidx = pha_file_or_instance.index_of("SPECTRUM")

    except:

        raise RuntimeError(
            "The input file %s is not in PHA format" % (pha_file_or_instance)
        )

    # spectrum_number = spectrum_number

    spectrum = pha_file_or_instance[HDUidx]

    data = spectrum.data
    header = spectrum.header

    # We don't support yet the rescaling

    if "CORRFILE" in header:

        if (header.get("CORRFILE").upper().strip() != "NONE") and (
            header.get("CORRFILE").upper().strip() != ""
        ):
            raise RuntimeError("CORRFILE is not yet supported")

    # See if there is there is a QUALITY==0 in the header

    if "QUALITY" in header:

        has_quality_column = False

        if header["QUALITY"] == 0:

            is_all_data_good = True

        else:

            is_all_data_good = False

    else:

        if "QUALITY" in data.columns.names:

            has_quality_column = True

            is_all_data_good = False

        else:

            has_quality_column = False

            is_all_data_good = True

            warnings.warn(
                "Could not find QUALITY in columns or header of PHA file. This is not a valid OGIP file. Assuming QUALITY =0 (good)"
            )

    # looking for tstart and tstop

    tstart = None
    tstop = None

    has_tstart = False
    has_tstop = False
    has_telapse = False

    if "TSTART" in header:

        has_tstart_column = False

        has_tstart = True

    else:

        if "TSTART" in data.columns.names:

            has_tstart_column = True

            has_tstart = True

    if "TELAPSE" in header:

        has_telapse_column = False

        has_telapse = True

    else:

        if "TELAPSE" in data.columns.names:
            has_telapse_column = True

            has_telapse = True

    if "TSTOP" in header:

        has_tstop_column = False

        has_tstop = True

    else:

        if "TSTOP" in data.columns.names:
            has_tstop_column = True

            has_tstop = True

    if has_tstop and has_telapse:

        warnings.warn("Found TSTOP and TELAPSE. This file is invalid. Using TSTOP.")

        has_telapse = False

    # Determine if this file contains COUNTS or RATES

    if "COUNTS" in data.columns.names:

        has_rates = False
        data_column_name = "COUNTS"

    elif "RATE" in data.columns.names:

        has_rates = True
        data_column_name = "RATE"

    else:

        raise RuntimeError(
            "This file does not contain a RATE nor a COUNTS column. "
            "This is not a valid PHA file"
        )

    # Determine if this is a PHA I or PHA II
    if len(data.field(data_column_name).shape) == 2:

        typeII = True

        if spectrum_number == None and not treat_as_time_series:
            raise RuntimeError(
                "This is a PHA Type II file. You have to provide a spectrum number"
            )

    else:

        typeII = False

    # Collect information from mandatory keywords

    keys = _required_keywords[file_type]

    gathered_keywords = {}

    for k in keys:

        internal_name, keyname = k.split(":")

        key_has_been_collected = False

        if keyname in header:
            if (
                keyname in _required_keyword_types
                and type(header.get(keyname)) is not _required_keyword_types[keyname]
            ):
                warnings.warn(
                    "unexpected type of %(keyname)s, expected %(expected_type)s\n found %(found_type)s: %(found_value)s"
                    % dict(
                        keyname=keyname,
                        expected_type=_required_keyword_types[keyname],
                        found_type=type(header.get(keyname)),
                        found_value=header.get(keyname),
                    )
                )
            else:
                gathered_keywords[internal_name] = header.get(keyname)

                # Fix "NONE" in None
                if (
                    gathered_keywords[internal_name] == "NONE"
                    or gathered_keywords[internal_name] == "none"
                ):
                    gathered_keywords[internal_name] = None

                key_has_been_collected = True

        # Note that we check again because the content of the column can override the content of the header

        if keyname in _might_be_columns[file_type] and typeII:

            # Check if there is a column with this name

            if keyname in data.columns.names:
                # This will set the exposure, among other things

                if not treat_as_time_series:

                    # if we just want a single spectrum

                    gathered_keywords[internal_name] = data[keyname][
                        spectrum_number - 1
                    ]

                else:

                    # else get all the columns

                    gathered_keywords[internal_name] = data[keyname]

                # Fix "NONE" in None
                if (
                    gathered_keywords[internal_name] == "NONE"
                    or gathered_keywords[internal_name] == "none"
                ):
                    gathered_keywords[internal_name] = None

                key_has_been_collected = True

        if not key_has_been_collected:

            # The keyword POISSERR is a special case, because even if it is missing,
            # it is assumed to be False if there is a STAT_ERR column in the file

            if keyname == "POISSERR" and "STAT_ERR" in data.columns.names:

                warnings.warn(
                    "POISSERR is not set. Assuming non-poisson errors as given in the "
                    "STAT_ERR column"
                )

                gathered_keywords["poisserr"] = False

            elif keyname == "ANCRFILE":

                # Some non-compliant files have no ARF because they don't need one. Don't fail, but issue a
                # warning

                warnings.warn(
                    "ANCRFILE is not set. This is not a compliant OGIP file. Assuming no ARF."
                )

                gathered_keywords["ancrfile"] = None

            elif keyname == "FILTER":

                # Some non-compliant files have no FILTER because they don't need one. Don't fail, but issue a
                # warning

                warnings.warn(
                    "FILTER is not set. This is not a compliant OGIP file. Assuming no FILTER."
                )

                gathered_keywords["filter"] = None

            else:

                raise RuntimeError(
                    "Keyword %s not found. File %s is not a proper PHA "
                    "file" % (keyname, filename)
                )

    is_poisson = gathered_keywords["poisserr"]

    exposure = gathered_keywords["exposure"]

    # now we need to get the response file so that we can extract the EBOUNDS

    if file_type == "observed":

        if rsp_file is None:

            # this means it should be specified in the header
            rsp_file = gathered_keywords["respfile"]

            if arf_file is None:
                arf_file = gathered_keywords["ancrfile"]

                # Read in the response

        if isinstance(rsp_file, six.string_types) or isinstance(rsp_file, str):
            rsp = OGIPResponse(rsp_file, arf_file=arf_file)

        else:

            # assume a fully formed OGIPResponse
            rsp = rsp_file

    if file_type == "background":
        # we need the rsp ebounds from response to build the histogram

        assert isinstance(
            rsp_file, InstrumentResponse
        ), "You must supply and OGIPResponse to extract the energy bounds"

        rsp = rsp_file

    # Now get the data (counts or rates) and their errors. If counts, transform them in rates

    if typeII:

        # PHA II file
        if has_rates:

            if not treat_as_time_series:

                rates = data.field(data_column_name)[spectrum_number - 1, :]

                rate_errors = None

                if not is_poisson:
                    rate_errors = data.field("STAT_ERR")[spectrum_number - 1, :]

            else:

                rates = data.field(data_column_name)

                rate_errors = None

                if not is_poisson:
                    rate_errors = data.field("STAT_ERR")

        else:

            if not treat_as_time_series:

                rates = old_div(
                    data.field(data_column_name)[spectrum_number - 1, :], exposure
                )

                rate_errors = None

                if not is_poisson:
                    rate_errors = old_div(
                        data.field("STAT_ERR")[spectrum_number - 1, :], exposure
                    )

            else:

                rates = old_div(data.field(data_column_name), np.atleast_2d(exposure).T)

                rate_errors = None

                if not is_poisson:
                    rate_errors = old_div(
                        data.field("STAT_ERR"), np.atleast_2d(exposure).T
                    )

        if "SYS_ERR" in data.columns.names:

            if not treat_as_time_series:

                sys_errors = data.field("SYS_ERR")[spectrum_number - 1, :]

            else:

                sys_errors = data.field("SYS_ERR")

        else:

            sys_errors = np.zeros(rates.shape)

        if has_quality_column:

            if not treat_as_time_series:

                try:

                    quality = data.field("QUALITY")[spectrum_number - 1, :]

                except (IndexError):

                    # GBM CSPEC files do not follow OGIP conventions and instead
                    # list simply QUALITY=0 for each spectrum
                    # so we have to read them differently

                    quality_element = data.field("QUALITY")[spectrum_number - 1]

                    warnings.warn(
                        "The QUALITY column has the wrong shape. This PHAII file does not follow OGIP standards"
                    )

                    if quality_element == 0:

                        quality = np.zeros_like(rates, dtype=int)

                    else:

                        quality = np.zeros_like(rates, dtype=int) + 5

            else:

                # we need to be careful again because the QUALITY column is not always the correct shape

                quality_element = data.field("QUALITY")

                if quality_element.shape == rates.shape:

                    # This is the proper way for the quality to be stored

                    quality = quality_element

                else:

                    quality = np.zeros_like(rates, dtype=int)

                    for i, q in enumerate(quality_element):

                        if q != 0:
                            quality[i, :] = 5

        else:

            if is_all_data_good:

                quality = np.zeros_like(rates, dtype=int)

            else:

                quality = np.zeros_like(rates, dtype=int) + 5

        if has_tstart:

            if has_tstart_column:

                if not treat_as_time_series:

                    tstart = data.field("TSTART")[spectrum_number - 1]

                else:

                    tstart = data.field("TSTART")

        if has_tstop:

            if has_tstop_column:

                if not treat_as_time_series:

                    tstop = data.field("TSTOP")[spectrum_number - 1]

                else:

                    tstop = data.field("TSTOP")

        if has_telapse:

            if has_telapse_column:

                if not treat_as_time_series:

                    tstop = tstart + data.field("TELAPSE")[spectrum_number - 1]

                else:

                    tstop = tstart + data.field("TELAPSE")

    elif typeII == False:

        assert (
            not treat_as_time_series
        ), "This is not a PHAII file but you specified to treat it as a time series"

        # PHA 1 file
        if has_rates:

            rates = data.field(data_column_name)

            rate_errors = None

            if not is_poisson:
                rate_errors = data.field("STAT_ERR")

        else:

            rates = old_div(data.field(data_column_name), exposure)

            rate_errors = None

            if not is_poisson:
                rate_errors = old_div(data.field("STAT_ERR"), exposure)

        if "SYS_ERR" in data.columns.names:

            sys_errors = data.field("SYS_ERR")

        else:

            sys_errors = np.zeros(rates.shape)

        if has_quality_column:

            quality = data.field("QUALITY")

        else:

            if is_all_data_good:

                quality = np.zeros_like(rates, dtype=int)

            else:

                quality = np.zeros_like(rates, dtype=int) + 5

        # read start and stop times if needed

        if has_tstart:

            if has_tstart_column:

                tstart = data.field("TSTART")

            else:

                tstart = header["TSTART"]

        if has_tstop:

            if has_tstop_column:

                tstop = data.field("TSTOP")

            else:

                tstop = header["TSTOP"]

        if has_telapse:

            if has_telapse_column:

                tstop = tstart + data.field("TELAPSE")

            else:

                tstop = tstart + header["TELAPSE"]

        # Now that we have read it, some safety checks

        assert rates.shape[0] == gathered_keywords["detchans"], (
            "The data column (RATES or COUNTS) has a different number of entries than the "
            "DETCHANS declared in the header"
        )

    quality = Quality.from_ogip(quality)

    if not treat_as_time_series:

        counts = rates * exposure

        if not is_poisson:

            count_errors = rate_errors * exposure

        else:

            count_errors = None

    else:

        exposure = np.atleast_2d(exposure).T

        counts = rates * exposure

        if not is_poisson:

            count_errors = rate_errors * exposure

        else:

            count_errors = None

    out = collections.OrderedDict(
        counts=counts,
        count_errors=count_errors,
        rates=rates,
        rate_errors=rate_errors,
        sys_errors=sys_errors,
        exposure=exposure,
        is_poisson=is_poisson,
        rsp=rsp,
        gathered_keywords=gathered_keywords,
        quality=quality,
        file_name=file_name,
        tstart=tstart,
        tstop=tstop,
    )

    return out


[docs]class PHASpectrum(BinnedSpectrumWithDispersion): def __init__( self, pha_file_or_instance, spectrum_number=None, file_type="observed", rsp_file=None, arf_file=None, ): """ A spectrum with dispersion build from an OGIP-compliant PHA FITS file. Both Type I & II files can be read. Type II spectra are selected either by specifying the spectrum_number or via the {spectrum_number} file name convention used in XSPEC. If the file_type is background, a 3ML InstrumentResponse or subclass must be passed so that the energy bounds can be obtained. :param pha_file_or_instance: either a PHA file name or threeML.plugins.OGIP.pha.PHAII instance :param spectrum_number: (optional) the spectrum number of the TypeII file to be used :param file_type: observed or background :param rsp_file: RMF filename or threeML.plugins.OGIP.response.InstrumentResponse instance :param arf_file: (optional) and ARF filename """ # extract the spectrum number if needed assert isinstance(pha_file_or_instance, six.string_types) or isinstance( pha_file_or_instance, PHAII ), "Must provide a FITS file name or PHAII instance" pha_information = _read_pha_or_pha2_file( pha_file_or_instance, spectrum_number, file_type, rsp_file, arf_file, treat_as_time_series=False, ) # default the grouping to all open bins # this will only be altered if the spectrum is rebinned self._grouping = np.ones_like(pha_information["counts"]) # this saves the extra properties to the class self._gathered_keywords = pha_information["gathered_keywords"] self._file_type = file_type self._file_name = pha_information["file_name"] # pass the needed spectrum values back up # remember that Spectrum reads counts, but returns # rates! super(PHASpectrum, self).__init__( counts=pha_information["counts"], exposure=pha_information["exposure"], response=pha_information["rsp"], count_errors=pha_information["count_errors"], sys_errors=pha_information["sys_errors"], is_poisson=pha_information["is_poisson"], quality=pha_information["quality"], mission=pha_information["gathered_keywords"]["mission"], instrument=pha_information["gathered_keywords"]["instrument"], tstart=pha_information["tstart"], tstop=pha_information["tstop"], ) def _return_file(self, key): if key in self._gathered_keywords: return self._gathered_keywords[key] else: return None
[docs] def set_ogip_grouping(self, grouping): """ If the counts are rebinned, this updates the grouping :param grouping: """ self._grouping = grouping
@property def filename(self): return self._file_name @property def background_file(self): """ Returns the background file definied in the header, or None if there is none defined p :return: a path to a file, or None """ back_file = self._return_file('backfile') if back_file == "": back_file = None return back_file @property def scale_factor(self): """ This is a scale factor (in the BACKSCAL keyword) which must be used to rescale background and source regions :return: """ return self._gathered_keywords["backscal"] @property def response_file(self): """ Returns the response file definied in the header, or None if there is none defined :return: a path to a file, or None """ return self._return_file("respfile") @property def ancillary_file(self): """ Returns the ancillary file definied in the header, or None if there is none defined :return: a path to a file, or None """ return self._return_file("ancrfile") @property def grouping(self): return self._grouping
[docs] def clone( self, new_counts=None, new_count_errors=None, new_exposure=None, new_scale_factor=None, ): """ make a new spectrum with new counts and errors and all other parameters the same :param new_exposure: the new exposure for the clone :param new_scale_factor: the new scale factor for the clone :param new_counts: new counts for the spectrum :param new_count_errors: new errors from the spectrum :return: new pha spectrum """ if new_exposure is None: new_exposure = self.exposure if new_counts is None: new_counts = self.counts new_count_errors = self.count_errors if new_count_errors is None: stat_err = None else: stat_err = old_div(new_count_errors, new_exposure) if self._tstart is None: tstart = 0 else: tstart = self._tstart if self._tstop is None: telapse = new_exposure else: telapse = self._tstop - tstart if new_scale_factor is None: new_scale_factor = self.scale_factor # create a new PHAII instance pha = PHAII( instrument_name=self.instrument, telescope_name=self.mission, tstart=tstart, telapse=telapse, channel=list(range(1, len(self) + 1)), rate=old_div(new_counts, self.exposure), stat_err=stat_err, quality=self.quality.to_ogip(), grouping=self.grouping, exposure=new_exposure, backscale=new_scale_factor, respfile=None, ancrfile=None, is_poisson=self.is_poisson, ) return pha
[docs] @classmethod def from_dispersion_spectrum(cls, dispersion_spectrum, file_type="observed"): # type: (BinnedSpectrumWithDispersion, str) -> PHASpectrum if dispersion_spectrum.is_poisson: rate_errors = None else: rate_errors = dispersion_spectrum.rate_errors if dispersion_spectrum.tstart is None: tstart = 0 else: tstart = dispersion_spectrum.tstart if dispersion_spectrum.tstop is None: telapse = dispersion_spectrum.exposure else: telapse = dispersion_spectrum.tstop - tstart pha = PHAII( instrument_name=dispersion_spectrum.instrument, telescope_name=dispersion_spectrum.mission, tstart=tstart, # TODO: add this in so that we have proper time! telapse=telapse, channel=list(range(1, len(dispersion_spectrum) + 1)), rate=dispersion_spectrum.rates, stat_err=rate_errors, quality=dispersion_spectrum.quality.to_ogip(), grouping=np.ones(len(dispersion_spectrum)), exposure=dispersion_spectrum.exposure, backscale=dispersion_spectrum.scale_factor, respfile=None, ancrfile=None, is_poisson=dispersion_spectrum.is_poisson, ) return cls( pha_file_or_instance=pha, spectrum_number=1, file_type=file_type, rsp_file=dispersion_spectrum.response, )
[docs]class PHASpectrumSet(BinnedSpectrumSet): def __init__( self, pha_file_or_instance, file_type="observed", rsp_file=None, arf_file=None ): """ A spectrum with dispersion build from an OGIP-compliant PHA FITS file. Both Type I & II files can be read. Type II spectra are selected either by specifying the spectrum_number or via the {spectrum_number} file name convention used in XSPEC. If the file_type is background, a 3ML InstrumentResponse or subclass must be passed so that the energy bounds can be obtained. :param pha_file_or_instance: either a PHA file name or threeML.plugins.OGIP.pha.PHAII instance :param spectrum_number: (optional) the spectrum number of the TypeII file to be used :param file_type: observed or background :param rsp_file: RMF filename or threeML.plugins.OGIP.response.InstrumentResponse instance :param arf_file: (optional) and ARF filename """ # extract the spectrum number if needed assert isinstance(pha_file_or_instance, six.string_types) or isinstance( pha_file_or_instance, PHAII ), "Must provide a FITS file name or PHAII instance" with fits.open(pha_file_or_instance) as f: try: HDUidx = f.index_of("SPECTRUM") except: raise RuntimeError( "The input file %s is not in PHA format" % (pha2_file) ) spectrum = f[HDUidx] data = spectrum.data if "COUNTS" in data.columns.names: has_rates = False data_column_name = "COUNTS" elif "RATE" in data.columns.names: has_rates = True data_column_name = "RATE" else: raise RuntimeError( "This file does not contain a RATE nor a COUNTS column. " "This is not a valid PHA file" ) # Determine if this is a PHA I or PHA II if len(data.field(data_column_name).shape) == 2: num_spectra = data.field(data_column_name).shape[0] else: raise RuntimeError("This appears to be a PHA I and not PHA II file") pha_information = _read_pha_or_pha2_file( pha_file_or_instance, None, file_type, rsp_file, arf_file, treat_as_time_series=True, ) # default the grouping to all open bins # this will only be altered if the spectrum is rebinned self._grouping = np.ones_like(pha_information["counts"]) # this saves the extra properties to the class self._gathered_keywords = pha_information["gathered_keywords"] self._file_type = file_type # need to see if we have count errors, tstart, tstop # if not, we create an list of None if pha_information["count_errors"] is None: count_errors = [None] * num_spectra else: count_errors = pha_information["count_errors"] if pha_information["tstart"] is None: tstart = [None] * num_spectra else: tstart = pha_information["tstart"] if pha_information["tstop"] is None: tstop = [None] * num_spectra else: tstop = pha_information["tstop"] # now build the list of binned spectra list_of_binned_spectra = [] with progress_bar(num_spectra, title="Loading PHAII spectra") as p: for i in range(num_spectra): list_of_binned_spectra.append( BinnedSpectrumWithDispersion( counts=pha_information["counts"][i], exposure=pha_information["exposure"][i, 0], response=pha_information["rsp"], count_errors=count_errors[i], sys_errors=pha_information["sys_errors"][i], is_poisson=pha_information["is_poisson"], quality=pha_information["quality"].get_slice(i), mission=pha_information["gathered_keywords"]["mission"], instrument=pha_information["gathered_keywords"]["instrument"], tstart=tstart[i], tstop=tstop[i], ) ) p.increase() # now get the time intervals start_times = data.field("TIME") stop_times = data.field("ENDTIME") time_intervals = TimeIntervalSet.from_starts_and_stops(start_times, stop_times) reference_time = 0 # see if there is a reference time in the file if "TRIGTIME" in spectrum.header: reference_time = spectrum.header["TRIGTIME"] for t_number in range(spectrum.header["TFIELDS"]): if "TZERO%d" % t_number in spectrum.header: reference_time = spectrum.header["TZERO%d" % t_number] super(PHASpectrumSet, self).__init__( list_of_binned_spectra, reference_time=reference_time, time_intervals=time_intervals, ) def _return_file(self, key): if key in self._gathered_keywords: return self._gathered_keywords[key] else: return None
[docs] def set_ogip_grouping(self, grouping): """ If the counts are rebinned, this updates the grouping :param grouping: """ self._grouping = grouping
@property def filename(self): return self._file_name @property def background_file(self): """ Returns the background file definied in the header, or None if there is none defined p :return: a path to a file, or None """ return self._return_file("backfile") @property def scale_factor(self): """ This is a scale factor (in the BACKSCAL keyword) which must be used to rescale background and source regions :return: """ return self._gathered_keywords["backscal"] @property def response_file(self): """ Returns the response file definied in the header, or None if there is none defined :return: a path to a file, or None """ return self._return_file("respfile") @property def ancillary_file(self): """ Returns the ancillary file definied in the header, or None if there is none defined :return: a path to a file, or None """ return self._return_file("ancrfile") @property def grouping(self): return self._grouping
[docs] def clone( self, new_counts=None, new_count_errors=None, ): """ make a new spectrum with new counts and errors and all other parameters the same :param new_counts: new counts for the spectrum :param new_count_errors: new errors from the spectrum :return: new pha spectrum """ if new_counts is None: new_counts = self.counts new_count_errors = self.count_errors if new_count_errors is None: stat_err = None else: stat_err = old_div(new_count_errors, self.exposure) # create a new PHAII instance pha = PHAII( instrument_name=self.instrument, telescope_name=self.mission, tstart=0, telapse=self.exposure, channel=list(range(1, len(self) + 1)), rate=old_div(new_counts, self.exposure), stat_err=stat_err, quality=self.quality.to_ogip(), grouping=self.grouping, exposure=self.exposure, backscale=self.scale_factor, respfile=None, ancrfile=None, is_poisson=self.is_poisson, ) return pha
[docs] @classmethod def from_dispersion_spectrum(cls, dispersion_spectrum, file_type="observed"): # type: (BinnedSpectrumWithDispersion, str) -> PHASpectrum if dispersion_spectrum.is_poisson: rate_errors = None else: rate_errors = dispersion_spectrum.rate_errors pha = PHAII( instrument_name=dispersion_spectrum.instrument, telescope_name=dispersion_spectrum.mission, tstart=dispersion_spectrum.tstart, telapse=dispersion_spectrum.tstop - dispersion_spectrum.tstart, channel=list(range(1, len(dispersion_spectrum) + 1)), rate=dispersion_spectrum.rates, stat_err=rate_errors, quality=dispersion_spectrum.quality.to_ogip(), grouping=np.ones(len(dispersion_spectrum)), exposure=dispersion_spectrum.exposure, backscale=dispersion_spectrum.scale_factor, respfile=None, ancrfile=None, is_poisson=dispersion_spectrum.is_poisson, ) return cls( pha_file_or_instance=pha, spectrum_number=1, file_type=file_type, rsp_file=dispersion_spectrum.response, )