Skip to content

Commit

Permalink
DIS-1681 Namespace Plugin with autogenerated aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
cecinestpasunepipe committed Aug 2, 2023
1 parent 29b54d7 commit d558a11
Show file tree
Hide file tree
Showing 17 changed files with 253 additions and 204 deletions.
15 changes: 11 additions & 4 deletions dissect/target/helpers/docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ def get_full_func_name(plugin_class: Type, func: Callable) -> str:
return func_name


FUNC_DOC_TEMPLATE = "{func_name} - {short_description} (output: {output_type})"


def get_func_description(func: Callable, with_docstrings: bool = False) -> str:
klass, func = get_real_func_obj(func)
func_output, func_doc = get_func_details(func)
Expand All @@ -88,7 +91,9 @@ def get_func_description(func: Callable, with_docstrings: bool = False) -> str:
desc = "\n".join([func_title, "", func_doc])
else:
docstring_first_line = func_doc.splitlines()[0].lstrip()
desc = f"{func_name} - {docstring_first_line} (output: {func_output})"
desc = FUNC_DOC_TEMPLATE.format(
func_name=func_name, short_description=docstring_first_line, output_type=func_output
)

return desc

Expand All @@ -97,9 +102,11 @@ def get_plugin_functions_desc(plugin_class: Type, with_docstrings: bool = False)
descriptions = []
for func_name in plugin_class.__exports__:
func_obj = getattr(plugin_class, func_name)
_, func = get_real_func_obj(func_obj)

func_desc = get_func_description(func, with_docstrings=with_docstrings)
if getattr(func_obj, "__documentor__", None):
func_desc = func_obj.__documentor__(FUNC_DOC_TEMPLATE)
else:
_, func = get_real_func_obj(func_obj)
func_desc = get_func_description(func, with_docstrings=with_docstrings)
descriptions.append(func_desc)

# sort functions in the plugin alphabetically
Expand Down
2 changes: 1 addition & 1 deletion dissect/target/helpers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class StrEnum(str, Enum):

def list_to_frozen_set(function):
def wrapper(*args):
args = [frozenset(x) if type(x) == list else x for x in args]
args = [frozenset(x) if isinstance(x, list) else x for x in args]
return function(*args)

return wrapper
Expand Down
158 changes: 154 additions & 4 deletions dissect/target/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
import os
import sys
import traceback
from collections import defaultdict
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Iterator, Optional, Type

from dissect.target.exceptions import PluginError
from flow.record import Record, RecordDescriptor

from dissect.target.exceptions import PluginError, UnsupportedPluginError
from dissect.target.helpers import cache
from dissect.target.helpers.record import EmptyRecord

Expand All @@ -30,8 +33,6 @@
GENERATED = False

if TYPE_CHECKING:
from flow.record import Record, RecordDescriptor

from dissect.target import Target
from dissect.target.filesystem import Filesystem
from dissect.target.helpers.record import ChildTargetRecord
Expand Down Expand Up @@ -424,6 +425,9 @@ def register(plugincls: Type[Plugin]) -> None:
if isinstance(attr, property):
attr = attr.fget

if getattr(attr, "__autogen__", False) and plugincls != plugincls.__nsplugin__:
continue

if getattr(attr, "__exported__", False):
exports.append(attr.__name__)
functions.append(attr.__name__)
Expand Down Expand Up @@ -793,8 +797,154 @@ def _modulepath(cls) -> str:
return cls.__module__.replace(MODULE_PATH, "").lstrip(".")


# Needs to be at the bottom of the module because __init_subclass__ requires everything
# These need to be at the bottom of the module because __init_subclass__ requires everything
# in the parent class Plugin to be defined and resolved.
class NamespacePlugin(Plugin):
def __init__(self, target: Target):
"""A Namespace plugin provides services to access functionality
from a group of subplugins through an internal function _func.
Upon initialisation, subplugins are collected.
"""
super().__init__(target)

# The code below only applies to the direct subclass
# indirect subclasses are finished here.
if self.__class__ != self.__nsplugin__:
return

self._subplugins = []
for entry in self.SUBPLUGINS:
try:
subplugin = getattr(self.target, entry)
self._subplugins.append(subplugin)
except Exception: # noqa
target.log.exception("Failed to load subplugin: %s", entry)

def _func(self, func_name: str, subplugins: list) -> Iterator[Record]:
"""Return the supported subplugin records.
Args:
func_name: Exported function of the subplugin to find.
Yields:
Record from the sub function.
"""
for entry in subplugins:
try:
subplugin = getattr(self.target, entry)
for item in getattr(subplugin, func_name)():
yield item
except Exception:
continue

def check_compatible(self) -> None:
if not len(self._subplugins):
raise UnsupportedPluginError("No compatible subplugins found")

def __init_subclass_namespace__(cls, **kwargs):
# If this is a direct subclass of a Namespace Plugin,
# create a reference to the current class for indirect subclasses
# so that the can autogenerate aggregate methods there
cls.__nsplugin__ = cls

def __init_subclass_subplugin__(cls, **kwargs):
# Does the direct subclass already have a SUBPLUGINS attribute?
# if not, create this attribute on the direct subclass
if not getattr(cls.__nsplugin__, "SUBPLUGINS", None):
cls.__nsplugin__.SUBPLUGINS = set()

# Register the current plugin class as a subplugin with
# the direct subclass of NamespacePlugin
cls.__nsplugin__.SUBPLUGINS.add(cls.__namespace__)

# Collect the public attrs of the subplugin
for subplugin_func_name in get_nonprivate_attribute_names(cls):
subplugin_func = inspect.getattr_static(cls, subplugin_func_name)

# The attr need to be callable and exported
if not isinstance(subplugin_func, Callable) or not getattr(subplugin_func, "__exported__", False):
continue

# The method needs to have a single record descriptor as output
if not isinstance(getattr(subplugin_func, "__record__", None), RecordDescriptor):
continue

# The method needs to be part of the current subclass and not a parent
if not subplugin_func.__qualname__.startswith(cls.__name__):
continue

# If we already have an aggregate method, skip
if existing_aggregator := getattr(cls.__nsplugin__, subplugin_func_name, None):
existing_aggregator.__subplugins__.append(cls.__namespace__)
continue

# The generic template for the aggregator method
def generate_aggregator(method_name):
def aggregator(self) -> Iterator[Record]:
yield from self._func(method_name, aggregator.__subplugins__)

# Holds the subplugins that share this method
aggregator.__subplugins__ = []

return aggregator

# The generic template for the documentation method
def generate_documentor(cls, method_name: str, aggregator: Callable) -> str:
def documentor(format_spec: str):
return format_spec.format_map(
defaultdict(
lambda: "???",
{
"func_name": f"{cls.__nsplugin__.__namespace__}.{method_name}",
"short_description": "".join(
["Return aggregated records for: ", ",".join(aggregator.__subplugins__)]
),
"output_type": "records",
},
)
)

return documentor

# Manifacture a method for the namespaced class
generated_aggregator = generate_aggregator(subplugin_func_name)
generated_documentor = generate_documentor(cls, subplugin_func_name, generated_aggregator)

# Add as a attribute to the namespace class
setattr(cls.__nsplugin__, subplugin_func_name, generated_aggregator)

# Copy the meta descriptors of the function attribute
copy_attrs = ["__output__", "__record__", "__doc__", "__exported__"]
for copy_attr in copy_attrs:
setattr(generated_aggregator, copy_attr, getattr(subplugin_func, copy_attr, None))

# Add subplugin to aggregator
generated_aggregator.__subplugins__.append(cls.__namespace__)

# Mark the function as being autogenerated
setattr(generated_aggregator, "__autogen__", True)

# Add the documentor function to the aggregator
setattr(generated_aggregator, "__documentor__", generated_documentor)

# Register the newly auto-created method
cls.__nsplugin__.__exports__.append(subplugin_func_name)
cls.__nsplugin__.__functions__.append(subplugin_func_name)

def __init_subclass__(cls, **kwargs):
# Upon subclassing,
# decide whether this is a direct subclass of NamespacePlugin
# If this is not the case, autogenerate aggregate methods for
# methods with similar record types and signatures in the
# direct subclass of NamespacePlugin

super().__init_subclass__(**kwargs)
if cls.__bases__[0] != NamespacePlugin:
cls.__init_subclass_subplugin__(cls, **kwargs)
else:
cls.__init_subclass_namespace__(cls, **kwargs)


class InternalPlugin(Plugin):
"""Parent class for internal plugins.
Expand Down
11 changes: 7 additions & 4 deletions dissect/target/plugins/apps/remoteaccess/anydesk.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from datetime import datetime

from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.plugin import Plugin, export
from dissect.target.plugins.apps.remoteaccess.remoteaccess import RemoteAccessRecord
from dissect.target.plugin import export
from dissect.target.plugins.apps.remoteaccess.remoteaccess import (
RemoteAccessPlugin,
RemoteAccessRecord,
)


class AnydeskPlugin(Plugin):
class AnydeskPlugin(RemoteAccessPlugin):
"""
Anydesk plugin.
"""
Expand Down Expand Up @@ -38,7 +41,7 @@ def check_compatible(self):
raise UnsupportedPluginError("No Anydesk logs found")

@export(record=RemoteAccessRecord)
def remoteaccess(self):
def logs(self):
"""Return the content of the AnyDesk logs.
AnyDesk is a remote desktop application and can be used by adversaries to get (persistent) access to a machine.
Expand Down
52 changes: 2 additions & 50 deletions dissect/target/plugins/apps/remoteaccess/remoteaccess.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.helpers.descriptor_extensions import UserRecordDescriptorExtension
from dissect.target.helpers.record import create_extended_descriptor
from dissect.target.plugin import Plugin, export
from dissect.target.plugin import NamespacePlugin

RemoteAccessRecord = create_extended_descriptor([UserRecordDescriptorExtension])(
"application/log/remoteaccess",
Expand All @@ -14,7 +13,7 @@
)


class RemoteAccessPlugin(Plugin):
class RemoteAccessPlugin(NamespacePlugin):
"""General Remote Access plugin.
This plugin groups the functions of all remote access plugins. For example,
Expand All @@ -23,50 +22,3 @@ class RemoteAccessPlugin(Plugin):
"""

__namespace__ = "remoteaccess"
__findable__ = False

TOOLS = [
"teamviewer",
"anydesk",
]

def __init__(self, target):
super().__init__(target)
self._plugins = []
for entry in self.TOOLS:
try:
self._plugins.append(getattr(self.target, entry))
except Exception: # noqa
target.log.exception("Failed to load tool plugin: %s", entry)

def check_compatible(self):
if not len(self._plugins):
raise UnsupportedPluginError("No compatible tool plugins found")

def _func(self, f):
for p in self._plugins:
try:
for entry in getattr(p, f)():
yield entry
except Exception:
self.target.log.exception("Failed to execute tool plugin: {}.{}", p._name, f)

@export(record=RemoteAccessRecord)
def remoteaccess(self):
"""Return Remote Access records from all Remote Access Tools.
This plugin groups the functions of all remote access plugins. For example, instead of having to run both
teamviewer.remoteaccess and anydesk.remoteaccess, you only have to run remoteaccess.remoteaccess to get output
from both tools.
Yields RemoteAccessRecords with the following fields:
('string', 'hostname'),
('string', 'domain'),
('datetime', 'ts'),
('string', 'user'),
('string', 'tool'),
('uri', 'logfile'),
('string', 'description')
"""
for e in self._func("remoteaccess"):
yield e
11 changes: 7 additions & 4 deletions dissect/target/plugins/apps/remoteaccess/teamviewer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from datetime import datetime

from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.plugin import Plugin, export
from dissect.target.plugins.apps.remoteaccess.remoteaccess import RemoteAccessRecord
from dissect.target.plugin import export
from dissect.target.plugins.apps.remoteaccess.remoteaccess import (
RemoteAccessPlugin,
RemoteAccessRecord,
)


class TeamviewerPlugin(Plugin):
class TeamviewerPlugin(RemoteAccessPlugin):
"""
Teamviewer plugin.
"""
Expand Down Expand Up @@ -39,7 +42,7 @@ def check_compatible(self):
raise UnsupportedPluginError("No Teamviewer logs found")

@export(record=RemoteAccessRecord)
def remoteaccess(self):
def logs(self):
"""Return the content of the TeamViewer logs.
TeamViewer is a commercial remote desktop application. An adversary may use it to gain persistence on a
Expand Down
Loading

0 comments on commit d558a11

Please sign in to comment.