Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix resources extracted from pak_wii being unusable #139

Merged
25 changes: 25 additions & 0 deletions src/retro_data_structures/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,29 @@
]


class LZOCompressedBlockCorruption(LZOCompressedBlock):
# Distinct implementation for Corruption's indexing needs :
# Corruption needs distinct indices for the current segment and the current block within the segment
def _actual_segment_size(self, context):
mem = context._index
context._index = context._._index

decompressed_size = construct.evaluate(self.decompressed_size, context)
segment_size = construct.evaluate(self.segment_size, context)
context._index = mem

previous_segments = context._index * segment_size
if previous_segments > decompressed_size:
# This segment is redundant!
raise construct.StopFieldError

elif previous_segments + segment_size > decompressed_size:
# Last segment
return decompressed_size - previous_segments

else:
# Another segment with this size
return segment_size

Check warning on line 122 in src/retro_data_structures/compression.py

View check run for this annotation

Codecov / codecov/patch

src/retro_data_structures/compression.py#L122

Added line #L122 was not covered by tests


ZlibCompressedBlock = construct.Compressed(construct.GreedyBytes, "zlib", level=9)
74 changes: 65 additions & 9 deletions src/retro_data_structures/formats/pak_wii.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from typing import TYPE_CHECKING

import construct
from construct import Bytes, Const, FocusedSeq, IfThenElse, Int32ub, PrefixedArray, Rebuild, Struct, len_, this
from construct import Bytes, Const, IfThenElse, Int32ub, PrefixedArray, Struct

from retro_data_structures import game_check
from retro_data_structures.base_resource import AssetId, AssetType, Dependency
from retro_data_structures.common_types import AssetId64, FourCC, String
from retro_data_structures.compression import LZOCompressedBlock, ZlibCompressedBlock
from retro_data_structures.compression import LZOCompressedBlockCorruption
from retro_data_structures.construct_extensions.alignment import AlignTo
from retro_data_structures.construct_extensions.dict import make_dict

Expand Down Expand Up @@ -70,14 +70,70 @@
_resources_end=construct.Tell,
)

CompressedPakResource = FocusedSeq(
"data",
decompressed_size=Rebuild(Int32ub, len_(this.data)),
# Added Zlib check for DKCR
data=IfThenElse(game_check.uses_lzo, LZOCompressedBlock(this.decompressed_size), ZlibCompressedBlock),
CMPD_Pak_Resource = Struct(
Belokuikuini marked this conversation as resolved.
Show resolved Hide resolved
magic=Const(b"CMPD"),
block_count=Int32ub,
block_header=construct.Array(
construct.this.block_count,
Struct(
flag=construct.Byte,
compressed_size=construct.Int24ub,
uncompressed_size=construct.Int32ub,
),
),
blocks=construct.Array(
construct.this.block_count,
IfThenElse(
lambda this: this.block_header[this._index].compressed_size
< this.block_header[this._index].uncompressed_size,
Struct(
block=LZOCompressedBlockCorruption(lambda this: this._.block_header[this._index].uncompressed_size),
),
Bytes(lambda this: this.block_header[this._index].uncompressed_size),
),
),
)


class CMPD_Pak_Adapter(construct.Adapter):
Belokuikuini marked this conversation as resolved.
Show resolved Hide resolved
def _decode(self, obj, context, path):
return b"".join([block if type(block) is bytes else block.block for block in obj.blocks])

# Going to rip a page out of PWE's book and compress everything in a single block
def _encode(self, uncompressed, context, path):
res = construct.Container(
[
("magic", b"CMPD"),
("block_count", 1),
(
"block_header",
construct.ListContainer(
[
construct.Container(
[
("flag", 0xA0),
("compressed_size", None),
("uncompressed_size", len(uncompressed)),
]
),
]
),
),
(
"blocks",
construct.ListContainer(
LZOCompressedBlockCorruption(len(uncompressed))._encode(uncompressed, context, path)
),
),
]
)
res.block_header[0].compressed_size = len(res.blocks[0])
return res


CompressedPakResource = CMPD_Pak_Adapter(CMPD_Pak_Resource)


@dataclasses.dataclass
class PakFile:
asset_id: AssetId
Expand All @@ -89,12 +145,12 @@

def get_decompressed(self, target_game: Game) -> bytes:
if self.uncompressed_data is None:
self.uncompressed_data = CompressedPakResource.parse(self.compressed_data, target_game=target_game)
self.uncompressed_data = CMPD_Pak_Resource.parse(self.compressed_data, target_game=target_game)
Belokuikuini marked this conversation as resolved.
Show resolved Hide resolved
return self.uncompressed_data

def get_compressed(self, target_game: Game) -> bytes:
if self.compressed_data is None:
self.compressed_data = CompressedPakResource.build(self.uncompressed_data, target_game=target_game)
self.compressed_data = CMPD_Pak_Resource.build(self.uncompressed_data, target_game=target_game)

Check warning on line 153 in src/retro_data_structures/formats/pak_wii.py

View check run for this annotation

Codecov / codecov/patch

src/retro_data_structures/formats/pak_wii.py#L153

Added line #L153 was not covered by tests
return self.compressed_data

def set_new_data(self, data: bytes):
Expand Down
135 changes: 59 additions & 76 deletions tests/formats/test_pak_wii.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,14 @@
from __future__ import annotations

# The two following imports are only used by file tests
# from glob import glob
# from os import path
import pytest

from retro_data_structures.formats.pak import Pak
from retro_data_structures.formats.pak_wii import (
PAK_WII,
PAKNoData,
)

# The two following classes are only used by file tests
# PakFile,
# PakBody
from retro_data_structures.formats.pak_wii import PAK_WII, CompressedPakResource, PAKNoData
from retro_data_structures.game_check import Game

# ruff: noqa: E501

# The following variables are only used for the file tests and should be set before executing said tests locally
# pak_target = "."
# pak_build_dir = "."

paks = {
"FrontEnd",
"GuiDVD",
Expand All @@ -44,6 +33,48 @@
}


@pytest.fixture(name="compressed_resources")
def _compressed_resources():
"""
The resources can be found in Metroid3.pak
"""
return [
{ # 2 segments resource
"compressed": 1,
"asset": {"type": "TXTR", "id": 4849971089334802081},
"contents": {
"data": b"CMPD\x00\x00\x00\x02\x00\x00\x00\x0c\x00\x00\x00\x0c\xc0\x00\x00\x11\x00\x00\x00\xa0"
b"\x00\x00\x00\n\x00\x10\x00\x10\x00\x00\x00\x02\x00\x0f\x16\x01\xe0\x02\xa0\xaa@\x00 "
b"w\x1c\x00\x11\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff",
"value": (
b"\x00\x00\x00\n\x00\x10\x00\x10\x00\x00\x00\x02\x01\xe0\x02\xa0\xaa\xaa\xaa\xaa\x01\xe0\x02"
b"\xa0\xaa\xaa\xaa\xaa\x01\xe0\x02\xa0\xaa\xaa\xaa\xaa\x01\xe0\x02\xa0\xaa\xaa\xaa\xaa\x01"
b"\xe0\x02\xa0\xaa\xaa\xaa\xaa\x01\xe0\x02\xa0\xaa\xaa\xaa\xaa\x01\xe0\x02\xa0\xaa\xaa\xaa"
b"\xaa\x01\xe0\x02\xa0\xaa\xaa\xaa\xaa\x01\xe0\x02\xa0\xaa\xaa\xaa\xaa\x01\xe0\x02\xa0\xaa"
b"\xaa\xaa\xaa\x01\xe0\x02\xa0\xaa\xaa\xaa\xaa\x01\xe0\x02\xa0\xaa\xaa\xaa\xaa\x01\xe0\x02"
b"\xa0\xaa\xaa\xaa\xaa\x01\xe0\x02\xa0\xaa\xaa\xaa\xaa\x01\xe0\x02\xa0\xaa\xaa\xaa\xaa\x01"
b"\xe0\x02\xa0\xaa\xaa\xaa\xaa\x01\xe0\x02\xa0\xaa\xaa\xaa\xaa\x01\xe0\x02\xa0\xaa\xaa\xaa"
b"\xaa\x01\xe0\x02\xa0\xaa\xaa\xaa\xaa\x01\xe0\x02\xa0\xaa\xaa\xaa\xaa"
),
},
},
{ # 1 segment resource
"compressed": 1,
"asset": {"type": "CSKR", "id": 2135532429836327754},
"contents": {
"data": b"CMPD\x00\x00\x00\x01\xa0\x00\x00*\x00\x00\x00J\x00(\x19SKIN\x00\x00\x00\x02M\x00\x01"
b"\xcf\x00\x03?\x80W\x00\x00\n\x1as\x00\x00\x00\xff/\x00\x00\xdc\x04+\x02\x00\x01\x00\x11\x00\x00"
b"\xff\xff\xff\xff\xff\xff",
"value": (
b"SKIN\x00\x00\x00\x02\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x03?\x80\x00\x00\x00\x00\n"
b"\x1a\x00\x00\x00\n\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
b"\xff\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00"
),
},
},
]


def test_identical_when_keep_data(prime3_iso_provider):
game = Game.CORRUPTION
for pakfile in paks:
Expand Down Expand Up @@ -77,65 +108,17 @@ def test_compare_header_keep_data(prime3_iso_provider):
assert custom_sizes == raw_sizes


# The following tests are what I call file tests :
# They produce or read local files specified by the two global variables pak_target and pak_build_dir
# They are NOT to be executed as tests by CI and are only here for reviewing the testing process

# def test_write_new_pak():
# game = Game.CORRUPTION
# files = [
# PakFile(0xDEADBEEF, "STRG", False, b"abcdefg", None),
# PakFile(0xDEADD00D, "STRG", False, b"hijklmn", None)
# ]
# body = PakBody(b"joe mama so fat ", [
# ("Hey its me Jack Block from minecraft",
# Dependency("STRG", 0xDEADBEEF))
# ],
# files
# )

# output_pak = Pak(body, game)
# encoded = output_pak.build()

# with open(pak_target, "wb") as fd :
# fd.write(encoded)

# def test_build_from_extracted_pak():
# game = Game.CORRUPTION

# files = []
# for file in glob(pak_build_dir + "/*"):
# asset_id, asset_type = file.split(".")
# asset_id = int(path.basename(asset_id), 16)

# data = b""
# with open(file, "rb") as fd:
# data = fd.read()

# files.append(PakFile(asset_id, asset_type, False, data, None))

# body = PakBody(b"\x1B\x62\xF7\xCA\x15\x60\xB1\x85\xC1\xE1\x09\x43\x99\x4F\xB9\xAC", [
# ("03b_Bryyo_Fire_#SERIAL#",
# Dependency("MLVL", 0x9BA9292D588D6EB8)),
# ("03b_Bryyo_Reptilicus_#SERIAL#",
# Dependency("MLVL", 0x9F059B53561A9695)),
# ("03b_Bryyo_Ice_#SERIAL#",
# Dependency("MLVL", 0xB0D67636D61F3868))
# ],
# files
# )

# output_pak = Pak(body, game)
# encoded = output_pak.build()

# with open(pak_target, "wb") as fd:
# fd.write(encoded)

# def test_parse_new_pak():
# game = Game.CORRUPTION

# with open(pak_target, "rb") as fd:
# raw = fd.read()

# decoded = Pak.parse(raw, game)
# return decoded
def test_corruption_resource_decode(compressed_resources):
for compressed_resource in compressed_resources:
decoded = CompressedPakResource.parse(compressed_resource["contents"]["data"], target_game=Game.CORRUPTION)

assert len(decoded) == len(compressed_resource["contents"]["value"])
assert decoded == compressed_resource["contents"]["value"]


def test_corruption_resource_encode_decode(compressed_resources):
for compressed_resource in compressed_resources:
raw = compressed_resource["contents"]["value"]
decoded = CompressedPakResource.build(raw, target_game=Game.CORRUPTION)
encoded = CompressedPakResource.parse(decoded, target_game=Game.CORRUPTION)
assert raw == encoded