Skip to content

Commit

Permalink
Merge pull request #12 from MAK-Relic-Tool/issue-#40-and-drive-name-h…
Browse files Browse the repository at this point in the history
…otifx

SGA Packing Patch
  • Loading branch information
ModernMAK authored Oct 19, 2023
2 parents f887f00 + ad5d98d commit fca6e51
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 64 deletions.
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ relic.cli =
relic.cli.sga =
unpack = relic.sga.core.cli:RelicSgaUnpackCli
pack = relic.sga.core.cli:RelicSgaPackCli
repack = relic.sga.core.cli:RelicSgaRepackCli

[options.packages.find]
where = src
2 changes: 1 addition & 1 deletion src/relic/sga/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
"""
from relic.sga.core.definitions import Version, MagicWord, StorageType, VerificationType

__version__ = "1.1.1"
__version__ = "1.1.2"
72 changes: 69 additions & 3 deletions src/relic/sga/core/cli.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import argparse
import os.path
from argparse import ArgumentParser, Namespace
from typing import Optional
from typing import Optional, Callable

import fs.copy
from fs.base import FS
Expand All @@ -20,6 +22,42 @@ def _create_parser(
return command_group.add_parser("sga")


def _arg_exists_err(value: str) -> argparse.ArgumentTypeError:
return argparse.ArgumentTypeError(f"The given path '{value}' does not exist!")


def _get_dir_type_validator(exists: bool) -> Callable[[str], str]:
def _dir_type(path: str) -> str:
if not os.path.exists(path):
if exists:
raise _arg_exists_err(path)
else:
return path

if os.path.isdir(path):
return path

raise argparse.ArgumentTypeError(f"The given path '{path}' is not a directory!")

return _dir_type


def _get_file_type_validator(exists: Optional[bool]) -> Callable[[str], str]:
def _file_type(path: str) -> str:
if not os.path.exists(path):
if exists:
raise _arg_exists_err(path)
else:
return path

if os.path.isfile(path):
return path

raise argparse.ArgumentTypeError(f"The given path '{path}' is not a file!")

return _file_type


class RelicSgaUnpackCli(CliPlugin):
def _create_parser(
self, command_group: Optional[_SubParsersAction] = None
Expand All @@ -30,8 +68,16 @@ def _create_parser(
else:
parser = command_group.add_parser("unpack")

parser.add_argument("src_sga", type=str, help="Source SGA File")
parser.add_argument("out_dir", type=str, help="Output Directory")
parser.add_argument(
"src_sga",
type=_get_file_type_validator(exists=True),
help="Source SGA File",
)
parser.add_argument(
"out_dir",
type=_get_dir_type_validator(exists=False),
help="Output Directory",
)

return parser

Expand Down Expand Up @@ -64,3 +110,23 @@ def _create_parser(
# pack further delegates to version plugins

return parser


class RelicSgaRepackCli(CliPluginGroup):
"""An alternative to pack which 'repacks' an SGA. Intended for testing purposes."""

GROUP = "relic.cli.sga.repack"

def _create_parser(
self, command_group: Optional[_SubParsersAction] = None
) -> ArgumentParser:
parser: ArgumentParser
desc = "Debug Command; reads and repacks an SGA archive."
if command_group is None:
parser = ArgumentParser("repack", description=desc)
else:
parser = command_group.add_parser("repack", description=desc)

# pack further delegates to version plugins

return parser
61 changes: 24 additions & 37 deletions src/relic/sga/core/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,10 @@ def to_info(self, namespaces=None):


class _EssenceDriveFS(MemoryFS):
def __init__(self, alias: str) -> None:
def __init__(self, alias: str, name: str) -> None:
super().__init__()
self.alias = alias
self.name = name

def _make_dir_entry(
self, resource_type: ResourceType, name: str
Expand Down Expand Up @@ -292,6 +293,22 @@ def setinfo(self, path: str, info: Mapping[str, Mapping[str, object]]) -> None:
# if LAZY_NAMESPACE in info and not resource_entry.is_dir:
# lazy

def getinfo(
self, path, namespaces=None
): # type: (Text, Optional[Collection[Text]]) -> Info
info = super().getinfo(path, namespaces)

_path = self.validatepath(path)
if _path == "/" and (
namespaces is not None and ESSENCE_NAMESPACE in namespaces
):
raw_info = info.raw
essence_ns = dict(raw_info[ESSENCE_NAMESPACE])
essence_ns["alias"] = self.alias
essence_ns["name"] = self.name
info = Info(raw_info)
return info

def getessence(self, path: str) -> Info:
return self.getinfo(path, [ESSENCE_NAMESPACE])

Expand Down Expand Up @@ -324,9 +341,12 @@ def setmeta(self, meta: Dict[str, Any], namespace: str = "standard") -> None:
def getessence(self, path: str) -> Info:
return self.getinfo(path, [ESSENCE_NAMESPACE])

def create_drive(self, name: str) -> _EssenceDriveFS:
drive = _EssenceDriveFS(name)
self.add_fs(name, drive)
def create_drive(self, alias: str, name: str) -> _EssenceDriveFS:
drive = _EssenceDriveFS(alias, name)
first_drive = len([*self.iterate_fs()]) == 0
self.add_fs(
alias, drive, write=first_drive
) # TODO see if name would work here, using alias because that is what it originally was
return drive

def _delegate(self, path):
Expand All @@ -340,39 +360,6 @@ def _delegate(self, path):
return super()._delegate(path)


# if __name__ == "__main__":
# test_file = File("test.txt", b"This is a Test!", StorageType.STORE, False, None)
# test_folder = Folder("Test", [], [test_file])
# data_folders = [test_folder]
# data_files = []
# data_drive = Drive("data", "", data_folders, data_files)
# attr_drive = Drive("attr", "", [], [test_file])
# archive = Archive("Test", None, [data_drive, attr_drive])
#
# with SGAFS() as fs:
# data_fs = MemoryFS()
# fs.add_fs("data", data_fs)
# data_dir = data_fs.makedir("Test Data")
# with data_dir.open("sample_data.txt", "wb") as data_sample_text:
# data_sample_text.write(b"Sample Data Text!")
#
# attr_fs = MemoryFS()
# fs.add_fs("attr", attr_fs)
# attr_dir = attr_fs.makedir("Test Attr")
# with attr_dir.open("sample_attr.txt", "wb") as attr_sample_text:
# attr_sample_text.write(b"Sample Attr Text!")
#
# for root, folders, files in fs.walk():
# print(root, "\n\t", folders, "\n\t", files)
#
# for name, sub_fs in fs.iterate_fs():
# print(name)
# for root, folders, files in sub_fs.walk():
# print("\t", root, "\n\t\t", folders, "\n\t\t", files)
#
# print(fs.getinfo("/", ["basic", "access"]).raw)
# pass

__all__ = [
"ESSENCE_NAMESPACE",
"EssenceFSHandler",
Expand Down
70 changes: 53 additions & 17 deletions src/relic/sga/core/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
Iterable,
TypeVar,
Generic,
Type,
)

from fs.base import FS
Expand Down Expand Up @@ -220,6 +221,8 @@ def _write_data(data: bytes, stream: BinaryIO) -> int:


def _get_or_write_name(name: str, stream: BinaryIO, lookup: Dict[str, int]) -> int:
# Tools don't like "/" so coerce "/" to "\"
name = name.replace("/", "\\")
if name in lookup:
return lookup[name]

Expand Down Expand Up @@ -399,7 +402,7 @@ def assemble_drive(
drive_folder_index = drive_def.root_folder - folder_offset
drive_folder_def = local_folder_defs[drive_folder_index]

drive = essence_fs.create_drive(drive_def.alias)
drive = essence_fs.create_drive(drive_def.alias, drive_def.name)
self._assemble_container(
drive,
drive_folder_def.file_range,
Expand Down Expand Up @@ -508,41 +511,70 @@ def flatten_folder_collection(self, container_fs: FS, path: str) -> Tuple[int, i
self.flat_folders[subfolder_start:subfolder_end] = subfolder_defs
return subfolder_start, subfolder_end

def _flatten_folder_names(self, fs: FS, path: str) -> None:
folders = [file_info.name for file_info in fs.scandir("/") if file_info.is_dir]
files = [file_info.name for file_info in fs.scandir("/") if file_info.is_file]

if len(path) > 0 and path[0] == "/":
path = path[1:] # strip leading '/'
_get_or_write_name(path, self.name_stream, self.flat_names)

for fold_path in folders:
full_fold_path = f"{path}/{fold_path}"
full_fold_path = str(full_fold_path).split(":", 1)[
-1
] # Strip 'alias:' from path
if full_fold_path[0] == "/":
full_fold_path = full_fold_path[1:] # strip leading '/'
_get_or_write_name(full_fold_path, self.name_stream, self.flat_names)

for file_path in files:
_get_or_write_name(file_path, self.name_stream, self.flat_names)

def disassemble_folder(self, folder_fs: FS, path: str) -> FolderDef:
folder_def = FolderDef(None, None, None) # type: ignore

# Subfiles
subfile_range = self.flatten_file_collection(folder_fs)
# Subfolders
# # Since Relic typically uses the first folder as the root folder; I will try to preserve that parent folders come before their child folders
subfolder_range = self.flatten_folder_collection(folder_fs, path)
# Write Name
self._flatten_folder_names(folder_fs, path)

folder_name = str(path).split(":", 1)[-1] # Strip 'alias:' from path

if folder_name[0] == "/":
folder_name = folder_name[1:] # strip leading '/'

folder_def.name_pos = _get_or_write_name(
folder_name, self.name_stream, self.flat_names
)

# Subfolders
# # Since Relic typically uses the first folder as the root folder; I will try to preserve that parent folders come before their child folders
subfolder_range = self.flatten_folder_collection(folder_fs, path)

# Subfiles
subfile_range = self.flatten_file_collection(folder_fs)

folder_def.file_range = subfile_range
folder_def.folder_range = subfolder_range

return folder_def

def disassemble_drive(self, drive: _EssenceDriveFS, alias: str) -> DriveDef:
name = ""
def disassemble_drive(self, drive: _EssenceDriveFS) -> DriveDef:
name = drive.name
folder_name = ""
alias = drive.alias
drive_folder_def = FolderDef(None, None, None) # type: ignore
self._flatten_folder_names(drive, folder_name)

root_folder = len(self.flat_folders)
folder_start = len(self.flat_folders)
file_start = len(self.flat_files)
self.flat_folders.append(drive_folder_def)

# Name should be an empty string?
drive_folder_def.name_pos = _get_or_write_name(
name, self.name_stream, self.flat_names
folder_name, self.name_stream, self.flat_names
)
drive_folder_def.file_range = self.flatten_file_collection(drive)
drive_folder_def.folder_range = self.flatten_folder_collection(drive, name)
drive_folder_def.folder_range = self.flatten_folder_collection(
drive, folder_name
)

folder_end = len(self.flat_folders)
file_end = len(self.flat_files)
Expand Down Expand Up @@ -593,9 +625,9 @@ def write_toc(self) -> TocBlock:
)

def disassemble(self) -> TocBlock:
for name, drive_fs in self.fs.iterate_fs():
for _, drive_fs in self.fs.iterate_fs():
drive_fs = typing.cast(_EssenceDriveFS, drive_fs)
drive_def = self.disassemble_drive(drive_fs, name)
drive_def = self.disassemble_drive(drive_fs)
self.flat_drives.append(drive_def)

return self.write_toc()
Expand Down Expand Up @@ -740,6 +772,8 @@ def __init__(
gen_empty_meta: Callable[[], TMetaBlock],
finalize_meta: Callable[[BinaryIO, TMetaBlock], None],
meta2def: Callable[[Dict[str, object]], TFileDef],
assembler: Optional[Type[FSAssembler[TFileDef]]] = None,
disassembler: Optional[Type[FSDisassembler[TFileDef]]] = None,
):
self.version = version
self.meta_serializer = meta_serializer
Expand All @@ -752,6 +786,8 @@ def __init__(
self.gen_empty_meta = gen_empty_meta
self.finalize_meta = finalize_meta
self.meta2def = meta2def
self.assembler_type = assembler or FSAssembler
self.disassembler_type = disassembler or FSDisassembler

def read(self, stream: BinaryIO) -> EssenceFS:
# Magic & Version; skippable so that we can check for a valid file and read the version elsewhere
Expand All @@ -773,7 +809,7 @@ def read(self, stream: BinaryIO) -> EssenceFS:
name, metadata = meta_block.name, self.assemble_meta(
stream, meta_block, toc_meta_block
)
assembler: FSAssembler[TFileDef] = FSAssembler(
assembler: FSAssembler[TFileDef] = self.assembler_type(
stream=stream,
ptrs=meta_block.ptrs,
toc=toc_block,
Expand Down Expand Up @@ -806,7 +842,7 @@ def write(self, stream: BinaryIO, essence_fs: EssenceFS) -> int:
with BytesIO() as data_stream:
with BytesIO() as toc_stream:
with BytesIO() as name_stream:
disassembler = FSDisassembler(
disassembler: FSDisassembler[TFileDef] = self.disassembler_type(
fs=essence_fs,
toc_stream=toc_stream,
data_stream=data_stream,
Expand Down
7 changes: 4 additions & 3 deletions tests/issues/test_issue_39.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def _generate_fake_osfs() -> FS:


def _pack_fake_osfs(osfs: FS, name: str) -> EssenceFS:
# Create 'SGA'
# Create 'SGA' V2
sga = EssenceFS()
sga.setmeta(
{
Expand All @@ -57,13 +57,14 @@ def _pack_fake_osfs(osfs: FS, name: str) -> EssenceFS:
"essence",
)

alias = "test"
alias = "data"
name = "test data"
sga_drive = None # sga.create_drive(alias)
for path in osfs.walk.files():
if (
sga_drive is None
): # Lazily create drive, to avoid empty drives from being created
sga_drive = sga.create_drive(alias)
sga_drive = sga.create_drive(alias, name)

if "stream" in path:
storage = StorageType.STREAM_COMPRESS
Expand Down
Loading

0 comments on commit fca6e51

Please sign in to comment.