Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NamespacePlugin #334

Merged
merged 20 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions dissect/target/helpers/docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ def get_full_func_name(plugin_class: Type, func: Callable) -> str:
return func_name


FUNC_DOC_TEMPLATE = "{func_name} - {short_description} (output: {output_type})"


def get_func_description(func: Callable, with_docstrings: bool = False) -> str:
klass, func = get_real_func_obj(func)
func_output, func_doc = get_func_details(func)
Expand All @@ -88,7 +91,9 @@ def get_func_description(func: Callable, with_docstrings: bool = False) -> str:
desc = "\n".join([func_title, "", func_doc])
else:
docstring_first_line = func_doc.splitlines()[0].lstrip()
desc = f"{func_name} - {docstring_first_line} (output: {func_output})"
desc = FUNC_DOC_TEMPLATE.format(
func_name=func_name, short_description=docstring_first_line, output_type=func_output
)

return desc

Expand All @@ -97,9 +102,11 @@ def get_plugin_functions_desc(plugin_class: Type, with_docstrings: bool = False)
descriptions = []
for func_name in plugin_class.__exports__:
func_obj = getattr(plugin_class, func_name)
_, func = get_real_func_obj(func_obj)

func_desc = get_func_description(func, with_docstrings=with_docstrings)
if getattr(func_obj, "get_func_doc_spec", None):
func_desc = FUNC_DOC_TEMPLATE.format_map(func_obj.get_func_doc_spec())
else:
_, func = get_real_func_obj(func_obj)
func_desc = get_func_description(func, with_docstrings=with_docstrings)
descriptions.append(func_desc)

# sort functions in the plugin alphabetically
Expand Down
176 changes: 163 additions & 13 deletions dissect/target/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
import os
import sys
import traceback
from collections import defaultdict
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Iterator, Optional, Type

from dissect.target.exceptions import PluginError
from flow.record import Record, RecordDescriptor

from dissect.target.exceptions import PluginError, UnsupportedPluginError
from dissect.target.helpers import cache
from dissect.target.helpers.record import EmptyRecord

Expand All @@ -30,8 +33,6 @@
GENERATED = False

if TYPE_CHECKING:
from flow.record import Record, RecordDescriptor

from dissect.target import Target
from dissect.target.filesystem import Filesystem
from dissect.target.helpers.record import ChildTargetRecord
Expand Down Expand Up @@ -211,6 +212,8 @@ class attribute. Namespacing results in your plugin needing to be prefixed
produce redundant results when used with a wild card
(browser.* -> browser.history + browser.*.history).
"""
__skip__: bool = False
"""Prevents plugin functions from indexing this plugin at all."""

def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
Expand Down Expand Up @@ -425,6 +428,9 @@ def register(plugincls: Type[Plugin]) -> None:
if isinstance(attr, property):
attr = attr.fget

if getattr(attr, "__autogen__", False) and plugincls != plugincls.__nsplugin__:
continue

if getattr(attr, "__exported__", False):
exports.append(attr.__name__)
functions.append(attr.__name__)
Expand Down Expand Up @@ -794,8 +800,138 @@ def _modulepath(cls) -> str:
return cls.__module__.replace(MODULE_PATH, "").lstrip(".")


# Needs to be at the bottom of the module because __init_subclass__ requires everything
# These need to be at the bottom of the module because __init_subclass__ requires everything
# in the parent class Plugin to be defined and resolved.
class NamespacePlugin(Plugin):
def __init__(self, target: Target):
"""A namespace plugin provides services to access functionality from a group of subplugins.

Support is currently limited to shared exported functions that yield records.
"""
super().__init__(target)

# The code below only applies to the direct subclass, indirect subclasses are finished here.
if self.__class__ != self.__nsplugin__:
return

self._subplugins = []
for entry in self.SUBPLUGINS:
try:
subplugin = getattr(self.target, entry)
self._subplugins.append(subplugin)
except Exception:
target.log.exception("Failed to load subplugin: %s", entry)

def check_compatible(self) -> None:
if not len(self._subplugins):
raise UnsupportedPluginError("No compatible subplugins found")

def __init_subclass_namespace__(cls, **kwargs):
# If this is a direct subclass of a Namespace plugin, create a reference to the current class for indirect
# subclasses. This is necessary to autogenerate aggregate methods there
cls.__nsplugin__ = cls
cls.__findable__ = False

def __init_subclass_subplugin__(cls, **kwargs):
cls.__findable__ = True

if not getattr(cls.__nsplugin__, "SUBPLUGINS", None):
cls.__nsplugin__.SUBPLUGINS = set()

# Register the current plugin class as a subplugin with
# the direct subclass of NamespacePlugin
cls.__nsplugin__.SUBPLUGINS.add(cls.__namespace__)

# Collect the public attrs of the subplugin
for subplugin_func_name in cls.__exports__:
subplugin_func = inspect.getattr_static(cls, subplugin_func_name)

# The attr need to be callable and exported
if not isinstance(subplugin_func, Callable):
continue

# The method needs to output records
if getattr(subplugin_func, "__output__", None) != "record":
continue

# The method needs to be part of the current subclass and not a parent
if not subplugin_func.__qualname__.startswith(cls.__name__):
continue

# If we already have an aggregate method, skip
if existing_aggregator := getattr(cls.__nsplugin__, subplugin_func_name, None):
existing_aggregator.__subplugins__.append(cls.__namespace__)
continue

# The generic template for the aggregator method
def generate_aggregator(method_name: str) -> Callable:
def aggregator(self) -> Iterator[Record]:
for entry in aggregator.__subplugins__:
try:
subplugin = getattr(self.target, entry)
for item in getattr(subplugin, method_name)():
yield item
except Exception:
continue

# Holds the subplugins that share this method
aggregator.__subplugins__ = []

return aggregator

# The generic template for the documentation method
def generate_documentor(cls, method_name: str, aggregator: Callable) -> str:
def documentor():
return defaultdict(
lambda: "???",
{
"func_name": f"{cls.__nsplugin__.__namespace__}.{method_name}",
"short_description": "".join(
[
f"Return {method_name} for: ",
",".join(aggregator.__subplugins__),
]
),
"output_type": "records",
},
)

return documentor

# Manifacture a method for the namespaced class
generated_aggregator = generate_aggregator(subplugin_func_name)
generated_documentor = generate_documentor(cls, subplugin_func_name, generated_aggregator)

# Add as an attribute to the namespace class
setattr(cls.__nsplugin__, subplugin_func_name, generated_aggregator)

# Copy the meta descriptors of the function attribute
for copy_attr in ["__output__", "__record__", "__doc__", "__exported__"]:
setattr(generated_aggregator, copy_attr, getattr(subplugin_func, copy_attr, None))

# Add subplugin to aggregator
generated_aggregator.__subplugins__.append(cls.__namespace__)

# Mark the function as being autogenerated
setattr(generated_aggregator, "__autogen__", True)

# Add the documentor function to the aggregator
setattr(generated_aggregator, "get_func_doc_spec", generated_documentor)

# Register the newly auto-created method
cls.__nsplugin__.__exports__.append(subplugin_func_name)
cls.__nsplugin__.__functions__.append(subplugin_func_name)

def __init_subclass__(cls, **kwargs):
# Upon subclassing, decide whether this is a direct subclass of NamespacePlugin
# If this is not the case, autogenerate aggregate methods for methods record output.
super().__init_subclass__(**kwargs)
if cls.__bases__[0] != NamespacePlugin:
cls.__init_subclass_subplugin__(cls, **kwargs)
else:
cls.__init_subclass_namespace__(cls, **kwargs)


class InternalPlugin(Plugin):
"""Parent class for internal plugins.

Expand All @@ -816,7 +952,7 @@ def __init_subclass__(cls, **kwargs):
class PluginFunction:
name: str
output_type: str
class_object: str
class_object: type[Plugin]
method_name: str
plugin_desc: dict = field(hash=False)

Expand Down Expand Up @@ -845,6 +981,7 @@ def all_plugins():
if "get_all_records" in available["exports"]:
available["exports"].remove("get_all_records")
modulepath = available["module"]

if modulepath.endswith("._os"):
if not target._os:
# if no target available add a namespaceless section
Expand All @@ -864,9 +1001,7 @@ def all_plugins():


def find_plugin_functions(
target: Target,
patterns: str,
compatibility: bool = False,
target: Target, patterns: str, compatibility: bool = False, **kwargs
) -> tuple[list[PluginFunction], set[str]]:
"""Finds plugins that match the target and the patterns.

Expand All @@ -875,9 +1010,15 @@ def find_plugin_functions(
a list of plugin function descriptors (including output types).
"""
result = []

def add_to_result(func: PluginFunction) -> None:
if func not in result and not func.class_object.__skip__:
result.append(func)

functions, rootset = plugin_function_index(target)

invalid_funcs = set()
show_hidden = kwargs.get("show_hidden", False)

for pattern in patterns.split(","):
# backward compatibility fix for namespace-level plugins (i.e. chrome)
Expand All @@ -887,8 +1028,17 @@ def find_plugin_functions(

wildcard = any(char in pattern for char in ["*", "!", "?", "[", "]"])
treematch = pattern.split(".")[0] in rootset and pattern != "os"
exact_match = pattern in functions

# Allow for exact matches, otherwise you cannot reach documented namespace plugins like
# browsers.browser.downloads. You can *always* run these using the namespace/classic-style like:
# browser.downloads (but -l lists them in the tree for documentation purposes so it would be misleading
# not to allow tree access as well). Note that these tree items will never respond to wildcards though
# (browsers.browser.* won't work) to avoid duplicate results.
if exact_match:
show_hidden = True

if treematch and not wildcard:
if treematch and not wildcard and not exact_match:
# Examples:
# -f browsers -> browsers* (the whole package)
# -f apps.webservers.iis -> apps.webservers.iis* (logs etc)
Expand All @@ -905,7 +1055,7 @@ def find_plugin_functions(
loaded_plugin_object = load(func)

# Skip plugins that don't want to be found by wildcards
if not loaded_plugin_object.__findable__:
if not show_hidden and not loaded_plugin_object.__findable__:
continue

fobject = inspect.getattr_static(loaded_plugin_object, method_name)
Expand All @@ -918,7 +1068,7 @@ def find_plugin_functions(
continue

matches = True
result.append(
add_to_result(
PluginFunction(
name=index_name,
class_object=loaded_plugin_object,
Expand Down Expand Up @@ -956,7 +1106,7 @@ def find_plugin_functions(
if compatibility and not loaded_plugin_object(target).is_compatible():
continue

result.append(
add_to_result(
PluginFunction(
name=f"{description['module']}.{pattern}",
class_object=loaded_plugin_object,
Expand All @@ -966,4 +1116,4 @@ def find_plugin_functions(
)
)

return list(set(result)), invalid_funcs
return result, invalid_funcs
11 changes: 7 additions & 4 deletions dissect/target/plugins/apps/remoteaccess/anydesk.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from datetime import datetime

from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.plugin import Plugin, export
from dissect.target.plugins.apps.remoteaccess.remoteaccess import RemoteAccessRecord
from dissect.target.plugin import export
from dissect.target.plugins.apps.remoteaccess.remoteaccess import (
RemoteAccessPlugin,
RemoteAccessRecord,
)


class AnydeskPlugin(Plugin):
class AnydeskPlugin(RemoteAccessPlugin):
"""
Anydesk plugin.
"""
Expand Down Expand Up @@ -38,7 +41,7 @@ def check_compatible(self):
raise UnsupportedPluginError("No Anydesk logs found")

@export(record=RemoteAccessRecord)
def remoteaccess(self):
def logs(self):
"""Return the content of the AnyDesk logs.

AnyDesk is a remote desktop application and can be used by adversaries to get (persistent) access to a machine.
Expand Down
Loading