Skip to content

Commit

Permalink
Merge branch 'release/0.17.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
floriankrb committed Sep 11, 2023
2 parents d724baa + f19d806 commit ce4745e
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 89 deletions.
92 changes: 72 additions & 20 deletions climetlab/loaders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@
LOG = logging.getLogger(__name__)


def get_packages_versions():
dic = {}

import climetlab

dic["climetlab"] = climetlab.__version__

import earthkit.meteo

dic["earthkit.meteo"] = earthkit.meteo.__version__

return dic


def _tidy(o):
if isinstance(o, dict):
return {k: _tidy(v) for k, v in o.items()}
Expand Down Expand Up @@ -150,10 +164,13 @@ def __init__(self, *, loader, parts, **kwargs):
i_chunk, n_chunks = int(i_chunk), int(n_chunks)

total = len(self.loader.registry.get_flags())
print(self.loader.registry.get_flags())
print(total)
assert i_chunk > 0, f"Chunk number {i_chunk} must be positive."
assert (
i_chunk <= total
), f"Chunk number {i_chunk} must be less than {total}+1."
if n_chunks <= total:
warnings.warn(
f"Number of chunks {n_chunks} is larger than the total number of chunks: {total}+1."
)

chunk_size = total / n_chunks
parts = [
Expand All @@ -164,6 +181,8 @@ def __init__(self, *, loader, parts, **kwargs):

parts = [int(_) for _ in parts]
print(f"Running parts: {parts}")
if not parts:
warnings.warn(f"Nothing to do for chunk {i_chunk}.")

self.parts = parts

Expand Down Expand Up @@ -284,8 +303,13 @@ class ZarrBuiltRegistry:
z = None

def __init__(self, path):
import zarr

assert isinstance(path, str), path
self.zarr_path = path
self.synchronizer = zarr.ProcessSynchronizer(
os.path.join(self.zarr_path, "registry.sync")
)

def get_slice_for(self, iloop):
lengths = self.get_lengths()
Expand Down Expand Up @@ -314,12 +338,12 @@ def set_flag(self, iloop, value=True):
def _open_read(self):
import zarr

return zarr.open(self.zarr_path, mode="r")
return zarr.open(self.zarr_path, mode="r", synchronizer=self.synchronizer)

def _open_write(self):
import zarr

return zarr.open(self.zarr_path, mode="r+")
return zarr.open(self.zarr_path, mode="r+", synchronizer=self.synchronizer)

def create(self, lengths, overwrite=False):
z = self._open_write()
Expand Down Expand Up @@ -399,41 +423,71 @@ def initialise(self):
"""Create empty zarr from self.main_config and self.path"""
import zarr

def get_one_element_config():
from climetlab.utils.dates import to_datetime # avoid circular imports

def get_first_and_last_element_configs():
first = None
for i, vars in enumerate(self.iter_loops()):
keys = list(vars.keys())
assert len(vars) == 1, keys
key = keys[0]
vars = {key: vars[key][0]}
return self.main_config.substitute(vars)

config = get_one_element_config()

cube, grid_points = self.config_to_data_cube(config, with_gridpoints=True)

shape = list(cube.extended_user_shape)
if first is None:
first = self.main_config.substitute({key: vars[key][0]})
last = self.main_config.substitute({key: vars[key][-1]})
return first, last

first, last = get_first_and_last_element_configs()

first_cube, grid_points = self.config_to_data_cube(first, with_gridpoints=True)
last_cube, grid_points_ = self.config_to_data_cube(last, with_gridpoints=True)
for _ in zip(grid_points, grid_points_):
assert (_[0] == _[1]).all(), (grid_points_, grid_points)

shape = list(first_cube.extended_user_shape)
assert shape == list(last_cube.extended_user_shape), (
shape,
list(last_cube.extended_user_shape),
)
# Notice that shape[0] can be >1
# we are assuming that all data has the same shape
one_element_length = shape[0]
lengths = self._compute_lengths(one_element_length)

shape[0] = sum(lengths)
total_shape = [sum(lengths), *shape[1:]]

chunks = cube.chunking(self.main_config.output.chunking)
chunks = first_cube.chunking(self.main_config.output.chunking)
dtype = self.main_config.output.dtype

self.print(
f"Creating ZARR '{self.path}', with {shape=}, " f"{chunks=} and {dtype=}"
f"Creating ZARR '{self.path}', with {total_shape=}, "
f"{chunks=} and {dtype=}"
)
self.z = zarr.open(self.path, mode="w")
self.z.create_dataset("data", shape=shape, chunks=chunks, dtype=dtype)
self.z.create_dataset("data", shape=total_shape, chunks=chunks, dtype=dtype)

lat = self._add_dataset("latitude", grid_points[0])
lon = self._add_dataset("longitude", grid_points[1])
assert lat.shape == lon.shape

metadata = {}
metadata["create_yaml_config"] = _tidy(self.main_config)
metadata["creation_timestamp"] = datetime.datetime.utcnow().isoformat()

first_date = to_datetime(first_cube.user_coords["valid_datetime"][0])
last_date = to_datetime(last_cube.user_coords["valid_datetime"][-1])
metadata["start_date"] = first_date.isoformat()
metadata["end_date"] = last_date.isoformat()

frequency = (
(last_date - first_date).total_seconds() / 3600 / (total_shape[0] - 1)
)
assert int(frequency) == frequency, frequency
frequency = int(frequency)
metadata["frequency"] = frequency

metadata["resolution"] = first_cube.source[0].resolution

metadata["versions"] = get_packages_versions()
metadata["name_to_index"] = {
name: i
for i, name in enumerate(
Expand All @@ -444,8 +498,6 @@ def get_one_element_config():
self.z.attrs[k] = v

metadatastr = yaml.dump(metadata, sort_keys=False)
# metadatastr = json.dumps(metadata, sort_keys=False)

self.z.attrs["climetlab"] = metadata
self.z.attrs["_climetlab"] = metadatastr
self.z["data"].attrs["climetlab"] = metadata
Expand Down
15 changes: 15 additions & 0 deletions climetlab/readers/grib/codes.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,21 @@ def field_metadata(self):
m["shape"] = self.shape
return m

@property
def resolution(self):
grid_type = self["gridType"]

if grid_type == "reduced_gg":
return self["gridName"]

if grid_type == "regular_ll":
x = self["DxInDegrees"]
y = self["DyInDegrees"]
assert x == y, (x, y)
return x

raise ValueError(f"Unknown gridType={grid_type}")

def datetime(self):
date = self.handle.get("date")
time = self.handle.get("time")
Expand Down
41 changes: 28 additions & 13 deletions climetlab/readers/grib/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,30 @@
ACCUMULATIONS = {("tp", 2): {"productDefinitionTemplateNumber": 8}}


_ORDER = (
"edition",
"setLocalDefinition",
"typeOfGeneratingProcess",
"productDefinitionTemplateNumber",
)

ORDER = {}
for i, k in enumerate(_ORDER):
ORDER[k] = i


def order(key):
ORDER.setdefault(key, len(ORDER))
return ORDER[key]


class Combined:
def __init__(self, handle, metadata):
self.handle = handle
self.metadata = metadata

def __contains__(self, key):
# return key in self.metadata or key in self.handle
raise NotImplementedError()

def __getitem__(self, key):
Expand Down Expand Up @@ -89,13 +107,9 @@ def write(
else:
handle = template.handle.clone()

# print("->", metadata)
self.update_metadata(handle, metadata, compulsary)

other = {}

for k, v in list(metadata.items()):
if not isinstance(v, (int, float, str)):
other[k] = metadata.pop(k)
# print("<-", metadata)

if check_nans:
import numpy as np
Expand All @@ -107,11 +121,13 @@ def write(
metadata["missingValue"] = missing_value
metadata["bitmapPresent"] = 1

LOG.debug("GribOutput.metadata %s, other %s", metadata, other)
metadata = {
k: v for k, v in sorted(metadata.items(), key=lambda x: order(x[0]))
}

handle.set_multiple(metadata)
LOG.debug("GribOutput.metadata %s", metadata)

for k, v in other.items():
for k, v in metadata.items():
handle.set(k, v)

handle.set_values(values)
Expand Down Expand Up @@ -166,10 +182,9 @@ def update_metadata(self, handle, metadata, compulsary):
if "number" in metadata:
compulsary += ("numberOfForecastsInEnsemble",)
productDefinitionTemplateNumber = {"tp": 11}
metadata.setdefault(
"productDefinitionTemplateNumber",
productDefinitionTemplateNumber.get(handle.get("shortName"), 1),
)
metadata[
"productDefinitionTemplateNumber"
] = productDefinitionTemplateNumber.get(handle.get("shortName"), 1)

if metadata.get("type") in ("pf", "cf"):
metadata.setdefault("typeOfGeneratingProcess", 4)
Expand Down
Loading

0 comments on commit ce4745e

Please sign in to comment.