Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions btrdb/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ def arrow_values(
end : int or datetime like object
The end time in nanoseconds for the range to be queried. (see
:func:`btrdb.utils.timez.to_nanoseconds` for valid input types)
version: int
version: int, default: 0
The version of the stream to be queried
auto_retry: bool, default: False
Whether to retry this request in the event of an error
Expand All @@ -931,6 +931,7 @@ def arrow_values(
Exponential factor by which the backoff increases between retries.
Will be ignored if auto_retry is False


Returns
------
pyarrow.Table
Expand Down Expand Up @@ -1049,6 +1050,7 @@ def arrow_aligned_windows(
start: int,
end: int,
pointwidth: int,
sort_time: bool = False,
version: int = 0,
auto_retry=False,
retries=5,
Expand Down Expand Up @@ -1081,7 +1083,9 @@ def arrow_aligned_windows(
:func:`btrdb.utils.timez.to_nanoseconds` for valid input types)
pointwidth : int, required
Specify the number of ns between data points (2**pointwidth)
version : int
sort_time : bool, default: False
Should the table be sorted on the 'time' column?
version : int, default: 0
Version of the stream to query
auto_retry: bool, default: False
Whether to retry this request in the event of an error
Expand Down Expand Up @@ -1123,7 +1127,10 @@ def arrow_aligned_windows(
)
if len(tables) > 0:
tabs, ver = zip(*tables)
return pa.concat_tables(tabs)
if sort_time:
return pa.concat_tables(tabs).sort_by("time")
else:
return pa.concat_tables(tabs)
else:
schema = pa.schema(
[
Expand Down Expand Up @@ -1217,6 +1224,7 @@ def arrow_windows(
start: int,
end: int,
width: int,
sort_time: bool = False,
version: int = 0,
auto_retry=False,
retries=5,
Expand All @@ -1235,6 +1243,8 @@ def arrow_windows(
:func:`btrdb.utils.timez.to_nanoseconds` for valid input types)
width : int, required
The number of nanoseconds in each window.
sort_time : bool, default: False
Should the table be sorted on the 'time' column.
version : int, default=0, optional
The version of the stream to query.
auto_retry: bool, default: False
Expand Down Expand Up @@ -1285,7 +1295,10 @@ def arrow_windows(
)
if len(tables) > 0:
tabs, ver = zip(*tables)
return pa.concat_tables(tabs)
if sort_time:
return pa.concat_tables(tabs).sort_by("time")
else:
return pa.concat_tables(tabs)
else:
schema = pa.schema(
[
Expand Down Expand Up @@ -2094,6 +2107,8 @@ def arrow_values(
):
"""Return a pyarrow table of stream values based on the streamset parameters.

This data will be sorted by the 'time' column.

Notes
-----
This method is available for commercial customers with arrow-enabled servers.
Expand Down Expand Up @@ -2138,6 +2153,7 @@ def arrow_values(
data = tablex
else:
data = tablex
data = data.sort_by("time")

elif self.width is not None and self.depth is not None:
# create list of stream.windows data (the windows method should
Expand Down Expand Up @@ -2168,6 +2184,7 @@ def arrow_values(
data = tablex
else:
data = tablex
data = data.sort_by("time")
else:
sampling_freq = params.pop("sampling_frequency", 0)
period_ns = 0
Expand Down