Skip to content

Commit

Permalink
feat(processing): pattern auto-identification
Browse files Browse the repository at this point in the history
Integrate pattern recognition for unknown chunks in order to help
identifying parts. Here we simply detect padding, but this could be
extended in the future to detect re-occuring patterns, encrypted
content, or even fingerprints.

Co-authored-by: Krisztián Fekete <1246751+e3krisztian@users.noreply.github.com>
  • Loading branch information
qkaiser and e3krisztian committed Jan 3, 2024
1 parent 00b25fa commit 89cd491
Show file tree
Hide file tree
Showing 11 changed files with 69 additions and 10 deletions.
15 changes: 10 additions & 5 deletions tests/test_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def hello_kitty_task_results(
extract_root: Path,
hello_id: str,
kitty_id: str,
padding_id: str,
container_id="",
start_depth=0,
):
Expand Down Expand Up @@ -133,12 +134,14 @@ def hello_kitty_task_results(
size=7,
entropy=None,
),
UnknownChunkReport(
id=ANY,
ChunkReport(
id=padding_id,
start_offset=263,
end_offset=264,
size=1,
entropy=None,
handler_name="padding",
is_encrypted=False,
extraction_reports=[],
),
ChunkReport(
id=hello_id,
Expand Down Expand Up @@ -286,13 +289,14 @@ def test_flat_report_structure(hello_kitty: Path, extract_root):
task_results = get_normalized_task_results(process_result)

# extract the ids from the chunks
hello_id, kitty_id = get_chunk_ids(task_results[0])
padding_id, hello_id, kitty_id = get_chunk_ids(task_results[0])

assert task_results == hello_kitty_task_results(
hello_kitty=hello_kitty,
extract_root=extract_root,
hello_id=hello_id,
kitty_id=kitty_id,
padding_id=padding_id,
)


Expand Down Expand Up @@ -416,7 +420,7 @@ def test_chunk_in_chunk_report_structure(hello_kitty_container: Path, extract_ro
# and they should be the only differences
[main_id] = get_chunk_ids(task_results[0])

hello_id, kitty_id = get_chunk_ids(task_results[2])
padding_id, hello_id, kitty_id = get_chunk_ids(task_results[2])

# We test, that the container is referenced from the internal file
# through the chunk id `main_id`
Expand All @@ -428,6 +432,7 @@ def test_chunk_in_chunk_report_structure(hello_kitty_container: Path, extract_ro
extract_root=extract_root / "container_extract",
hello_id=hello_id,
kitty_id=kitty_id,
padding_id=padding_id,
container_id=main_id,
start_depth=1,
)
Expand Down
13 changes: 10 additions & 3 deletions unblob/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
import errno
import os
from pathlib import Path
from typing import Union

from structlog import get_logger

from .file_utils import carve, is_safe_path
from .models import Chunk, File, TaskResult, UnknownChunk, ValidChunk
from .models import Chunk, File, PaddingChunk, TaskResult, UnknownChunk, ValidChunk
from .report import MaliciousSymlinkRemoved

logger = get_logger()
Expand Down Expand Up @@ -113,8 +114,14 @@ def _fix_extracted_directory(directory: Path):
_fix_extracted_directory(outdir)


def carve_unknown_chunk(extract_dir: Path, file: File, chunk: UnknownChunk) -> Path:
filename = f"{chunk.start_offset}-{chunk.end_offset}.unknown"
def carve_unknown_chunk(
extract_dir: Path, file: File, chunk: Union[UnknownChunk, PaddingChunk]
) -> Path:
extension = "unknown"
if isinstance(chunk, PaddingChunk):
extension = "padding"

filename = f"{chunk.start_offset}-{chunk.end_offset}.{extension}"
carve_path = extract_dir / filename
logger.info("Extracting unknown chunk", path=carve_path, chunk=chunk)
carve_chunk_to_file(carve_path, file, chunk)
Expand Down
22 changes: 22 additions & 0 deletions unblob/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,28 @@ def as_report(self, entropy: Optional[EntropyReport]) -> UnknownChunkReport:
)


@attr.define(repr=False)
class PaddingChunk(Chunk):
r"""Gaps between valid chunks or otherwise unknown chunks.
Important for manual analysis, and analytical certanity: for example
entropy, other chunks inside it, metadata, etc.
"""

def as_report(
self, entropy: Optional[EntropyReport] # noqa: ARG002
) -> ChunkReport:
return ChunkReport(
id=self.id,
start_offset=self.start_offset,
end_offset=self.end_offset,
size=self.size,
is_encrypted=False,
handler_name="padding",
extraction_reports=[],
)


@attrs.define
class MultiFile(Blob):
name: str = attr.field(kw_only=True)
Expand Down
29 changes: 27 additions & 2 deletions unblob/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import shutil
from operator import attrgetter
from pathlib import Path
from typing import Iterable, List, Optional, Sequence, Set, Tuple, Type
from typing import Iterable, List, Optional, Sequence, Set, Tuple, Type, Union

import attr
import magic
Expand All @@ -24,6 +24,7 @@
ExtractError,
File,
MultiFile,
PaddingChunk,
ProcessResult,
Task,
TaskResult,
Expand Down Expand Up @@ -458,6 +459,29 @@ def _iterate_directory(self, extract_dirs, processed_paths):
)


def is_padding(file: File, chunk: UnknownChunk):
return len(set(file[chunk.start_offset : chunk.end_offset])) == 1


def process_patterns(
unknown_chunks: List[UnknownChunk], file: File
) -> List[Union[UnknownChunk, PaddingChunk]]:
processed_chunks = []
for unknown_chunk in unknown_chunks:
if is_padding(file, unknown_chunk):
processed_chunks.append(
PaddingChunk(
start_offset=unknown_chunk.start_offset,
end_offset=unknown_chunk.end_offset,
id=unknown_chunk.id,
file=unknown_chunk.file,
)
)
else:
processed_chunks.append(unknown_chunk)
return processed_chunks


class _FileTask:
def __init__(
self,
Expand Down Expand Up @@ -495,6 +519,7 @@ def process(self):
)
outer_chunks = remove_inner_chunks(all_chunks)
unknown_chunks = calculate_unknown_chunks(outer_chunks, self.size)
unknown_chunks = process_patterns(unknown_chunks, file)
assign_file_to_chunks(outer_chunks, file=file)
assign_file_to_chunks(unknown_chunks, file=file)

Expand All @@ -511,7 +536,7 @@ def _process_chunks(
self,
file: File,
outer_chunks: List[ValidChunk],
unknown_chunks: List[UnknownChunk],
unknown_chunks: List[Union[UnknownChunk, PaddingChunk]],
):
if unknown_chunks:
logger.warning("Found unknown Chunks", chunks=unknown_chunks)
Expand Down

0 comments on commit 89cd491

Please sign in to comment.