Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add insert benchmarking methods #27

Merged
merged 1 commit into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions benchmarks/benchmark_stream_inserts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from time import perf_counter
from typing import Dict, List, Tuple, Union

import pyarrow

import btrdb


def time_stream_insert(
stream: btrdb.stream.Stream,
data: List[Tuple[int, float]],
merge_policy: str = "never",
) -> Dict[str, Union[int, float, str]]:
"""Insert raw data into a single stream, where data is a List of tuples of int64 timestamps and float64 values.

Parameters
----------
stream : btrdb.stream.Stream, required
The stream to insert data into.
data : List[Tuple[int, float]], required
The data to insert into stream.
merge_policy : str, optional, default = 'never'
How should the platform handle duplicated data?
Valid policies:
`never`: the default, no points are merged
`equal`: points are deduplicated if the time and value are equal
`retain`: if two points have the same timestamp, the old one is kept
`replace`: if two points have the same timestamp, the new one is kept
"""
prev_ver = stream.version()
tic = perf_counter()
new_ver = stream.insert(data, merge=merge_policy)
toc = perf_counter()
run_time = toc - tic
n_points = len(data)
result = {
"uuid": stream.uuid,
"previous_version": prev_ver,
"new_version": new_ver,
"points_to_insert": n_points,
"total_time_seconds": run_time,
"merge_policy": merge_policy,
}
return result


def time_stream_arrow_insert(
stream: btrdb.stream.Stream, data: pyarrow.Table, merge_policy: str = "never"
) -> Dict[str, Union[int, float, str]]:
"""Insert raw data into a single stream, where data is a pyarrow Table of timestamps and float values.

Parameters
----------
stream : btrdb.stream.Stream, required
The stream to insert data into.
data : pyarrow.Table, required
The table of data to insert into stream.
merge_policy : str, optional, default = 'never'
How should the platform handle duplicated data?
Valid policies:
`never`: the default, no points are merged
`equal`: points are deduplicated if the time and value are equal
`retain`: if two points have the same timestamp, the old one is kept
`replace`: if two points have the same timestamp, the new one is kept
"""
prev_ver = stream.version()
tic = perf_counter()
new_ver = stream.arrow_insert(data, merge=merge_policy)
toc = perf_counter()
run_time = toc - tic
n_points = data.num_rows
result = {
"uuid": stream.uuid,
"previous_version": prev_ver,
"new_version": new_ver,
"points_to_insert": n_points,
"total_time_seconds": run_time,
"merge_policy": merge_policy,
}
return result
103 changes: 103 additions & 0 deletions benchmarks/benchmark_streamset_inserts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import uuid
from time import perf_counter
from typing import Dict, List, Tuple, Union

import pyarrow

import btrdb


def time_streamset_inserts(
streamset: btrdb.stream.StreamSet,
data_map: Dict[uuid.UUID, List[Tuple[int, float]]],
merge_policy: str = "never",
) -> Dict[str, Union[str, int, float, List, Dict]]:
"""Insert data into a streamset with a provided data map.

Parameters
----------
streamset : btrdb.stream.StreamSet, required
The streams to insert data into.
data_map : Dict[uuid.UUID, List[Tuple[int, float]], required
The mappings of streamset uuids to data to insert.
merge_policy : str, optional, default = 'never'
How should the platform handle duplicated data?
Valid policies:
`never`: the default, no points are merged
`equal`: points are deduplicated if the time and value are equal
`retain`: if two points have the same timestamp, the old one is kept
`replace`: if two points have the same timestamp, the new one is kept
"""
_ensure_all_streams_in_data_map(streamset, data_map)
prev_vers = streamset.versions()
tic = perf_counter()
new_vers = streamset.insert(data_map=data_map, merge=merge_policy)
toc = perf_counter()
n_streams = len(streamset)
total_points = sum([len(v) for v in data_map.values()])
points_insert_per_stream = {s.uuid: len(data_map[s.uuid]) for s in streamset}
total_time = toc - tic
result = {
"n_streams": n_streams,
"total_points": total_points,
"previous_versions": prev_vers,
"new_versions": new_vers,
"total_time_seconds": total_time,
"points_insert_per_stream": points_insert_per_stream,
"merge_policy": merge_policy,
}
return result


def time_streamset_arrow_inserts(
streamset: btrdb.stream.StreamSet,
data_map: Dict[uuid.UUID, pyarrow.Table],
merge_policy: str = "never",
) -> Dict[str, Union[str, int, float, List, Dict]]:
"""Insert data into streamset using pyarrow Tables.

Parameters
----------
streamset : btrdb.stream.StreamSet, required
The streams to insert data into.
data_map : Dict[uuid.UUID, pyarrow.Table], required
The mappings of streamset uuids to pyarrow tables to insert.
merge_policy : str, optional, default = 'never'
How should the platform handle duplicated data?
Valid policies:
`never`: the default, no points are merged
`equal`: points are deduplicated if the time and value are equal
`retain`: if two points have the same timestamp, the old one is kept
`replace`: if two points have the same timestamp, the new one is kept
"""
_ensure_all_streams_in_data_map(streamset, data_map)
prev_vers = streamset.versions()
tic = perf_counter()
new_vers = streamset.arrow_insert(data_map=data_map, merge=merge_policy)
toc = perf_counter()
n_streams = len(streamset)
total_points = sum([v.num_rows for v in data_map.values()])
points_insert_per_stream = {s.uuid: data_map[s.uuid].num_rows for s in streamset}
total_time = toc - tic
result = {
"n_streams": n_streams,
"total_points": total_points,
"previous_versions": prev_vers,
"new_versions": new_vers,
"total_time_seconds": total_time,
"points_insert_per_stream": points_insert_per_stream,
"merge_policy": merge_policy,
}
return result


def _ensure_all_streams_in_data_map(
streamset: btrdb.stream.StreamSet,
data_map: Dict[uuid.UUID, Union[pyarrow.Table, List[Tuple[int, float]]]],
) -> None:
truthy = [s.uuid in data_map for s in streamset]
uus = [s.uuid for s in streamset]
if not all(truthy):
raise ValueError(
f"Streams in streamset are not present in the data map. Data_map keys: {data_map.keys()}, streamset uuids: {uus}"
)