Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Linux locate plugin #505

Merged
merged 26 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4582282
add locate plugin
JSCU-CNI Jan 15, 2024
4c07897
Merge branch 'main' into feature/add-linux-locate-plugin
JSCU-CNI Feb 5, 2024
60a1cde
Apply suggestions from code review
JSCU-CNI Feb 5, 2024
cc8cc64
rename helper
JSCU-CNI Feb 5, 2024
b242c32
Add as_posix() to test_gnulocate
Horofic Feb 7, 2024
6e18a74
Add .as_posix() to test_mlocate.py
Horofic Feb 7, 2024
d69cbef
Add .as_posix() to test_plocate.py
Horofic Feb 7, 2024
51d2ebd
Update test_gnulocate.py
Horofic Feb 7, 2024
c5dd8cb
Update test_mlocate.py
Horofic Feb 7, 2024
483a0bf
Update test_plocate.py
Horofic Feb 7, 2024
d789d0f
Revert changes of test_gnulocate.py
Horofic Feb 8, 2024
874151f
Revert changes in test_mlocate.py
Horofic Feb 8, 2024
b3e30e2
Revert changes in test_plocate.py
Horofic Feb 8, 2024
1041bc8
Fix for Windows tests in mlocate.py plugin
Horofic Feb 8, 2024
63ab896
Fix Windows tests for mlocate.py helper
Horofic Feb 8, 2024
57d402b
Linting fixes
Horofic Feb 8, 2024
e94d138
Linting fixes
Horofic Feb 8, 2024
d8c2c39
Change errors mode for path decoding
Horofic Feb 14, 2024
b57b576
Add workaround for pypy3.9
Horofic Feb 14, 2024
7886a21
Merge branch 'main' into feature/add-linux-locate-plugin
JSCU-CNI Feb 15, 2024
f646d0d
Pypy magic attempt
Schamper Feb 16, 2024
67970ff
Pypy magic attempt again
Schamper Feb 16, 2024
4bce8e4
Cleaner dirty fix
Schamper Feb 16, 2024
493f67b
Merge branch 'main' into feature/add-linux-locate-plugin
Horofic Feb 16, 2024
62903d0
Move locate helper to plugins
Horofic Feb 19, 2024
976c4b3
Merge branch 'main' into feature/add-linux-locate-plugin
Horofic Feb 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@ tests/_data/volumes/bde/enc-volume.bin filter=lfs diff=lfs merge=lfs -text
tests/_data/volumes/md/md-nested.bin.gz filter=lfs diff=lfs merge=lfs -text
tests/_data/loaders/tar/test-anon-filesystems.tar filter=lfs diff=lfs merge=lfs -text
tests/_data/plugins/apps/browser/firefox/cookies.sqlite filter=lfs diff=lfs merge=lfs -text
tests/_data/plugins/os/unix/locate/locatedb filter=lfs diff=lfs merge=lfs -text
tests/_data/plugins/os/unix/locate/mlocate.db filter=lfs diff=lfs merge=lfs -text
tests/_data/plugins/os/unix/locate/plocate.db filter=lfs diff=lfs merge=lfs -text
Empty file.
53 changes: 53 additions & 0 deletions dissect/target/helpers/locate/locate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from typing import IO, Iterator

from dissect.cstruct import cstruct

locate_def = """
#define MAGIC 0x004c4f43415445303200 /* b'/x00LOCATE02/x00' */

struct file {
char path_ending[];
}
"""

c_locate = cstruct()
c_locate.load(locate_def)


class LocateFileParser:
"""locate file parser

Multiple formats exist for the locatedb file. This class only supports the most recent version ``LOCATE02``.

The file is encoded with front compression (incremental encoding). This is a form of compression
which takes a number of characters of the previous encoded entries. Entries are separated with a null byte.

Resources:
- https://manpages.ubuntu.com/manpages/trusty/en/man5/locatedb.5.html
"""

def __init__(self, file_handler: IO):
self.fh = file_handler
self.fh.seek(0)

magic = int.from_bytes(self.fh.read(10), byteorder="big")
if magic != c_locate.MAGIC:
raise ValueError("is not a valid locate file")

def __iter__(self) -> Iterator[str]:
count = 0
previous_path = ""

try:
while True:
# NOTE: The offset could be negative, which indicates that we
# decrease the number of characters of the previous path.
offset = int.from_bytes(self.fh.read(1), byteorder="big", signed=True)
count += offset

current_filepath_end = c_locate.file(self.fh).path_ending.decode()
path = previous_path[0:count] + current_filepath_end
yield path
previous_path = path
except EOFError:
return
96 changes: 96 additions & 0 deletions dissect/target/helpers/locate/mlocate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import os.path
from typing import IO, Iterator, Union

from dissect.cstruct import cstruct
from dissect.util.ts import from_unix

# Resources: https://linux.die.net/man/5/locate.db
mlocate_def = """
#define MAGIC 0x006d6c6f63617465 /* b'/x00mlocate' */

struct header_config {
int32 conf_size;
int8 version; /* file format version */
int8 require_visibility;
int8 pad[2]; /* 32-bit total alignment */
char root_database;
char config_block[conf_size];
int8 pad;
};

enum DBE_TYPE: uint8 { /* database entry type */
FILE = 0x0, /* file */
DIRECTORY = 0x1, /* directory */
END = 0x2 /* end of directory */
};

struct directory {
/* time is the 'maximum of st_ctime and
st_mtime of the directory' according to docs */
int64 time_seconds;
int32 time_nanoseconds;
int32 padding;
char path[];
};

struct entry {
char path[];
};
"""

c_mlocate = cstruct(endian=">")
c_mlocate.load(mlocate_def)


class MLocateDirectory:
def __init__(self, time_seconds, path):
self.ts = from_unix(time_seconds)
self.path = path


class MLocateEntry:
def __init__(self, path, dbe_type):
self.path = path
self.dbe_type = dbe_type


class MLocateFileParser:
"""mlocate file parser

Resources:
- https://manpages.debian.org/testing/mlocate/mlocate.db.5.en.html
"""

def __init__(self, file_handler: IO):
self.fh = file_handler

magic = int.from_bytes(self.fh.read(8), byteorder="big")
if magic != c_mlocate.MAGIC:
raise ValueError("is not a valid mlocate file")

self.header = c_mlocate.header_config(self.fh)

def _parse_directory_entries(self) -> Iterator:
while True:
dbe_type = c_mlocate.DBE_TYPE(self.fh)
if dbe_type == c_mlocate.DBE_TYPE.END:
break

entry = c_mlocate.entry(self.fh)
dbe_type = "file" if dbe_type == c_mlocate.DBE_TYPE.FILE else "directory"
yield dbe_type, entry

def __iter__(self) -> Iterator[Union[MLocateEntry, MLocateEntry]]:
while True:
try:
directory = c_mlocate.directory(self.fh)
directory_path = directory.path.decode()
except EOFError:
self.fh.close()
return

yield MLocateDirectory(time_seconds=directory.time_seconds, path=directory.path)

for dbe_type, entry in self._parse_directory_entries():
entry = entry.path.decode()
yield MLocateEntry(path=os.path.join(directory_path, entry), dbe_type=dbe_type)
117 changes: 117 additions & 0 deletions dissect/target/helpers/locate/plocate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from typing import IO, Iterator

import zstandard
from dissect.cstruct import cstruct

# Resource: https://git.sesse.net/?p=plocate @ db.h
plocate_def = """
#define MAGIC 0x00706c6f63617465 /* b'/x00plocate' */

struct header {
uint32_t version;
uint32_t hashtable_size;
uint32_t extra_ht_slots;
uint32_t num_docids;
uint64_t hash_table_offset_bytes;
uint64_t filename_index_offset_bytes;

/* Version 1 and up only. */
uint32_t max_version;
uint32_t zstd_dictionary_length_bytes;
uint64_t zstd_dictionary_offset_bytes;

/* Only if max_version >= 2, and only relevant for updatedb. */
uint64_t directory_data_length_bytes;
uint64_t directory_data_offset_bytes;
uint64_t next_zstd_dictionary_length_bytes;
uint64_t next_zstd_dictionary_offset_bytes;
uint64_t conf_block_length_bytes;
uint64_t conf_block_offset_bytes;

uint8_t check_visibility;
char padding[7]; /* padding for alignment */
};

struct file {
char path[];
}
"""

c_plocate = cstruct()
c_plocate.load(plocate_def)


class PLocateFileParser:
"""plocate file parser

The ``plocate.db`` file contains a hashtable and trigrams to enable quick lookups of filenames.

We've implemented a few methods to gather those for possible future use, but for the PLocatePlugin
we're only interested in the filepaths stored in the database. Hence we don't use these methods.

Roughly speaking, the plocate.db file has the following structure:
- ``header`` (0x70 bytes)
- zstd compressed ``filename``s (until start of ``filename_index_offset_bytes``),
possibly including a dictionary
- hashtables (offset and length in ``header``)
- directory data (offset and length in ``header``)
- possible zstd dictionary (offset and length in ``header``)
- configuration block (offset and length in ``header``)

No documentation other than the source code is available on the format of this file.

Resources:
- https://git.sesse.net/?p=plocate
"""

HEADER_SIZE = 0x70 # 0x8 bytes magic + 0x68 bytes header
NUM_OVERFLOW_SLOTS = 16
TRIGRAM_SIZE_BYTES = 16
DOCID_SIZE_BYTES = 8

def __init__(self, file_handler: IO):
self.fh = file_handler

magic = int.from_bytes(self.fh.read(8), byteorder="big")
if magic != c_plocate.MAGIC:
raise ValueError("is not a valid plocate file")

self.header = c_plocate.header(self.fh)

def paths(self) -> Iterator[str]:
"""
A zstd compressed blob with null byte separated paths is located after the file header.
The compression was done either with or without a dictionary. This is specified by the
zstd_dictionary_length_bytes / zstd_dictionary_offset_bytes values in the header. If there is no dictionary,
they are both 0.
"""
self.fh.seek(self.HEADER_SIZE)
if self.header.zstd_dictionary_offset_bytes == 0:
dict_data = None
else:
dict_data = zstandard.ZstdCompressionDict(self.fh.read(self.header.zstd_dictionary_length_bytes))

compressed_length_bytes = (
self.header.filename_index_offset_bytes - self.HEADER_SIZE - self.header.zstd_dictionary_length_bytes
)
compressed_buf = self.fh.read(compressed_length_bytes)
ctx = zstandard.ZstdDecompressor(dict_data=dict_data)

with ctx.stream_reader(compressed_buf) as reader:
while True:
try:
file = c_plocate.file(reader)
yield file.path.decode()
except EOFError:
return

def filename_index(self) -> bytes:
self.fh.seek(self.header.filename_index_offset_bytes)
num_docids = self.header.num_docids
filename_index_size = num_docids * self.DOCID_SIZE_BYTES
return self.fh.read(filename_index_size)

def hashtable(self) -> bytes:
self.fh.seek(self.header.hash_table_offset_bytes)
hashtable_size = (self.header.hashtable_size + self.NUM_OVERFLOW_SLOTS + 1) * self.TRIGRAM_SIZE_BYTES
return self.fh.read(hashtable_size)
2 changes: 2 additions & 0 deletions dissect/target/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,8 @@ def __init__(self, target: Target):
try:
subplugin = getattr(self.target, entry)
self._subplugins.append(subplugin)
except UnsupportedPluginError:
target.log.warning("Subplugin %s is not compatible with target.", entry)
except Exception:
target.log.exception("Failed to load subplugin: %s", entry)

Expand Down
Empty file.
36 changes: 36 additions & 0 deletions dissect/target/plugins/os/unix/locate/gnulocate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.helpers.locate.locate import LocateFileParser
from dissect.target.helpers.record import TargetRecordDescriptor
from dissect.target.plugin import export
from dissect.target.plugins.os.unix.locate.locate import BaseLocatePlugin

LocateRecord = TargetRecordDescriptor(
"linux/locate/locate",
[
("string", "path"),
("string", "source"),
],
)


class GNULocatePlugin(BaseLocatePlugin):
__namespace__ = "gnulocate"

path = "/var/cache/locate/locatedb"

def check_compatible(self) -> None:
if not self.target.fs.path(self.path).exists():
raise UnsupportedPluginError(f"No locatedb file found at {self.path}")

@export(record=LocateRecord)
def locate(self) -> LocateRecord:
"""Yield file and directory names from GNU findutils' locatedb file.

Resources:
- https://manpages.debian.org/testing/locate/locatedb.5.en.html
"""
locate_fh = self.target.fs.path(self.path).open()
locate_file = LocateFileParser(locate_fh)

for path in locate_file:
yield LocateRecord(path=path, source=self.path)
5 changes: 5 additions & 0 deletions dissect/target/plugins/os/unix/locate/locate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from dissect.target.plugin import NamespacePlugin


class BaseLocatePlugin(NamespacePlugin):
__namespace__ = "locate"
63 changes: 63 additions & 0 deletions dissect/target/plugins/os/unix/locate/mlocate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from typing import Iterator, Union

from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.helpers.locate.mlocate import (
MLocateDirectory,
MLocateEntry,
MLocateFileParser,
)
from dissect.target.helpers.record import TargetRecordDescriptor
from dissect.target.plugin import export
from dissect.target.plugins.os.unix.locate.locate import BaseLocatePlugin

MLocateDirectoryRecord = TargetRecordDescriptor(
"linux/locate/mlocate_directory",
[
("datetime", "ts"),
("string", "path"),
("string", "source"),
],
)

MLocateEntryRecord = TargetRecordDescriptor(
"linux/locate/mlocate_entry",
[
("string", "path"),
("string", "type"),
("string", "source"),
],
)

MLocateRecord = Union[
MLocateEntryRecord,
MLocateDirectoryRecord,
]


class MLocatePlugin(BaseLocatePlugin):
__namespace__ = "mlocate"

path = "/var/lib/mlocate/mlocate.db"

def check_compatible(self) -> None:
if not self.target.fs.path(self.path).exists():
raise UnsupportedPluginError(f"No mlocate.db file found at {self.path}")

@export(record=MLocateRecord)
def locate(self) -> Iterator[MLocateRecord]:
"""Yield file and directory names from mlocate.db file.

``mlocate`` is a new implementation of GNU locate,
but has been deprecated since Ubuntu 22.

Resources:
- https://manpages.debian.org/testing/mlocate/mlocate.db.5.en.html
"""
mlocate_fh = self.target.fs.path(self.path).open()
mlocate_file = MLocateFileParser(mlocate_fh)

for item in mlocate_file:
if isinstance(item, MLocateDirectory):
yield MLocateDirectoryRecord(ts=item.ts, path=item.path, source=self.path, _target=self.target)
elif isinstance(item, MLocateEntry):
yield MLocateEntryRecord(path=item.path, type=item.dbe_type, source=self.path, _target=self.target)
Loading