Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unknown chunks auto-identification (padding) #697

Merged
merged 1 commit into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions tests/test_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def hello_kitty_task_results(
extract_root: Path,
hello_id: str,
kitty_id: str,
padding_id: str,
container_id="",
start_depth=0,
):
Expand Down Expand Up @@ -133,12 +134,14 @@ def hello_kitty_task_results(
size=7,
entropy=None,
),
UnknownChunkReport(
id=ANY,
ChunkReport(
id=padding_id,
start_offset=263,
end_offset=264,
size=1,
entropy=None,
handler_name="padding",
is_encrypted=False,
extraction_reports=[],
),
ChunkReport(
id=hello_id,
Expand Down Expand Up @@ -286,13 +289,14 @@ def test_flat_report_structure(hello_kitty: Path, extract_root):
task_results = get_normalized_task_results(process_result)

# extract the ids from the chunks
hello_id, kitty_id = get_chunk_ids(task_results[0])
padding_id, hello_id, kitty_id = get_chunk_ids(task_results[0])

assert task_results == hello_kitty_task_results(
hello_kitty=hello_kitty,
extract_root=extract_root,
hello_id=hello_id,
kitty_id=kitty_id,
padding_id=padding_id,
)


Expand Down Expand Up @@ -416,7 +420,7 @@ def test_chunk_in_chunk_report_structure(hello_kitty_container: Path, extract_ro
# and they should be the only differences
[main_id] = get_chunk_ids(task_results[0])

hello_id, kitty_id = get_chunk_ids(task_results[2])
padding_id, hello_id, kitty_id = get_chunk_ids(task_results[2])

# We test, that the container is referenced from the internal file
# through the chunk id `main_id`
Expand All @@ -428,6 +432,7 @@ def test_chunk_in_chunk_report_structure(hello_kitty_container: Path, extract_ro
extract_root=extract_root / "container_extract",
hello_id=hello_id,
kitty_id=kitty_id,
padding_id=padding_id,
container_id=main_id,
start_depth=1,
)
Expand Down
13 changes: 10 additions & 3 deletions unblob/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
import errno
import os
from pathlib import Path
from typing import Union

from structlog import get_logger

from .file_utils import carve, is_safe_path
from .models import Chunk, File, TaskResult, UnknownChunk, ValidChunk
from .models import Chunk, File, PaddingChunk, TaskResult, UnknownChunk, ValidChunk
from .report import MaliciousSymlinkRemoved

logger = get_logger()
Expand Down Expand Up @@ -113,8 +114,14 @@ def _fix_extracted_directory(directory: Path):
_fix_extracted_directory(outdir)


def carve_unknown_chunk(extract_dir: Path, file: File, chunk: UnknownChunk) -> Path:
filename = f"{chunk.start_offset}-{chunk.end_offset}.unknown"
def carve_unknown_chunk(
extract_dir: Path, file: File, chunk: Union[UnknownChunk, PaddingChunk]
) -> Path:
extension = "unknown"
if isinstance(chunk, PaddingChunk):
extension = "padding"

filename = f"{chunk.start_offset}-{chunk.end_offset}.{extension}"
carve_path = extract_dir / filename
logger.info("Extracting unknown chunk", path=carve_path, chunk=chunk)
carve_chunk_to_file(carve_path, file, chunk)
Expand Down
22 changes: 22 additions & 0 deletions unblob/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,28 @@ def as_report(self, entropy: Optional[EntropyReport]) -> UnknownChunkReport:
)


@attr.define(repr=False)
class PaddingChunk(Chunk):
r"""Gaps between valid chunks or otherwise unknown chunks.

Important for manual analysis, and analytical certanity: for example
entropy, other chunks inside it, metadata, etc.
"""

def as_report(
self, entropy: Optional[EntropyReport] # noqa: ARG002
) -> ChunkReport:
return ChunkReport(
id=self.id,
start_offset=self.start_offset,
end_offset=self.end_offset,
size=self.size,
is_encrypted=False,
handler_name="padding",
extraction_reports=[],
)


@attrs.define
class MultiFile(Blob):
name: str = attr.field(kw_only=True)
Expand Down
29 changes: 27 additions & 2 deletions unblob/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import shutil
from operator import attrgetter
from pathlib import Path
from typing import Iterable, List, Optional, Sequence, Set, Tuple, Type
from typing import Iterable, List, Optional, Sequence, Set, Tuple, Type, Union

import attr
import magic
Expand All @@ -24,6 +24,7 @@
ExtractError,
File,
MultiFile,
PaddingChunk,
ProcessResult,
Task,
TaskResult,
Expand Down Expand Up @@ -458,6 +459,29 @@ def _iterate_directory(self, extract_dirs, processed_paths):
)


def is_padding(file: File, chunk: UnknownChunk):
return len(set(file[chunk.start_offset : chunk.end_offset])) == 1


def process_patterns(
unknown_chunks: List[UnknownChunk], file: File
) -> List[Union[UnknownChunk, PaddingChunk]]:
processed_chunks = []
for unknown_chunk in unknown_chunks:
if is_padding(file, unknown_chunk):
processed_chunks.append(
PaddingChunk(
start_offset=unknown_chunk.start_offset,
end_offset=unknown_chunk.end_offset,
id=unknown_chunk.id,
file=unknown_chunk.file,
)
)
else:
processed_chunks.append(unknown_chunk)
return processed_chunks


class _FileTask:
def __init__(
self,
Expand Down Expand Up @@ -495,6 +519,7 @@ def process(self):
)
outer_chunks = remove_inner_chunks(all_chunks)
unknown_chunks = calculate_unknown_chunks(outer_chunks, self.size)
unknown_chunks = process_patterns(unknown_chunks, file)
assign_file_to_chunks(outer_chunks, file=file)
assign_file_to_chunks(unknown_chunks, file=file)

Expand All @@ -511,7 +536,7 @@ def _process_chunks(
self,
file: File,
outer_chunks: List[ValidChunk],
unknown_chunks: List[UnknownChunk],
unknown_chunks: List[Union[UnknownChunk, PaddingChunk]],
):
if unknown_chunks:
logger.warning("Found unknown Chunks", chunks=unknown_chunks)
Expand Down
Loading