From e5efda9008fd410bdf36ca4b20d0f8e6385f1b64 Mon Sep 17 00:00:00 2001 From: Alexey Timin Date: Mon, 20 Feb 2023 14:36:21 +0100 Subject: [PATCH] DRIFT-660: Implement export time series data as .dp and csv files (#2) --- CHANGELOG.md | 2 + README.md | 13 +++- docs/export.md | 53 +++++++++++++ drift_cli/cli.py | 14 +--- drift_cli/export.py | 83 ++++++++++++++++++++ drift_cli/export_impl/raw.py | 122 ++++++++++++++++++++++++++++++ drift_cli/utils/helpers.py | 120 ++++++++++++++--------------- mkdocs.yml | 1 + pyproject.toml | 5 +- tests/export_test.py | 143 +++++++++++++++++++++++++++++++++++ 10 files changed, 478 insertions(+), 78 deletions(-) create mode 100644 docs/export.md create mode 100644 drift_cli/export.py create mode 100644 drift_cli/export_impl/raw.py create mode 100644 tests/export_test.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b8979b..cdf0207 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,3 +10,5 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `drift-cli alias` command to manage aliases, [PR-1](https://github.com/panda-official/DriftCLI/pull/1) +- `drift-cli export raw` command to export data from blob + storage, [PR-2](https://github.com/panda-official/DriftCLI/pull/2) diff --git a/README.md b/README.md index 266567a..d4942f7 100644 --- a/README.md +++ b/README.md @@ -8,8 +8,7 @@ Drift CLI is a command line client for [PANDA|Drift](https://driftpythonclient.r ## Features -* - +* Export data from Drift Blob Storage ## Requirements @@ -23,3 +22,13 @@ To install the Drift CLI, simply use pip: ``` pip install drift-cli ``` +## + +``` +drift-cli --help +drift-cli alias add drift-device --address 127.0.0.1 --password SOME_PASSWORD +drift-cli export raw drift-device ./tmp --start 2021-01-01 --end 2021-01-02[export.md](..%2F..%2Freduct%2Freduct-cli%2Fdocs%2Fexport.md)[export.md](..%2F..%2Freduct%2Freduct-cli%2Fdocs%2Fexport.md) +``` +## Links + +* [Documentation](https://driftcli.readthedocs.io/en/latest/) diff --git a/docs/export.md b/docs/export.md new file mode 100644 index 0000000..c5245d1 --- /dev/null +++ b/docs/export.md @@ -0,0 +1,53 @@ +# Export Data + +In this section, we will show how to export data from a Drift instance by using the `drift-cli export` command. + +## Export Raw Data + + +The `drift-cli export raw` command allows you to export data from a Drift instance to a local folder +on your computer. This can be useful if you want to make a copy of your data for backup or process them locally. + +The `drift-cli export raw` command has the following syntax: + +``` +drift-cli export raw [OPTIONS] SRC DEST +``` + +`SRC` should be an alias of a Drift instance you want to export data from. + +`DEST` should be the destination folder on your computer where you want to save the exported data. + +Here is an example of how you might use the `drift-cli export raw` command: + +``` +drift-cli export raw drift-device ./exported-data --start 2021-01-23 --end 2021-01-24 +``` + +This will export all the raw data from the `drift-device` Drift instance to the `./exported-data` folder on your computer. +For each topic the CLI will create a separate folder. Each package will be saved as a separate file with the name +`.dp`. + +## Available options + +Here is a list of the options that you can use with the `drift-cli export` commands: + +* `--start`: This option allows you to specify a starting time point for the data that you want to export. Data with + timestamps newer than this time point will be included in the export. The time point should be in ISO format (e.g., + 2022-01-01T00:00:00Z). + +* `--stop`: This option allows you to specify an ending time point for the data that you want to export. Data with + timestamps older than this time point will be included in the export. The time point should be in ISO format (e.g., + 2022-01-01T00:00:00Z). + +* `--csv`: This option allows you to export data in csv format. It creates a separate csv file for each topic in the + exported data and save time series data in a single column with meta information in first row. The meta information + has the following format: `topic,package count, first timestamp, last timestamp`. The timestamp format is Unix time + in milliseconds. + + +You also can use the global `--parallel` option to specify the number of entries that you want to export in parallel: + +``` +drift-cli --parallel 10 export raw drift-device ./exported-data --start 2021-01-01T00:00:00Z --end 2021-01-02T00:00:00Z +``` diff --git a/drift_cli/cli.py b/drift_cli/cli.py index 229a618..5d620cd 100644 --- a/drift_cli/cli.py +++ b/drift_cli/cli.py @@ -6,6 +6,8 @@ import click from drift_cli.alias import alias +from drift_cli.export import export + from drift_cli.config import write_config, Config @@ -17,12 +19,6 @@ type=Path, help="Path to config file. Default ${HOME}/drift-cli/config.toml", ) -@click.option( - "--timeout", - "-t", - type=int, - help="Timeout for requests in seconds. Default 5", -) @click.option( "--parallel", "-p", @@ -33,16 +29,12 @@ def cli( ctx, config: Optional[Path] = None, - timeout: Optional[int] = None, parallel: Optional[int] = None, ): """CLI client for PANDA | Drift Platform""" if config is None: config = Path.home() / ".drift-cli" / "config.toml" - if timeout is None: - timeout = 5 - if parallel is None: parallel = 10 @@ -50,8 +42,8 @@ def cli( write_config(config, Config(aliases={})) ctx.obj["config_path"] = config - ctx.obj["timeout"] = timeout ctx.obj["parallel"] = parallel cli.add_command(alias, "alias") +cli.add_command(export, "export") diff --git a/drift_cli/export.py b/drift_cli/export.py new file mode 100644 index 0000000..4f37858 --- /dev/null +++ b/drift_cli/export.py @@ -0,0 +1,83 @@ +"""Export Command""" +import asyncio + +import click +from click import Abort +from drift_client import DriftClient + +from drift_cli.config import Alias +from drift_cli.config import read_config +from drift_cli.export_impl.raw import export_raw +from drift_cli.utils.consoles import error_console +from drift_cli.utils.error import error_handle +from drift_cli.utils.helpers import ( + parse_path, +) + +start_option = click.option( + "--start", + help="Export records with timestamps newer than this time point in ISO format e.g. 2023-01-01T00:00:00.000Z", +) + +stop_option = click.option( + "--stop", + help="Export records with timestamps older than this time point in ISO format e.g 2023-01-01T00:00:00.000Z", +) + + +@click.group() +def export(): + """Export data from a bucket somewhere else""" + + +@export.command() +@click.argument("src") +@click.argument("dest") +@stop_option +@start_option +@click.option( + "--csv", + help="Export data as CSV instead of raw data (only for timeseries)", + default=False, + is_flag=True, +) +@click.pass_context +def raw( + ctx, + src: str, + dest: str, + start: str, + stop: str, + csv: bool, +): # pylint: disable=too-many-arguments + """Export data from SRC bucket to DST folder + + SRC should be in the format of ALIAS/BUCKET_NAME. + DST should be a path to a folder. + + As result, the folder will contain a folder for each entry in the bucket. + Each entry folder will contain a file for each record + in the entry with the timestamp as the name. + """ + if start is None or stop is None: + error_console.print("Error: --start and --stop are required") + raise Abort() + + alias_name, _ = parse_path(src) + alias: Alias = read_config(ctx.obj["config_path"]).aliases[alias_name] + + loop = asyncio.get_event_loop() + run = loop.run_until_complete + client = DriftClient(alias.address, alias.password, loop=loop) + + with error_handle(): + run( + export_raw( + client, + dest, + parallel=ctx.obj["parallel"], + start=start, + stop=stop, + csv=csv, + ) + ) diff --git a/drift_cli/export_impl/raw.py b/drift_cli/export_impl/raw.py new file mode 100644 index 0000000..ba873bd --- /dev/null +++ b/drift_cli/export_impl/raw.py @@ -0,0 +1,122 @@ +"""Export data""" +import asyncio +from concurrent.futures import ThreadPoolExecutor, Executor +from pathlib import Path + +import numpy as np +from drift_client import DriftClient +from drift_protocol.meta import MetaInfo +from rich.progress import Progress + +from drift_cli.utils.helpers import read_topic + + +async def _export_topic( + pool: Executor, + client: DriftClient, + topic: str, + dest: str, + progress: Progress, + sem, + **kwargs, +): + async for package, task in read_topic(pool, client, topic, progress, sem, **kwargs): + Path.mkdir(Path(dest) / topic, exist_ok=True, parents=True) + with open(Path(dest) / topic / f"{package.package_id}.dp", "wb") as file: + file.write(package.blob) + + +async def _export_csv( + pool: Executor, + client: DriftClient, + curren_topic: str, + dest: str, + progress: Progress, + sem, + **kwargs, +): + Path.mkdir(Path(dest), exist_ok=True, parents=True) + + filename = Path(dest) / f"{curren_topic}.csv" + started = False + first_timestamp = 0 + last_timestamp = 0 + count = 0 + async for package, task in read_topic( + pool, client, curren_topic, progress, sem, **kwargs + ): + meta = package.meta + if meta.type != MetaInfo.TIME_SERIES: + progress.update( + task, + description=f"[SKIPPED] Topic {curren_topic} is not a time series", + completed=True, + ) + break + + if not started: + with open(Path(dest) / f"{curren_topic}.csv", "w") as file: + file.write(" " * 256 + "\n") + started = True + first_timestamp = meta.time_series_info.start_timestamp.ToMilliseconds() + else: + if last_timestamp != meta.time_series_info.start_timestamp.ToMilliseconds(): + progress.update( + task, + description=f"[ERROR] Topic {curren_topic} has gaps", + completed=True, + ) + break + + if package.status_code != 0: + progress.update( + task, + description=f"[ERROR] Topic {curren_topic} has a bad package", + completed=True, + ) + break + + last_timestamp = package.meta.time_series_info.stop_timestamp.ToMilliseconds() + with open(filename, "a") as file: + np.savetxt(file, package.as_np(), delimiter=",", fmt="%.5f") + + count += 1 + + if started: + with open(filename, "r+") as file: + file.seek(0) + file.write( + ",".join( + [ + curren_topic, + str(count), + str(first_timestamp), + str(last_timestamp), + ] + ) + ) + + +async def export_raw(client: DriftClient, dest: str, parallel: int, **kwargs): + """Export data from Drift instance to DST folder + Args: + client: Drift client + dest: Path to a folder + parallel: Number of parallel tasks + KArgs: + start: Export records with timestamps newer than this time point in ISO format + stop: Export records with timestamps older than this time point in ISO format + csv: Export data as CSV instead of raw data + """ + sem = asyncio.Semaphore(parallel) + with Progress() as progress: + with ThreadPoolExecutor() as pool: + topics = client.get_topics() + csv = kwargs.pop("csv", False) + task = _export_csv if csv else _export_topic + + tasks = [ + task(pool, client, topic, dest, progress, sem, topics=topics, **kwargs) + for topic in topics + ] + await asyncio.gather(*tasks) diff --git a/drift_cli/utils/helpers.py b/drift_cli/utils/helpers.py index 99bdd86..8920e98 100644 --- a/drift_cli/utils/helpers.py +++ b/drift_cli/utils/helpers.py @@ -3,12 +3,13 @@ import signal import time from asyncio import Semaphore, Queue +from concurrent.futures import Executor from datetime import datetime from pathlib import Path -from typing import Tuple, List +from typing import Tuple from click import Abort -from reduct import EntryInfo, Bucket +from drift_client import DriftClient from rich.progress import Progress from drift_cli.config import read_config, Alias @@ -32,24 +33,29 @@ def get_alias(config_path: Path, name: str) -> Alias: def parse_path(path) -> Tuple[str, str]: """Parse path ALIAS/RESOURCE""" args = path.split("/") - if len(args) != 2: + if len(args) > 2: raise RuntimeError( f"Path {path} has wrong format. It must be 'ALIAS/BUCKET_NAME'" ) + + if len(args) == 1: + args.append("panda") + return tuple(args) -async def read_records_with_progress( - entry: EntryInfo, - bucket: Bucket, +async def read_topic( + pool: Executor, + client: DriftClient, + topic: str, progress: Progress, sem: Semaphore, **kwargs, ): # pylint: disable=too-many-locals """Read records from entry and show progress Args: - entry (EntryInfo): Entry to read records from - bucket (Bucket): Bucket to read records from + client: Drift client + topic: Topic name progress (Progress): Progress bar to show progress sem (Semaphore): Semaphore to limit parallelism Keyword Args: @@ -60,94 +66,82 @@ async def read_records_with_progress( Record: Record from entry """ - def _to_timestamp(date: str) -> int: - return int( - datetime.fromisoformat(date.replace("Z", "+00:00")).timestamp() * 1000_000 - ) - - start = _to_timestamp(kwargs["start"]) if kwargs["start"] else entry.oldest_record - stop = _to_timestamp(kwargs["stop"]) if kwargs["stop"] else entry.latest_record + def _to_timestamp(date: str) -> float: + return datetime.fromisoformat(date.replace("Z", "+00:00")).timestamp() - include = {} - for item in kwargs["include"]: - if item: - key, value = item.split("=") - include[key] = value - - exclude = {} - for item in kwargs["exclude"]: - if item: - key, value = item.split("=") - exclude[key] = value + start = _to_timestamp(kwargs["start"]) + stop = _to_timestamp(kwargs["stop"]) last_time = start - task = progress.add_task(f"Entry '{entry.name}' waiting", total=stop - start) + task = progress.add_task(f"Topic '{topic}' waiting", total=stop - start) async with sem: exported_size = 0 count = 0 stats = [] speed = 0 + loop = asyncio.get_running_loop() + def stop_signal(): signal_queue.put_nowait("stop") - asyncio.get_event_loop().add_signal_handler(signal.SIGINT, stop_signal) - asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, stop_signal) - - async for record in bucket.query( - entry.name, - start=start, - stop=stop, - include=include, - exclude=exclude, - ttl=kwargs["timeout"] * sem._value, # pylint: disable=protected-access - ): + loop.add_signal_handler(signal.SIGINT, stop_signal) + loop.add_signal_handler(signal.SIGTERM, stop_signal) + + packages = await loop.run_in_executor( + pool, client.get_package_names, topic, start, stop + ) + for package in sorted(packages): + drift_pkg = await loop.run_in_executor(pool, client.get_item, package) if signal_queue.qsize() > 0: # stop signal received progress.update( task, - description=f"Entry '{entry.name}' " - f"(copied {count} records ({pretty_size(exported_size)}), stopped", + description=f"Topic '{topic}' " + f"(copied {count} packages ({pretty_size(exported_size)}), stopped", refresh=True, ) return - exported_size += record.size - stats.append((record.size, time.time())) + pkg_size = len(drift_pkg.blob) + timestamp = float(drift_pkg.package_id) / 1000 + exported_size += pkg_size + stats.append((pkg_size, time.time())) if len(stats) > 10: stats.pop(0) if len(stats) > 1: speed = sum(s[0] for s in stats) / (stats[-1][1] - stats[0][1]) - yield record + yield drift_pkg, task + count += 1 progress.update( task, - description=f"Entry '{entry.name}' " - f"(copied {count} records ({pretty_size(exported_size)}), " + description=f"Topic '{topic}' " + f"(copied {count} packages ({pretty_size(exported_size)}), " f"speed {pretty_size(speed)}/s)", - advance=record.timestamp - last_time, + advance=timestamp - last_time, refresh=True, ) - last_time = record.timestamp - count += 1 - progress.update(task, total=1, completed=True) + last_time = timestamp + progress.update(task, total=1, completed=True) -def filter_entries(entries: List[EntryInfo], names: List[str]) -> List[EntryInfo]: - """Filter entries by names""" - if not names or len(names) == 0: - return entries - - if len(names) == 1 and names[0] == "": - return entries - - def _filter(entry): - for name in names: - if name == entry.name: - return True - return False - return list(filter(_filter, entries)) +# def filter_entries(entries: List[EntryInfo], names: List[str]) -> List[EntryInfo]: +# """Filter entries by names""" +# if not names or len(names) == 0: +# return entries +# +# if len(names) == 1 and names[0] == "": +# return entries +# +# def _filter(entry): +# for name in names: +# if name == entry.name: +# return True +# return False +# +# return list(filter(_filter, entries)) diff --git a/mkdocs.yml b/mkdocs.yml index 47c39e0..31c8282 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -7,6 +7,7 @@ nav: - CLI Reference: docs/cli.md - Usage: - docs/aliases.md + - docs/export.md repo_name: panda-official/DriftCLI repo_url: https://github.com/panda-official/DriftCLI diff --git a/pyproject.toml b/pyproject.toml index 45093cd..fb4258a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ classifiers = [ ] dependencies = [ - "drift-python-client~=0.3.1", + "drift-python-client~=0.4.0", "click~=8.1", "tomlkit~=0.11", "rich~=12.6", @@ -42,7 +42,7 @@ test = [ "pytest-asyncio~=0.18" ] -lint = ["pylint~=2.14"] +lint = ["pylint~=2.14", "pylint-protobuf==0.20.2"] format = ["black~=22.6"] docs = [ "mkdocs~=1.3", @@ -66,3 +66,4 @@ drift-cli = "drift_cli:main" good-names = "ls,rm" disable = ["fixme", "duplicate-code"] extension-pkg-whitelist = "pydantic" +load-plugins= "pylint_protobuf" diff --git a/tests/export_test.py b/tests/export_test.py new file mode 100644 index 0000000..43a1ce3 --- /dev/null +++ b/tests/export_test.py @@ -0,0 +1,143 @@ +"""Export data from SRC bucket to DST bucket""" +import shutil +from pathlib import Path +from tempfile import gettempdir + +import numpy as np +import pytest +from drift_client import DriftClient, DriftDataPackage +from drift_protocol.common import ( + DriftPackage, + DataPayload, +) +from drift_protocol.meta import TimeSeriesInfo, MetaInfo +from google.protobuf.any_pb2 import Any # pylint: disable=no-name-in-module +from wavelet_buffer import WaveletBuffer, WaveletType, denoise + + +@pytest.fixture(name="topics") +def _make_topics(): + """Make topics""" + return ["topic1", "topic2"] + + +@pytest.fixture(name="packages") +def _make_packages(): + """Make packages""" + packages = [] + signal = np.array( + [0.1, 0.2, 0.5, 0.1, 0.2, 0.1, 0.6, 0.1, 0.1, 0.2], dtype=np.float32 + ) + + buffer = WaveletBuffer( + signal_shape=[len(signal)], + signal_number=1, + decomposition_steps=2, + wavelet_type=WaveletType.DB1, + ) + buffer.decompose(signal, denoise.Null()) + + # Prepare payload + payload = DataPayload() + payload.data = buffer.serialize(compression_level=16) + + msg = Any() + msg.Pack(payload) + + for package_id in range(1, 3): + pkg = DriftPackage() + pkg.id = package_id + pkg.status = 0 + pkg.data.append(msg) + + info = TimeSeriesInfo() + info.start_timestamp.FromMilliseconds(package_id) + info.stop_timestamp.FromMilliseconds(package_id + 1) + + pkg.meta.type = MetaInfo.TIME_SERIES + pkg.meta.time_series_info.CopyFrom(info) + + packages.append(DriftDataPackage(pkg.SerializeToString())) + return packages + + +@pytest.fixture(name="package_names") +def _make_package_names(topics): + return [[f"{topic}/1.dp", f"{topic}/2.dp"] for topic in topics] + + +@pytest.fixture(name="client") +def _make_client(mocker, topics, package_names, packages) -> DriftClient: + kls = mocker.patch("drift_cli.export.DriftClient") + client = mocker.Mock(spec=DriftClient) + kls.return_value = client + + client.get_topics.return_value = topics + client.get_package_names.side_effect = package_names + client.get_item.side_effect = packages * len(topics) + return client + + +@pytest.fixture(name="export_path") +def _make_export_path() -> Path: + path = Path(gettempdir()) / "drift_export" + try: + yield path + finally: + shutil.rmtree(path, ignore_errors=True) + + +@pytest.mark.usefixtures("set_alias", "client") +def test__export_raw_data(runner, conf, export_path, topics): + """Test export raw data""" + result = runner( + f"-c {conf} -p 1 export raw test {export_path} --start 2022-01-01 --stop 2022-01-02" + ) + assert f"Topic '{topics[0]}' (copied 2 packages (403 B)" in result.output + assert f"Topic '{topics[1]}' (copied 2 packages (403 B)" in result.output + + assert result.exit_code == 0 + assert (export_path / topics[0] / "1.dp").exists() + assert (export_path / topics[0] / "2.dp").exists() + assert (export_path / topics[1] / "1.dp").exists() + assert (export_path / topics[1] / "2.dp").exists() + + +@pytest.mark.usefixtures("set_alias", "client") +def test__export_raw_data_as_csv(runner, conf, export_path, topics): + """Test export raw data as csv""" + result = runner( + f"-c {conf} -p 1 export raw test {export_path} --start 2022-01-01 --stop 2022-01-02 --csv" + ) + + assert f"Topic '{topics[0]}' (copied 2 packages (403 B)" in result.output + assert f"Topic '{topics[1]}' (copied 2 packages (403 B)" in result.output + + assert result.exit_code == 0 + + assert (export_path / f"{topics[0]}.csv").exists() + assert (export_path / f"{topics[1]}.csv").exists() + + with open(export_path / f"{topics[0]}.csv", encoding="utf-8") as file: + assert file.readline().strip() == "topic1,2,1,3" # topic, count, start, stop + + +@pytest.mark.usefixtures("set_alias", "client") +def test__export_raw_data_start_stop_required(runner, conf, export_path): + """Test export raw data start stop required""" + result = runner(f"-c {conf} -p 1 export raw test {export_path}") + assert "Error: --start and --stop are required" in result.output + assert result.exit_code == 1 + + +@pytest.mark.usefixtures("set_alias") +def test__export_raw_data_no_timeseries(runner, client, conf, export_path): + """Should skip no timeseries""" + pkg = DriftPackage() + pkg.meta.type = MetaInfo.IMAGE + client.get_item.side_effect = [DriftDataPackage(pkg.SerializeToString())] * 2 + + result = runner( + f"-c {conf} -p 1 export raw test {export_path} --start 2022-01-01 --stop 2022-01-02 --csv" + ) + assert "[SKIPPED] Topic topic1 is not a time series" in result.output