Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JSON and YAML support to configutil #528

Merged
merged 4 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions dissect/target/filesystems/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@
try:
config_parser = parse(entry, *args, **kwargs)
entry = ConfigurationEntry(self, entry.path, entry, config_parser)
except ConfigurationParsingError:
except ConfigurationParsingError as e:
# If a parsing error gets created, it should return the `entry`
log.debug("Error when parsing %s", entry.path)
log.debug("Error when parsing %s with message '%s'", entry.path, e)

Check warning on line 125 in dissect/target/filesystems/config.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/config.py#L125

Added line #L125 was not covered by tests

return entry

Expand Down Expand Up @@ -220,13 +220,15 @@
output_buffer = io.StringIO()

if isinstance(values, list):
output_buffer.write(textwrap.indent(text="\n".join(values), prefix=prefix))
# Explicitly convert the list values to strings
_text = "\n".join(str(val) for val in values)
output_buffer.write(textwrap.indent(text=_text, prefix=prefix))

Check warning on line 225 in dissect/target/filesystems/config.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/config.py#L224-L225

Added lines #L224 - L225 were not covered by tests
elif hasattr(values, "keys"):
for key, value in values.items():
output_buffer.write(textwrap.indent(key, prefix=prefix) + "\n")
output_buffer.write(self._write_value_mapping(value, indentation_nr + 4))
else:
output_buffer.write(textwrap.indent(values, prefix=prefix) + "\n")
output_buffer.write(textwrap.indent(str(values), prefix=prefix) + "\n")

return output_buffer.getvalue()

Expand Down
85 changes: 83 additions & 2 deletions dissect/target/helpers/configutil.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import io
import json
import re
from collections import deque
from configparser import ConfigParser, MissingSectionHeaderError
Expand All @@ -26,6 +27,13 @@
from dissect.target.filesystem import FilesystemEntry
from dissect.target.helpers.fsutil import TargetPath

try:
import yaml

PY_YAML = True
except (AttributeError, ImportError):
PY_YAML = False

Check warning on line 35 in dissect/target/helpers/configutil.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/helpers/configutil.py#L34-L35

Added lines #L34 - L35 were not covered by tests


def _update_dictionary(current: dict[str, Any], key: str, value: Any) -> None:
if prev_value := current.get(key):
Expand Down Expand Up @@ -151,7 +159,7 @@
try:
self.parse_file(fh)
except Exception as e:
raise ConfigurationParsingError from e
raise ConfigurationParsingError(*e.args) from e

Check warning on line 162 in dissect/target/helpers/configutil.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/helpers/configutil.py#L162

Added line #L162 was not covered by tests

if self.collapse_all or self.collapse:
self.parsed_data = self._collapse_dict(self.parsed_data)
Expand Down Expand Up @@ -329,6 +337,77 @@
self.parsed_data = tree


class ListUnwrapper:
"""Provides utility functions to unwrap dictionary objects out of lists."""

@staticmethod
def unwrap(data: Union[dict, list]) -> Union[dict, list]:
"""Transforms a list with dictionaries to a dictionary.

The order of the list is preserved. If no dictionary is found,
the list remains untouched:

["value1", "value2"] -> ["value1", "value2"]

{"data": "value"} -> {"data": "value"}

[{"data": "value"}] -> {
"list_item0": {
"data": "value"
}
}
"""
orig = ListUnwrapper._unwrap_dict_list(data)
return ListUnwrapper._unwrap_dict(orig)

@staticmethod
def _unwrap_dict(data: Union[dict, list]) -> Union[dict, list]:
"""Looks for dictionaries and unwraps its values."""

if not isinstance(data, dict):
return data

Check warning on line 368 in dissect/target/helpers/configutil.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/helpers/configutil.py#L368

Added line #L368 was not covered by tests

root = dict()
for key, value in data.items():
_value = ListUnwrapper._unwrap_dict_list(value)
if isinstance(_value, dict):
_value = ListUnwrapper._unwrap_dict(_value)
root[key] = _value

return root

@staticmethod
def _unwrap_dict_list(data: Union[dict, list]) -> Union[dict, list]:
"""Unwraps a list containing dictionaries."""
if not isinstance(data, list) or not any(isinstance(obj, dict) for obj in data):
return data

return_value = {}
for idx, elem in enumerate(data):
return_value[f"list_item{idx}"] = elem

return return_value


class Json(ConfigurationParser):
"""Parses a JSON file."""

def parse_file(self, fh: TextIO):
parsed_data = json.load(fh)
self.parsed_data = ListUnwrapper.unwrap(parsed_data)


class Yaml(ConfigurationParser):
"""Parses a Yaml file."""

def parse_file(self, fh: TextIO) -> None:
if PY_YAML:
parsed_data = yaml.load(fh, yaml.BaseLoader)
self.parsed_data = ListUnwrapper.unwrap(parsed_data)
else:
raise ConfigurationParsingError("Failed to parse file, please install PyYAML.")

Check warning on line 408 in dissect/target/helpers/configutil.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/helpers/configutil.py#L408

Added line #L408 was not covered by tests


class ScopeManager:
"""A (context)manager for dictionary scoping.

Expand Down Expand Up @@ -609,7 +688,9 @@
CONFIG_MAP: dict[tuple[str, ...], ParserConfig] = {
"ini": ParserConfig(Ini),
"xml": ParserConfig(Xml),
"json": ParserConfig(Txt),
"json": ParserConfig(Json),
"yml": ParserConfig(Yaml),
"yaml": ParserConfig(Yaml),
"cnf": ParserConfig(Default),
"conf": ParserConfig(Default, separator=(r"\s",)),
"sample": ParserConfig(Txt),
Expand Down
31 changes: 31 additions & 0 deletions tests/helpers/test_configutil.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import textwrap
from io import StringIO
from pathlib import Path
from typing import Union

import pytest

from dissect.target.helpers.configutil import (
ConfigurationParser,
Default,
Indentation,
Json,
ScopeManager,
SystemD,
)
Expand Down Expand Up @@ -227,3 +229,32 @@ def test_systemd_basic_syntax() -> None:
parser.parse_file(StringIO(data.read_text()))

assert parser.parsed_data == output


@pytest.mark.parametrize(
"data_string, expected_data",
[
(r'{"data" : "value"}', {"data": "value"}),
(r'[{"data" : "value"}]', {"list_item0": {"data": "value"}}),
(
r'[{"data" : "value"}, {"data" : "value2"}]',
{"list_item0": {"data": "value"}, "list_item1": {"data": "value2"}},
),
(
r'[{"data": [{"key1": "value1"}, {"key1": "value2"}]}]',
{
"list_item0": {
"data": {
"list_item0": {"key1": "value1"},
"list_item1": {"key1": "value2"},
},
},
},
),
],
)
def test_json_syntax(data_string: str, expected_data: Union[dict, list]) -> None:
parser = Json()
parser.parse_file(StringIO(data_string))

assert parser.parsed_data == expected_data
3 changes: 2 additions & 1 deletion tests/plugins/general/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ def test_collapse_types(
[
("ini", b"[DEFAULT]\nkey=value"),
("xml", b"<a>currently_just_text</a>"),
("json", b"currently_just_text"),
("json", b'{"key": "value"}'),
("yaml", b"key: value"),
("cnf", b"key=value"),
("conf", b"key value"),
("sample", b"currently_just_text"),
Expand Down
Loading