Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
KOLANICH committed Oct 15, 2023
0 parents commit eaf2d36
Show file tree
Hide file tree
Showing 17 changed files with 735 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/.templateMarker
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
KOLANICH/python_project_boilerplate
8 changes: 8 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version: 2
updates:
- package-ecosystem: "pip"
directory: "/"
schedule:
interval: "daily"
allow:
- dependency-type: "all"
15 changes: 15 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name: CI
on:
push:
branches: [master]
pull_request:
branches: [master]

jobs:
build:
runs-on: ubuntu-22.04
steps:
- name: typical python workflow
uses: KOLANICH-GHActions/typical-python-workflow@master
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
12 changes: 12 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/kaitai_struct_formats

__pycache__
*.pyc
*.pyo
/*.egg-info
/build
/dist
/.eggs
/monkeytype.sqlite3
/.coverage
*.py,cover
1 change: 1 addition & 0 deletions Code_Of_Conduct.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
No codes of conduct!
4 changes: 4 additions & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
include UNLICENSE
include *.md
include tests
include .editorconfig
9 changes: 9 additions & 0 deletions ReadMe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
lime.py [![Unlicensed work](https://raw.githubusercontent.com/unlicense/unlicense.org/master/static/favicon.png)](https://unlicense.org/)
===============
~~[wheel](https://gitlab.com/KOLANICH/lime.py/-/jobs/artifacts/master/raw/dist/lime-0.CI-py3-none-any.whl?job=build)~~
~~![GitLab Build Status](https://gitlab.com/KOLANICH/lime.py/badges/master/pipeline.svg)~~
~~![GitLab Coverage](https://gitlab.com/KOLANICH/lime.py/badges/master/coverage.svg)~~
[![Libraries.io Status](https://img.shields.io/librariesio/github/KOLANICH/lime.py.svg)](https://libraries.io/github/KOLANICH/lime.py)
[![Code style: antiflash](https://img.shields.io/badge/code%20style-antiflash-FFF.svg)](https://codeberg.org/KOLANICH-tools/antiflash.py)

Just a lib implementing object-oriented interface to a LiME file format. Allows you to generate/read them using python. You are likely already familiar to the API, if you used other serialization libs.
24 changes: 24 additions & 0 deletions UNLICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
This is free and unencumbered software released into the public domain.

Anyone is free to copy, modify, publish, use, compile, sell, or
distribute this software, either in source code form or as a compiled
binary, for any purpose, commercial or non-commercial, and by any
means.

In jurisdictions that recognize copyright laws, the author or authors
of this software dedicate any and all copyright interest in the
software to the public domain. We make this dedication for the benefit
of the public at large and to the detriment of our heirs and
successors. We intend this dedication to be an overt act of
relinquishment in perpetuity of all present and future rights to this
software under copyright law.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.

For more information, please refer to <https://unlicense.org/>
170 changes: 170 additions & 0 deletions lime/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import struct
import sys
import typing
from collections.abc import ByteString, Mapping
from enum import IntEnum
from io import BytesIO
from warnings import warn

from rangeslicetools.tree import IndexProto, RangesTree
from rangeslicetools.utils import slen

__all__ = ("Signatures", "defaultFormat", "dumpRecord", "dump", "dumps", "load", "loads")


ByteStringT = typing.Union[ByteString, "mmap.mmap"]
_memFragTypeMap = (int, ByteStringT)
MemFragT = typing.Tuple[_memFragTypeMap]
MemFragIter = typing.Iterator[MemFragT]
MemFragMapping = typing.Mapping[_memFragTypeMap]


headerFormatStr = "IIQQQ"
headerSize = struct.calcsize(headerFormatStr)

headerStruct = {
None: struct.Struct("=" + headerFormatStr),
True: struct.Struct(">" + headerFormatStr),
False: struct.Struct("<" + headerFormatStr),
}


class Signatures(IntEnum):
LiME = struct.unpack(">I", b"LiME")[0]
AVML = struct.unpack("<I", b"AVML")[0]

@classmethod
def _missing_(cls, value):
raise ValueError("Invalid signature: " + repr(struct.pack("=I", value)), value)


compressionHeaderSizes = {
Signatures.LiME: 0,
Signatures.AVML: None # ToDo!
}

# If the first byte is L, then the format is BE
formats = {
Signatures.LiME: (0, lambda d: d, lambda d: d),
}
defaultFormat = Signatures.LiME

try:
import snappy

defaultFormat = Signatures.AVML
formats[Signatures.AVML] = (1, snappy.decompress, snappy.compress)
except BaseException:
warn("AVML format requires `snappy` compression. Install its lib and its python bindings.")


def dumpRecord(stream, start: int, data: ByteStringT, format: Signatures = defaultFormat, isBE: typing.Optional[bool] = None) -> int:
version, processor, unprocessor = formats[format]
stream.write(headerStruct[isBE].pack(format, version, start, start + len(data) - 1, 0))
written = headerSize
dataTransformed = unprocessor(data)
stream.write(dataTransformed)
written += len(dataTransformed)
return written


def dump(t: MemFragMapping, stream, format: Signatures = defaultFormat, isBE: typing.Optional[bool] = None) -> int:
if isinstance(t, Mapping):
t = t.items()

total = 0
for k, v in t:
total += dumpRecord(stream, k, v, format=format, isBE=isBE)
return total


def dumps(t: MemFragMapping, format: Signatures = defaultFormat, isBE: typing.Optional[bool] = None) -> bytes:
with BytesIO() as s:
dump(t, s, format=format, isBE=isBE)
#return s.getbuffer()
return s.getvalue()


def sortOfsDataPairsList(ofsDataPairs) -> typing.List[MemFragT]:
return sorted(ofsDataPairs, key=lambda x: x[0])


ctorCustomizers = {
"rangeslicetools.tree": (
"RangesTree", lambda loaded: RangesTree.build(*zip((slice(ofs, ofs + len(d)), d) for ofs, d in loaded))
)
}


def estimateEmptySize(recordCount: int, format: Signatures) -> int:
"""Estimates size of a dump blob containing all empty records. In this case there is no data, so no compression headers."""
return recordCount * headerSize


def estimateMinSize(recordCount: int, format: Signatures) -> int:
"""Estimates lower bound of size of a dump blob containing all nonempty records, assumming that every record data in compressed form occupies 0 bytes."""
recordOverhead = headerSize + compressionHeaderSizes[format]
return recordCount * recordOverhead


def estimateMaxSize(recordCount: int, totalDataSize: int, format: Signatures) -> int:
"""Estimates upper bound of size of a dump blob. When compression is used, the overall size may be less, than predicted. When compression is used the overall size may be more than uncompressed depending on the data."""
return estimateMinSize(recordCount=recordCount, format=format) + totalDataSize


def loadRecordNative(stream, desiredResultCtor=sortOfsDataPairsList) -> typing.Optional[MemFragT]:
header = stream.read(headerSize)
if not header:
return None

isBE = header[0] == ord(b"L")
(format, version, start, end, padding) = headerStruct[isBE].unpack(header)

format = Signatures(format)
expectedVersion, processor, unprocessor = formats[format]

if version != expectedVersion:
raise ValueError("Version for the format must be ", format, expectedVersion)

end += 1
l = end - start
if l < 0:
raise ValueError("end < start", end, start)
data = processor(stream.read(l))
return (start, data)


def loadNative(stream) -> typing.Iterator[MemFragT]:
r = loadRecordNative(stream)

while r:
yield r
r = loadRecordNative(stream)


def loadKaitai(stream) -> typing.Iterator[MemFragT]:
from kaitaistruct import KaitaiStream

from lime.kaitai.lime_avml_memory_dump import LimeAvmlMemoryDump

ks = KaitaiStream(stream)
p = LimeAvmlMemoryDump(ks)
for r in p.records:
yield (r.header.range.start, r.payload)


def load(stream, desiredResultCtor=sortOfsDataPairsList, loaderBackend=loadNative):
customizer = desiredResultCtor
customizerDtor = ctorCustomizers.get(desiredResultCtor.__module__, None)
if customizerDtor is not None:
className, ctor = customizerDtor
cls2Check = getattr(sys.modules[desiredResultCtor.__module__], className, None)
if cls2Check is not None and isinstance(cls2Check, type) and issubclass(desiredResultCtor, cls2Check):
customizer = ctor

return desiredResultCtor(loaderBackend(stream))


def loads(d, desiredResultCtor=sortOfsDataPairsList, loaderBackend=loadNative) -> MemFragMapping:
with BytesIO(d) as s:
return load(s, desiredResultCtor=desiredResultCtor, loaderBackend=loaderBackend)
76 changes: 76 additions & 0 deletions lime/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import inspect
import sys
from os import isatty, truncate
from pathlib import Path

from plumbum import cli

from . import Signatures, defaultFormat, dump, load
from .testTools import estimateMaxRandDumpSize, genRandDump
from .utils import *


class LiMECLI(cli.Application):
pass


@LiMECLI.subcommand("2sparse")
class LiME2Sparse(cli.Application):
def main(self, limeFile):
limeFile = Path(limeFile)
outFile = limeFile.parent / limeFile.stem

with limeFile.open("rb") as iF:
lFn = iF.fileno()
with mmap.mmap(lFn, limeFile.stat().st_size, access=mmap.ACCESS_READ) as lM:
loaded = load(lM)
dumpSparse(loaded, outFile)


class DumpCommand(cli.Application):
format = cli.SwitchAttr(["-f", "--format"], Signatures, default=defaultFormat, help="Selects the format to be used")


def genParams(f, locs, randDumpArgs):
for par in inspect.signature(f).parameters.values():
shortArfName, doc = randDumpArgs[par.name]
locs[par.name] = cli.SwitchAttr(["-" + shortArfName, "--" + par.name], par.annotation, help=doc, default=par.default)


@LiMECLI.subcommand("genRand")
class RandDump(DumpCommand):
genParams(
genRandDump,
locals(),
{
"count": ("c", "sets count of records"),
"minAddr": ("a", "sets minimum address of a dump"),
"maxAddr": ("A", "sets maximum address of a dump"),
"minSize": ("s", "Sets minimum size of uncompressed data in the record"),
"maxSize": ("S", "Sets maximum size of uncompressed data in the record"),
},
)

def dumpRandomData(self, buf):
l = estimateMaxRandDumpSize(count=self.count, maxSize=self.maxSize, format=self.format)
dI = list(genRandDump(count=self.count, minAddr=self.minAddr, minSize=self.minSize, maxSize=self.maxSize, maxAddr=self.maxAddr)) # iterator, evaluates lazily
fn = buf.fileno()
fallocate_native(fn, 0, 0, l)
l = 0
with mmap.mmap(fn, l, flags=mmap.MAP_SHARED, access=mmap.ACCESS_WRITE) as fM:
l = dump(dI, fM, format=self.format)
truncate(fn, l)

def main(self, limeFile="-"):
if limeFile == "-":
buf = sys.stdout.buffer
if isatty(buf.fileno()):
raise Exception("Refusing output binary data into terminal!")
self.dumpRandomData(buf)
else:
with open(limeFile, "wb+") as buf:
self.dumpRandomData(buf)


if __name__ == "__main__":
LiMECLI.run()
Empty file added lime/kaitai/__init__.py
Empty file.
Loading

0 comments on commit eaf2d36

Please sign in to comment.