Skip to content

Commit

Permalink
initial metadata parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
Alrobbertz committed Jul 10, 2023
1 parent 6b201c8 commit 18a1359
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 29 deletions.
122 changes: 93 additions & 29 deletions hermes_core/util/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,11 @@ def save_data(self, data, file_path):
json_filename = f"{data.meta['Logical_file_id']}.json"
output_json_filepath = str(Path(file_path) / json_filename)

if Path(output_json_filepath).exists():
raise FileExistsError(
f"Cannot Save JSON Data to path: {output_json_filepath}. File alrady exists!"
)

# Create a New JSON Object (dict) to Write to a JSON File
output_json_data = {}

Expand Down Expand Up @@ -472,6 +477,13 @@ class CSVDataHandler(TimeDataIOHandler):
This class provides methods to load and save heliophysics data from/to a CSV file.
"""

def __init__(self):
super().__init__()

self.global_metadata_tag = "GLOBAL_METADATA"
self.variable_metadata_tag = "VARIABLE_METADATA"
self.variable_name_tag = "VARIABLE_NAME"

def load_data(self, file_path):
"""
Load heliophysics data from a CSV file.
Expand Down Expand Up @@ -587,34 +599,55 @@ def _convert_header_metadata(self, header, ts):
if not hasattr(ts, "meta"):
ts.meta = OrderedDict()

# Add the the Header Information
ts.meta["Header"] = "".join(header)

# Initialize Variable Metadata Dicts
for column_name in ts.columns:
if not hasattr(ts[column_name], "meta"):
ts[column_name].meta = OrderedDict({"VAR_TYPE": "data"})

# Add Dummy Meta
# TODO Add "Real" Metadata Parsing
random_inst_id = randrange(len(INST_NAMES))
random_inst_shortname = INST_NAMES[random_inst_id]
random_inst_longname = INST_FULLNAMES[random_inst_id]

random_data_level_id = randrange(len(VALID_DATA_LEVELS))
random_data_level_shortname = VALID_DATA_LEVELS[random_data_level_id]
if random_data_level_shortname != "ql":
random_data_level_longname = f"Level {random_data_level_shortname[1]}"
else:
random_data_level_longname = f"Quicklook"

ts.meta.update(
{
"Descriptor": f"{random_inst_shortname}>{random_inst_longname}",
"Data_level": f"{random_data_level_shortname}>{random_data_level_longname}",
"Data_version": "v0.0.1",
}
)
# Split the Global and Variable Metadata Sections
global_metadata_lines = []
_in_global_section = False
variable_metadata_lines = []
_in_variable_section = False
for line in header:
if self.global_metadata_tag in line:
_in_global_section = True
_in_variable_section = False
elif self.variable_metadata_tag in line:
_in_global_section = False
_in_variable_section = True
elif _in_global_section:
global_metadata_lines.append(line)
elif _in_variable_section:
variable_metadata_lines.append(line)
else:
warn_user(f"Cannot Parse Header Line: {line}")

# Parse the Global Metadata and add to the TimeSeries
self._parse_global_metadata_lines(global_metadata_lines, ts)

# Parse the Variable Metadata and add to the TimeSeries
self._parse_variable_metadata_lines(variable_metadata_lines, ts)

def _parse_global_metadata_lines(self, global_metadata_lines, ts):
# Loop through the Lines
for line in global_metadata_lines:
# Strip and Split the line
attr_name, attr_value = line.strip("#\n").split(",")
# Add to the TimeSeries Meta Dict
ts.meta.update({attr_name: attr_value})

def _parse_variable_metadata_lines(self, variable_metadata_lines, ts):
targeted_variable = None
# Loop though the Lines
for line in variable_metadata_lines:
if self.variable_name_tag in line:
# Strip and Split the line
_, targeted_variable = line.strip("#\n").split(",")
elif targeted_variable is not None:
# Strip and Split the line
attr_name, attr_value = line.strip("#\n").split(",")
ts[targeted_variable].meta.update({attr_name: attr_value})

def save_data(self, data, file_path):
"""
Expand Down Expand Up @@ -652,26 +685,57 @@ def save_data(self, data, file_path):
return output_csv_filepath

def _convert_metadata_to_csv(self, data, csv_file):
# Write the Global Metadata Tag
csv_file.write(f"#{self.global_metadata_tag}\n")

# Loop though Global Attributes in target_dict
for attr_name, attr_value in data.meta.items():
# Add the Attribute to the CSV File
attr_type = type(attr_value).__name__
if isinstance(attr_value, datetime):
csv_file.write(f"#{attr_name},{attr_value.isoformat()}\n")
elif attr_value[-1] == "\n":
csv_file.write(f"#{attr_name},{attr_value}")
csv_file.write(f"#{attr_name},{attr_value.isoformat()},{attr_type}\n")
else:
csv_file.write(f"#{attr_name},{attr_value}\n")
csv_file.write(f"#{attr_name},{attr_value},{attr_type}\n")

# Write the Variable Metadata Tag
csv_file.write(f"#{self.variable_metadata_tag}\n")

# Loop through the Variables in the TimeData container
for column_name in data.columns:
# Write a tag for the specific variable
csv_file.write(f"#Variable_Name,{column_name}\n")
# Loop through the Variable Attributes for the Variable
for attr_name, attr_value in data[column_name].meta.items():
# Add the Attribute to the CSV File
attr_type = type(attr_value).__name__
if isinstance(attr_value, datetime):
csv_file.write(
f"#{attr_name},{attr_value.isoformat()},{attr_type}\n"
)
else:
csv_file.write(f"#{attr_name},{attr_value},{attr_type}\n")

def _convert_measurement_data_to_csv(self, data, csv_file):
# Write a Header Row with the Column Names from the TimeSeries
csv_file.write(",".join(data.columns) + "\n")
csv_column_header = []
for column_name in data.columns:
# Ensure the UNITS are incliuded in the CSV Header for each column
unit_suffix = f"__{data[column_name].meta['UNITS']}"
if unit_suffix in column_name:
csv_column_header.append(column_name)
else:
csv_column_header.append(column_name + unit_suffix)
csv_file.write(",".join(csv_column_header) + "\n")

# Loop through the Rows of the TimeSeries Data
for row in data.data.iterrows():
vals = []
# Each of the Rows is represnted as a `tuple[Quantity]` so we want to
# convert this into a list of values
for item in row:
vals.append(str(item.value))
if isinstance(item, Time):
vals.append(f"{item.to_value('isot')}")
else:
vals.append(str(item.value))
# Write the row to the CSV File
csv_file.write(",".join(vals) + "\n")
94 changes: 94 additions & 0 deletions hermes_core/util/tests/test_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Tests for Loading and Saving data from data containers"""

from collections import OrderedDict
from pathlib import Path
import datetime
import pytest
import numpy as np
from numpy.random import random
import tempfile
from astropy.timeseries import TimeSeries
from astropy.time import Time
from astropy.units import Quantity
from spacepy.pycdf import CDFError
from hermes_core.timedata import TimeData


def get_test_timedata():
ts = TimeSeries()
ts.meta.update(
{
"Descriptor": "EEA>Electron Electrostatic Analyzer",
"Data_level": "l1>Level 1",
"Data_version": "v0.0.1",
"Start_time": datetime.datetime.now(),
}
)

# Create an astropy.Time object
time = np.arange(10)
time_col = Time(time, format="unix")
ts["time"] = time_col

# Add Measurement
quant = Quantity(value=random(size=(10)), unit="m", dtype=np.uint16)
ts["measurement"] = quant
ts["measurement"].meta = OrderedDict(
{
"VAR_TYPE": "metadata",
"CATDESC": "Test Metadata",
}
)
timedata = TimeData(data=ts)
return timedata


def test_cdf_io():
# Get Test Datas
td = get_test_timedata()

with tempfile.TemporaryDirectory() as tmpdirname:
# Convert TimeData the to a CDF File
test_file_output_path = td.save(output_path=tmpdirname, file_extension=".cdf")

# Load the CDF to a TimeData Object
td_loaded = TimeData.load(test_file_output_path)

assert td.shape == td_loaded.shape

with pytest.raises(CDFError):
td_loaded.save(output_path=tmpdirname, file_extension=".cdf")


def test_json_io():
# Get Test Datas
td = get_test_timedata()

with tempfile.TemporaryDirectory() as tmpdirname:
# Convert TimeData the to a JSON File
test_file_output_path = td.save(output_path=tmpdirname, file_extension=".json")

# Load the JSON to a TimeData Object
td_loaded = TimeData.load(test_file_output_path)

assert td.shape == td_loaded.shape

with pytest.raises(FileExistsError):
td_loaded.save(output_path=tmpdirname, file_extension=".json")


def test_csv_io():
# Get Test Datas
td = get_test_timedata()

with tempfile.TemporaryDirectory() as tmpdirname:
# Convert TimeData the to a JSON File
test_file_output_path = td.save(output_path=tmpdirname, file_extension=".csv")
# Load the JSON to a TimeData Object
td_loaded = TimeData.load(test_file_output_path)

assert td.shape == td_loaded.shape

print(test_file_output_path)
with pytest.raises(FileExistsError):
td_loaded.save(output_path=tmpdirname, file_extension=".csv")

0 comments on commit 18a1359

Please sign in to comment.