Skip to content

Commit

Permalink
Add a zlib stream implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Schamper committed Dec 24, 2023
1 parent 7041f19 commit 87f6471
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 0 deletions.
92 changes: 92 additions & 0 deletions dissect/util/stream.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import io
import os
import sys
import zlib
from bisect import bisect_left, bisect_right
from threading import Lock
from typing import BinaryIO, Optional, Union
Expand Down Expand Up @@ -550,3 +552,93 @@ def _read(self, offset: int, length: int) -> bytes:
overlay_idx += 1

return b"".join(result)


class ZlibStream(AlignedStream):
"""Create a zlib stream from another file-like object.
Basically the same as ``gzip.GzipFile`` but for raw zlib streams.
Due to the nature of zlib streams, seeking backwards requires resetting the decompression context.
Args:
fh: The source file-like object.
size: The size the stream should be.
"""

def __init__(self, fh: BinaryIO, size: Optional[int] = None, **kwargs):
self._fh = fh

self._zlib = None
self._zlib_args = kwargs
self._zlib_offset = 0
self._zlib_prepend = b""
self._zlib_prepend_offset = None
self._rewind()

super().__init__(size)

def _rewind(self) -> None:
self._fh.seek(0)
self._zlib = zlib.decompressobj(**self._zlib_args)
self._zlib_offset = 0
self._zlib_prepend = b""
self._zlib_prepend_offset = None

def _seek_zlib(self, offset: int) -> None:
if offset < self._zlib_offset:
self._rewind()

Check warning on line 589 in dissect/util/stream.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/stream.py#L589

Added line #L589 was not covered by tests

while self._zlib_offset < offset:
read_size = min(offset - self._zlib_offset, self.align)
if self._read_zlib(read_size) == b"":
break

Check warning on line 594 in dissect/util/stream.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/stream.py#L592-L594

Added lines #L592 - L594 were not covered by tests

def _read_fh(self, length: int) -> bytes:
if self._zlib_prepend_offset is None:
return self._fh.read(length)

if self._zlib_prepend_offset + length <= len(self._zlib_prepend):
offset = self._zlib_prepend_offset
self._zlib_prepend_offset += length
return self._zlib_prepend_offset[offset : self._zlib_prepend_offset]

Check warning on line 603 in dissect/util/stream.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/stream.py#L600-L603

Added lines #L600 - L603 were not covered by tests
else:
offset = self._zlib_prepend_offset
self._zlib_prepend_offset = None
return self._zlib_prepend[offset:] + self._fh.read(length - len(self._zlib_prepend) + offset)

Check warning on line 607 in dissect/util/stream.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/stream.py#L605-L607

Added lines #L605 - L607 were not covered by tests

def _read_zlib(self, length: int) -> bytes:
if length < 0:
return self.readall()

Check warning on line 611 in dissect/util/stream.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/stream.py#L611

Added line #L611 was not covered by tests

result = []
while length > 0:
buf = self._read_fh(io.DEFAULT_BUFFER_SIZE)
decompressed = self._zlib.decompress(buf, length)

if self._zlib.unconsumed_tail != b"":
self._zlib_prepend = self._zlib.unconsumed_tail
self._zlib_prepend_offset = 0

Check warning on line 620 in dissect/util/stream.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/stream.py#L619-L620

Added lines #L619 - L620 were not covered by tests

if buf == b"":
break

result.append(decompressed)
length -= len(decompressed)

buf = b"".join(result)
self._zlib_offset += len(buf)
return buf

def _read(self, offset: int, length: int) -> bytes:
self._seek_zlib(offset)
return self._read_zlib(length)

def readall(self) -> bytes:
chunks = []

Check warning on line 637 in dissect/util/stream.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/stream.py#L637

Added line #L637 was not covered by tests
# sys.maxsize means the max length of output buffer is unlimited,
# so that the whole input buffer can be decompressed within one
# .decompress() call.
while data := self._read_zlib(sys.maxsize):
chunks.append(data)

Check warning on line 642 in dissect/util/stream.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/stream.py#L641-L642

Added lines #L641 - L642 were not covered by tests

return b"".join(chunks)

Check warning on line 644 in dissect/util/stream.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/stream.py#L644

Added line #L644 was not covered by tests
21 changes: 21 additions & 0 deletions tests/test_stream.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import io
import zlib

import pytest

Expand Down Expand Up @@ -170,3 +171,23 @@ def test_overlay_stream():
fh.add((512 * 8) - 4, b"\x04" * 100)
fh.seek((512 * 8) - 4)
assert fh.read(100) == b"\x04" * 4


def test_zlib_stream():
buf = io.BytesIO(zlib.compress(b"\x01" * 512 + b"\x02" * 512 + b"\x03" * 512 + b"\x04" * 512))
fh = stream.ZlibStream(buf, size=512 * 4)

assert fh.read(512) == b"\x01" * 512
assert fh.read(512) == b"\x02" * 512
assert fh.read(512) == b"\x03" * 512
assert fh.read(512) == b"\x04" * 512
assert fh.read(1) == b""

fh.seek(0)
assert fh.read(512) == b"\x01" * 512

fh.seek(1024)
assert fh.read(512) == b"\x03" * 512

fh.seek(512)
assert fh.read(1024) == b"\x02" * 512 + b"\x03" * 512

0 comments on commit 87f6471

Please sign in to comment.