Skip to content

Commit

Permalink
Merge pull request #1445 from metno/pyaro-postprocessing
Browse files Browse the repository at this point in the history
Add post-processing of observations from pyaro
  • Loading branch information
magnusuMET authored Dec 18, 2024
2 parents 5475122 + 2c36ec4 commit 82a50eb
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 28 deletions.
13 changes: 6 additions & 7 deletions pyaerocom/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ def add_ungridded_post_dataset(
obs_vars,
obs_aux_requires,
obs_merge_how,
obs_aux_funs=None,
obs_aux_funs: dict[str, str] | None = None,
obs_aux_units=None,
**kwargs,
):
Expand Down Expand Up @@ -645,15 +645,14 @@ def add_ungridded_post_dataset(
combine). For valid input args see
:mod:`pyaerocom.combine_vardata_ungridded`. If value is string,
then the same method is used for all variables.
obs_aux_funs : dict, optional
obs_aux_funs : dict[str, str], optional
dictionary specifying computation methods for auxiliary variables
that are supposed to be retrieved via `obs_merge_how='eval'`.
Keys are variable names, values are respective computation methods
(which need to be strings as they will be evaluated via
:func:`pandas.DataFrame.eval` in
:mod:`pyaerocom.combine_vardata_ungridded`). This input is
optional, but mandatory if any of the `obs_vars` is
supposed to be retrieved via `merge_how='eval'`.
passed to :mod:`pyaerocom.combine_vardata_ungridded`.
The function only supports addition/multiplication/.. on two variables.
This input is optional, but mandatory if any of the `obs_vars`
is supposed to be retrieved via `merge_how='eval'`.
obs_aux_units : dict, optional
output units of auxiliary variables (only needed for varibales
that are derived via `merge_how='eval'`)
Expand Down
185 changes: 185 additions & 0 deletions pyaerocom/io/pyaro/postprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import dataclasses

from pyaro.timeseries import (
Reader,
Data,
)

from pyaerocom.units_helpers import get_unit_conversion_fac


@dataclasses.dataclass
class VariableScaling:
REQ_VAR: str
IN_UNIT: str
OUT_UNIT: str
SCALING_FACTOR: float
OUT_VARNAME: str
NOTE: str | None = None

def required_input_variables(self) -> list[str]:
return [self.REQ_VAR]

def out_varname(self) -> str:
return self.OUT_VARNAME


M_N = 14.006
M_O = 15.999
M_S = 32.065

TRANSFORMATIONS = {
"concNno_from_concno": VariableScaling(
REQ_VAR="concno",
IN_UNIT="ug m-3",
OUT_UNIT="ug N m-3",
SCALING_FACTOR=M_N / (M_N + M_O),
OUT_VARNAME="concNno",
),
"concNno2_from_concno2": VariableScaling(
REQ_VAR="concno2",
IN_UNIT="ug m-3",
OUT_UNIT="ug N m-3",
SCALING_FACTOR=M_N / (M_N + 2 * M_O),
OUT_VARNAME="concNno2",
),
"concSso2_from_concso2": VariableScaling(
REQ_VAR="concso2",
IN_UNIT="ug m-3",
OUT_UNIT="ug S m-3",
SCALING_FACTOR=M_S / (M_S + 2 * M_O),
OUT_VARNAME="concSso2",
),
"vmro3_from_conco3": VariableScaling(
REQ_VAR="conco3",
IN_UNIT="µg m-3",
OUT_UNIT="ppb",
SCALING_FACTOR=0.5011, # 20C and 1013 hPa
OUT_VARNAME="vmro3",
NOTE="The vmro3_from_conco3 transform is only valid at T=20C, p=1013hPa",
),
"vmro3max_from_conco3": VariableScaling( # Requires `resample_how`
REQ_VAR="conco3",
IN_UNIT="µg m-3",
OUT_UNIT="ppb",
SCALING_FACTOR=0.5011, # 20C and 1013 hPa
OUT_VARNAME="vmro3max",
NOTE="The vmro3max_from_conco3 transform is only valid at T=20C, p=1013hPa, and the transform requires the use of resample_how to obtain the daily maximum",
),
}


class PostProcessingReaderData(Data):
def __init__(self, data: Data, variable: str, units: str, scaling: float | None):
self._variable = variable
self._units = units
self.data = data
self.scaling = scaling

def keys(self):
return self.data.keys()

def slice(self, index):
return PostProcessingReaderData(
self.data.slice(index),
variable=self._variable,
units=self._units,
scaling=self.scaling,
)

def __len__(self):
return self.data.__len__()

@property
def values(self):
if self.scaling is None:
return self.data.values
else:
return self.data.values * self.scaling

@property
def stations(self):
return self.data.stations

@property
def latitudes(self):
return self.data.latitudes

@property
def longitudes(self):
return self.data.longitudes

@property
def altitudes(self):
return self.data.altitudes

@property
def start_times(self):
return self.data.start_times

@property
def end_times(self):
return self.data.end_times

@property
def flags(self):
return self.data.flags

@property
def standard_deviations(self):
return self.data.standard_deviations


class PostProcessingReader(Reader):
def __init__(
self,
reader: Reader,
compute_vars: list[str] | None = None,
):
self.reader = reader

self.compute_vars = dict()
if compute_vars is not None:
known_variables = reader.variables()
for compute_var in compute_vars:
transform = TRANSFORMATIONS.get(compute_var)
if transform is None:
raise Exception(f"Unknown transformation ({compute_var}) encountered")
required_input = transform.required_input_variables()
missing = set(required_input) - set(known_variables)
if len(missing) > 0:
raise Exception(
f"The transformation {compute_var} requires variables which are not present, missing {missing}"
)
self.compute_vars[transform.out_varname()] = transform

def data(self, varname: str) -> Data:
if varname not in self.compute_vars:
data = self.reader.data(varname)
return data
else:
transform = self.compute_vars[varname]
if isinstance(transform, VariableScaling):
data = self.reader.data(transform.REQ_VAR)
scaling = transform.SCALING_FACTOR * get_unit_conversion_fac(
from_unit=data.units, to_unit=transform.IN_UNIT, var_name=transform.REQ_VAR
)
return PostProcessingReaderData(
data, variable=varname, units=transform.OUT_UNIT, scaling=scaling
)
else:
raise Exception(
f"Unknown transform {transform} encountered for variable {varname}"
)

def variables(self) -> list[str]:
variables = list()
variables.extend(self.reader.variables())
variables.extend(self.compute_vars.keys())
return variables

def stations(self):
return self.reader.stations()

def close(self) -> None:
self.reader.close()
1 change: 1 addition & 0 deletions pyaerocom/io/pyaro/pyaro_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class PyaroConfig(BaseModel):
filename_or_obj_or_url: str | list[str] | Path | list[Path]
filters: dict[str, FilterArgs]
name_map: dict[str, str] | None = None # no Unit conversion option
post_processing: list[str] | None = None # List of variables to add through post-processing

##########################
# Save and load methods
Expand Down
31 changes: 16 additions & 15 deletions pyaerocom/io/pyaro/read_pyaro.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from pyaro.timeseries.Wrappers import VariableNameChangingReader

from pyaerocom.io.pyaro.pyaro_config import PyaroConfig
from pyaerocom.io.pyaro.postprocess import PostProcessingReader
from pyaerocom.io.readungriddedbase import ReadUngriddedBase
from pyaerocom.tstype import TsType
from pyaerocom.ungriddeddata import UngriddedData
Expand Down Expand Up @@ -111,23 +112,23 @@ def _open_reader(self) -> Reader:
else:
kwargs = {}

if self.config.name_map is None:
return open_timeseries(
reader_id,
self.config.filename_or_obj_or_url,
filters=self.config.filters,
**kwargs,
)
else:
return VariableNameChangingReader(
open_timeseries(
reader_id,
self.config.filename_or_obj_or_url,
filters=self.config.filters,
**kwargs,
),
reader = open_timeseries(
reader_id,
self.config.filename_or_obj_or_url,
filters=self.config.filters,
**kwargs,
)
if self.config.name_map is not None:
reader = VariableNameChangingReader(
reader,
self.config.name_map,
)
if self.config.post_processing is not None:
reader = PostProcessingReader(
reader,
self.config.post_processing,
)
return reader

def _convert_to_ungriddeddata(self, pyaro_data: dict[str, Data]) -> UngriddedData:
total_size = sum(len(var) for var in pyaro_data.values())
Expand Down
2 changes: 1 addition & 1 deletion pyaerocom/io/readungridded.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ def _check_var_alias(self, var, supported):
return svar
raise ValueError()

def get_vars_supported(self, obs_id, vars_desired): # , config: Optional[PyaroConfig] = None):
def get_vars_supported(self, obs_id: str, vars_desired: list[str]):
"""
Filter input list of variables by supported ones for a certain data ID
Expand Down
7 changes: 4 additions & 3 deletions tests/fixtures/pyaro.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def make_csv_test_file(tmp_path: Path) -> Path:
stations = ["NO0002", "GB0881"]
countries = ["NO", "GB"]
coords = [(58, 8), (60, -1)]
species = ["NOx", "SOx", "AOD"]
species = ["NOx", "SOx", "AOD", "NO"]
area_type = ["Rural", "Urban"]

with open(file, "w") as f:
Expand All @@ -34,14 +34,15 @@ def make_csv_test_file(tmp_path: Path) -> Path:
delta_t = ["1h", "3D", "2D", "2h"][
j % 4
] # Rotates over the freqs in a deterministic fashion
unit = "Gg" if s != "NO" else "ng m-3"
f.write(
f"{s}, {station}, {coords[i][1]}, {coords[i][0]}, {np.random.normal(10, 5)}, Gg, {date}, {date+pd.Timedelta(delta_t)},{countries[i]},{area_type[i]} \n"
f"{s}, {station}, {coords[i][1]}, {coords[i][0]}, {np.random.normal(10, 5)}, {unit}, {date}, {date+pd.Timedelta(delta_t)},{countries[i]},{area_type[i]} \n"
)

return file


def testconfig(tmp_path: Path) -> PyaroConfig:
def testconfig(tmp_path: Path) -> tuple[PyaroConfig, PyaroConfig]:
reader_id = "csv_timeseries"

config1 = PyaroConfig(
Expand Down
31 changes: 29 additions & 2 deletions tests/io/pyaro/test_read_pyaro.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

import pandas as pd
import numpy as np

from pyaerocom import UngriddedData
from pyaerocom.io import ReadPyaro
from pyaerocom.io import ReadPyaro, PyaroConfig


def test_testfile(pyaro_test_data_file):
Expand All @@ -18,7 +19,7 @@ def test_readpyaro(pyaro_testdata):

def test_variables(pyaro_testdata):
rp = pyaro_testdata
variables = ["NOx", "concso4", "od550aer"]
variables = ["NOx", "concso4", "od550aer", "NO"]

assert rp.PROVIDES_VARIABLES == variables
assert rp.DEFAULT_VARS == variables
Expand Down Expand Up @@ -87,3 +88,29 @@ def test_pyarotoungriddeddata_variables(pyaro_testdata):
obj = pyaro_testdata.converter

assert obj.get_variables() == pyaro_testdata.PROVIDES_VARIABLES


def test_postprocessing(pyaro_test_data_file):
config = PyaroConfig(
reader_id="csv_timeseries",
filename_or_obj_or_url=str(pyaro_test_data_file),
name="test",
name_map={
"NO": "concno",
},
post_processing=[
"concNno_from_concno",
],
filters=dict(),
)
reader = ReadPyaro(config)
assert set(reader.PROVIDES_VARIABLES).issuperset(["concno", "concNno"])
data = reader.read(["concno", "concNno"])

concno = data.all_datapoints_var("concno")
concNno = data.all_datapoints_var("concNno")

# Proportion of N in NO, ng -> ug conversion
conversion_factor = 14.006 / (14.006 + 15.999) * 1e-3

assert np.allclose(concno * conversion_factor, concNno)

0 comments on commit 82a50eb

Please sign in to comment.