Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent enumerating entire non-ISO formatted syslog files in is_iso_fmt #972

Merged
merged 10 commits into from
Jan 30, 2025
18 changes: 13 additions & 5 deletions dissect/target/plugins/os/unix/log/helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import itertools
from __future__ import annotations

import logging
import re
from datetime import datetime
Expand All @@ -22,12 +23,17 @@
)


def iso_readlines(file: Path) -> Iterator[tuple[datetime, str]]:
def iso_readlines(file: Path, max_lines: int | None = None) -> Iterator[tuple[datetime, str]]:
"""Iterator reading the provided log file in ISO format. Mimics ``year_rollover_helper`` behaviour."""
with open_decompress(file, "rt") as fh:
for line in fh:
for i, line in enumerate(fh):
if max_lines is not None and i >= max_lines:
log.debug("Stopping iso_readlines enumeration in %s: max_lines=%s was reached", file, max_lines)
break

if not (match := RE_TS_ISO.match(line)):
log.warning("No timestamp found in one of the lines in %s!", file)
if not max_lines:
log.warning("No timestamp found in one of the lines in %s!", file)
log.debug("Skipping line: %s", line)
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved
continue

Expand All @@ -43,4 +49,6 @@ def iso_readlines(file: Path) -> Iterator[tuple[datetime, str]]:

def is_iso_fmt(file: Path) -> bool:
"""Determine if the provided log file uses ISO 8601 timestamp format logging or not."""
return any(itertools.islice(iso_readlines(file), 0, 2))
# We do not want to iterate of the entire file so we limit iso_readlines to the first few lines.
# We can not use islice here since that would only work if the file is ISO formatted and thus yields results.
return any(iso_readlines(file, max_lines=3))
42 changes: 42 additions & 0 deletions tests/plugins/os/unix/log/test_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import gzip
import textwrap
from io import BytesIO

import pytest

from dissect.target.filesystem import VirtualFilesystem
from dissect.target.plugins.os.unix.log.helpers import is_iso_fmt, iso_readlines

syslog = """\
Dec 31 03:14:15 localhost systemd[1]: Starting Journal Service...
Jan 1 13:21:34 localhost systemd: Stopped target Swap.
Jan 2 03:14:15 localhost systemd[1]: Starting Journal Service...
Jan 3 13:21:34 localhost systemd: Stopped target Swap.
2024-12-31T13:37:00.123456+02:00 hostname systemd[1]: Started anacron.service - Run anacron jobs.
2024-12-31T13:37:00.123456+02:00 hostname anacron[1337]: Anacron 2.3 started on 2024-12-31
2024-12-31T13:37:00.123456+02:00 hostname anacron[1337]: Normal exit (0 jobs run)
2024-12-31T13:37:00.123456+02:00 hostname systemd[1]: anacron.service: Deactivated successfully.
"""


@pytest.mark.parametrize(
"max_lines, expected_return_value",
[
(3, False),
(4, False),
(5, True),
(9, True),
],
)
def test_iso_readlines_max_lines(fs_unix: VirtualFilesystem, max_lines: int, expected_return_value: bool) -> None:
"""assert that iso_readlines does not parse more than the provided max_lines"""

fs_unix.map_file_fh("/var/log/syslog.2", BytesIO(gzip.compress(textwrap.dedent(syslog).encode())))
assert any(iso_readlines(fs_unix.path("/var/log/syslog.2"), max_lines)) == expected_return_value


def test_is_iso_fmt(fs_unix: VirtualFilesystem) -> None:
"""assert that is_iso_fmt does not parse more than three max_lines"""

fs_unix.map_file_fh("/var/log/syslog.3", BytesIO(gzip.compress(textwrap.dedent(syslog).encode())))
assert not is_iso_fmt(fs_unix.path("/var/log/syslog.3"))
Loading