Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add XZ (LZMA) checksum repair utility #39

Merged
merged 6 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions dissect/util/compression/xz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import io
from binascii import crc32
from typing import BinaryIO

from dissect.util.stream import OverlayStream


def repair_checksum(fh: BinaryIO) -> BinaryIO:
"""Repair CRC32 checksums for all headers in an XZ stream.

FortiOS XZ files have (on purpose) corrupt streams which they read using a modified ``xz`` binary.
The only thing changed are the CRC32 checksums, so partially parse the XZ file and fix all of them.

References:
- https://tukaani.org/xz/xz-file-format-1.1.0.txt
- https://github.com/Rogdham/python-xz

Args:
fh: A file-like object of an LZMA stream to repair.
"""
size = fh.seek(0, io.SEEK_END)
Schamper marked this conversation as resolved.
Show resolved Hide resolved
repaired = OverlayStream(fh, size)
fh.seek(0)

header = fh.read(12)
Schamper marked this conversation as resolved.
Show resolved Hide resolved
# Check header magic
if header[:6] != b"\xfd7zXZ\x00":
Schamper marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("Not an XZ file")

Check warning on line 28 in dissect/util/compression/xz.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/compression/xz.py#L28

Added line #L28 was not covered by tests

# Add correct header CRC32
repaired.add(8, _crc32(header[6:8]))
Schamper marked this conversation as resolved.
Show resolved Hide resolved

fh.seek(-12, io.SEEK_END)
Schamper marked this conversation as resolved.
Show resolved Hide resolved
footer = fh.read(12)
Schamper marked this conversation as resolved.
Show resolved Hide resolved

# Check footer magic
if footer[10:12] != b"YZ":
Schamper marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("Not an XZ file")

Check warning on line 38 in dissect/util/compression/xz.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/compression/xz.py#L38

Added line #L38 was not covered by tests

# Add correct footer CRC32
repaired.add(fh.tell() - 12, _crc32(footer[4:10]))
Schamper marked this conversation as resolved.
Show resolved Hide resolved

backward_size = (int.from_bytes(footer[4:8], "little") + 1) * 4
fh.seek(-12 - backward_size, io.SEEK_END)
Schamper marked this conversation as resolved.
Show resolved Hide resolved
index = fh.read(backward_size)

# Add correct index CRC32
repaired.add(fh.tell() - 4, _crc32(index[:-4]))
Schamper marked this conversation as resolved.
Show resolved Hide resolved

# Parse the index
isize, nb_records = _mbi(index[1:])
Schamper marked this conversation as resolved.
Show resolved Hide resolved
index = index[1 + isize : -4]
records = []
for _ in range(nb_records):
if not index:
raise ValueError("index size")

Check warning on line 56 in dissect/util/compression/xz.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/compression/xz.py#L56

Added line #L56 was not covered by tests
Schamper marked this conversation as resolved.
Show resolved Hide resolved

isize, unpadded_size = _mbi(index)
if not unpadded_size:
raise ValueError("index record unpadded size")

Check warning on line 60 in dissect/util/compression/xz.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/compression/xz.py#L60

Added line #L60 was not covered by tests

index = index[isize:]
if not index:
raise ValueError("index size")

Check warning on line 64 in dissect/util/compression/xz.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/compression/xz.py#L64

Added line #L64 was not covered by tests

isize, uncompressed_size = _mbi(index)
if not uncompressed_size:
raise ValueError("index record uncompressed size")

Check warning on line 68 in dissect/util/compression/xz.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/compression/xz.py#L68

Added line #L68 was not covered by tests

index = index[isize:]
records.append((unpadded_size, uncompressed_size))

block_start = size - 12 - backward_size
Schamper marked this conversation as resolved.
Show resolved Hide resolved
blocks_len = sum((unpadded_size + 3) & ~3 for unpadded_size, _ in records)
block_start -= blocks_len

# Iterate over all blocks and add the correct block header CRC32
for unpadded_size, _ in records:
fh.seek(block_start)

block_header = fh.read(1)
block_header_size = (block_header[0] + 1) * 4
block_header += fh.read(block_header_size - 1)
repaired.add(fh.tell() - 4, _crc32(block_header[:-4]))

block_start += (unpadded_size + 3) & ~3

return repaired


def _mbi(data: bytes) -> tuple[int, int]:
Schamper marked this conversation as resolved.
Show resolved Hide resolved
value = 0
for size, byte in enumerate(data):
value |= (byte & 0x7F) << (size * 7)
if not byte & 0x80:
return size + 1, value
raise ValueError("Invalid mbi")

Check warning on line 97 in dissect/util/compression/xz.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/compression/xz.py#L97

Added line #L97 was not covered by tests


def _crc32(data: bytes) -> bytes:
return int.to_bytes(crc32(data), 4, "little")
Schamper marked this conversation as resolved.
Show resolved Hide resolved
16 changes: 16 additions & 0 deletions tests/test_compression.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import hashlib
import lzma
from io import BytesIO

from dissect.util.compression import (
lz4,
Expand All @@ -7,6 +9,7 @@
lzxpress,
lzxpress_huffman,
sevenbit,
xz,
)


Expand Down Expand Up @@ -254,3 +257,16 @@ def test_sevenbit_decompress_wide():
result = sevenbit.decompress(bytes.fromhex("b796384d078ddf6db8bc3c9fa7df6e10bd3ca783e67479da7d06"), wide=True)
target = "7-bit compression test string".encode("utf-16-le")
assert result == target


def test_xz_repair_checksum():
buf = BytesIO(
bytes.fromhex(
"fd377a585a000004deadbeef0200210116000000deadbeefe00fff001e5d003a"
"194ace2b0f238ce989a29cfeb182a4e814985366b771770233ca314836000000"
"2972e8fd62b18ee300013a8020000000deadbeefdeadbeef020000000004595a"
)
)
repaired = xz.repair_checksum(buf)

assert lzma.decompress(repaired.read()) == b"test" * 1024