-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
DRIFT-660: Implement export time series data as .dp and csv files (#2)
- Loading branch information
Showing
10 changed files
with
478 additions
and
78 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
# Export Data | ||
|
||
In this section, we will show how to export data from a Drift instance by using the `drift-cli export` command. | ||
|
||
## Export Raw Data | ||
|
||
|
||
The `drift-cli export raw` command allows you to export data from a Drift instance to a local folder | ||
on your computer. This can be useful if you want to make a copy of your data for backup or process them locally. | ||
|
||
The `drift-cli export raw` command has the following syntax: | ||
|
||
``` | ||
drift-cli export raw [OPTIONS] SRC DEST | ||
``` | ||
|
||
`SRC` should be an alias of a Drift instance you want to export data from. | ||
|
||
`DEST` should be the destination folder on your computer where you want to save the exported data. | ||
|
||
Here is an example of how you might use the `drift-cli export raw` command: | ||
|
||
``` | ||
drift-cli export raw drift-device ./exported-data --start 2021-01-23 --end 2021-01-24 | ||
``` | ||
|
||
This will export all the raw data from the `drift-device` Drift instance to the `./exported-data` folder on your computer. | ||
For each topic the CLI will create a separate folder. Each package will be saved as a separate file with the name | ||
`<timestamp>.dp`. | ||
|
||
## Available options | ||
|
||
Here is a list of the options that you can use with the `drift-cli export` commands: | ||
|
||
* `--start`: This option allows you to specify a starting time point for the data that you want to export. Data with | ||
timestamps newer than this time point will be included in the export. The time point should be in ISO format (e.g., | ||
2022-01-01T00:00:00Z). | ||
|
||
* `--stop`: This option allows you to specify an ending time point for the data that you want to export. Data with | ||
timestamps older than this time point will be included in the export. The time point should be in ISO format (e.g., | ||
2022-01-01T00:00:00Z). | ||
|
||
* `--csv`: This option allows you to export data in csv format. It creates a separate csv file for each topic in the | ||
exported data and save time series data in a single column with meta information in first row. The meta information | ||
has the following format: `topic,package count, first timestamp, last timestamp`. The timestamp format is Unix time | ||
in milliseconds. | ||
|
||
|
||
You also can use the global `--parallel` option to specify the number of entries that you want to export in parallel: | ||
|
||
``` | ||
drift-cli --parallel 10 export raw drift-device ./exported-data --start 2021-01-01T00:00:00Z --end 2021-01-02T00:00:00Z | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
"""Export Command""" | ||
import asyncio | ||
|
||
import click | ||
from click import Abort | ||
from drift_client import DriftClient | ||
|
||
from drift_cli.config import Alias | ||
from drift_cli.config import read_config | ||
from drift_cli.export_impl.raw import export_raw | ||
from drift_cli.utils.consoles import error_console | ||
from drift_cli.utils.error import error_handle | ||
from drift_cli.utils.helpers import ( | ||
parse_path, | ||
) | ||
|
||
start_option = click.option( | ||
"--start", | ||
help="Export records with timestamps newer than this time point in ISO format e.g. 2023-01-01T00:00:00.000Z", | ||
) | ||
|
||
stop_option = click.option( | ||
"--stop", | ||
help="Export records with timestamps older than this time point in ISO format e.g 2023-01-01T00:00:00.000Z", | ||
) | ||
|
||
|
||
@click.group() | ||
def export(): | ||
"""Export data from a bucket somewhere else""" | ||
|
||
|
||
@export.command() | ||
@click.argument("src") | ||
@click.argument("dest") | ||
@stop_option | ||
@start_option | ||
@click.option( | ||
"--csv", | ||
help="Export data as CSV instead of raw data (only for timeseries)", | ||
default=False, | ||
is_flag=True, | ||
) | ||
@click.pass_context | ||
def raw( | ||
ctx, | ||
src: str, | ||
dest: str, | ||
start: str, | ||
stop: str, | ||
csv: bool, | ||
): # pylint: disable=too-many-arguments | ||
"""Export data from SRC bucket to DST folder | ||
SRC should be in the format of ALIAS/BUCKET_NAME. | ||
DST should be a path to a folder. | ||
As result, the folder will contain a folder for each entry in the bucket. | ||
Each entry folder will contain a file for each record | ||
in the entry with the timestamp as the name. | ||
""" | ||
if start is None or stop is None: | ||
error_console.print("Error: --start and --stop are required") | ||
raise Abort() | ||
|
||
alias_name, _ = parse_path(src) | ||
alias: Alias = read_config(ctx.obj["config_path"]).aliases[alias_name] | ||
|
||
loop = asyncio.get_event_loop() | ||
run = loop.run_until_complete | ||
client = DriftClient(alias.address, alias.password, loop=loop) | ||
|
||
with error_handle(): | ||
run( | ||
export_raw( | ||
client, | ||
dest, | ||
parallel=ctx.obj["parallel"], | ||
start=start, | ||
stop=stop, | ||
csv=csv, | ||
) | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
"""Export data""" | ||
import asyncio | ||
from concurrent.futures import ThreadPoolExecutor, Executor | ||
from pathlib import Path | ||
|
||
import numpy as np | ||
from drift_client import DriftClient | ||
from drift_protocol.meta import MetaInfo | ||
from rich.progress import Progress | ||
|
||
from drift_cli.utils.helpers import read_topic | ||
|
||
|
||
async def _export_topic( | ||
pool: Executor, | ||
client: DriftClient, | ||
topic: str, | ||
dest: str, | ||
progress: Progress, | ||
sem, | ||
**kwargs, | ||
): | ||
async for package, task in read_topic(pool, client, topic, progress, sem, **kwargs): | ||
Path.mkdir(Path(dest) / topic, exist_ok=True, parents=True) | ||
with open(Path(dest) / topic / f"{package.package_id}.dp", "wb") as file: | ||
file.write(package.blob) | ||
|
||
|
||
async def _export_csv( | ||
pool: Executor, | ||
client: DriftClient, | ||
curren_topic: str, | ||
dest: str, | ||
progress: Progress, | ||
sem, | ||
**kwargs, | ||
): | ||
Path.mkdir(Path(dest), exist_ok=True, parents=True) | ||
|
||
filename = Path(dest) / f"{curren_topic}.csv" | ||
started = False | ||
first_timestamp = 0 | ||
last_timestamp = 0 | ||
count = 0 | ||
async for package, task in read_topic( | ||
pool, client, curren_topic, progress, sem, **kwargs | ||
): | ||
meta = package.meta | ||
if meta.type != MetaInfo.TIME_SERIES: | ||
progress.update( | ||
task, | ||
description=f"[SKIPPED] Topic {curren_topic} is not a time series", | ||
completed=True, | ||
) | ||
break | ||
|
||
if not started: | ||
with open(Path(dest) / f"{curren_topic}.csv", "w") as file: | ||
file.write(" " * 256 + "\n") | ||
started = True | ||
first_timestamp = meta.time_series_info.start_timestamp.ToMilliseconds() | ||
else: | ||
if last_timestamp != meta.time_series_info.start_timestamp.ToMilliseconds(): | ||
progress.update( | ||
task, | ||
description=f"[ERROR] Topic {curren_topic} has gaps", | ||
completed=True, | ||
) | ||
break | ||
|
||
if package.status_code != 0: | ||
progress.update( | ||
task, | ||
description=f"[ERROR] Topic {curren_topic} has a bad package", | ||
completed=True, | ||
) | ||
break | ||
|
||
last_timestamp = package.meta.time_series_info.stop_timestamp.ToMilliseconds() | ||
with open(filename, "a") as file: | ||
np.savetxt(file, package.as_np(), delimiter=",", fmt="%.5f") | ||
|
||
count += 1 | ||
|
||
if started: | ||
with open(filename, "r+") as file: | ||
file.seek(0) | ||
file.write( | ||
",".join( | ||
[ | ||
curren_topic, | ||
str(count), | ||
str(first_timestamp), | ||
str(last_timestamp), | ||
] | ||
) | ||
) | ||
|
||
|
||
async def export_raw(client: DriftClient, dest: str, parallel: int, **kwargs): | ||
"""Export data from Drift instance to DST folder | ||
Args: | ||
client: Drift client | ||
dest: Path to a folder | ||
parallel: Number of parallel tasks | ||
KArgs: | ||
start: Export records with timestamps newer than this time point in ISO format | ||
stop: Export records with timestamps older than this time point in ISO format | ||
csv: Export data as CSV instead of raw data | ||
""" | ||
sem = asyncio.Semaphore(parallel) | ||
with Progress() as progress: | ||
with ThreadPoolExecutor() as pool: | ||
topics = client.get_topics() | ||
csv = kwargs.pop("csv", False) | ||
task = _export_csv if csv else _export_topic | ||
|
||
tasks = [ | ||
task(pool, client, topic, dest, progress, sem, topics=topics, **kwargs) | ||
for topic in topics | ||
] | ||
await asyncio.gather(*tasks) |
Oops, something went wrong.