Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic clean up #722

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 49 additions & 69 deletions mikeio/generic.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
"""Generic functions for working with all types of dfs files."""

from __future__ import annotations
from dataclasses import dataclass
import math
import operator
import os
import pathlib
from copy import deepcopy
from datetime import datetime, timedelta
from shutil import copyfile
from collections.abc import Iterable, Sequence
from typing import Union
from typing import Callable, Union
import warnings


import numpy as np
Expand Down Expand Up @@ -38,7 +41,19 @@

show_progress = True

__all__ = [
"avg_time",
"concat",
"diff",
"extract",
"fill_corrupt",
"quantile",
"scale",
"sum",
]


@dataclass
class _ChunkInfo:
"""Class for keeping track of an chunked processing.

Expand Down Expand Up @@ -109,7 +124,7 @@ def _clone(
outfilename: str | pathlib.Path,
start_time: datetime | None = None,
timestep: float | None = None,
items: Sequence[int | str | DfsDynamicItemInfo] | None = None,
items: Sequence[int | DfsDynamicItemInfo] | None = None,
) -> DfsFile:
"""Clone a dfs file.

Expand All @@ -123,7 +138,7 @@ def _clone(
new start time for the new file, by default None
timestep : float, optional
new timestep (in seconds) for the new file, by default None
items : list(int,str,eum.ItemInfo), optional
items : list(int,eum.ItemInfo), optional
list of items for new file, either as a list of
ItemInfo or a list of str/int referring to original file,
default: all items from original file
Expand Down Expand Up @@ -162,9 +177,6 @@ def _clone(
for customBlock in fi.CustomBlocks:
builder.AddCustomBlock(customBlock)

names = [x.Name for x in source.ItemInfo]
item_lookup = {name: i for i, name in enumerate(names)}

if isinstance(items, Iterable) and not isinstance(items, str):
for item in items:
if isinstance(item, ItemInfo):
Expand All @@ -177,11 +189,8 @@ def _clone(
builder.AddDynamicItem(item)
elif isinstance(item, int):
builder.AddDynamicItem(source.ItemInfo[item])
elif isinstance(item, str):
item_no = item_lookup[item]
builder.AddDynamicItem(source.ItemInfo[item_no])

elif isinstance(items, (int, str)) or items is None:
elif isinstance(items, (int)) or items is None:
# must be str/int refering to original file (or None)
item_numbers = _valid_item_numbers(source.ItemInfo, items)
items = [source.ItemInfo[item] for item in item_numbers]
Expand All @@ -190,17 +199,11 @@ def _clone(
else:
raise ValueError("Items of type: {type(items)} is not supported")

# Create file
builder.CreateFile(str(outfilename))

# Copy static items
while True:
static_item = source.ReadStaticItemNext()
if static_item is None:
break
for static_item in iter(source.ReadStaticItemNext, None):
builder.AddStaticItem(static_item)

# Get the file
file = builder.GetFile()

source.Close()
Expand Down Expand Up @@ -322,12 +325,13 @@ def fill_corrupt(
dfs.Close()


def sum(
def _process_dfs_files(
infilename_a: str | pathlib.Path,
infilename_b: str | pathlib.Path,
outfilename: str | pathlib.Path,
op: Callable[[np.ndarray, np.ndarray], np.ndarray],
) -> None:
"""Sum two dfs files (a+b).
"""Process two dfs files with a specified operation.

Parameters
----------
Expand All @@ -337,6 +341,8 @@ def sum(
full path to the second input file
outfilename: str | pathlib.Path
full path to the output file
op: Callable[[np.ndarray, np.ndarray], np.ndarray]
operation to perform on the data arrays

"""
infilename_a = str(infilename_a)
Expand All @@ -362,10 +368,10 @@ def sum(

itemdata_b = dfs_i_b.ReadItemTimeStep(item + 1, timestep)
d_b = itemdata_b.Data
d_a[d_a == deletevalue] = np.nan
d_b[d_b == deletevalue] = np.nan
time = itemdata_a.Time

outdata = d_a + d_b
outdata = op(d_a, d_b)

darray = outdata.astype(np.float32)

Expand All @@ -376,60 +382,34 @@ def sum(
dfs_o.Close()


def diff(
# TODO sum is conflicting with the built-in sum function, which we could haved used above.
def sum(
infilename_a: str | pathlib.Path,
infilename_b: str | pathlib.Path,
outfilename: str | pathlib.Path,
) -> None:
"""Calculate difference between two dfs files (a-b).

Parameters
----------
infilename_a: str | pathlib.Path
full path to the first input file
infilename_b: str | pathlib.Path
full path to the second input file
outfilename: str | pathlib.Path
full path to the output file

"""
infilename_a = str(infilename_a)
infilename_b = str(infilename_b)
outfilename = str(outfilename)

copyfile(infilename_a, outfilename)

dfs_i_a = DfsFileFactory.DfsGenericOpen(infilename_a)
dfs_i_b = DfsFileFactory.DfsGenericOpen(infilename_b)
dfs_o = DfsFileFactory.DfsGenericOpenEdit(outfilename)

deletevalue = dfs_i_a.FileInfo.DeleteValueFloat

n_time_steps = dfs_i_a.FileInfo.TimeAxis.NumberOfTimeSteps
n_items = len(dfs_i_a.ItemInfo)
# TODO Add checks to verify identical structure of file a and b
"""Sum two dfs files (a+b)."""
# deprecated
warnings.warn(FutureWarning("This function is deprecated. Use add instead."))
_process_dfs_files(infilename_a, infilename_b, outfilename, operator.add)

for timestep in trange(n_time_steps):
for item in range(n_items):
itemdata_a = dfs_i_a.ReadItemTimeStep(item + 1, timestep)
d_a = itemdata_a.Data
d_a[d_a == deletevalue] = np.nan

itemdata_b = dfs_i_b.ReadItemTimeStep(item + 1, timestep)
d_b = itemdata_b.Data
d_b[d_b == deletevalue] = np.nan
time = itemdata_a.Time

outdata = d_a - d_b

d = outdata.astype(np.float32)
d[np.isnan(d)] = deletevalue
def add(
infilename_a: str | pathlib.Path,
infilename_b: str | pathlib.Path,
outfilename: str | pathlib.Path,
) -> None:
"""Add two dfs files (a+b)."""
_process_dfs_files(infilename_a, infilename_b, outfilename, operator.add)

dfs_o.WriteItemTimeStep(item + 1, timestep, time, d)

dfs_i_a.Close()
dfs_i_b.Close()
dfs_o.Close()
def diff(
infilename_a: str | pathlib.Path,
infilename_b: str | pathlib.Path,
outfilename: str | pathlib.Path,
) -> None:
"""Calculate difference between two dfs files (a-b)."""
_process_dfs_files(infilename_a, infilename_b, outfilename, operator.sub)


def concat(
Expand Down Expand Up @@ -558,8 +538,8 @@ def concat(
def extract(
infilename: str | pathlib.Path,
outfilename: str | pathlib.Path,
start: int = 0,
end: int = -1,
start: int | float | str | datetime = 0,
end: int | float | str | datetime = -1,
step: int = 1,
items: Sequence[int | str] | None = None,
) -> None:
Expand Down
Loading