Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multistream read bench insert bench #26

Merged
merged 2 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions benchmarks/benchmark_streamset_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,92 @@ def time_streamset_arrow_aligned_windows_values(
return results


def time_streamset_arrow_multistream_raw_values_non_timesnapped(
streamset: btrdb.stream.StreamSet,
start: int,
end: int,
version: int = 0,
sampling_frequency: int = None,
) -> Dict[str, Union[str, int, float]]:
"""Use the arrow multistream endpoint that joins the stream data on-server before sending to the client.

We make sure to set a sampling rate of 0 to ensure that we do not time snap and just perform a full-outer join on
the streams.

Parameters
----------
streamset : btrdb.stream.StreamSet, required
The streamset to perform the multistream query on.
start : int, required
The start time (in nanoseconds) to query raw data from.
end : int, required
The end time (in nanoseconds) non-exclusive, to query raw data from.
version : int, optional, default=0
The version of the stream to pin against, currently this is unused.
sampling_frequency : int, optional, ignored
The sampling frequency of the data stream in Hz

Notes
-----
Sampling_frequency is not used here, it will be manually set.
"""
streamset = streamset.filter(start=start, end=end, sampling_frequency=0)
versions = {s.uuid: 0 for s in streamset}
streamset = streamset.pin_versions(versions)
tic = perf_counter()
vals = streamset.arrow_values()
toc = perf_counter()
queried_points = vals.num_rows * len(streamset)
# print(vals)
# print(vals.to_pandas().describe())
run_time = toc - tic
results = _create_streamset_result_dict(
streamset=streamset, total_time=run_time, point_count=queried_points, version=0
)
return results


def time_streamset_arrow_multistream_raw_values_timesnapped(
streamset: btrdb.stream.StreamSet,
start: int,
end: int,
sampling_frequency: int,
version: int = 0,
) -> Dict[str, Union[str, int, float]]:
"""Use the arrow multistream endpoint that joins the stream data on-server before sending to the client.

We make sure to set a sampling rate to ensure that we time snap the returned data.

Parameters
----------
streamset : btrdb.stream.StreamSet, required
The streamset to perform the multistream query on.
start : int, required
The start time (in nanoseconds) to query raw data from.
end : int, required
The end time (in nanoseconds) non-exclusive, to query raw data from.
sampling_frequency : int, required
The common sampling frequency (in Hz) of the data to snap the data points to.
version : int, optional, default=0
The version of the stream to pin against, currently this is unused.
"""
streamset = streamset.filter(
start=start, end=end, sampling_frequency=sampling_frequency
)
versions = {s.uuid: 0 for s in streamset}
streamset = streamset.pin_versions(versions)
tic = perf_counter()
vals = streamset.arrow_values()
toc = perf_counter()
queried_points = vals.num_rows * len(streamset)
# print(vals)
run_time = toc - tic
results = _create_streamset_result_dict(
streamset=streamset, total_time=run_time, point_count=queried_points, version=0
)
return results


def _create_streamset_result_dict(
streamset: btrdb.stream.StreamSet,
point_count: int,
Expand Down Expand Up @@ -337,6 +423,13 @@ def main():
res = f(streamset, start, end, pointwidth=pointwidth, version=0)
res["func"] = f.__name__
res_list.append(res)
for f in [
time_streamset_arrow_multistream_raw_values_non_timesnapped,
time_streamset_arrow_multistream_raw_values_timesnapped,
]:
res = f(streamset, start, end, sampling_frequency=2, version=0)
res["func"] = f.__name__
res_list.append(res)

return res_list

Expand Down
2 changes: 1 addition & 1 deletion btrdb/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def arrowMultiValues(self, uu_list, start, end, version_list, snap_periodNS):
)
for result in self.stub.ArrowMultiValues(params):
check_proto_stat(result.stat)
yield result.arrowBytes, result.versionMajor
yield result.arrowBytes, None

@error_handler
def arrowInsertValues(self, uu: uuid.UUID, values: bytearray, policy: str):
Expand Down
4 changes: 3 additions & 1 deletion btrdb/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1837,8 +1837,10 @@ def _arrow_multivalues(self, period_ns: int):
params = self._params_from_filters()
versions = self.versions()
params["uu_list"] = [s.uuid for s in self._streams]
params["versions"] = [versions[s.uuid] for s in self._streams]
params["version_list"] = [versions[s.uuid] for s in self._streams]
params["snap_periodNS"] = period_ns
# dict.pop(key, default_return_value_if_no_key)
_ = params.pop("sampling_frequency", None)
arr_bytes = self._btrdb.ep.arrowMultiValues(**params)
# exhausting the generator from above
bytes_materialized = list(arr_bytes)
Expand Down