Source code for plio.io.io_spectral_profiler
import os
import pandas as pd
import pvl
import numpy as np
from plio.utils.utils import find_in_dict
[docs]class Spectral_Profiler(object):
"""
Attributes
----------
spectra : panel
A pandas panel containing n individual spectra.
ancillary_data : dataframe
A pandas DataFrame of the parsed ancillary data (PVL label)
label : object
The raw PVL label object
offsets : dict
with key as the spectra index and value as the start byte offset
"""
def __init__(self, input_data, label=None, cleaned=True, qa_threshold=2000):
"""
Read the .spc file, parse the label, and extract the spectra
Parameters
----------
input_data : string
The PATH to the input .spc file
label : string
The PATH to an optional detached label associated with the .spc
cleaned : boolean
If True, mask the data based on the QA array.
nspectra : int
The number of spectra in the given data file
qa_threshold : int
The threshold value over which observations are masked as noise
if cleaned is True.
"""
label_dtype_map = {'IEEE_REAL':'f',
'MSB_INTEGER':'i',
'MSB_UNSIGNED_INTEGER':'u'}
if label:
label = pvl.load(label)
else:
label = pvl.load(input_data)
self.label = label
self.input_data = input_data
with open(input_data, 'rb') as indata:
# Extract and handle the ancillary data
ancillary_data = find_in_dict(label, "ANCILLARY_AND_SUPPLEMENT_DATA")
self.nspectra = nrows = ancillary_data['ROWS']
ncols = ancillary_data['COLUMNS']
rowbytes = ancillary_data['ROW_BYTES']
columns = []
bytelengths = []
datatypes = []
try:
ancillary_data_offset = find_in_dict(self.label, "^ANCILLARY_AND_SUPPLEMENT_DATA").value
except:
ancillary_data_offset = find_in_dict(self.label, "^ANCILLARY_AND_SUPPLEMENT_DATA")[1].value
indata.seek(ancillary_data_offset - 1)
for i in ancillary_data.items():
if i[0] == 'COLUMN':
entry = i[1]
# Level 2B2 PVL has entries with 0 bytes, e.g. omitted.
if entry['BYTES'] > 0:
columns.append(str(entry['NAME']))
datatypes.append(label_dtype_map[entry['DATA_TYPE']])
bytelengths.append(entry['BYTES'])
else:
ncols -= 1
strbytes = map(str, bytelengths)
rowdtype = list(zip(columns, map(''.join, zip(['>'] * ncols, datatypes, strbytes))))
d = np.frombuffer(indata.read(rowbytes * nrows), dtype=rowdtype,
count=nrows)
self.ancillary_data = pd.DataFrame(d, columns=columns,
index=np.arange(nrows))
keys = []
vals = []
for k, v in label.items():
if k in ["ANCILLARY_AND_SUPPLEMENT_DATA", "L2D_RESULT_ARRAY",
"SP_SPECTRUM_QA", "SP_SPECTRUM_REF1", "SP_SPECTRUM_RAD",
"SP_SPECTRUM_REF2", "SP_SPECTRUM_RAW", "SP_SPECTRUM_WAV",
"^ANCILLARY_AND_SUPPLEMENT_DATA", "^SP_SPECTRUM_WAV",
"^SP_SPECTRUM_RAW", "^SP_SPECTRUM_REF2"," ^SP_SPECTRUM_RAD",
"^SP_SPECTRUM_REF1", "^SP_SPECTRUM_QA", "^L2D_RESULT_ARRAY",
"^SP_SPECTRUM_RAD"]:
continue
if isinstance(v, pvl.collections.Quantity):
k = "{}_{}".format(k, v.units)
v = v.value
keys.append(k)
vals.append(v)
vals = [vals] * len(self.ancillary_data)
new_anc = pd.DataFrame(vals, index=self.ancillary_data.index, columns=keys)
self.ancillary_data = self.ancillary_data.join(new_anc, how='inner')
assert(ncols == len(columns))
keys = []
array_offsets = []
for d in ['WAV', 'RAW', 'REF', 'REF1', 'REF2', 'DAR', 'QA', 'RAD']:
search_key = '^SP_SPECTRUM_{}'.format(d)
result = find_in_dict(label, search_key)
if result:
try:
array_offsets.append(result.value)
except:
array_offsets.append(result[1].value) # 2C V3.0
keys.append('SP_SPECTRUM_{}'.format(d))
offsets = dict(zip(keys, array_offsets))
arrays = {}
for k, offset in offsets.items():
indata.seek(offset - 1)
newk = k.split('_')[-1]
d = find_in_dict(label, k)
unit = d['UNIT']
lines = d['LINES']
scaling_factor = d['SCALING_FACTOR']
arr = np.frombuffer(indata.read(lines * 296*2), dtype='>H').astype(np.float64)
arr = arr.reshape(lines, -1)
# If the data is scaled, apply the scaling factor
if isinstance(scaling_factor, float):
arr *= scaling_factor
arrays[newk] = arr
self.wavelengths = pd.Series(arrays['WAV'][0])
self.spectra = {}
for i in range(nrows):
self.spectra[i] = pd.DataFrame(index=self.wavelengths)
for k in keys:
k = k.split('_')[-1]
if k == 'WAV':
continue
self.spectra[i][k] = arrays[k][i]
if cleaned:
mask = self.spectra[i]['QA'] < qa_threshold
self.spectra[i] = self.spectra[i][mask]
# If the spectra have been cleaned, the wavelength ids also need to be cleaned.
if cleaned:
self.wavelengths = self.wavelengths[mask.values].values
dfs = [v for k, v in self.spectra.items()]
self.spectra = pd.concat(dfs, axis=1, keys=range(nrows))
[docs] def open_browse(self, extension='.jpg'):
"""
Attempt to open the browse image corresponding to the spc file
Parameters
----------
extension : str
The file type extension to be added to the base name
of the spc file.
Returns
-------
"""
from plio.io.io_gdal import GeoDataset
path, ext = os.path.splitext(self.input_data)
self.browse = GeoDataset(path + extension)