diff --git a/benchmarks/benchmark_stream_inserts.py b/benchmarks/benchmark_stream_inserts.py new file mode 100644 index 0000000..003c386 --- /dev/null +++ b/benchmarks/benchmark_stream_inserts.py @@ -0,0 +1,80 @@ +from time import perf_counter +from typing import Dict, List, Tuple, Union + +import pyarrow + +import btrdb + + +def time_stream_insert( + stream: btrdb.stream.Stream, + data: List[Tuple[int, float]], + merge_policy: str = "never", +) -> Dict[str, Union[int, float, str]]: + """Insert raw data into a single stream, where data is a List of tuples of int64 timestamps and float64 values. + + Parameters + ---------- + stream : btrdb.stream.Stream, required + The stream to insert data into. + data : List[Tuple[int, float]], required + The data to insert into stream. + merge_policy : str, optional, default = 'never' + How should the platform handle duplicated data? + Valid policies: + `never`: the default, no points are merged + `equal`: points are deduplicated if the time and value are equal + `retain`: if two points have the same timestamp, the old one is kept + `replace`: if two points have the same timestamp, the new one is kept + """ + prev_ver = stream.version() + tic = perf_counter() + new_ver = stream.insert(data, merge=merge_policy) + toc = perf_counter() + run_time = toc - tic + n_points = len(data) + result = { + "uuid": stream.uuid, + "previous_version": prev_ver, + "new_version": new_ver, + "points_to_insert": n_points, + "total_time_seconds": run_time, + "merge_policy": merge_policy, + } + return result + + +def time_stream_arrow_insert( + stream: btrdb.stream.Stream, data: pyarrow.Table, merge_policy: str = "never" +) -> Dict[str, Union[int, float, str]]: + """Insert raw data into a single stream, where data is a pyarrow Table of timestamps and float values. + + Parameters + ---------- + stream : btrdb.stream.Stream, required + The stream to insert data into. + data : pyarrow.Table, required + The table of data to insert into stream. + merge_policy : str, optional, default = 'never' + How should the platform handle duplicated data? + Valid policies: + `never`: the default, no points are merged + `equal`: points are deduplicated if the time and value are equal + `retain`: if two points have the same timestamp, the old one is kept + `replace`: if two points have the same timestamp, the new one is kept + """ + prev_ver = stream.version() + tic = perf_counter() + new_ver = stream.arrow_insert(data, merge=merge_policy) + toc = perf_counter() + run_time = toc - tic + n_points = data.num_rows + result = { + "uuid": stream.uuid, + "previous_version": prev_ver, + "new_version": new_ver, + "points_to_insert": n_points, + "total_time_seconds": run_time, + "merge_policy": merge_policy, + } + return result diff --git a/benchmarks/benchmark_streamset_inserts.py b/benchmarks/benchmark_streamset_inserts.py new file mode 100644 index 0000000..4a03e1f --- /dev/null +++ b/benchmarks/benchmark_streamset_inserts.py @@ -0,0 +1,103 @@ +import uuid +from time import perf_counter +from typing import Dict, List, Tuple, Union + +import pyarrow + +import btrdb + + +def time_streamset_inserts( + streamset: btrdb.stream.StreamSet, + data_map: Dict[uuid.UUID, List[Tuple[int, float]]], + merge_policy: str = "never", +) -> Dict[str, Union[str, int, float, List, Dict]]: + """Insert data into a streamset with a provided data map. + + Parameters + ---------- + streamset : btrdb.stream.StreamSet, required + The streams to insert data into. + data_map : Dict[uuid.UUID, List[Tuple[int, float]], required + The mappings of streamset uuids to data to insert. + merge_policy : str, optional, default = 'never' + How should the platform handle duplicated data? + Valid policies: + `never`: the default, no points are merged + `equal`: points are deduplicated if the time and value are equal + `retain`: if two points have the same timestamp, the old one is kept + `replace`: if two points have the same timestamp, the new one is kept + """ + _ensure_all_streams_in_data_map(streamset, data_map) + prev_vers = streamset.versions() + tic = perf_counter() + new_vers = streamset.insert(data_map=data_map, merge=merge_policy) + toc = perf_counter() + n_streams = len(streamset) + total_points = sum([len(v) for v in data_map.values()]) + points_insert_per_stream = {s.uuid: len(data_map[s.uuid]) for s in streamset} + total_time = toc - tic + result = { + "n_streams": n_streams, + "total_points": total_points, + "previous_versions": prev_vers, + "new_versions": new_vers, + "total_time_seconds": total_time, + "points_insert_per_stream": points_insert_per_stream, + "merge_policy": merge_policy, + } + return result + + +def time_streamset_arrow_inserts( + streamset: btrdb.stream.StreamSet, + data_map: Dict[uuid.UUID, pyarrow.Table], + merge_policy: str = "never", +) -> Dict[str, Union[str, int, float, List, Dict]]: + """Insert data into streamset using pyarrow Tables. + + Parameters + ---------- + streamset : btrdb.stream.StreamSet, required + The streams to insert data into. + data_map : Dict[uuid.UUID, pyarrow.Table], required + The mappings of streamset uuids to pyarrow tables to insert. + merge_policy : str, optional, default = 'never' + How should the platform handle duplicated data? + Valid policies: + `never`: the default, no points are merged + `equal`: points are deduplicated if the time and value are equal + `retain`: if two points have the same timestamp, the old one is kept + `replace`: if two points have the same timestamp, the new one is kept + """ + _ensure_all_streams_in_data_map(streamset, data_map) + prev_vers = streamset.versions() + tic = perf_counter() + new_vers = streamset.arrow_insert(data_map=data_map, merge=merge_policy) + toc = perf_counter() + n_streams = len(streamset) + total_points = sum([v.num_rows for v in data_map.values()]) + points_insert_per_stream = {s.uuid: data_map[s.uuid].num_rows for s in streamset} + total_time = toc - tic + result = { + "n_streams": n_streams, + "total_points": total_points, + "previous_versions": prev_vers, + "new_versions": new_vers, + "total_time_seconds": total_time, + "points_insert_per_stream": points_insert_per_stream, + "merge_policy": merge_policy, + } + return result + + +def _ensure_all_streams_in_data_map( + streamset: btrdb.stream.StreamSet, + data_map: Dict[uuid.UUID, Union[pyarrow.Table, List[Tuple[int, float]]]], +) -> None: + truthy = [s.uuid in data_map for s in streamset] + uus = [s.uuid for s in streamset] + if not all(truthy): + raise ValueError( + f"Streams in streamset are not present in the data map. Data_map keys: {data_map.keys()}, streamset uuids: {uus}" + ) diff --git a/btrdb/stream.py b/btrdb/stream.py index 8ded4a8..094ddde 100644 --- a/btrdb/stream.py +++ b/btrdb/stream.py @@ -10,6 +10,8 @@ """ Module for Stream and related classes """ +import io + ########################################################################## ## Imports ########################################################################## @@ -512,8 +514,8 @@ def arrow_insert(self, data: pa.Table, merge: str = "never") -> int: logger.debug(f"tmp_table schema: {tmp_table.schema}") new_schema = pa.schema( [ - (pa.field("time", pa.timestamp(unit="ns", tz="UTC"))), - (pa.field("value", pa.float64())), + (pa.field("time", pa.timestamp(unit="ns", tz="UTC"), nullable=False)), + (pa.field("value", pa.float64(), nullable=False)), ] ) tmp_table = tmp_table.cast(new_schema) @@ -1930,10 +1932,12 @@ def _materialize_stream_as_table(arrow_bytes): def _table_slice_to_feather_bytes(table_slice: pa.Table) -> bytes: - sink = pa.BufferOutputStream() + # sink = pa.BufferOutputStream() + sink = io.BytesIO() with pa.ipc.new_stream(sink=sink, schema=table_slice.schema) as writer: - writer.write_table(table_slice) - return sink.readall() + writer.write(table_slice) + buf = sink.getvalue() + return buf def _coalesce_table_deque(tables: deque):