Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve robustness of journal plugin #872

Merged
merged 15 commits into from
Oct 16, 2024
Merged
199 changes: 108 additions & 91 deletions dissect/target/plugins/os/unix/log/journal.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import annotations

import logging
import lzma
from typing import BinaryIO, Callable, Iterator
from typing import Any, BinaryIO, Callable, Iterator

import zstandard
from dissect.cstruct import cstruct
Expand All @@ -13,6 +14,8 @@
from dissect.target.helpers.record import TargetRecordDescriptor
from dissect.target.plugin import Plugin, export

log = logging.getLogger(__name__)

# The events have undocumented fields that are not part of the record
JournalRecord = TargetRecordDescriptor(
"linux/log/journal",
Expand All @@ -28,7 +31,7 @@
("varint", "errno"),
("string", "invocation_id"),
("string", "user_invocation_id"),
("varint", "syslog_facility"),
("string", "syslog_facility"),
("string", "syslog_identifier"),
("varint", "syslog_pid"),
("string", "syslog_raw"),
Expand Down Expand Up @@ -70,11 +73,13 @@
("path", "udev_devlink"),
# Other fields
("string", "journal_hostname"),
("path", "filepath"),
("path", "source"),
],
)

journal_def = """
#define HEADER_SIGNATURE b"LPKSHHRH"

typedef uint8 uint8_t;
typedef uint32 le32_t;
typedef uint64 le64_t;
Expand All @@ -100,7 +105,7 @@
};

struct Header {
uint8_t signature[8];
char signature[8];
le32_t compatible_flags;
IncompatibleFlag incompatible_flags;
State state;
Expand Down Expand Up @@ -165,7 +170,7 @@

// The first four members are copied from ObjectHeader, so that the size can be used as the length of payload
struct DataObject {
ObjectType type;
// ObjectType type;
ObjectFlag flags;
uint8_t reserved[6];
le64_t size;
Expand All @@ -181,7 +186,7 @@
// If the HEADER_INCOMPATIBLE_COMPACT flag is set, two extra fields are stored to allow immediate access
// to the tail entry array in the DATA object's entry array chain.
struct DataObject_Compact {
ObjectType type;
// ObjectType type;
ObjectFlag flags;
uint8_t reserved[6];
le64_t size;
Expand Down Expand Up @@ -236,7 +241,7 @@

// The first four members are copied from from ObjectHeader, so that the size can be used as the length of entry_object_offsets
struct EntryArrayObject {
ObjectType type;
// ObjectType type;
uint8_t flags;
uint8_t reserved[6];
le64_t size;
Expand All @@ -245,7 +250,7 @@
};

struct EntryArrayObject_Compact {
ObjectType type;
// ObjectType type;
uint8_t flags;
uint8_t reserved[6];
le64_t size;
Expand All @@ -257,9 +262,19 @@
c_journal = cstruct().load(journal_def)


def get_optional(value: str, to_type: Callable):
def get_optional(value: str, to_type: Callable) -> Any | None:
"""Return the value if True, otherwise return None."""
return to_type(value) if value else None

if not value:
return None

try:
return to_type(value)

except ValueError as e:
log.error("Unable to cast '%s' to %s", value, to_type)
log.debug("", exc_info=e)
return None

Check warning on line 277 in dissect/target/plugins/os/unix/log/journal.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/log/journal.py#L274-L277

Added lines #L274 - L277 were not covered by tests


class JournalFile:
Expand All @@ -273,136 +288,138 @@
def __init__(self, fh: BinaryIO, target: Target):
self.fh = fh
self.target = target
self.header = c_journal.Header(self.fh)
self.signature = "".join(chr(c) for c in self.header.signature)
self.entry_array_offset = self.header.entry_array_offset

def entry_object_offsets(self) -> Iterator[int]:
"""Read object entry arrays."""
try:
self.header = c_journal.Header(self.fh)
except EOFError as e:
raise ValueError(f"Invalid systemd Journal file: {e}")

Check warning on line 295 in dissect/target/plugins/os/unix/log/journal.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/log/journal.py#L294-L295

Added lines #L294 - L295 were not covered by tests

offset = self.entry_array_offset

# Entry Array with next_entry_array_offset set to 0 is the last in the list
while offset != 0:
self.fh.seek(offset)

object = c_journal.ObjectHeader(self.fh)

if object.type == c_journal.ObjectType.OBJECT_ENTRY_ARRAY:
# After the object is checked, read again but with EntryArrayObject instead of ObjectHeader
self.fh.seek(offset)

if self.header.incompatible_flags & c_journal.IncompatibleFlag.HEADER_INCOMPATIBLE_COMPACT:
entry_array_object = c_journal.EntryArrayObject_Compact(self.fh)
else:
entry_array_object = c_journal.EntryArrayObject(self.fh)

for entry_object_offset in entry_array_object.entry_object_offsets:
# Check if the offset is not zero and points to nothing
if entry_object_offset:
yield entry_object_offset

offset = entry_array_object.next_entry_array_offset
if self.header.signature != c_journal.HEADER_SIGNATURE:
raise ValueError(f"Journal file has invalid magic header: {self.header.signature!r}'")

Check warning on line 298 in dissect/target/plugins/os/unix/log/journal.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/log/journal.py#L298

Added line #L298 was not covered by tests

def decode_value(self, value: bytes) -> tuple[str, str]:
value = value.decode(encoding="utf-8", errors="surrogateescape").strip()

# Strip leading underscores part of the field name
value = value.lstrip("_")

"""Decode the given bytes to a key value pair."""
value = value.decode(errors="surrogateescape").strip().lstrip("_")
key, value = value.split("=", 1)
key = key.lower()

return key, value

def __iter__(self) -> Iterator[dict[str, int | str]]:
"Iterate over the entry objects to read payloads."

for offset in self.entry_object_offsets():
offset = self.header.entry_array_offset
while offset != 0:
self.fh.seek(offset)

if int.from_bytes(self.fh.read(1), "little") != c_journal.ObjectType.OBJECT_ENTRY_ARRAY:
raise ValueError(f"Expected OBJECT_ENTRY_ARRAY at offset {offset}")

Check warning on line 315 in dissect/target/plugins/os/unix/log/journal.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/log/journal.py#L315

Added line #L315 was not covered by tests

if self.header.incompatible_flags & c_journal.IncompatibleFlag.HEADER_INCOMPATIBLE_COMPACT:
entry_array_object = c_journal.EntryArrayObject_Compact(self.fh)

Check warning on line 318 in dissect/target/plugins/os/unix/log/journal.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/log/journal.py#L318

Added line #L318 was not covered by tests
else:
entry_array_object = c_journal.EntryArrayObject(self.fh)

for entry_object_offset in entry_array_object.entry_object_offsets:
if entry_object_offset:
yield from self._parse_entry_object(offset=entry_object_offset)

offset = entry_array_object.next_entry_array_offset

def _parse_entry_object(self, offset: int) -> Iterator[dict]:
self.fh.seek(offset)

try:
if self.header.incompatible_flags & c_journal.IncompatibleFlag.HEADER_INCOMPATIBLE_COMPACT:
entry = c_journal.EntryObject_Compact(self.fh)
else:
entry = c_journal.EntryObject(self.fh)

event = {}
event["ts"] = ts.from_unix_us(entry.realtime)
except EOFError as e:
self.target.log.warning("Unable to read Journal EntryObject at offset %s in: %s", offset, self.fh)
self.target.log.debug("", exc_info=e)
return

Check warning on line 340 in dissect/target/plugins/os/unix/log/journal.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/log/journal.py#L337-L340

Added lines #L337 - L340 were not covered by tests

for item in entry.items:
try:
self.fh.seek(item.object_offset)
event = {"ts": ts.from_unix_us(entry.realtime)}
for item in entry.items:
try:
self.fh.seek(item.object_offset)

object = c_journal.ObjectHeader(self.fh)
if int.from_bytes(self.fh.read(1), "little") != c_journal.ObjectType.OBJECT_DATA:
continue

Check warning on line 348 in dissect/target/plugins/os/unix/log/journal.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/log/journal.py#L348

Added line #L348 was not covered by tests

if object.type == c_journal.ObjectType.OBJECT_DATA:
self.fh.seek(item.object_offset)
if self.header.incompatible_flags & c_journal.IncompatibleFlag.HEADER_INCOMPATIBLE_COMPACT:
data_object = c_journal.DataObject_Compact(self.fh)

Check warning on line 351 in dissect/target/plugins/os/unix/log/journal.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/log/journal.py#L351

Added line #L351 was not covered by tests
else:
data_object = c_journal.DataObject(self.fh)

if self.header.incompatible_flags & c_journal.IncompatibleFlag.HEADER_INCOMPATIBLE_COMPACT:
data_object = c_journal.DataObject_Compact(self.fh)
else:
data_object = c_journal.DataObject(self.fh)
if not data_object.payload:
continue

Check warning on line 356 in dissect/target/plugins/os/unix/log/journal.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/log/journal.py#L356

Added line #L356 was not covered by tests

data = data_object.payload
data = data_object.payload

if not data:
# If the payload is empty
continue
elif data_object.flags & c_journal.ObjectFlag.OBJECT_COMPRESSED_XZ:
data = lzma.decompress(data)
elif data_object.flags & c_journal.ObjectFlag.OBJECT_COMPRESSED_LZ4:
data = lz4.decompress(data[8:])
elif data_object.flags & c_journal.ObjectFlag.OBJECT_COMPRESSED_ZSTD:
data = zstandard.decompress(data)
if data_object.flags & c_journal.ObjectFlag.OBJECT_COMPRESSED_XZ:
data = lzma.decompress(data)

Check warning on line 361 in dissect/target/plugins/os/unix/log/journal.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/log/journal.py#L361

Added line #L361 was not covered by tests

key, value = self.decode_value(data)
event[key] = value
elif data_object.flags & c_journal.ObjectFlag.OBJECT_COMPRESSED_LZ4:
data = lz4.decompress(data[8:])

Check warning on line 364 in dissect/target/plugins/os/unix/log/journal.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/log/journal.py#L364

Added line #L364 was not covered by tests

except Exception as e:
self.target.log.warning(
"The data object in Journal file %s could not be parsed",
getattr(self.fh, "name", None),
exc_info=e,
)
continue
elif data_object.flags & c_journal.ObjectFlag.OBJECT_COMPRESSED_ZSTD:
data = zstandard.decompress(data)

Check warning on line 367 in dissect/target/plugins/os/unix/log/journal.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/log/journal.py#L367

Added line #L367 was not covered by tests

key, value = self.decode_value(data)
event[key] = value

except Exception as e:
self.target.log.warning(

Check warning on line 373 in dissect/target/plugins/os/unix/log/journal.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/log/journal.py#L372-L373

Added lines #L372 - L373 were not covered by tests
"Journal DataObject could not be parsed at offset %s in %s",
item.object_offset,
getattr(self.fh, "name", None),
)
self.target.log.debug("", exc_info=e)
continue

Check warning on line 379 in dissect/target/plugins/os/unix/log/journal.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/log/journal.py#L378-L379

Added lines #L378 - L379 were not covered by tests

yield event
yield event


class JournalPlugin(Plugin):
"""Systemd Journal plugin."""

JOURNAL_PATHS = ["/var/log/journal"] # TODO: /run/systemd/journal
JOURNAL_GLOB = "*/*.journal*" # The extensions .journal and .journal~
JOURNAL_SIGNATURE = "LPKSHHRH"

def __init__(self, target: Target):
super().__init__(target)
self.journal_paths = []
self.journal_files = []

for _path in self.JOURNAL_PATHS:
self.journal_paths.extend(self.target.fs.path(_path).glob(self.JOURNAL_GLOB))
for journal_path in self.JOURNAL_PATHS:
self.journal_files.extend(self.target.fs.path(journal_path).glob(self.JOURNAL_GLOB))

def check_compatible(self) -> None:
if not len(self.journal_paths):
if not self.journal_files:
raise UnsupportedPluginError("No journald files found")

@export(record=JournalRecord)
def journal(self) -> Iterator[JournalRecord]:
"""Return the content of Systemd Journal log files.
"""Return the contents of Systemd Journal log files.

References:
- https://wiki.archlinux.org/title/Systemd/Journal
- https://github.com/systemd/systemd/blob/9203abf79f1d05fdef9b039e7addf9fc5a27752d/man/systemd.journal-fields.xml
""" # noqa: E501

path_function = self.target.fs.path

for _path in self.journal_paths:
fh = _path.open()
for journal_file in self.journal_files:
if not journal_file.is_file():
self.target.log.warning("Unable to parse journal file as it is not a file: %s", journal_file)
continue

Check warning on line 414 in dissect/target/plugins/os/unix/log/journal.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/log/journal.py#L413-L414

Added lines #L413 - L414 were not covered by tests

journal = JournalFile(fh, self.target)
try:
fh = journal_file.open()
journal = JournalFile(fh, self.target)

if not journal.signature == self.JOURNAL_SIGNATURE:
self.target.log.warning("The Journal log file %s has an invalid magic header", _path)
except Exception as e:
self.target.log.warning("Unable to parse journal file structure: %s: %s", journal_file, str(e))
self.target.log.debug("", exc_info=e)

Check warning on line 422 in dissect/target/plugins/os/unix/log/journal.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/log/journal.py#L420-L422

Added lines #L420 - L422 were not covered by tests
continue

for entry in journal:
Expand All @@ -417,7 +434,7 @@
errno=get_optional(entry.get("errno"), int),
invocation_id=entry.get("invocation_id"),
user_invocation_id=entry.get("user_invocation_id"),
syslog_facility=get_optional(entry.get("syslog_facility"), int),
syslog_facility=entry.get("syslog_facility"),
syslog_identifier=entry.get("syslog_identifier"),
syslog_pid=get_optional(entry.get("syslog_pid"), int),
syslog_raw=entry.get("syslog_raw"),
Expand Down Expand Up @@ -456,6 +473,6 @@
udev_devnode=get_optional(entry.get("udev_devnode"), path_function),
udev_devlink=get_optional(entry.get("udev_devlink"), path_function),
journal_hostname=entry.get("hostname"),
filepath=_path,
source=journal_file,
_target=self.target,
)
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
from flow.record.fieldtypes import datetime as dt

from dissect.target.filesystem import VirtualFilesystem
from dissect.target.plugins.os.unix.log.journal import JournalPlugin
from dissect.target.target import Target
from tests._utils import absolute_path


def test_journal_plugin(target_unix, fs_unix):
def test_journal_plugin(target_unix: Target, fs_unix: VirtualFilesystem) -> None:
"""test linux systemd journal file parsing."""

data_file = absolute_path("_data/plugins/os/unix/log/journal/journal")
fs_unix.map_file("var/log/journal/1337/user-1000.journal", data_file)

target_unix.add_plugin(JournalPlugin)

results = list(target_unix.journal())
record = results[0]

assert len(results) == 2

record = results[0]
assert record.ts == dt("2023-05-19T16:22:38.841870+00:00")
assert (
record.message
== "Window manager warning: last_user_time (928062) is greater than comparison timestamp (928031). This most likely represents a buggy client sending inaccurate timestamps in messages such as _NET_ACTIVE_WINDOW. Trying to work around..." # noqa: E501
assert record.message == (
"Window manager warning: last_user_time (928062) is greater than comparison timestamp (928031). "
"This most likely represents a buggy client sending inaccurate timestamps in messages such as "
"_NET_ACTIVE_WINDOW. Trying to work around..."
)
assert record.syslog_facility == 3
assert record.syslog_facility == "3"
assert record.syslog_identifier == "gnome-shell"
assert record.pid == 2096
assert record.transport == "stdout"
assert str(record.filepath) == "/var/log/journal/1337/user-1000.journal"
assert record.source == "/var/log/journal/1337/user-1000.journal"
Loading