Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threaded arrow #23

Merged
merged 40 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
ca5346c
Merge pull request #96 from PingThingsIO/sc-17506-update-depth-docume…
davidkonigsberg Jan 24, 2022
ef9c35f
Merge pull request #97 from PingThingsIO/sc-17504-what-is-time
davidkonigsberg Jan 24, 2022
6362963
Release v5.15.0
davidkonigsberg Jan 25, 2022
73a9ec8
Merge branch 'master' of https://github.com/PingThingsIO/btrdb-python
jleifnf Mar 17, 2023
c8caa51
update protobuf to v4.22.3
jleifnf Apr 14, 2023
6696adc
Add threaded streamset calls
justinGilmer May 18, 2023
82bcf5e
Blacken code
justinGilmer May 18, 2023
d5ff32d
Update for failing tests
justinGilmer May 18, 2023
5300276
Ignore flake8 as part of testing
justinGilmer May 18, 2023
fdaba46
Update .gitignore
justinGilmer May 18, 2023
56d8c19
Update proto definitions.
justinGilmer May 18, 2023
6ef0a67
Update endpoint to support arrow methods
justinGilmer May 18, 2023
7a40b18
Support arrow endpoints
justinGilmer May 19, 2023
dd512d3
Additional arrow updates
justinGilmer May 19, 2023
22aa9cf
Update transformers, add polars conversion
justinGilmer May 20, 2023
5e96339
Update .gitignore
justinGilmer May 20, 2023
2c986fa
Update ignore and remove extra print.
justinGilmer May 20, 2023
5dbfd8f
Remove idea folder (pycharm)
justinGilmer May 20, 2023
67d7a44
Merge branch 'threadpool_executor' into threaded_arrow
justinGilmer May 20, 2023
10239c8
Update requirements.txt
justinGilmer May 20, 2023
d193556
Update btrdb/transformers.py
justinGilmer May 20, 2023
84175b5
Update the way to check for arrow-enabled btrdb
justinGilmer May 24, 2023
0522cc0
Use IPC streams to send the arrow bytes for insert
justinGilmer May 24, 2023
038f79c
Create arrow specific stream methods.
justinGilmer May 25, 2023
ce5b410
Update test conn object to support minor version
justinGilmer May 25, 2023
8836d26
Update tests and migrate arrow code.
justinGilmer May 25, 2023
ba49fe6
Arrow and standard streamset insert
justinGilmer May 25, 2023
94e167a
Create basic arrow to dataframe transformer
justinGilmer May 25, 2023
a0d6244
Support multirawvalues, arrow transformers
justinGilmer May 26, 2023
90920eb
Multivalue arrow queries, in progress
justinGilmer May 30, 2023
ea09373
Update stream filter to properly filter for sampling frequency
justinGilmer May 30, 2023
10122fe
Update arrow values queries for multivalues
justinGilmer May 30, 2023
b48e301
Update param passing for sampling frequency
justinGilmer May 30, 2023
7424acf
Update index passing, and ignore depth
justinGilmer May 31, 2023
9a5e69f
benchmark raw values queries for arrow and current api
justinGilmer Jun 2, 2023
36bf46c
Add aligned windows and run func
justinGilmer Jun 2, 2023
d116740
Streamset read benchmarks (WIP)
justinGilmer Jun 2, 2023
2039df1
Update mock return value for versionMajor
justinGilmer Jun 2, 2023
12e6546
In progress validation of stream benchs
justinGilmer Jun 2, 2023
24d8847
Merge branch 'staging' into threaded_arrow
justinGilmer Jun 6, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ dmypy.json

# Pyre type checker
.pyre/

# arrow parquet files
*.parquet

.idea
.idea/misc.xml
.idea/vcs.xml
.idea/inspectionProfiles/profiles_settings.xml
Expand Down
276 changes: 276 additions & 0 deletions benchmarks/benchmark_stream_reads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
import uuid
from time import perf_counter
from typing import Dict, Union

import btrdb


def time_single_stream_raw_values(
stream: btrdb.stream.Stream, start: int, end: int, version: int = 0
) -> Dict[str, Union[int, str]]:
"""Return the elapsed time for the stream raw values query

Parameters
----------
stream : btrdb.Stream, required
The data stream to return raw values.
start : int, required
The start time (in nanoseconds) to return raw values (inclusive).
end : int, required
The end time (in nanoseconds) to return raw values (exclusive)
version : int, optional, default : 0
The version of the stream to query for points.

Notes
-----
The data points returned will be [start, end)

Returns
-------
results : dict
The performance results of the stream method
"""
expected_count = stream.count(start, end, version=version, precise=True)
tic = perf_counter()
vals = stream.values(start, end, version=version)
toc = perf_counter()
# minus 1 to account for the exclusive end time
queried_points = len(vals)
assert queried_points == expected_count
# time in seconds to run
run_time = toc - tic
results = _create_stream_result_dict(
stream.uuid, point_count=queried_points, total_time=run_time, version=version
)
return results


def time_single_stream_arrow_raw_values(
stream: btrdb.stream.Stream, start: int, end: int, version: int = 0
) -> Dict[str, Union[str, int, float]]:
"""Return the elapsed time for the stream arrow raw values query

Parameters
----------
stream : btrdb.Stream, required
The data stream to return the raw data as an arrow table.
start : int, required
The start time (in nanoseconds) to return raw values (inclusive).
end : int, required
The end time (in nanoseconds) to return raw values (exclusive)
version : int, optional, default : 0
The version of the stream to query for points.

Notes
-----
The data points returned will be [start, end)

Returns
-------
results : dict
The performance results of the stream method
"""
# minus 1 to account for the exclusive end time for the values query
expected_count = stream.count(start, end, version=version, precise=True)
tic = perf_counter()
vals = stream.arrow_values(start, end, version=version)
toc = perf_counter()
# num of rows
queried_points = vals.num_rows
assert queried_points == expected_count
# time in seconds to run
run_time = toc - tic
results = _create_stream_result_dict(
stream.uuid, point_count=queried_points, total_time=run_time, version=version
)
return results


def time_single_stream_windows_values(
stream: btrdb.stream.Stream, start: int, end: int, width_ns: int, version: int = 0
) -> Dict[str, Union[str, int, float]]:
"""Return the elapsed time for the stream windows values query

Parameters
----------
stream : btrdb.Stream, required
The data stream to return the windowed data as a list of statpoints
start : int, required
The start time (in nanoseconds) to return statpoint values
end : int, required
The end time (in nanoseconds) to return statpoint values
width_ns : int, required
The window width (in nanoseconds) for the statpoints
version : int, optional, default : 0
The version of the stream to query for points.

Returns
-------
results : dict
The performance results of the stream method
"""
tic = perf_counter()
vals = stream.windows(start, end, width=width_ns, version=version)
toc = perf_counter()
# num of statpoints
queried_points = len(vals)
assert queried_points != 0
# time in seconds to run
run_time = toc - tic
results = _create_stream_result_dict(
stream.uuid, point_count=queried_points, total_time=run_time, version=version
)
return results


def time_single_stream_arrow_windows_values(
stream: btrdb.stream.Stream, start: int, end: int, width_ns: int, version: int = 0
) -> Dict[str, Union[str, int, float]]:
"""Return the elapsed time for the stream arrow window values query

Parameters
----------
stream : btrdb.Stream, required
The data stream to return the windowed data as an arrow table.
start : int, required
The start time (in nanoseconds) to return statpoint values
end : int, required
The end time (in nanoseconds) to return statpoint values
width_ns : int, required
The window width (in nanoseconds) for the statpoints
version : int, optional, default : 0
The version of the stream to query for points.

Notes
-----
The data points returned will be [start, end)

Returns
-------
results : dict
The performance results of the stream method
"""
tic = perf_counter()
vals = stream.arrow_windows(start, end, width=width_ns, version=version)
toc = perf_counter()
# num of statpoints
queried_points = vals.num_rows
assert queried_points != 0
# time in seconds to run
run_time = toc - tic
results = _create_stream_result_dict(
stream.uuid, point_count=queried_points, total_time=run_time, version=version
)
return results


def time_single_stream_aligned_windows_values(
stream: btrdb.stream.Stream, start: int, end: int, pointwidth: int, version: int = 0
) -> Dict[str, Union[str, int, float]]:
"""Return the elapsed time for the stream window values query

Parameters
----------
stream : btrdb.Stream, required
The data stream to return the windowed data as a list of statpoints
start : int, required
The start time (in nanoseconds) to return statpoint values
end : int, required
The end time (in nanoseconds) to return statpoint values
pointwidth : int, required
The level of the tree to return statpoints (the exponent k in 2**k)
version : int, optional, default : 0
The version of the stream to query for points.

Returns
-------
results : dict
The performance results of the stream method
"""
tic = perf_counter()
vals = stream.aligned_windows(start, end, pointwidth=pointwidth, version=version)
toc = perf_counter()
# num of statpoints
queried_points = len(vals)
assert queried_points != 0
# time in seconds to run
run_time = toc - tic
results = _create_stream_result_dict(
stream.uuid, point_count=queried_points, total_time=run_time, version=version
)
return results


def time_single_stream_arrow_aligned_windows_values(
stream: btrdb.stream.Stream, start: int, end: int, pointwidth: int, version: int = 0
) -> Dict[str, Union[str, int, float]]:
"""Return the elapsed time for the stream arrow aligned window values query

Parameters
----------
stream : btrdb.Stream, required
The data stream to return the windowed data as an arrow table.
start : int, required
The start time (in nanoseconds) to return statpoint values
end : int, required
The end time (in nanoseconds) to return statpoint values
pointwidth : int, required
The level of the tree to return statpoints (the exponent k in 2**k)
version : int, optional, default : 0
The version of the stream to query for points.

Returns
-------
results : dict
The performance results of the stream method
"""
tic = perf_counter()
vals = stream.arrow_aligned_windows(
start, end, pointwidth=pointwidth, version=version
)
toc = perf_counter()
# num of statpoints
queried_points = vals.num_rows
assert queried_points != 0
# time in seconds to run
run_time = toc - tic
results = _create_stream_result_dict(
stream.uuid, point_count=queried_points, total_time=run_time, version=version
)
return results


def _create_stream_result_dict(
uu: uuid.UUID,
point_count: int,
total_time: float,
version: int,
) -> Dict[str, Union[str, int, float]]:
return {
"uuid": str(uu),
"total_points": point_count,
"total_time_seconds": total_time,
"stream_version": version,
}


def main():
"""Run a single run of the benchmarks"""
conn = btrdb.connect(profile="andy")
stream1 = conn.stream_from_uuid(
list(conn.streams_in_collection("andy/7064-6684-5e6e-9e14-ff9ca7bae46e"))[0].uuid
)
start = stream1.earliest()[0].time
end = stream1.latest()[0].time
width_ns = btrdb.utils.timez.ns_delta(minutes=1)
pointwidth = btrdb.utils.general.pointwidth(38)
print(f"pointwidth of: {pointwidth}")
for f in [time_single_stream_arrow_raw_values, time_single_stream_raw_values]:
print(f(stream1, start, end, 0))
for f in [time_single_stream_arrow_windows_values, time_single_stream_windows_values]:
print(f(stream1, start, end, width_ns=width_ns, version=0))
for f in [time_single_stream_arrow_aligned_windows_values, time_single_stream_aligned_windows_values]:
print(f(stream1, start, end, pointwidth=pointwidth, version=0))

if __name__=="__main__":
main()
Loading