Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start developing compression for Uproot writing. #416

Merged
merged 7 commits into from
Aug 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions src/uproot/behaviors/TBranch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2458,25 +2458,24 @@ def compression(self):
@property
def compressed_bytes(self):
"""
The number of compressed bytes in all ``TBaskets`` of this ``TBranch``.
The number of compressed bytes in all ``TBaskets`` of this ``TBranch``,
including the TKey headers (which are always uncompressed).

The number of compressed bytes is specified in the ``TBranch`` metadata
and can be determined without reading any additional data. The
uncompressed bytes requires reading all of the ``TBasket`` ``TKeys`` at
least.
This information is specified in the ``TBranch`` metadata (``fZipBytes``)
and can be determined without reading any additional data.
"""
return sum(self.basket_compressed_bytes(i) for i in range(self.num_baskets))
return self.member("fZipBytes")

@property
def uncompressed_bytes(self):
"""
The number of uncompressed bytes in all ``TBaskets`` of this ``TBranch``.
The number of uncompressed bytes in all ``TBaskets`` of this ``TBranch``,
including the TKey headers.

The number of uncompressed bytes cannot be determined without reading a
``TKey``, which are small, but may be slow for remote connections because
of the latency of round-trip requests.
This information is specified in the ``TBranch`` metadata (``fTotBytes``)
and can be determined without reading any additional data.
"""
return sum(self.basket_uncompressed_bytes(i) for i in range(self.num_baskets))
return self.member("fTotBytes")

@property
def compression_ratio(self):
Expand Down Expand Up @@ -2554,7 +2553,8 @@ def basket_chunk_cursor(self, basket_num):

def basket_compressed_bytes(self, basket_num):
"""
The number of compressed bytes for the ``TBasket`` at ``basket_num``.
The number of compressed bytes for the ``TBasket`` at ``basket_num``,
including the TKey header.

The number of compressed bytes is specified in the ``TBranch`` metadata
and can be determined without reading any additional data. The
Expand All @@ -2576,7 +2576,8 @@ def basket_compressed_bytes(self, basket_num):

def basket_uncompressed_bytes(self, basket_num):
"""
The number of uncompressed bytes for the ``TBasket`` at ``basket_num``.
The number of uncompressed bytes for the ``TBasket`` at ``basket_num``,
including the TKey header.

The number of uncompressed bytes cannot be determined without reading a
``TKey``, which are small, but may be slow for remote connections because
Expand Down
34 changes: 34 additions & 0 deletions src/uproot/behaviors/TTree.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,40 @@ def tree(self):
"""
return self

@property
def compressed_bytes(self):
"""
The number of compressed bytes in all ``TBaskets`` of all ``TBranches``
of this ``TTree``, including all the TKey headers (which are always
uncompressed).

This information is specified in the ``TTree`` metadata (``fZipBytes``)
and can be determined without reading any additional data.
"""
return self.member("fZipBytes")

@property
def uncompressed_bytes(self):
"""
The number of uncompressed bytes in all ``TBaskets`` of all ``TBranches``
of this ``TTree``, including all the TKey headers.

This information is specified in the ``TTree`` metadata (``fTotBytes``)
and can be determined without reading any additional data.
"""
return self.member("fTotBytes")

@property
def compression_ratio(self):
"""
The number of uncompressed bytes divided by the number of compressed
bytes for this ``TTree``.

See :ref:`uproot.behaviors.TTree.TTree.compressed_bytes` and
:ref:`uproot.behaviors.TTree.TTree.uncompressed_bytes`.
"""
return float(self.uncompressed_bytes) / float(self.compressed_bytes)

@property
def aliases(self):
u"""
Expand Down
194 changes: 162 additions & 32 deletions src/uproot/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,25 @@ def level(self, value):
raise ValueError("Compression level must be between 0 and 9 (inclusive)")
self._level = int(value)

def __eq__(self, other):
if isinstance(other, Compression):
return self.name == other.name and self.level == other.level
else:
return False


class _DecompressZLIB(object):
name = "ZLIB"
_2byte = b"ZL"
_method = b"\x08"

def decompress(self, data, uncompressed_bytes=None):
import zlib

class ZLIB(Compression):
return zlib.decompress(data)


class ZLIB(Compression, _DecompressZLIB):
"""
Args:
level (int, 0-9): Compression level: 0 is uncompressed, 1 is minimally
Expand All @@ -98,14 +115,27 @@ class ZLIB(Compression):
Uproot uses ``zlib`` from the Python standard library.
"""

@classmethod
def decompress(cls, data, uncompressed_bytes=None):
def __init__(self, level):
_DecompressZLIB.__init__(self)
Compression.__init__(self, level)

def compress(self, data):
import zlib

return zlib.decompress(data)
return zlib.compress(data, level=self._level)


class _DecompressLZMA(object):
name = "LZMA"
_2byte = b"XZ"
_method = b"\x00"

def decompress(self, data, uncompressed_bytes=None):
lzma = uproot.extras.lzma()
return lzma.decompress(data)


class LZMA(Compression):
class LZMA(Compression, _DecompressLZMA):
"""
Args:
level (int, 0-9): Compression level: 0 is uncompressed, 1 is minimally
Expand All @@ -118,13 +148,30 @@ class LZMA(Compression):
In Python 2, ``backports.lzma`` must be installed.
"""

@classmethod
def decompress(cls, data, uncompressed_bytes=None):
def __init__(self, level):
_DecompressLZMA.__init__(self)
Compression.__init__(self, level)

def compress(self, data):
lzma = uproot.extras.lzma()
return lzma.decompress(data)
return lzma.compress(data, preset=self._level)


class _DecompressLZ4(object):
name = "LZ4"
_2byte = b"L4"
_method = b"\x01"

class LZ4(Compression):
def decompress(self, data, uncompressed_bytes=None):
lz4_block = uproot.extras.lz4_block()
if uncompressed_bytes is None:
raise ValueError(
"lz4 block decompression requires the number of uncompressed bytes"
)
return lz4_block.decompress(data, uncompressed_size=uncompressed_bytes)


class LZ4(Compression, _DecompressLZ4):
"""
Args:
level (int, 0-9): Compression level: 0 is uncompressed, 1 is minimally
Expand All @@ -135,17 +182,35 @@ class LZ4(Compression):
The ``zl4`` and ``xxhash`` libraries must be installed.
"""

@classmethod
def decompress(cls, data, uncompressed_bytes=None):
def __init__(self, level):
_DecompressLZ4.__init__(self)
Compression.__init__(self, level)

def compress(self, data):
lz4_block = uproot.extras.lz4_block()
if uncompressed_bytes is None:
raise ValueError(
"lz4 block decompression requires the number of uncompressed bytes"
)
return lz4_block.decompress(data, uncompressed_size=uncompressed_bytes)
return lz4_block.compress(data, compression=self._level, store_size=False)


class _DecompressZSTD(object):
name = "ZSTD"
_2byte = b"ZS"
_method = b"\x01"

def __init__(self):
self._decompressor = None

@property
def decompressor(self):
if self._decompressor is None:
zstandard = uproot.extras.zstandard()
self._decompressor = zstandard.ZstdDecompressor()
return self._decompressor

def decompress(self, data, uncompressed_bytes=None):
return self.decompressor.decompress(data)

class ZSTD(Compression):

class ZSTD(Compression, _DecompressZSTD):
"""
Args:
level (int, 0-9): Compression level: 0 is uncompressed, 1 is minimally
Expand All @@ -156,11 +221,20 @@ class ZSTD(Compression):
The ``zstandard`` library must be installed.
"""

@classmethod
def decompress(cls, data, uncompressed_bytes=None):
zstandard = uproot.extras.zstandard()
dctx = zstandard.ZstdDecompressor()
return dctx.decompress(data)
def __init__(self, level):
_DecompressZSTD.__init__(self)
Compression.__init__(self, level)
self._compressor = None

@property
def compressor(self):
if self._compressor is None:
zstandard = uproot.extras.zstandard()
self._compressor = zstandard.ZstdCompressor(level=self._level)
return self._compressor

def compress(self, data):
return self.compressor.compress(data)


algorithm_codes = {
Expand All @@ -170,6 +244,10 @@ def decompress(cls, data, uncompressed_bytes=None):
uproot.const.kZSTD: ZSTD,
}

_decompress_ZLIB = _DecompressZLIB()
_decompress_LZMA = _DecompressLZMA()
_decompress_LZ4 = _DecompressLZ4()
_decompress_ZSTD = _DecompressZSTD()

_decompress_header_format = struct.Struct("2sBBBBBBB")
_decompress_checksum_format = struct.Struct(">Q")
Expand Down Expand Up @@ -227,16 +305,16 @@ def decompress(
block_compressed_bytes = c1 + (c2 << 8) + (c3 << 16)
block_uncompressed_bytes = u1 + (u2 << 8) + (u3 << 16)

if algo == b"ZL":
cls = ZLIB
if algo == _decompress_ZLIB._2byte:
decompressor = _decompress_ZLIB
data = cursor.bytes(chunk, block_compressed_bytes, context)

elif algo == b"XZ":
cls = LZMA
elif algo == _decompress_LZMA._2byte:
decompressor = _decompress_LZMA
data = cursor.bytes(chunk, block_compressed_bytes, context)

elif algo == b"L4":
cls = LZ4
elif algo == _decompress_LZ4._2byte:
decompressor = _decompress_LZ4
block_compressed_bytes -= 8
expected_checksum = cursor.field(
chunk, _decompress_checksum_format, context
Expand All @@ -253,8 +331,8 @@ def decompress(
)
)

elif algo == b"ZS":
cls = ZSTD
elif algo == _decompress_ZSTD._2byte:
decompressor = _decompress_ZSTD
data = cursor.bytes(chunk, block_compressed_bytes, context)

elif algo == b"CS":
Expand All @@ -275,9 +353,13 @@ def decompress(
)

if block_info is not None:
block_info.append((cls, block_compressed_bytes, block_uncompressed_bytes))
block_info.append(
(decompressor.name, block_compressed_bytes, block_uncompressed_bytes)
)

uncompressed_bytestring = cls.decompress(data, block_uncompressed_bytes)
uncompressed_bytestring = decompressor.decompress(
data, block_uncompressed_bytes
)

if len(uncompressed_bytestring) != block_uncompressed_bytes:
raise ValueError(
Expand Down Expand Up @@ -337,3 +419,51 @@ def hook_after_block(**kwargs): # noqa: D103

decompress.hook_before_block = hook_before_block
decompress.hook_after_block = hook_after_block

_3BYTE_MAX = 2 ** 24 - 1
_4byte = struct.Struct("<I") # compressed sizes are 3-byte little endian!


def compress(data, compression):
"""
FIXME: docstring
"""
if compression is None or compression.level == 0:
return data

out = []
next = data

while len(next) > 0:
block, next = next[:_3BYTE_MAX], next[_3BYTE_MAX:]

compressed = compression.compress(block)
len_compressed = len(compressed)

if isinstance(compression, LZ4):
xxhash = uproot.extras.xxhash()
computed_checksum = xxhash.xxh64(compressed).intdigest()
checksum = _decompress_checksum_format.pack(computed_checksum)
len_compressed += 8
else:
checksum = b""

uncompressed_size = _4byte.pack(len(block))[:-1]
compressed_size = _4byte.pack(len_compressed)[:-1]

out.append(
compression._2byte
+ compression._method
+ compressed_size
+ uncompressed_size
+ checksum
)

out.append(compressed)

out = b"".join(out)

if len(out) < len(data):
return out
else:
return data
Loading