Skip to content

Commit

Permalink
Add JSON and YAML support to configutil (fox-it#528)
Browse files Browse the repository at this point in the history
(DIS-2160)
  • Loading branch information
Miauwkeru authored and Zawadidone committed Apr 5, 2024
1 parent 5ef2c96 commit 21c4d9d
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 7 deletions.
10 changes: 6 additions & 4 deletions dissect/target/filesystems/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ def _convert_entry(
try:
config_parser = parse(entry, *args, **kwargs)
entry = ConfigurationEntry(self, entry.path, entry, config_parser)
except ConfigurationParsingError:
except ConfigurationParsingError as e:
# If a parsing error gets created, it should return the `entry`
log.debug("Error when parsing %s", entry.path)
log.debug("Error when parsing %s with message '%s'", entry.path, e)

return entry

Expand Down Expand Up @@ -220,13 +220,15 @@ def _write_value_mapping(self, values: dict[str, Any], indentation_nr: int = 0)
output_buffer = io.StringIO()

if isinstance(values, list):
output_buffer.write(textwrap.indent(text="\n".join(values), prefix=prefix))
# Explicitly convert the list values to strings
_text = "\n".join(str(val) for val in values)
output_buffer.write(textwrap.indent(text=_text, prefix=prefix))
elif hasattr(values, "keys"):
for key, value in values.items():
output_buffer.write(textwrap.indent(key, prefix=prefix) + "\n")
output_buffer.write(self._write_value_mapping(value, indentation_nr + 4))
else:
output_buffer.write(textwrap.indent(values, prefix=prefix) + "\n")
output_buffer.write(textwrap.indent(str(values), prefix=prefix) + "\n")

return output_buffer.getvalue()

Expand Down
85 changes: 83 additions & 2 deletions dissect/target/helpers/configutil.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import io
import json
import re
from collections import deque
from configparser import ConfigParser, MissingSectionHeaderError
Expand All @@ -26,6 +27,13 @@
from dissect.target.filesystem import FilesystemEntry
from dissect.target.helpers.fsutil import TargetPath

try:
import yaml

PY_YAML = True
except (AttributeError, ImportError):
PY_YAML = False


def _update_dictionary(current: dict[str, Any], key: str, value: Any) -> None:
if prev_value := current.get(key):
Expand Down Expand Up @@ -151,7 +159,7 @@ def read_file(self, fh: TextIO) -> None:
try:
self.parse_file(fh)
except Exception as e:
raise ConfigurationParsingError from e
raise ConfigurationParsingError(*e.args) from e

if self.collapse_all or self.collapse:
self.parsed_data = self._collapse_dict(self.parsed_data)
Expand Down Expand Up @@ -329,6 +337,77 @@ def parse_file(self, fh: TextIO) -> None:
self.parsed_data = tree


class ListUnwrapper:
"""Provides utility functions to unwrap dictionary objects out of lists."""

@staticmethod
def unwrap(data: Union[dict, list]) -> Union[dict, list]:
"""Transforms a list with dictionaries to a dictionary.
The order of the list is preserved. If no dictionary is found,
the list remains untouched:
["value1", "value2"] -> ["value1", "value2"]
{"data": "value"} -> {"data": "value"}
[{"data": "value"}] -> {
"list_item0": {
"data": "value"
}
}
"""
orig = ListUnwrapper._unwrap_dict_list(data)
return ListUnwrapper._unwrap_dict(orig)

@staticmethod
def _unwrap_dict(data: Union[dict, list]) -> Union[dict, list]:
"""Looks for dictionaries and unwraps its values."""

if not isinstance(data, dict):
return data

root = dict()
for key, value in data.items():
_value = ListUnwrapper._unwrap_dict_list(value)
if isinstance(_value, dict):
_value = ListUnwrapper._unwrap_dict(_value)
root[key] = _value

return root

@staticmethod
def _unwrap_dict_list(data: Union[dict, list]) -> Union[dict, list]:
"""Unwraps a list containing dictionaries."""
if not isinstance(data, list) or not any(isinstance(obj, dict) for obj in data):
return data

return_value = {}
for idx, elem in enumerate(data):
return_value[f"list_item{idx}"] = elem

return return_value


class Json(ConfigurationParser):
"""Parses a JSON file."""

def parse_file(self, fh: TextIO):
parsed_data = json.load(fh)
self.parsed_data = ListUnwrapper.unwrap(parsed_data)


class Yaml(ConfigurationParser):
"""Parses a Yaml file."""

def parse_file(self, fh: TextIO) -> None:
if PY_YAML:
parsed_data = yaml.load(fh, yaml.BaseLoader)
self.parsed_data = ListUnwrapper.unwrap(parsed_data)
else:
raise ConfigurationParsingError("Failed to parse file, please install PyYAML.")


class ScopeManager:
"""A (context)manager for dictionary scoping.
Expand Down Expand Up @@ -609,7 +688,9 @@ def create_parser(self, options: Optional[ParserOptions] = None) -> Configuratio
CONFIG_MAP: dict[tuple[str, ...], ParserConfig] = {
"ini": ParserConfig(Ini),
"xml": ParserConfig(Xml),
"json": ParserConfig(Txt),
"json": ParserConfig(Json),
"yml": ParserConfig(Yaml),
"yaml": ParserConfig(Yaml),
"cnf": ParserConfig(Default),
"conf": ParserConfig(Default, separator=(r"\s",)),
"sample": ParserConfig(Txt),
Expand Down
31 changes: 31 additions & 0 deletions tests/helpers/test_configutil.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import textwrap
from io import StringIO
from pathlib import Path
from typing import Union

import pytest

from dissect.target.helpers.configutil import (
ConfigurationParser,
Default,
Indentation,
Json,
ScopeManager,
SystemD,
)
Expand Down Expand Up @@ -227,3 +229,32 @@ def test_systemd_basic_syntax() -> None:
parser.parse_file(StringIO(data.read_text()))

assert parser.parsed_data == output


@pytest.mark.parametrize(
"data_string, expected_data",
[
(r'{"data" : "value"}', {"data": "value"}),
(r'[{"data" : "value"}]', {"list_item0": {"data": "value"}}),
(
r'[{"data" : "value"}, {"data" : "value2"}]',
{"list_item0": {"data": "value"}, "list_item1": {"data": "value2"}},
),
(
r'[{"data": [{"key1": "value1"}, {"key1": "value2"}]}]',
{
"list_item0": {
"data": {
"list_item0": {"key1": "value1"},
"list_item1": {"key1": "value2"},
},
},
},
),
],
)
def test_json_syntax(data_string: str, expected_data: Union[dict, list]) -> None:
parser = Json()
parser.parse_file(StringIO(data_string))

assert parser.parsed_data == expected_data
3 changes: 2 additions & 1 deletion tests/plugins/general/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ def test_collapse_types(
[
("ini", b"[DEFAULT]\nkey=value"),
("xml", b"<a>currently_just_text</a>"),
("json", b"currently_just_text"),
("json", b'{"key": "value"}'),
("yaml", b"key: value"),
("cnf", b"key=value"),
("conf", b"key value"),
("sample", b"currently_just_text"),
Expand Down

0 comments on commit 21c4d9d

Please sign in to comment.