Source code for pyspextools.io.rmf

#!/usr/bin/env python

import pyspextools.messages as message
import numpy as np
import astropy.io.fits as fits
from pyspextools.io.arf import Arf


[docs]class RmfEbounds: """Class to read the EBOUNDS extension from an RMF or RSP file. :ivar FirstChannel: First channel number. :vartype FirstChannel: int :ivar Channel: Channel numbers. :vartype Channel: numpy.ndarray :ivar ChannelLowEnergy: Start energy of channel. :vartype ChannelLowEnergy: numpy.ndarray :ivar ChannelHighEnergy: End energy of channel. :vartype ChannelHighEnergy: numpy.ndarray :ivar NumberChannels: Number of data channels. :vartype NumberChannels: int :ivar EnergyUnits: Unit of the energy scale :vartype EnergyUnits: string """ def __init__(self): self.FirstChannel = 0 # First channel number self.Channel = np.array([], dtype=int) # Channel numbers self.ChannelLowEnergy = np.array([], dtype=float) # Start energy of channel self.ChannelHighEnergy = np.array([], dtype=float) # End energy of channel self.NumberChannels = 0 # Number of data channels self.EnergyUnits = '' # Unit of the energy scale def read(self, rmffile): # Read the Ebounds table (data, header) = fits.getdata(rmffile, 'EBOUNDS', header=True) self.Channel = data['CHANNEL'] self.ChannelLowEnergy = data['E_MIN'] self.ChannelHighEnergy = data['E_MAX'] self.NumberChannels = self.Channel.size self.FirstChannel = self.Channel[0] try: self.EnergyUnits = header['TUNIT2'] except KeyError: message.warning("Could not find energy units in the Energy column. Assuming unit keV") self.EnergyUnits = 'keV'
[docs]class RmfMatrix: """Class to read a MATRIX extension from an OGIP RMF or RSP file. :ivar NumberGroups: Number of response groups. :vartype NumberGroups: numpy.ndarray :ivar FirstGroup: First response group for this energy bin. :vartype FirstGroup: numpy.ndarray :ivar FirstChannelGroup: First channel number in this group. :vartype FirstChannelGroup: numpy.ndarray :ivar NumberChannelsGroup: Number of channels in this group. :vartype NumberChannelsGroup: numpy.ndarray :ivar FirstElement: First response element for this group. :vartype FirstElement: numpy.ndarray :ivar LowEnergy: Start energy of bin. :vartype LowEnergy: numpy.ndarray :ivar HighEnergy: End energy of bin. :vartype HighEnergy: numpy.ndarray :ivar Matrix: Response matrix elements. :vartype Matrix: numpy.ndarray :ivar NumberEnergyBins: Number of energy bins. :vartype NumberEnergyBins: int :ivar NumberTotalGroups: Total number of groups. :vartype NumberTotalGroups: int :ivar NumberTotalElements: Total number of response elements. :vartype NumberTotalElements: int :ivar AreaScaling: Value of EFFAREA keyword. :vartype AreaScaling: float :ivar ResponseThreshold: Minimum value in response. :vartype ResponseThreshold: float :ivar EnergyUnits: Units of the energy scale. :vartype EnergyUnits: str :ivar RMFUnits: Units for RMF values. :vartype RMFUnits: str :ivar AreaIncluded: Is the effective area included in the response? :vartype AreaIncluded: bool :ivar Order: Order of the matrix. :vartype Order: int """ def __init__(self): self.NumberGroups = np.array([], dtype=int) # Number of response groups self.FirstGroup = np.array([], dtype=int) # First response group for this energy bin self.FirstChannelGroup = np.array([], dtype=int) # First channel number in this group self.NumberChannelsGroup = np.array([], dtype=int) # Number of channels in this group self.FirstElement = np.array([], dtype=int) # First response element for this group self.LowEnergy = np.array([], dtype=float) # Start energy of bin self.HighEnergy = np.array([], dtype=float) # End energy of bin self.Matrix = np.array([], dtype=float) # Matrix elements self.NumberEnergyBins = 0 # Number of energy bins self.NumberTotalGroups = 0 # Total number of groups self.NumberTotalElements = 0 # Total number of response elements self.AreaScaling = 1.0 # Value of EFFAREA keyword self.ResponseThreshold = 1E-7 # Minimum value in response self.EnergyUnits = '' # Units of the energy scale self.RMFUnits = '' # Units for RMF values self.AreaIncluded = False # Is the effective area included in the response? self.Order = 0 # Order of the matrix def read(self, rmfhdu): # Read the Matrix table data = rmfhdu.data header = rmfhdu.header if rmfhdu.name == 'MATRIX': pass elif rmfhdu.name == 'SPECRESP MATRIX': message.warning("This is an RSP file with the effective area included.") print("Do not read an ARF file, unless you know what you are doing.") self.AreaIncluded = True else: message.error("MATRIX extension not successfully found in RMF file.") return self.LowEnergy = data['ENERG_LO'] self.HighEnergy = data['ENERG_HI'] self.NumberEnergyBins = self.LowEnergy.size try: self.EnergyUnits = header['TUNIT1'] except KeyError: message.warning("Could not find units in the file for the Energy grid. Assuming keV.") self.EnergyUnits = 'keV' self.NumberGroups = data['N_GRP'] self.NumberTotalGroups = np.sum(self.NumberGroups) self.FirstGroup = np.zeros(self.NumberEnergyBins, dtype=int) self.FirstChannelGroup = np.zeros(self.NumberTotalGroups, dtype=int) self.NumberChannelsGroup = np.zeros(self.NumberTotalGroups, dtype=int) self.FirstElement = np.zeros(self.NumberTotalGroups, dtype=int) self.Matrix = np.array([], dtype=float) try: self.Order = header['ORDER'] except KeyError: pass fgroup = 0 # Count total number of groups felem = 0 # Count total number of response elements nelem = np.zeros(self.NumberEnergyBins, dtype=int) # Count number of response elements per energy bin k = 0 fchan_local = data['F_CHAN'] nchan_local = data['N_CHAN'] matrix_local = data['MATRIX'] for i in np.arange(self.NumberEnergyBins): self.FirstGroup[i] = fgroup fgroup = fgroup + self.NumberGroups[i] if self.NumberGroups[i] != 0: for j in np.arange(self.NumberGroups[i]): try: self.FirstChannelGroup[k] = fchan_local[i][j] self.NumberChannelsGroup[k] = nchan_local[i][j] except IndexError: self.FirstChannelGroup[k] = fchan_local[i] self.NumberChannelsGroup[k] = nchan_local[i] self.FirstElement[k] = felem felem = felem + self.NumberChannelsGroup[k] nelem[i] = nelem[i] + self.NumberChannelsGroup[k] k = k + 1 self.Matrix = np.zeros(felem, dtype=float) r = 0 for i in np.arange(self.NumberEnergyBins): if nelem[i] != 0: for j in np.arange(nelem[i]): self.Matrix[r] = matrix_local[i][j] r = r + 1 self.NumberTotalElements = self.Matrix.size self.ResponseThreshold = np.amin(self.Matrix)
[docs]class Rmf: """Class to read OGIP RMF files. The response is given in two parts: an EBOUNDS extension, containing the energy boundries of the instrument channels, and one or more MATRIX extensions, which contain components of the response matrix. :ivar ebounds: Represents the EBOUNDS extension in the RMF file, which contains the channel energy scale. :vartype ebounds: pyspextools.io.rmf.RmfEbounds :ivar matrix: List containing the matrix extensions (type pyspextools.io.rmf.RmfMatrix) :vartype matrix: list :ivar NumberMatrixExt: The number of matrix extensions. :vartype NumberMatrixExt: int :ivar MatrixExt: Array containing the FITS extension numbers that contain a response matrix. :vartype MatrixExt: numpy.ndarray """ def __init__(self): self.ebounds = RmfEbounds() self.matrix = [] self.NumberMatrixExt = 0 self.MatrixExt = np.array([], dtype=int)
[docs] def read(self, rmffile): """Method to read OGIP RMF files. The variable naming is made consistent with the HEASOFT HEASP module by Keith Arnaud. :param rmffile: RMF file name to read. :type rmffile: str """ # Read the Ebounds table self.ebounds.read(rmffile) # Empty lists for safety self.NumberMatrixExt = 0 self.MatrixExt = np.array([], dtype=int) self.matrix = [] # Read the number of MATRIX extensions rmf = fits.open(rmffile) for i in range(len(rmf)): if rmf[i].name == 'MATRIX' or rmf[i].name == 'SPECRESP MATRIX': self.NumberMatrixExt += 1 self.MatrixExt = np.append(self.MatrixExt, i) # Read the individual matrix extensions for i in self.MatrixExt: mat = RmfMatrix() mat.read(rmf[i]) self.matrix.append(mat) rmf.close() return 0
[docs] def write(self, rmffile, telescop=None, instrume=None, filterkey=None, overwrite=False): """Method to write an OGIP format RMF file. :param rmffile: RMF file name to write. :type rmffile: str :param telescop: Name of the telescope to be put in the TELESCOP keyword. :type telescop: str :param instrume: Name of the instrument to be put in the INSTRUME keyword. :type instrume: str :param filterkey: Name of the filter to be put in the FILTER keyword. :type filterkey: str :param overwrite: Overwrite existing file names? (True/False) :type overwrite: bool """ # # Generate warning if there are multiple groups per energy # if np.amax(self.matrix[0].NumberGroups) != 1: message.warning("This method has not been tested for responses with multiple response groups per energy.") # # Create Primary HDU # primary = fits.PrimaryHDU() # # Create the EBOUNDS extension # ecol1 = fits.Column(name='CHANNEL', format='J', array=self.ebounds.Channel) ecol2 = fits.Column(name='E_MIN', format='D', unit=self.ebounds.EnergyUnits, array=self.ebounds.ChannelLowEnergy) ecol3 = fits.Column(name='E_MAX', format='D', unit=self.ebounds.EnergyUnits, array=self.ebounds.ChannelHighEnergy) ebnds = fits.BinTableHDU.from_columns([ecol1, ecol2, ecol3]) ehdr = ebnds.header ehdr.set('EXTNAME', 'EBOUNDS') ehdr.set('DETCHANS', self.ebounds.NumberChannels) # Set the TELESCOP keyword (optional) if telescop is None: ehdr.set('TELESCOP', 'None', 'Telescope name') else: ehdr.set('TELESCOP', telescop, 'Telescope name') # Set the INSTRUME keyword (optional) if instrume is None: ehdr.set('INSTRUME', 'None', 'Instrument name') else: ehdr.set('INSTRUME', instrume, 'Instrument name') # Set the FILTER keyword (optional) if filterkey is None: ehdr.set('FILTER', 'None', 'Filter setting') else: ehdr.set('FILTER', filterkey, 'Filter setting') ehdr.set('DETNAM ', 'None') ehdr.set('CHANTYPE', 'PI') ehdr.set('HDUCLASS', 'OGIP') ehdr.set('HDUCLAS1', 'RESPONSE') ehdr.set('HDUCLAS2', 'EBOUNDS ') ehdr.set('HDUVERS1', '1.2.0') ehdr.set('ORIGIN ', 'SRON') hdu = fits.HDUList([primary, ebnds]) # # Create SPECRESP MATRIX extension # for e in range(self.NumberMatrixExt): print("Writing matrix for matrix number: {0}".format(e)) mcol1 = fits.Column(name='ENERG_LO', format='D', unit=self.matrix[e].EnergyUnits, array=self.matrix[e].LowEnergy) mcol2 = fits.Column(name='ENERG_HI', format='D', unit=self.matrix[e].EnergyUnits, array=self.matrix[e].HighEnergy) mcol3 = fits.Column(name='N_GRP', format='J', array=self.matrix[e].NumberGroups) mcol4 = fits.Column(name='F_CHAN', format='J', array=self.matrix[e].FirstChannelGroup) mcol5 = fits.Column(name='N_CHAN', format='J', array=self.matrix[e].NumberChannelsGroup) # Determine the width of the matrix width = np.amax(self.matrix[e].NumberChannelsGroup) formatstr = str(width)+'D' # # THIS PART COULD BE UPDATED TO OPTIMIZE THE SIZE USING VARIABLE SIZE ARRAYS IN FITS. # # Building the MATRIX column newmatrix = np.zeros(self.matrix[e].NumberEnergyBins * width, dtype=float).reshape(self.matrix[e].NumberEnergyBins, width) re = 0 for i in np.arange(self.matrix[e].NumberEnergyBins): for j in np.arange(self.matrix[e].NumberGroups[i]): for k in np.arange(self.matrix[e].NumberChannelsGroup[i]): newmatrix[i, k] = self.matrix[e].Matrix[re] re = re + 1 mcol6 = fits.Column(name='MATRIX', format=formatstr, array=newmatrix) matrix = fits.BinTableHDU.from_columns([mcol1, mcol2, mcol3, mcol4, mcol5, mcol6]) mhdr = matrix.header if self.matrix[e].AreaIncluded: mhdr.set('EXTNAME', 'SPECRESP MATRIX') else: mhdr.set('EXTNAME', 'MATRIX') # Set the TELESCOP keyword (optional) if telescop is None: mhdr.set('TELESCOP', 'None', 'Telescope name') else: mhdr.set('TELESCOP', telescop, 'Telescope name') # Set the INSTRUME keyword (optional) if instrume is None: mhdr.set('INSTRUME', 'None', 'Instrument name') else: mhdr.set('INSTRUME', instrume, 'Instrument name') # Set the FILTER keyword (optional) if filterkey is None: mhdr.set('FILTER', 'None', 'Filter setting') else: mhdr.set('FILTER', filterkey, 'Filter setting') mhdr.set('DETCHANS', self.ebounds.NumberChannels) mhdr.set('LO_THRES', self.matrix[e].ResponseThreshold) mhdr.set('CHANTYPE', 'PI') mhdr.set('HDUCLASS', 'OGIP') mhdr.set('HDUCLAS1', 'RESPONSE') mhdr.set('HDUCLAS2', 'RSP_MATRIX') if self.matrix[e].AreaIncluded: mhdr.set('HDUCLAS3', 'FULL') else: mhdr.set('HDUCLAS3', 'REDIST') mhdr.set('HDUVERS1', '1.3.0') mhdr.set('ORIGIN ', 'SRON') matrix.header['HISTORY'] = 'Created by pyspextools:' matrix.header['HISTORY'] = 'https://github.com/spex-xray/pyspextools' hdu.append(matrix) try: hdu.writeto(rmffile, overwrite=overwrite) except IOError: message.error("File {0} already exists. I will not overwrite it!".format(rmffile)) return 1 return 0
[docs] def check(self): """Check the RMF for internal consistency.""" if self.ebounds.NumberChannels <= 0: message.error("Number of Channels in response is zero.") return 1 for e in range(self.NumberMatrixExt): if self.matrix[e].NumberEnergyBins <= 0: message.error("Number of Energy bins in response is zero.") return 1 c = 0 r = 0 # Check if matrix array is consistent with the indexing for i in np.arange(self.matrix[e].NumberEnergyBins): for j in np.arange(self.matrix[e].NumberGroups[i]): for k in np.arange(self.matrix[e].NumberChannelsGroup[c]): r = r + 1 c = c + 1 if r != self.matrix[e].Matrix.size: message.error("Matrix size does not correspond to index arrays. Response inconsistent.") return 1 # Check for energy bins with zero width wzero = np.where(self.matrix[e].LowEnergy == self.matrix[e].HighEnergy)[0] if wzero.size != 0: for i in np.arange(wzero.size): message.warning("Energy bin with zero width found in row {0}!".format(wzero[i]+1)) return 2 return 0
[docs] def disp(self): """Display a summary of the RMF object.""" print("RMF Response matrix:") print("") print("Channel energy bounds:") print("FirstChannel: {0:>20} First channel number".format(self.ebounds.FirstChannel)) print("NumberChannels: {0:>20} Number of data channels".format(self.ebounds.NumberChannels)) print("Channel {0:>20} Channel numbers".format(self.ebounds.Channel.size)) print("ChannelLowEnergy {0:>20} Start energy of channel".format(self.ebounds.ChannelLowEnergy.size)) print("ChannelHighEnergy {0:>20} End energy of channel".format(self.ebounds.ChannelHighEnergy.size)) print("") print("NumberMatrixExt {0:>10} Number of MATRIX extensions".format(self.NumberMatrixExt)) for i in range(self.NumberMatrixExt): print("") print("Information for MATRIX number {0}:".format(i+1)) print("NumberEnergyBins: {0:>20} Number of energy bins".format(self.matrix[i].NumberEnergyBins)) print("NumberTotalGroups: {0:>20} Total number of groups".format(self.matrix[i].NumberTotalGroups)) print("NumberTotalElements: {0:>20} Total number of response elements".format(self.matrix[i].NumberTotalElements)) print("AreaScaling {0:>20} Value of EFFAREA keyword".format(self.matrix[i].AreaScaling)) print("ResponseThreshold {0:>20g} Minimum value in response".format(self.matrix[i].ResponseThreshold)) print("EnergyUnits {0:>20} Units of the energy scale".format(self.matrix[i].EnergyUnits)) print("RMFUnits {0:>20} Units for RMF values".format(self.matrix[i].RMFUnits)) print("") print("NumberGroups {0:>20} Number of response groups".format(self.matrix[i].NumberGroups.size)) print("FirstGroup {0:>20} First response group for this energy bin".format(self.matrix[i].FirstGroup.size)) print("FirstChannelGroup {0:>20} First channel number in this group".format(self.matrix[i].FirstChannelGroup.size)) print("NumberChannelsGroup {0:>20} Number of channels in this group".format(self.matrix[i].NumberChannelsGroup.size)) print("FirstElement {0:>20} First response element for this group".format(self.matrix[i].FirstElement.size)) print("LowEnergy {0:>20} Start energy of bin".format(self.matrix[i].LowEnergy.size)) print("HighEnergy {0:>20} End energy of bin".format(self.matrix[i].HighEnergy.size)) print("Matrix {0:>20} Matrix elements".format(self.matrix[i].Matrix.size)) print("") return
[docs] def check_compatibility(self, arf): """Check whether the input arf object is really an ARF object with data and consistent with this RMF file. :param arf: Input arf object to check. :type arf: pyspextools.io.Arf """ # Check if arf is really an Arf object if not isinstance(arf, Arf): message.error("Input arf is not an Arf class instance.") return 1 # Check if the size of the energy arrays is the same. if arf.LowEnergy.size != self.matrix[0].LowEnergy.size: message.error("Size of ARF and RMF are not the same.") return 1 # Check if the numbers of the high energy column are the same # Here we check the high values, because sometimes the first low value is different... if arf.HighEnergy[0] != self.matrix[0].HighEnergy[0]: message.error("First high-energy boundaries of arrays are not the same.") return 1 # Check if the last values of the array are similar. size = arf.HighEnergy.size - 1 if arf.HighEnergy[size] != self.matrix[0].HighEnergy[size]: message.error("Last high-energy boundaries of arrays are not the same.") return 1 return 0
[docs] def fix_energy_grid(self): """In some exceptional cases, like with ROSAT response matrices, the matrix contains energy bins with zero width, which is not physical. This method changes them to a small finite width to allow them to be processed with ogip2spex and trafo.""" for e in range(self.NumberMatrixExt): wzero = np.where(self.matrix[e].LowEnergy == self.matrix[e].HighEnergy)[0] if wzero.size != 0: for i in np.arange(wzero.size): self.matrix[e].HighEnergy[wzero[i]] = self.matrix[e].HighEnergy[wzero[i]] + 1.0E-6 if i < self.matrix[e].NumberEnergyBins: self.matrix[e].LowEnergy[wzero[i]+1] = self.matrix[e].LowEnergy[wzero[i]+1] + 1.0E-6