Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maybe support hdf4 #494

Merged
merged 14 commits into from
Sep 12, 2024
356 changes: 356 additions & 0 deletions kerchunk/hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import fsspec.core
from fsspec.implementations.reference import LazyReferenceMapper
import numpy as np
from numba.core.pythonapi import reflect

martindurant marked this conversation as resolved.
Show resolved Hide resolved
import zarr
import numcodecs

Expand Down Expand Up @@ -681,3 +683,357 @@ def _is_netcdf_variable(dataset: h5py.Dataset):

def has_visititems_links():
return hasattr(h5py.Group, "visititems_links")


decoders = {}


def reg(name):
def f(func):
decoders[name] = func
return func

return f


class HDF4ToZarr:
def __init__(
self,
path,
storage_options=None,
inline_threshold=100,
out=None,
remote_protocol=None,
remote_options=None,
):
self.path = path
self.st = storage_options
self.thresh = inline_threshold
self.out = out or {}
self.remote_protocol = remote_protocol
self.remote_options = remote_options

def read_int(self, n):
return int.from_bytes(self.f.read(n), "big")

def read_ddh(self):
return {"ndd": self.read_int(2), "next": self.read_int(4)}

def read_dd(self):
loc = self.f.tell()
i = int.from_bytes(self.f.read(2), "big")
if i & 0x4000:
extended = True
i = i - 0x4000
else:
extended = False
tag = tags.get(i, i)
no_data = tag not in {"NULL"}
ref = (tag, int.from_bytes(self.f.read(2), "big"))
info = {
"offset": int.from_bytes(self.f.read(4), "big") * no_data,
"length": int.from_bytes(self.f.read(4), "big") * no_data,
"extended": extended,
"loc": loc,
}
return ref, info

def decode(self, tag, info):
self.f.seek(info["offset"])
ident = lambda _, __: info
return decoders.get(tag, ident)(self, info)

def translate(self):
import zarr

self.f = fsspec.open(self.path, **(self.st or {})).open()
fs = fsspec.filesystem(
"reference",
fo=self.out,
remote_protocol=self.remote_protocol,
remote_options=self.remote_options,
)
g = zarr.open_group("reference://", storage_options=dict(fs=fs))
assert self.f.read(4) == b"\x0e\x03\x13\x01"
self.tags = {}
while True:
ddh = self.read_ddh()

for _ in range(ddh["ndd"]):
ident, info = self.read_dd()
self.tags[ident] = info
if ddh["next"] == 0:
# "finished" sentry
break
# or continue
self.f.seek(ddh["next"])

for tag, ref in self.tags:
self._dec(tag, ref)
return self.tags

def _dec(self, tag, ref):
info = self.tags[(tag, ref)]
if not set(info) - {"length", "offset", "extended", "loc"}:
self.f.seek(info["offset"])
if info["extended"]:
info["data"] = self._dec_extended()
else:
info.update(self.decode(tag, info))
return info

def _dec_extended(self):
ext_type = spec[self.read_int(2)]
if ext_type == "CHUNKED":
return self._dec_chunked()
elif ext_type == "LINKED":
return self._dec_linked_header()
elif ext_type == "COMP":
return self._dec_comp()

def _dec_linked_header(self):
# get the bytes of a linked set - these will always be inlined
length = self.read_int(4)
blk_len = self.read_int(4)
num_blk = self.read_int(4)
next_ref = self.read_int(2)
out = []
while next_ref:
next_ref, data = self._dec_linked_block(self.tags[("LINKED", next_ref)])
out.extend([d for d in data if d])
bits = []
for ref in out:
info = self.tags[("LINKED", ref)]
self.f.seek(info["offset"])
bits.append(self.f.read(info["length"]))
return b"".join(bits)

def _dec_linked_block(self, block):
self.f.seek(block["offset"])
next_ref = self.read_int(2)
refs = [self.read_int(2) for _ in range((block["length"] // 2) - 1)]
return next_ref, refs

def _dec_chunked(self):
# we want to turn the chunks table into references
tag_head_len = self.read_int(4)
version = self.f.read(1)[0]
flag = self.read_int(4)
elem_tot_len = self.read_int(4)
chunk_size = self.read_int(4)
nt_size = self.read_int(4)
chk_tbl_tag = tags[self.read_int(2)] # should be VH
chk_tbl_ref = self.read_int(2)
sp_tag = tags[self.read_int(2)]
sp_ref = self.read_int(2)
ndims = self.read_int(4)
dims = [
{
"flag": self.read_int(4),
"dim_length": self.read_int(4),
"chunk_length": self.read_int(4),
}
for _ in range(ndims)
]
fill_value = self.f.read(
self.read_int(4)
) # to be interpreted as a number later
header = self._dec(chk_tbl_tag, chk_tbl_ref)
data = self._dec("VS", chk_tbl_ref)["data"] # corresponding table
# header gives the field pattern for the rows of data, one per chunk
dt = [(f"ind{i}", ">u4") for i in range(len(dims))] + [
("tag", ">u2"),
("ref", ">u2"),
]
rows = np.frombuffer(data, dtype=dt, count=header["nvert"])
# rows["tag"] should always be 61 -> CHUNK
refs = []
for *ind, tag, ref in rows:
# maybe ind needs reversing since everything is FORTRAN
chunk_tag = self.tags[(tags[tag], ref)]
if chunk_tag["extended"]:
self.f.seek(chunk_tag["offset"])
# these are always COMP?
ctype, offset, length = self._dec_extended()
refs.append([".".join(str(_) for _ in ind), offset, length, ctype])
else:
refs.append(
[
".".join(str(_) for _ in ind),
chunk_tag["offset"],
chunk_tag["length"],
]
)
return refs

def _dec_comp(self):
version = self.read_int(2) # always 0
len_uncomp = self.read_int(4)
data_ref = self.read_int(2)
model = self.read_int(2) # always 0
ctype = comp[self.read_int(2)]
tag = self.tags[("COMPRESSED", data_ref)]
offset = tag["offset"]
length = tag["length"]
return ctype, offset, length


@reg("NDG")
def _dec_ndg(self, info):
# links together these things as a Data Group
return [(tags[self.read_int(2)], self.read_int(2)) for _ in range(0, info["length"], 4)]


@reg("SDD")
def _dec_sdd(self, info):
rank = self.read_int(2)
dims = [self.read_int(4) for _ in range(rank)]
data_tag = (tags[self.read_int(2)], self.read_int(2))
scale_tags = [(tags[self.read_int(2)], self.read_int(2)) for _ in range(rank)]
return _pl(locals())


@reg("VERSION")
def _dec_version(self, info):
return {
"major": self.read_int(4),
"minor": self.read_int(4),
"release": self.read_int(4),
"string:": _null_str(self.f.read(info["length"] - 10).decode()),
}


@reg("VH")
def _dec_vh(self, info):
# virtual group ("table") header
interface = self.read_int(2)
nvert = self.read_int(4)
ivsize = self.read_int(2)
nfields = self.read_int(2)
types = [self.read_int(2) for _ in range(nfields)]
isize = [self.read_int(2) for _ in range(nfields)]
offset = [self.read_int(2) for _ in range(nfields)]
order = [self.read_int(2) for _ in range(nfields)]
names = [self.f.read(self.read_int(2)).decode() for _ in range(nfields)]
namelen = self.read_int(2)
name = self.f.read(namelen).decode()
classlen = self.read_int(2)
cls = self.f.read(classlen).decode()
ref = (self.read_int(2), self.read_int(2))
return _pl(locals())


def _null_str(s):
return s.split("\00", 1)[0]


def _pl(l):
return {k: v for k, v in l.items() if k not in {"info", "f", "self"}}


# hdf/src/htags.h
tags = {
1: "NULL",
20: "LINKED",
30: "VERSION",
40: "COMPRESSED",
50: "VLINKED",
51: "VLINKED_DATA",
60: "CHUNKED",
61: "CHUNK",
100: "FID",
101: "FD",
102: "TID",
103: "TD",
104: "DIL",
105: "DIA",
106: "NT",
107: "MT",
108: "FREE",
200: "ID8",
201: "IP8",
202: "RI8",
203: "CI8",
204: "II8",
300: "ID",
301: "LUT",
302: "RI",
303: "CI",
304: "NRI",
306: "RIG",
307: "LD",
308: "MD",
309: "MA",
310: "CCN",
311: "CFM",
312: "AR",
400: "DRAW",
401: "RUN",
500: "XYP",
501: "MTO",
602: "T14",
603: "T105",
700: "SDG",
701: "SDD",
702: "SD",
703: "SDS",
704: "SDL",
705: "SDU",
706: "SDF",
707: "SDM",
708: "SDC",
709: "SDT",
710: "SDLNK",
720: "NDG",
721: "RESERVED",
# "Objects of tag 721 are never actually written to the file. The tag is
# needed to make things easier mixing DFSD and SD style objects in the same file"
731: "CAL",
732: "FV",
799: "BREQ",
781: "SDRAG",
780: "EREQ",
1965: "VG",
1962: "VH",
1963: "VS",
11: "RLE",
12: "IMCOMP",
13: "JPEG",
14: "GREYJPEG",
15: "JPEG5",
16: "GREYJPEG5",
}
spec = {
1: "LINKED",
2: "EXT",
3: "COMP",
4: "VLINKED",
5: "CHUNKED",
6: "BUFFERED",
7: "COMPRAS",
}

# hdf4/hdf/src/hntdefs.h
dtypes = {
5: "f4",
6: "f8",
20: "i1", # = CHAR, 3?
21: "u1", # = UCHAR, 4?
22: "i2",
23: "u2",
24: "i4",
25: "u4",
26: "i8",
27: "u8",
}

# hdf4/hdf/src/hcomp.h
comp = {
0: "NONE",
1: "RLE",
2: "NBIT",
3: "SKPHUFF",
4: "DEFLATE", # called deflate, but code says "gzip" and doc says "GNU zip"
5: "SZIP",
7: "JPEG",
}
Loading