From eece7b111cb97352ffafa095dbfb6440b843907b Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Wed, 29 Nov 2023 11:18:35 +0000 Subject: [PATCH 1/5] Add -Lv/--line-verbose mode to rdump This adds a verbose option to the LineWriter adapter which also prints the field type. --- flow/record/adapter/line.py | 21 ++++++++++++++++++--- flow/record/base.py | 2 +- flow/record/tools/rdump.py | 14 +++++++++++++- tests/test_rdump.py | 36 ++++++++++++++++++++++++++++++++++++ 4 files changed, 68 insertions(+), 5 deletions(-) diff --git a/flow/record/adapter/line.py b/flow/record/adapter/line.py index 40350c9..5ed823c 100644 --- a/flow/record/adapter/line.py +++ b/flow/record/adapter/line.py @@ -5,8 +5,11 @@ __usage__ = """ Line output format adapter (writer only) --- -Write usage: rdump -w line://[PATH] +Write usage: rdump -w line://[PATH]?verbose=[VERBOSE] [PATH]: path to file. Leave empty or "-" to output to stdout + +Optional arguments: + [VERBOSE]: Also show fieldtype in line output (default: False) """ @@ -15,11 +18,12 @@ class LineWriter(AbstractWriter): fp = None - def __init__(self, path, fields=None, exclude=None, **kwargs): + def __init__(self, path, *, fields=None, exclude=None, verbose=False, **kwargs): self.fp = open_path_or_stream(path, "wb") self.count = 0 self.fields = fields self.exclude = exclude + self.verbose = verbose if isinstance(self.fields, str): self.fields = self.fields.split(",") if isinstance(self.exclude, str): @@ -27,11 +31,22 @@ def __init__(self, path, fields=None, exclude=None, **kwargs): def write(self, rec): rdict = rec._asdict(fields=self.fields, exclude=self.exclude) + rdict_types = None + if self.verbose: + rdict_types = {fname: fieldset.typename for fname, fieldset in rec._desc.get_all_fields().items()} + self.count += 1 self.fp.write("--[ RECORD {} ]--\n".format(self.count).encode()) if rdict: - fmt = "{{:>{width}}} = {{}}\n".format(width=max(len(k) for k in rdict)) + if rdict_types: + # also account for extra characters for fieldtype and whitespace + parenthesis + width = max(len(k + rdict_types[k]) for k in rdict) + 3 + else: + width = max(len(k) for k in rdict) + fmt = "{{:>{width}}} = {{}}\n".format(width=width) for key, value in rdict.items(): + if rdict_types: + key = f"{key} ({rdict_types[key]})" self.fp.write(fmt.format(key, value).encode()) def flush(self): diff --git a/flow/record/base.py b/flow/record/base.py index 93736c0..cd9688a 100644 --- a/flow/record/base.py +++ b/flow/record/base.py @@ -499,7 +499,7 @@ def get_all_fields(self) -> Mapping[str, RecordField]: "_source": RecordField("_source", "string"), "_classification": RecordField("_classification", "datetime"), "_generated": RecordField("_generated", "datetime"), - "_version": RecordField("_version", "vaeint"), + "_version": RecordField("_version", "varint"), } Returns: diff --git a/flow/record/tools/rdump.py b/flow/record/tools/rdump.py index e12cef7..c97ae22 100644 --- a/flow/record/tools/rdump.py +++ b/flow/record/tools/rdump.py @@ -98,7 +98,9 @@ def main(argv=None): output.add_argument("-c", "--count", type=int, help="Exit after COUNT records") output.add_argument("--skip", metavar="COUNT", type=int, default=0, help="Skip the first COUNT records") output.add_argument("-w", "--writer", metavar="OUTPUT", default=None, help="Write records to output") - output.add_argument("-m", "--mode", default=None, choices=("csv", "json", "jsonlines", "line"), help="Output mode") + output.add_argument( + "-m", "--mode", default=None, choices=("csv", "json", "jsonlines", "line", "line-verbose"), help="Output mode" + ) output.add_argument( "--split", metavar="COUNT", default=None, type=int, help="Write record files smaller than COUNT records" ) @@ -155,6 +157,15 @@ def main(argv=None): default=argparse.SUPPRESS, help="Short for --mode=line", ) + aliases.add_argument( + "-Lv", + "--line-verbose", + action="store_const", + const="line-verbose", + dest="mode", + default=argparse.SUPPRESS, + help="Short for --mode=line-verbose", + ) args = parser.parse_args(argv) @@ -176,6 +187,7 @@ def main(argv=None): "json": "jsonfile://?indent=2&descriptors=false", "jsonlines": "jsonfile://?descriptors=false", "line": "line://", + "line-verbose": "line://?verbose=true", } uri = mode_to_uri.get(args.mode, uri) qparams = { diff --git a/tests/test_rdump.py b/tests/test_rdump.py index 2e547e9..e7c9c24 100644 --- a/tests/test_rdump.py +++ b/tests/test_rdump.py @@ -624,3 +624,39 @@ def test_flow_record_invalid_tz(tmp_path, capsys): # restore DISPLAY_TZINFO just in case flow.record.fieldtypes.DISPLAY_TZINFO = flow_record_tz(default_tz="UTC") + + +@pytest.mark.parametrize( + "rdump_params", + [ + ["--mode=line-verbose"], + ["--line-verbose"], + ["-Lv"], + ["-w line://?verbose=true"], + ["-w line://?verbose=1"], + ["-w line://?verbose=True"], + ], +) +def test_rdump_line_verbose(tmp_path, capsys, rdump_params): + TestRecord = RecordDescriptor( + "test/rdump/line_verbose", + [ + ("datetime", "stamp"), + ("bytes", "data"), + ("uint32", "counter"), + ("string", "foo"), + ], + ) + record_path = tmp_path / "test.records" + + with RecordWriter(record_path) as writer: + writer.write(TestRecord()) + + rdump.main([str(record_path)] + rdump_params) + captured = capsys.readouterr() + + assert captured.err == "" + assert "stamp (datetime) =" in captured.out + assert "data (bytes) =" in captured.out + assert "counter (uint32) =" in captured.out + assert "foo (string) =" in captured.out From 7918da6b544a4ce92c5582f6adbc440e494a3712 Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Thu, 4 Jan 2024 20:24:45 +0000 Subject: [PATCH 2/5] Fix rdump cli arguments --- tests/test_rdump.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_rdump.py b/tests/test_rdump.py index e7c9c24..104c22a 100644 --- a/tests/test_rdump.py +++ b/tests/test_rdump.py @@ -632,9 +632,9 @@ def test_flow_record_invalid_tz(tmp_path, capsys): ["--mode=line-verbose"], ["--line-verbose"], ["-Lv"], - ["-w line://?verbose=true"], - ["-w line://?verbose=1"], - ["-w line://?verbose=True"], + ["-w", "line://?verbose=true"], + ["-w", "line://?verbose=1"], + ["-w", "line://?verbose=True"], ], ) def test_rdump_line_verbose(tmp_path, capsys, rdump_params): From 361452aa63d8718f7bc950158cfde6c76738debb Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Thu, 4 Jan 2024 21:36:34 +0000 Subject: [PATCH 3/5] Add lru_cache for getting fieldtypes --- flow/record/adapter/line.py | 20 ++++++++++++++++---- tests/test_rdump.py | 13 +++++++++++-- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/flow/record/adapter/line.py b/flow/record/adapter/line.py index 5ed823c..6ff3c9c 100644 --- a/flow/record/adapter/line.py +++ b/flow/record/adapter/line.py @@ -1,4 +1,6 @@ -from flow.record import open_path_or_stream +from functools import lru_cache + +from flow.record import RecordDescriptor, open_path_or_stream from flow.record.adapter import AbstractWriter from flow.record.utils import is_stdout @@ -13,6 +15,18 @@ """ +@lru_cache(maxsize=1024) +def field_types_for_record_descriptor(desc: RecordDescriptor) -> dict[str, str]: + """Return a dictionary of fieldname -> fieldtype for given RecordDescriptor. + + Args: + desc: RecordDescriptor to get fieldtypes for + Returns: + Dictionary of fieldname -> fieldtype + """ + return {fname: fieldset.typename for fname, fieldset in desc.get_all_fields().items()} + + class LineWriter(AbstractWriter): """Prints all fields and values of the Record on a separate line.""" @@ -31,9 +45,7 @@ def __init__(self, path, *, fields=None, exclude=None, verbose=False, **kwargs): def write(self, rec): rdict = rec._asdict(fields=self.fields, exclude=self.exclude) - rdict_types = None - if self.verbose: - rdict_types = {fname: fieldset.typename for fname, fieldset in rec._desc.get_all_fields().items()} + rdict_types = field_types_for_record_descriptor(rec._desc) if self.verbose else None self.count += 1 self.fp.write("--[ RECORD {} ]--\n".format(self.count).encode()) diff --git a/tests/test_rdump.py b/tests/test_rdump.py index 104c22a..1bcbf5c 100644 --- a/tests/test_rdump.py +++ b/tests/test_rdump.py @@ -650,11 +650,20 @@ def test_rdump_line_verbose(tmp_path, capsys, rdump_params): record_path = tmp_path / "test.records" with RecordWriter(record_path) as writer: - writer.write(TestRecord()) + writer.write(TestRecord(counter=1)) + writer.write(TestRecord(counter=2)) + writer.write(TestRecord(counter=3)) + from flow.record.adapter.line import field_types_for_record_descriptor + + field_types_for_record_descriptor.cache_clear() + assert field_types_for_record_descriptor.cache_info().currsize == 0 rdump.main([str(record_path)] + rdump_params) - captured = capsys.readouterr() + assert field_types_for_record_descriptor.cache_info().misses == 1 + assert field_types_for_record_descriptor.cache_info().hits == 2 + assert field_types_for_record_descriptor.cache_info().currsize == 1 + captured = capsys.readouterr() assert captured.err == "" assert "stamp (datetime) =" in captured.out assert "data (bytes) =" in captured.out From 16e75550aa989454726941d4b021c821999f2982 Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Thu, 4 Jan 2024 21:38:28 +0000 Subject: [PATCH 4/5] Import annotations from the future --- flow/record/adapter/line.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flow/record/adapter/line.py b/flow/record/adapter/line.py index 6ff3c9c..3a3105e 100644 --- a/flow/record/adapter/line.py +++ b/flow/record/adapter/line.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from functools import lru_cache from flow.record import RecordDescriptor, open_path_or_stream From 7e1c360d7fe9d907a2841552742c9245be6354e4 Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Thu, 4 Jan 2024 22:44:47 +0000 Subject: [PATCH 5/5] Add more type hinting --- flow/record/adapter/line.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/flow/record/adapter/line.py b/flow/record/adapter/line.py index 3a3105e..3765fb0 100644 --- a/flow/record/adapter/line.py +++ b/flow/record/adapter/line.py @@ -2,7 +2,7 @@ from functools import lru_cache -from flow.record import RecordDescriptor, open_path_or_stream +from flow.record import Record, RecordDescriptor, open_path_or_stream from flow.record.adapter import AbstractWriter from flow.record.utils import is_stdout @@ -19,7 +19,7 @@ @lru_cache(maxsize=1024) def field_types_for_record_descriptor(desc: RecordDescriptor) -> dict[str, str]: - """Return a dictionary of fieldname -> fieldtype for given RecordDescriptor. + """Return dictionary of fieldname -> fieldtype for given RecordDescriptor. Args: desc: RecordDescriptor to get fieldtypes for @@ -34,7 +34,15 @@ class LineWriter(AbstractWriter): fp = None - def __init__(self, path, *, fields=None, exclude=None, verbose=False, **kwargs): + def __init__( + self, + path: str, + *, + fields: list[str] | str | None = None, + exclude: list[str] | str | None = None, + verbose: bool = False, + **kwargs, + ): self.fp = open_path_or_stream(path, "wb") self.count = 0 self.fields = fields @@ -45,12 +53,12 @@ def __init__(self, path, *, fields=None, exclude=None, verbose=False, **kwargs): if isinstance(self.exclude, str): self.exclude = self.exclude.split(",") - def write(self, rec): + def write(self, rec: Record) -> None: rdict = rec._asdict(fields=self.fields, exclude=self.exclude) rdict_types = field_types_for_record_descriptor(rec._desc) if self.verbose else None self.count += 1 - self.fp.write("--[ RECORD {} ]--\n".format(self.count).encode()) + self.fp.write(f"--[ RECORD {self.count} ]--\n".encode()) if rdict: if rdict_types: # also account for extra characters for fieldtype and whitespace + parenthesis @@ -63,11 +71,11 @@ def write(self, rec): key = f"{key} ({rdict_types[key]})" self.fp.write(fmt.format(key, value).encode()) - def flush(self): + def flush(self) -> None: if self.fp: self.fp.flush() - def close(self): + def close(self) -> None: if self.fp and not is_stdout(self.fp): self.fp.close() self.fp = None