Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a plugin to parse notifications from Windows appdb.dat #394

Merged
merged 2 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ tests/data/plugins/browsers/edge/History.sqlite filter=lfs diff=lfs merge=lfs -t
tests/data/plugins/browsers/chromium/History.sqlite filter=lfs diff=lfs merge=lfs -text
tests/data/plugins/os/windows/dpapi/** filter=lfs diff=lfs merge=lfs -text
tests/data/volumes/md-nested.bin.gz filter=lfs diff=lfs merge=lfs -text
tests/data/plugins/os/windows/notifications/appdb.dat.v3.gz filter=lfs diff=lfs merge=lfs -text
tests/data/plugins/os/windows/notifications/wpndatabase.db filter=lfs diff=lfs merge=lfs -text
25 changes: 23 additions & 2 deletions dissect/target/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import gzip
import io
import logging
import os
Expand Down Expand Up @@ -960,6 +961,21 @@ def lattr(self) -> dict[str, bytes]:
return fsutil.fs_attrs(self.entry, follow_symlinks=False)


class MappedCompressedFile(MappedFile):
"""Virtual file backed by a compressed file on the host machine."""

_compressors = {"gzip": gzip}

def __init__(self, fs: Filesystem, path: str, entry: Any, algo: str = "gzip"):
super().__init__(fs, path, entry)
self._compressor = self._compressors.get(algo)
if self._compressor is None:
raise ValueError(f"Unsupported compression algorithm {algo}")

def open(self) -> BinaryIO:
return self._compressor.open(self.entry, "rb")


class VirtualSymlink(FilesystemEntry):
"""Virtual symlink implementation."""

Expand Down Expand Up @@ -1115,13 +1131,18 @@ def map_dir(self, vfspath: str, realpath: str) -> None:
real_file_path = os.path.join(root, file_)
directory.add(file_, MappedFile(self, vfs_file_path, real_file_path))

def map_file(self, vfspath: str, realpath: str) -> None:
def map_file(self, vfspath: str, realpath: str, compression: Optional[str] = None) -> None:
"""Map a file from the host machine into the VFS."""
vfspath = fsutil.normalize(vfspath, alt_separator=self.alt_separator)
if vfspath[-1] == "/":
raise AttributeError(f"Can't map a file onto a directory: {vfspath}")
file_path = vfspath.lstrip("/")
self.map_file_entry(vfspath, MappedFile(self, file_path, realpath))

if compression is not None:
mapped_file = MappedCompressedFile(self, file_path, realpath, algo=compression)
else:
mapped_file = MappedFile(self, file_path, realpath)
self.map_file_entry(vfspath, mapped_file)

def map_file_fh(self, vfspath: str, fh: BinaryIO) -> None:
"""Map a file handle into the VFS."""
Expand Down
14 changes: 8 additions & 6 deletions dissect/target/plugins/general/users.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from collections import namedtuple
from typing import Generator, Optional

from flow.record import RecordDescriptor
from typing import Generator, NamedTuple, Optional, Union

from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.helpers.fsutil import TargetPath
from dissect.target.helpers.record import UnixUserRecord, WindowsUserRecord
from dissect.target.plugin import InternalPlugin

UserDetails = namedtuple("UserDetails", "user home_path")

class UserDetails(NamedTuple):
user: Union[UnixUserRecord, WindowsUserRecord]
home_path: Optional[TargetPath]


class UsersPlugin(InternalPlugin):
Expand Down Expand Up @@ -44,7 +46,7 @@ def is_name_matching(name: str) -> bool:
):
return self.get(user)

def get(self, user: RecordDescriptor) -> UserDetails:
def get(self, user: Union[UnixUserRecord, WindowsUserRecord]) -> UserDetails:
"""Return additional details about the user"""
# Resolving the user home can not use the user's environment variables,
# as those depend on the user's home to be known first. So we resolve
Expand Down
Loading