Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core, plugin] : Virtual mappings dumping and caching #1237

Draft
wants to merge 7 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions volatility3/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,31 +417,33 @@ def run(self):
if os.path.exists(virtmap_metadata_filename):
with open(virtmap_metadata_filename, "r") as f:
map_metadata = json.loads(f.read())
layers_classes = map_metadata["layers_classes"]
layers_identifiers = map_metadata["layers_identifiers"]
sections_per_layer = map_metadata["sections_per_layer"]
else:
vollog.debug("Saving virtmap cache file metadata to Volatility3 cache")
raw_json = lzma.decompress(virtmap_cache_content)
json_val: dict = json.loads(raw_json)
layers_classes = list(json_val.keys())
layers_identifiers = list(json_val.keys())

sections_per_layer = {}
for layer_class, sections in json_val.items():
sections_per_layer[layer_class] = list(sections.keys())
for layer_identifier, sections in json_val.items():
sections_per_layer[layer_identifier] = list(sections.keys())

# Save metadata in the Vol3 cache, to avoid the costly
# decompression and deserialization process on each run.
with open(virtmap_metadata_filename, "w+") as f:
json.dump(
{
"layers_classes": list(json_val.keys()),
"layers_identifiers": list(json_val.keys()),
"sections_per_layer": sections_per_layer,
},
f,
)

ctx.config[path_join("virtmap_cache", "filepath")] = args.virtmap_cache_path
ctx.config[path_join("virtmap_cache", "layers_classes")] = layers_classes
ctx.config[path_join("virtmap_cache", "layers_identifiers")] = (
layers_identifiers
)
ctx.config.splice(
path_join("virtmap_cache", "sections_per_layer"),
interfaces.configuration.HierarchicalDict(sections_per_layer),
Expand Down
22 changes: 10 additions & 12 deletions volatility3/framework/interfaces/layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,23 +482,21 @@ def _access_virtmap_cache(self, section: Tuple[int, int]) -> Optional[list]:
A list containing mappings for a specific section of this layer"""

# Check if layer is fully constructed first
if self.config.get("class") and self.context.config.get(
if self.context.config.get(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What object has the requirement on this? It should be that the layer that uses it has an optional requirement on it, so that it'll get saved into any config files that get constructed. The layer identifier isn't even close to unique (almost all plugins use the same layer name, and class) so this will go badly when you have multiple layers you want to use this on (or a config you want to work for multiple images).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The layer_identifier should be unique ? Example layer_identifiers :

  • volatility3.framework.layers.intel.WindowsIntel32e.layer_name -> identifies the kernel layer (TranslationLayerRequirement name)
  • volatility3.framework.layers.intel.WindowsIntel32e.layer_name_Process5948 -> identifies the process 5948

I might have missed something, but it shouldn't be possible to have a duplicate layer string identifier in the layers pool ?

This specific "config"/"cache" is intended to be used for a unique memory capture, as even a dump from the same kernel a few seconds later would have different mappings.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Layer name is unique for a run of volatility, but they'll likely all say primary1 or memory_layer1 or something. Across runs they're unlikely to even be different.

The process layers, similarly, won't be different across different images that have processes with the same pid... I don't think a dump from a few seconds later would have a different cache? The layer name would likely be the same, and many of the process layer names would too, but also, it could match a wildly different image...

path_join("virtmap_cache", "filepath")
):
) and self.config.get("class"):
filepath = self.context.config[path_join("virtmap_cache", "filepath")]
layers_classes = self.context.config[
path_join("virtmap_cache", "layers_classes")
layer_identifier = path_join(self.config["class"], self.name)
layers_identifiers = self.context.config[
path_join("virtmap_cache", "layers_identifiers")
]

# Exact match only, even if a requested section would *fit*
# Exact section match only, even if a requested section would *fit*
# inside one available in the cache.
if (
self.config["class"] in layers_classes
layer_identifier in layers_identifiers
and str(section)
in self.context.config[
path_join(
"virtmap_cache", "sections_per_layer", self.config["class"]
)
path_join("virtmap_cache", "sections_per_layer", layer_identifier)
]
):
# Avoid decompressing and deserializing the file
Expand All @@ -511,9 +509,9 @@ def _access_virtmap_cache(self, section: Tuple[int, int]) -> Optional[list]:

vollog.log(
constants.LOGLEVEL_VVV,
f"Applying virtmap cache to section {section} of layer {self.config['class']}",
f'Applying virtmap cache to section "{section}" of layer "{layer_identifier}"',
)
return self._virtmap_cache_dict[self.config["class"]][str(section)]
return self._virtmap_cache_dict[layer_identifier][str(section)]
return None

@functools.lru_cache(maxsize=512)
Expand Down
135 changes: 108 additions & 27 deletions volatility3/framework/plugins/windows/virtmapscanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@
import functools
import json
import lzma
import traceback

from typing import Iterable, Type, Tuple
from volatility3.framework import renderers, interfaces
from typing import Iterable, Type, Tuple, Dict
from volatility3.framework import renderers, interfaces, constants, exceptions
from volatility3.framework.configuration import requirements
from volatility3.framework.layers.scanners import BytesScanner

from volatility3.framework.interfaces.configuration import path_join
from volatility3.plugins.windows import pslist

vollog = logging.getLogger(__name__)


class VirtMapScanner(interfaces.plugins.PluginInterface):
"""Scans the entire kernel virtual memory space, and dumps its content to the disk. Allows to speed-up mapping operations afterwards, by specifying the output file as an argument to --virtmap-cache-path."""
"""Scans by default the entire kernel virtual memory space, and dumps its content to the disk. Allows to speed-up mapping operations afterwards, by specifying the output file as an argument to --virtmap-cache-path."""

_required_framework_version = (2, 0, 0)
_version = (1, 0, 0)
ikelos marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -30,30 +32,24 @@ def get_requirements(cls):
description="Windows kernel",
architectures=["Intel32", "Intel64"],
),
requirements.PluginRequirement(
name="pslist", plugin=pslist.PsList, version=(2, 0, 0)
),
requirements.BooleanRequirement(
name="scan-processes",
description="Scan each process address space",
default=False,
optional=True,
),
]

@classmethod
def virtmap_cache_file_producer(
cls,
sections: Iterable[Tuple[int, int]],
layer: interfaces.layers.DataLayerInterface,
results: dict,
open_method: Type[interfaces.plugins.FileHandlerInterface],
filename: str = "virtmapcache.json.xz",
):
results = {}
for section in sections:
scan_iterator = functools.partial(
layer._scan_iterator, BytesScanner(""), [section]
)
scan_values = list(scan_iterator())
results[str(section)] = scan_values

results = {layer.config["class"]: results}

# Prefer a simpler filename for convenience when passing as an argument
# Leave the task of distinguishing between multiple virtmapcache files to the user
# formatted_sections = "_".join([f"{hex(s[0])}-{hex(s[0] + s[1])}" for s in sections])
# filename = f"virtmapcache_{layer.name}_{formatted_sections}.json.xz"
filename = "virtmapcache.json.xz"
file_handle = open_method(filename)
json_data = json.dumps(results).encode()
xz_data = lzma.compress(json_data)
Expand All @@ -62,29 +58,114 @@ def virtmap_cache_file_producer(

return file_handle.preferred_filename

@classmethod
def virtmap_cache_scanner(
cls,
layer: interfaces.layers.DataLayerInterface,
sections: Iterable[Tuple[int, int]],
progress_callback: constants.ProgressCallback = None,
):
layer_results = {}
scanner = BytesScanner("")
for section in sections:
scan_iterator = functools.partial(layer._scan_iterator, scanner, [section])
scan_metric = layer._scan_metric(scanner, [section])
scan_values = []
try:
for value in scan_iterator():
scan_values.append(value)
if progress_callback:
progress_callback(
scan_metric(value[1]),
f"Scanning {layer.name} using {scanner.__class__.__name__}",
)
except Exception as e:
vollog.debug(f"Scan Failure: {str(e)}")
vollog.log(
constants.LOGLEVEL_VVV,
"\n".join(
traceback.TracebackException.from_exception(e).format(
chain=True
)
),
)

layer_results[str(section)] = scan_values

return layer_results

@classmethod
def virtmap_cache_producer(
cls,
layers_sections: Dict[
interfaces.layers.DataLayerInterface, Iterable[Tuple[int, int]]
],
progress_callback: constants.ProgressCallback = None,
):
layers_results = {}

for layer, sections in layers_sections.items():
layer_results = cls.virtmap_cache_scanner(
layer, sections, progress_callback
)
# Clearly identify this layer, by concatenating the layer class and the layer name
layer_identifier = path_join(layer.config["class"], layer.name)
layers_results[layer_identifier] = layer_results

return layers_results

def _generator(self):
kernel = self.context.modules[self.config["kernel"]]
kernel_layer = self.context.layers[kernel.layer_name]
sections = [
layers_sections = {}
layers_sections[kernel_layer] = [
(
kernel_layer.minimum_address,
kernel_layer.maximum_address - kernel_layer.minimum_address,
)
]
if self.config["scan-processes"]:
for proc in pslist.PsList.list_processes(
context=self.context,
layer_name=kernel.layer_name,
symbol_table=kernel.symbol_table_name,
):
proc_id = "Unknown"
try:
proc_id = proc.UniqueProcessId
proc_layer_name = proc.add_process_layer()
except exceptions.InvalidAddressException as excp:
vollog.debug(
"Process {}: invalid address {} in layer {}".format(
proc_id, excp.invalid_address, excp.layer_name
)
)
continue

proc_layer = self.context.layers[proc_layer_name]
layers_sections[proc_layer] = [
(
proc_layer.minimum_address,
proc_layer.maximum_address - proc_layer.minimum_address,
)
]

layers_results = self.virtmap_cache_producer(
layers_sections, self._progress_callback
)
virtmapcache_filename = self.virtmap_cache_file_producer(
layers_results, self.open
)

res = (
0,
(
str(sections),
self.virtmap_cache_file_producer(sections, kernel_layer, self.open),
),
(virtmapcache_filename,),
)
yield res

def run(self):
return renderers.TreeGrid(
[
("Sections", str),
("Virtual mappings cache file output", str),
],
self._generator(),
Expand Down
Loading