Skip to content

Commit

Permalink
first shot at pattern auto-identification
Browse files Browse the repository at this point in the history
  • Loading branch information
qkaiser committed Jan 2, 2024
1 parent c722811 commit 89f76da
Show file tree
Hide file tree
Showing 12 changed files with 77 additions and 6 deletions.
3 changes: 2 additions & 1 deletion tests/test_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
ChunkReport,
FileMagicReport,
HashReport,
PaddingChunkReport,
StatReport,
UnknownChunkReport,
)
Expand Down Expand Up @@ -133,7 +134,7 @@ def hello_kitty_task_results(
size=7,
entropy=None,
),
UnknownChunkReport(
PaddingChunkReport(
id=ANY,
start_offset=263,
end_offset=264,
Expand Down
13 changes: 10 additions & 3 deletions unblob/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
import errno
import os
from pathlib import Path
from typing import Union

from structlog import get_logger

from .file_utils import carve, is_safe_path
from .models import Chunk, File, TaskResult, UnknownChunk, ValidChunk
from .models import Chunk, File, PaddingChunk, TaskResult, UnknownChunk, ValidChunk
from .report import MaliciousSymlinkRemoved

logger = get_logger()
Expand Down Expand Up @@ -113,8 +114,14 @@ def _fix_extracted_directory(directory: Path):
_fix_extracted_directory(outdir)


def carve_unknown_chunk(extract_dir: Path, file: File, chunk: UnknownChunk) -> Path:
filename = f"{chunk.start_offset}-{chunk.end_offset}.unknown"
def carve_unknown_chunk(
extract_dir: Path, file: File, chunk: Union[UnknownChunk, PaddingChunk]
) -> Path:
extension = "unknown"
if isinstance(chunk, PaddingChunk):
extension = "padding"

filename = f"{chunk.start_offset}-{chunk.end_offset}.{extension}"
carve_path = extract_dir / filename
logger.info("Extracting unknown chunk", path=carve_path, chunk=chunk)
carve_chunk_to_file(carve_path, file, chunk)
Expand Down
22 changes: 22 additions & 0 deletions unblob/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
EntropyReport,
ErrorReport,
MultiFileReport,
PaddingChunkReport,
Report,
UnknownChunkReport,
)
Expand Down Expand Up @@ -147,6 +148,27 @@ def as_report(self, entropy: Optional[EntropyReport]) -> UnknownChunkReport:
)


@attr.define(repr=False)
class PaddingChunk(Chunk):
r"""Gaps between valid chunks or otherwise unknown chunks.
Important for manual analysis, and analytical certanity: for example
entropy, other chunks inside it, metadata, etc.
These are not extracted, just logged for information purposes and further analysis,
like most common bytes (like \x00 and \xFF), ASCII strings, high entropy, etc.
"""

def as_report(self, entropy: Optional[EntropyReport]) -> PaddingChunkReport:
return PaddingChunkReport(
id=self.id,
start_offset=self.start_offset,
end_offset=self.end_offset,
size=self.size,
entropy=entropy,
)


@attrs.define
class MultiFile(Blob):
name: str = attr.field(kw_only=True)
Expand Down
35 changes: 33 additions & 2 deletions unblob/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import shutil
from operator import attrgetter
from pathlib import Path
from typing import Iterable, List, Optional, Sequence, Set, Tuple, Type
from typing import Iterable, List, Optional, Sequence, Set, Tuple, Type, Union

import attr
import magic
Expand All @@ -24,6 +24,7 @@
ExtractError,
File,
MultiFile,
PaddingChunk,
ProcessResult,
Task,
TaskResult,
Expand Down Expand Up @@ -450,6 +451,35 @@ def _iterate_directory(self, extract_dirs, processed_paths):
)


def is_padding(file: File, chunk: UnknownChunk):
return not any(
current_byte != next_byte
for current_byte, next_byte in zip(
file[chunk.start_offset : chunk.end_offset],
file[chunk.start_offset + 1 : chunk.end_offset],
)
)


def process_patterns(
unknown_chunks: List[UnknownChunk], file: File
) -> List[Union[UnknownChunk, PaddingChunk]]:
processed_chunks = []
for unknown_chunk in unknown_chunks:
if is_padding(file, unknown_chunk):
processed_chunks.append(
PaddingChunk(
start_offset=unknown_chunk.start_offset,
end_offset=unknown_chunk.end_offset,
id=unknown_chunk.id,
file=unknown_chunk.file,
)
)
else:
processed_chunks.append(unknown_chunk)
return processed_chunks


class _FileTask:
def __init__(
self,
Expand Down Expand Up @@ -487,6 +517,7 @@ def process(self):
)
outer_chunks = remove_inner_chunks(all_chunks)
unknown_chunks = calculate_unknown_chunks(outer_chunks, self.size)
unknown_chunks = process_patterns(unknown_chunks, file)
assign_file_to_chunks(outer_chunks, file=file)
assign_file_to_chunks(unknown_chunks, file=file)

Expand All @@ -503,7 +534,7 @@ def _process_chunks(
self,
file: File,
outer_chunks: List[ValidChunk],
unknown_chunks: List[UnknownChunk],
unknown_chunks: List[Union[UnknownChunk, PaddingChunk]],
):
if unknown_chunks:
logger.warning("Found unknown Chunks", chunks=unknown_chunks)
Expand Down
10 changes: 10 additions & 0 deletions unblob/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,16 @@ class UnknownChunkReport(Report):
entropy: Optional[EntropyReport]


@final
@attr.define(kw_only=True, frozen=True)
class PaddingChunkReport(Report):
id: str # noqa: A003
start_offset: int
end_offset: int
size: int
entropy: Optional[EntropyReport]


@final
@attr.define(kw_only=True, frozen=True)
class MultiFileReport(Report):
Expand Down

0 comments on commit 89f76da

Please sign in to comment.