Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/impact write from hdf5 #606

Merged
merged 13 commits into from
Feb 2, 2023
238 changes: 237 additions & 1 deletion climada/engine/impact.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
import warnings
import datetime as dt
from itertools import zip_longest
from typing import Union, Any
from collections.abc import Collection
from pathlib import Path

import contextily as ctx
import numpy as np
from scipy import sparse
Expand All @@ -36,7 +40,7 @@
import pandas as pd
import xlsxwriter
from tqdm import tqdm

import h5py

from climada.entity import Exposures, Tag
from climada.hazard import Tag as TagHaz
Expand Down Expand Up @@ -862,6 +866,120 @@ def write_col(i_col, imp_ws, xls_data):

imp_wb.close()

def write_hdf5(self, file_path: Union[str, Path], dense_imp_mat: bool=False):
"""Write the data stored in this object into an H5 file.

Try to write all attributes of this class into H5 datasets or attributes.
By default, any iterable will be stored in a dataset and any string or scalar
will be stored in an attribute. Dictionaries will be stored as groups, with
the previous rules being applied recursively to their values.

The impact matrix can be stored in a sparse or dense format.

Notes
-----
This writer does not support attributes with variable types. Please make sure
that ``event_name`` is a list of equally-typed values, e.g., all ``str``.

Parameters
----------
file_path : str or Path
File path to write data into. The enclosing directory must exist.
dense_imp_mat : bool
If ``True``, write the impact matrix as dense matrix that can be more easily
interpreted by common H5 file readers but takes up (vastly) more space.
Defaults to ``False``.
"""
# Define writers for all types (will be filled later)
type_writers = dict()

def write(group: h5py.Group, name: str, value: Any, default_writer):
"""Write the given name-value pair with a type-specific writer.

This selects a writer by calling ``isinstance(value, key)``, where ``key``
iterates through the keys of ``type_writers``. If a type matches multiple
entries in ``type_writers``, the *first* match is chosen. If none matches,
the ``default_writer`` is used.

Parameters
----------
group : h5py.Group
The group in the H5 file to write into
name : str
The identifier of the value
value : scalar or array
The value/data to write
default_writer
Fallback writer if no writer in ``type_writers`` matches
peanutfun marked this conversation as resolved.
Show resolved Hide resolved
"""
for key, writer in type_writers.items():
if isinstance(value, key):
writer(group, name, value)
return

default_writer(group, name, value)

def _str_type_helper(values: Collection):
"""Return string datatype if we assume 'values' contains strings"""
if isinstance(next(iter(values)), str):
return h5py.string_dtype()
return None

def write_attribute(group, name, value):
"""Write any attribute. This should work for almost any data"""
group.attrs[name] = value

def write_dataset(group, name, value):
"""Write a dataset"""
group.create_dataset(name, data=value, dtype=_str_type_helper(value))

def write_dict(group, name, value):
"""Write a dictionary with unknown level recursively into a group"""
group = group.create_group(name)
for key, val in value.items():
write(group, key, val, write_attribute)

def write_tag(group, name, value):
"""Write a tag object using the dict writer"""
write_dict(group, name, value.__dict__)

def _write_csr_dense(group, name, value):
"""Write a CSR Matrix in dense format"""
group.create_dataset(name, data=value.toarray())

def _write_csr_sparse(group, name, value):
"""Write a CSR Matrix in sparse format"""
group = group.create_group(name)
group.create_dataset("data", data=value.data)
group.create_dataset("indices", data=value.indices)
group.create_dataset("indptr", data=value.indptr)
group.attrs["shape"] = value.shape

def write_csr(group, name, value):
"""Write a CSR matrix depending on user input"""
if dense_imp_mat:
_write_csr_dense(group, name, value)
else:
_write_csr_sparse(group, name, value)

# Set up writers based on types
# NOTE: Many things are 'Collection', so make sure that precendence fits!
type_writers = {
str: write_attribute,
Tag: write_tag,
TagHaz: write_tag,
dict: write_dict,
sparse.csr_matrix: write_csr,
Collection: write_dataset,
peanutfun marked this conversation as resolved.
Show resolved Hide resolved
}

# Open file in write mode
with h5py.File(file_path, "w") as file:

# Now write all attributes
for name, value in self.__dict__.items():
write(file, name, value, write_attribute)

def write_sparse_csr(self, file_name):
"""Write imp_mat matrix in numpy's npz format."""
LOGGER.info('Writing %s', file_name)
Expand Down Expand Up @@ -994,6 +1112,124 @@ def read_excel(self, *args, **kwargs):
"Use Impact.from_excel instead.")
self.__dict__ = Impact.from_excel(*args, **kwargs).__dict__

@classmethod
def from_hdf5(cls, file_path: Union[str, Path]):
"""Create an impact object from an H5 file.

This assumes a specific layout of the file. If values are not found in the
expected places, they will be set to the default values for an ``Impact`` object.

The following H5 file structure is assumed (H5 groups are terminated with ``/``,
attributes are denoted by ``.attrs/``)::

file.h5
├─ at_event
├─ coord_exp
├─ eai_exp
├─ event_id
├─ event_name
├─ frequency
├─ imp_mat
├─ tag/
│ ├─ exp/
│ │ ├─ .attrs/
│ │ │ ├─ file_name
│ │ │ ├─ description
│ ├─ haz/
│ │ ├─ .attrs/
│ │ │ ├─ haz_type
│ │ │ ├─ file_name
│ │ │ ├─ description
│ ├─ impf_set/
│ │ ├─ .attrs/
│ │ │ ├─ file_name
│ │ │ ├─ description
├─ .attrs/
│ ├─ aai_agg
│ ├─ crs
│ ├─ frequency_unit
│ ├─ tot_value
│ ├─ unit

As per the :py:func:`climada.engine.impact.Impact.__init__`, any of these entries
is optional. If it is not found, the default value will be used when constructing
the Impact.

The impact matrix ``imp_mat`` can either be an H5 dataset, in which case it is
interpreted as dense representation of the matrix, or an H5 group, in which case
the group is expected to contain the following data for instantiating a
`scipy.sparse.csr_matrix <https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.csr_matrix.html>`_::

imp_mat/
├─ data
├─ indices
├─ indptr
├─ .attrs/
│ ├─ shape

Parameters
----------
file_path : str or Path
The file path of the file to read.

Returns
-------
imp : Impact
Impact with data from the given file
"""
kwargs = dict()
with h5py.File(file_path, "r") as file:

# Impact matrix
if "imp_mat" in file:
impact_matrix = file["imp_mat"]
if isinstance(impact_matrix, h5py.Dataset): # Dense
impact_matrix = sparse.csr_matrix(impact_matrix)
else: # Sparse
impact_matrix = sparse.csr_matrix(
(
impact_matrix["data"],
impact_matrix["indices"],
impact_matrix["indptr"],
),
shape=impact_matrix.attrs["shape"],
)
kwargs["imp_mat"] = impact_matrix

# Scalar attributes
scalar_attrs = set(
("crs", "tot_value", "unit", "aai_agg", "frequency_unit")
).intersection(file.attrs.keys())
kwargs.update({attr: file.attrs[attr] for attr in scalar_attrs})

# Array attributes
# NOTE: Need [:] to copy array data. Otherwise, it would be a view that is
# invalidated once we close the file.
array_attrs = set(
("event_id", "date", "coord_exp", "eai_exp", "at_event", "frequency")
).intersection(file.keys())
kwargs.update({attr: file[attr][:] for attr in array_attrs})

# Special handling for 'event_name' because it's a list of strings
if "event_name" in file:
# pylint: disable=no-member
kwargs["event_name"] = list(file["event_name"].asstr()[:])

# Tags
if "tag" in file:
tag_kwargs = dict()
tag_group = file["tag"]
subtags = set(("exp", "impf_set")).intersection(tag_group.keys())
tag_kwargs.update({st: Tag(**tag_group[st].attrs) for st in subtags})

# Special handling for hazard because it has another tag type
if "haz" in tag_group:
tag_kwargs["haz"] = TagHaz(**tag_group["haz"].attrs)
kwargs["tag"] = tag_kwargs

# Create the impact object
return cls(**kwargs)

@staticmethod
def video_direct_impact(exp, impf_set, haz_list, file_name='',
writer=animation.PillowWriter(bitrate=500),
Expand Down
Loading