Skip to content

Commit

Permalink
Add -p/--progress bar option to rdump (#166)
Browse files Browse the repository at this point in the history
Co-authored-by: Yun Zheng Hu <hu@fox-it.com>
  • Loading branch information
JSCU-CNI and yunzheng authored Feb 20, 2025
1 parent 6033534 commit f4eba70
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 3 deletions.
4 changes: 2 additions & 2 deletions flow/record/adapter/text.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ class DefaultMissing(dict):
Example:
>>> d = DefaultMissing({"foo": "bar"})
>>> d['foo']
>>> d["foo"]
'bar'
>>> d['missing_key']
>>> d["missing_key"]
'{missing_key}'
"""

Expand Down
24 changes: 23 additions & 1 deletion flow/record/tools/rdump.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@
except ImportError:
version = "unknown"

try:
import tqdm

HAS_TQDM = True

except ImportError:
HAS_TQDM = False

log = logging.getLogger(__name__)


Expand Down Expand Up @@ -112,6 +120,12 @@ def main(argv: list[str] | None = None) -> int:
help="Generate suffixes of length LEN for splitted output files",
)
output.add_argument("--multi-timestamp", action="store_true", help="Create records for datetime fields")
output.add_argument(
"-p",
"--progress",
action="store_true",
help="Show progress bar (requires tqdm)",
)

advanced = parser.add_argument_group("advanced")
advanced.add_argument(
Expand Down Expand Up @@ -217,7 +231,14 @@ def main(argv: list[str] | None = None) -> int:
seen_desc = set()
islice_stop = (args.count + args.skip) if args.count else None
record_iterator = islice(record_stream(args.src, selector), args.skip, islice_stop)

if args.progress:
if not HAS_TQDM:
parser.error("tqdm is required for progress bar")
record_iterator = tqdm.tqdm(record_iterator, unit=" records", delay=sys.float_info.min)

count = 0
record_writer = None

try:
record_writer = RecordWriter(uri)
Expand Down Expand Up @@ -246,7 +267,8 @@ def main(argv: list[str] | None = None) -> int:
record_writer.write(rec)

finally:
record_writer.__exit__()
if record_writer:
record_writer.__exit__()

if args.list:
print(f"Processed {count} records")
Expand Down
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ test = [
"flow.record[elastic]",
"duckdb; platform_python_implementation != 'PyPy' and python_version < '3.12'", # duckdb
"pytz; platform_python_implementation != 'PyPy' and python_version < '3.12'", # duckdb
"tqdm",
]
full = [
"flow.record[compression]",
"tqdm",
]

[project.scripts]
Expand Down
26 changes: 26 additions & 0 deletions tests/test_rdump.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,3 +696,29 @@ def test_rdump_line_verbose(tmp_path: Path, capsys: pytest.CaptureFixture, rdump
assert "data (bytes) =" in captured.out
assert "counter (uint32) =" in captured.out
assert "foo (string) =" in captured.out


def test_rdump_list_progress(tmp_path: Path, capsys: pytest.CaptureFixture) -> None:
TestRecord = RecordDescriptor(
"test/rdump/progress",
[
("uint32", "counter"),
],
)
record_path = tmp_path / "test.records"

with RecordWriter(record_path) as writer:
for i in range(100):
writer.write(TestRecord(counter=i))

rdump.main(["--list", "--progress", str(record_path)])
captured = capsys.readouterr()

# stderr should contain tqdm progress bar
# 100 records [00:00, 64987.67 records/s]
assert "\r100 records [" in captured.err
assert " records/s]" in captured.err

# stdout should contain the RecordDescriptor definition and count
assert "# <RecordDescriptor test/rdump/progress, hash=eeb21156>" in captured.out
assert "Processed 100 records" in captured.out

0 comments on commit f4eba70

Please sign in to comment.