Skip to content

Commit

Permalink
Move schedlgu plugin to its own file
Browse files Browse the repository at this point in the history
  • Loading branch information
Horofic committed Jan 25, 2024
1 parent 75739c6 commit 55b75d9
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 145 deletions.
155 changes: 155 additions & 0 deletions dissect/target/plugins/os/windows/log/schedlgu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
from __future__ import annotations

import logging
import re
import warnings
from dataclasses import dataclass
from datetime import datetime
from typing import Iterator, Optional

from dissect.target import Target
from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.helpers.record import TargetRecordDescriptor
from dissect.target.plugin import Plugin, export

warnings.simplefilter(action="ignore", category=FutureWarning)
log = logging.getLogger(__name__)

SchedLgURecord = TargetRecordDescriptor(
"windows/tasks/log/schedlgu",
[
("datetime", "ts"),
("string", "job"),
("string", "command"),
("string", "status"),
("uint32", "exit_code"),
("string", "version"),
],
)

JOB_REGEX_PATTERN = re.compile(r"\"(.*?)\" \((.*?)\)")
SCHEDLGU_REGEX_PATTERN = re.compile(r"\".+\n.+\n\s{4}.+\n|\".+\n.+", re.MULTILINE)


@dataclass(order=True)
class SchedLgU:
ts: datetime = None
job: str = None
status: str = None
command: str = None
exit_code: int = None
version: str = None

@staticmethod
def _sanitize_ts(ts: str) -> datetime:
# sometimes "at" exists before the timestamp
ts = ts.strip("at ")
try:
ts = datetime.strptime(ts, "%m/%d/%Y %I:%M:%S %p")
except ValueError:
ts = datetime.strptime(ts, "%d-%m-%Y %H:%M:%S")

return ts

@staticmethod
def _parse_job(line: str) -> tuple[str, Optional[str]]:
matches = JOB_REGEX_PATTERN.match(line)
if matches:
return matches.groups()

log.warning("SchedLgU failed to parse job and command from line: '%s'. Returning line.", line)
return line, None

@classmethod
def from_line(cls, line: str) -> SchedLgU:
"""Parse a group of SchedLgU.txt lines."""
event = cls()
lines = line.splitlines()

# Events can have 2 or 3 lines as a group in total. An example of a complete task job event is:
# "Symantec NetDetect.job" (NDETECT.EXE)
# Finished 14-9-2003 13:21:01
# Result: The task completed with an exit code of (65).
if len(lines) == 3:
event.job, event.command = cls._parse_job(lines[0])
event.status, event.ts = lines[1].split(maxsplit=1)
event.exit_code = int(lines[2].split("(")[1].rstrip(")."))

# Events that have 2 lines as a group can be started task job event or the Task Scheduler Service. Examples:
# "Symantec NetDetect.job" (NDETECT.EXE)
# Started at 14-9-2003 13:26:00
elif len(lines) == 2 and ".job" in lines[0]:
event.job, event.command = cls._parse_job(lines[0])
event.status, event.ts = lines[1].split(maxsplit=1)

# Events without a task job event are the Task Scheduler Service events. Which can look like this:
# "Task Scheduler Service"
# Exited at 14-9-2003 13:40:24
# OR
# "Task Scheduler Service"
# 6.0.6000.16386 (vista_rtm.061101-2205)
elif len(lines) == 2:
event.job = lines[0].strip('"')

if lines[1].startswith("\t") or lines[1].startswith(" "):
event.status, event.ts = lines[1].split(maxsplit=1)
else:
event.version = lines[1]

if event.ts:
event.ts = cls._sanitize_ts(event.ts)

return event


class SchedLgUPlugin(Plugin):
"""Plugin for parsing the Task Scheduler Service transaction log file (SchedLgU.txt)."""

PATHS = {
"sysvol/SchedLgU.txt",
"sysvol/windows/SchedLgU.txt",
"sysvol/windows/tasks/SchedLgU.txt",
"sysvol/winnt/tasks/SchedLgU.txt",
}

def __init__(self, target: Target) -> None:
self.target = target
self.paths = [self.target.fs.path(path) for path in self.PATHS if self.target.fs.path(path).exists()]

def check_compatible(self) -> None:
if len(self.paths) == 0:
raise UnsupportedPluginError("No SchedLgU.txt file found.")

@export(record=SchedLgURecord)
def schedlgu(self) -> Iterator[SchedLgURecord]:
"""Return all events in the Task Scheduler Service transaction log file (SchedLgU.txt).
Older Windows systems may log ``.job`` tasks that get started remotely in the SchedLgU.txt file.
In addition, this log file records when the Task Scheduler service starts and stops.
Adversaries may use malicious ``.job`` files to gain persistence on a system.
Yield:
ts (datetime): The timestamp of the event.
job (str): The name of the ``.job`` file.
command (str): The command executed.
status (str): The status of the event (finished, completed, exited, stopped).
exit_code (int): The exit code of the event.
version (str): The version of the Task Scheduler service.
"""

for path in self.paths:
content = path.read_text(encoding="UTF-16", errors="surrogateescape")

for match in re.findall(SCHEDLGU_REGEX_PATTERN, content):
event = SchedLgU.from_line(match)

yield SchedLgURecord(
ts=event.ts,
job=event.job,
command=event.command,
status=event.status,
exit_code=event.exit_code,
version=event.version,
_target=self.target,
)
144 changes: 1 addition & 143 deletions dissect/target/plugins/os/windows/tasks.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
from __future__ import annotations

import logging
import re
import warnings
from dataclasses import dataclass
from datetime import datetime
from typing import Iterator, Optional, Union
from typing import Iterator, Union

from flow.record import GroupedRecord

Expand Down Expand Up @@ -78,92 +75,6 @@
],
)

SchedLgURecord = TargetRecordDescriptor(
"windows/tasks/log/schedlgu",
[
("datetime", "ts"),
("string", "job"),
("string", "command"),
("string", "status"),
("uint32", "exit_code"),
("string", "version"),
],
)

JOB_REGEX_PATTERN = re.compile(r"\"(.*?)\" \((.*?)\)")
SCHEDLGU_REGEX_PATTERN = re.compile(r"\".+\n.+\n\s{4}.+\n|\".+\n.+", re.MULTILINE)


@dataclass(order=True)
class SchedLgU:
ts: datetime = None
job: str = None
status: str = None
command: str = None
exit_code: int = None
version: str = None

@staticmethod
def _sanitize_ts(ts: str) -> datetime:
# sometimes "at" exists before the timestamp
ts = ts.strip("at ")
try:
ts = datetime.strptime(ts, "%m/%d/%Y %I:%M:%S %p")
except ValueError:
ts = datetime.strptime(ts, "%d-%m-%Y %H:%M:%S")

return ts

@staticmethod
def _parse_job(line: str) -> tuple[str, Optional[str]]:
matches = JOB_REGEX_PATTERN.match(line)
if matches:
return matches.groups()

log.warning("SchedLgU failed to parse job and command from line: '%s'. Returning line.", line)
return line, None

@classmethod
def from_line(cls, line: str) -> SchedLgU:
"""Parse a group of SchedLgU.txt lines."""
event = cls()
lines = line.splitlines()

# Events can have 2 or 3 lines as a group in total. An example of a complete task job event is:
# "Symantec NetDetect.job" (NDETECT.EXE)
# Finished 14-9-2003 13:21:01
# Result: The task completed with an exit code of (65).
if len(lines) == 3:
event.job, event.command = cls._parse_job(lines[0])
event.status, event.ts = lines[1].split(maxsplit=1)
event.exit_code = int(lines[2].split("(")[1].rstrip(")."))

# Events that have 2 lines as a group can be started task job event or the Task Scheduler Service. Examples:
# "Symantec NetDetect.job" (NDETECT.EXE)
# Started at 14-9-2003 13:26:00
elif len(lines) == 2 and ".job" in lines[0]:
event.job, event.command = cls._parse_job(lines[0])
event.status, event.ts = lines[1].split(maxsplit=1)

# Events without a task job event are the Task Scheduler Service events. Which can look like this:
# "Task Scheduler Service"
# Exited at 14-9-2003 13:40:24
# OR
# "Task Scheduler Service"
# 6.0.6000.16386 (vista_rtm.061101-2205)
elif len(lines) == 2:
event.job = lines[0].strip('"')

if lines[1].startswith("\t") or lines[1].startswith(" "):
event.status, event.ts = lines[1].split(maxsplit=1)
else:
event.version = lines[1]

if event.ts:
event.ts = cls._sanitize_ts(event.ts)

return event


class TasksPlugin(Plugin):
"""Plugin for retrieving scheduled tasks on a Windows system.
Expand Down Expand Up @@ -242,56 +153,3 @@ def tasks(self) -> Iterator[Union[TaskRecord, GroupedRecord]]:
for trigger in task_object.get_triggers():
grouped = GroupedRecord("filesystem/windows/task/grouped", [record, trigger])
yield grouped


class SchedLgUPlugin(Plugin):
"""Plugin for parsing the Task Scheduler Service transaction log file (SchedLgU.txt)."""

PATHS = {
"sysvol/SchedLgU.txt",
"sysvol/windows/SchedLgU.txt",
"sysvol/windows/tasks/SchedLgU.txt",
"sysvol/winnt/tasks/SchedLgU.txt",
}

def __init__(self, target: Target) -> None:
self.target = target
self.paths = [self.target.fs.path(path) for path in self.PATHS if self.target.fs.path(path).exists()]

def check_compatible(self) -> None:
if len(self.paths) == 0:
raise UnsupportedPluginError("No SchedLgU.txt file found.")

@export(record=SchedLgURecord)
def schedlgu(self) -> Iterator[SchedLgURecord]:
"""Return all events in the Task Scheduler Service transaction log file (SchedLgU.txt).
Older Windows systems may log ``.job`` tasks that get started remotely in the SchedLgU.txt file.
In addition, this log file records when the Task Scheduler service starts and stops.
Adversaries may use malicious ``.job`` files to gain persistence on a system.
Yield:
ts (datetime): The timestamp of the event.
job (str): The name of the ``.job`` file.
command (str): The command executed.
status (str): The status of the event (finished, completed, exited, stopped).
exit_code (int): The exit code of the event.
version (str): The version of the Task Scheduler service.
"""

for path in self.paths:
content = path.read_text(encoding="UTF-16", errors="surrogateescape")

for match in re.findall(SCHEDLGU_REGEX_PATTERN, content):
event = SchedLgU.from_line(match)

yield SchedLgURecord(
ts=event.ts,
job=event.job,
command=event.command,
status=event.status,
exit_code=event.exit_code,
version=event.version,
_target=self.target,
)
3 changes: 1 addition & 2 deletions tests/plugins/os/windows/log/test_schedlgu.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from flow.record.fieldtypes import datetime

from dissect.target.plugins.os.windows.tasks import SchedLgUPlugin, SchedLgURecord
from dissect.target.plugins.os.windows.log.schedlgu import SchedLgUPlugin
from tests._utils import absolute_path


Expand All @@ -16,7 +16,6 @@ def test_shedlgu(target_win, fs_win):
task_scheduler_exited_event = records[2]
job_task_event = records[58]

assert any(isinstance(record, type(SchedLgURecord())) for record in records)
assert task_scheduler_started_event.ts == datetime("2006-11-02 07:35:17+00:00")
assert task_scheduler_started_event.job == "Task Scheduler Service"
assert task_scheduler_started_event.status == "Started"
Expand Down

0 comments on commit 55b75d9

Please sign in to comment.