Skip to content

Commit be0b757

Browse files
authored
Provide option to sort the arrow tables (#47)
* Provide option to sort the arrow tables Previously, the arrow endpoints are not guaranteed to present their data in a sorted order, this PR lets the user set that for single stream calls, and sorts by time for the streamset operations by default. Streamset transformers like streamset.to_dataframe in the old version of the api used a PointBuffer that would sort the values by time before returning to the user. The single stream arrow methods now have a boolean argument `sorted` to let the user specify if they want to sort the returned table on the 'time' column or not. This is False by default. The streamset methods though, will be sorted by default and the user wont be able to switch that. We can change that later if needed. * Only sort window queries. * Update sorted parameter to sort_time.
1 parent 0d12d58 commit be0b757

File tree

1 file changed

+21
-4
lines changed

1 file changed

+21
-4
lines changed

btrdb/stream.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -917,7 +917,7 @@ def arrow_values(
917917
end : int or datetime like object
918918
The end time in nanoseconds for the range to be queried. (see
919919
:func:`btrdb.utils.timez.to_nanoseconds` for valid input types)
920-
version: int
920+
version: int, default: 0
921921
The version of the stream to be queried
922922
auto_retry: bool, default: False
923923
Whether to retry this request in the event of an error
@@ -931,6 +931,7 @@ def arrow_values(
931931
Exponential factor by which the backoff increases between retries.
932932
Will be ignored if auto_retry is False
933933
934+
934935
Returns
935936
------
936937
pyarrow.Table
@@ -1049,6 +1050,7 @@ def arrow_aligned_windows(
10491050
start: int,
10501051
end: int,
10511052
pointwidth: int,
1053+
sort_time: bool = False,
10521054
version: int = 0,
10531055
auto_retry=False,
10541056
retries=5,
@@ -1081,7 +1083,9 @@ def arrow_aligned_windows(
10811083
:func:`btrdb.utils.timez.to_nanoseconds` for valid input types)
10821084
pointwidth : int, required
10831085
Specify the number of ns between data points (2**pointwidth)
1084-
version : int
1086+
sort_time : bool, default: False
1087+
Should the table be sorted on the 'time' column?
1088+
version : int, default: 0
10851089
Version of the stream to query
10861090
auto_retry: bool, default: False
10871091
Whether to retry this request in the event of an error
@@ -1123,7 +1127,10 @@ def arrow_aligned_windows(
11231127
)
11241128
if len(tables) > 0:
11251129
tabs, ver = zip(*tables)
1126-
return pa.concat_tables(tabs)
1130+
if sort_time:
1131+
return pa.concat_tables(tabs).sort_by("time")
1132+
else:
1133+
return pa.concat_tables(tabs)
11271134
else:
11281135
schema = pa.schema(
11291136
[
@@ -1217,6 +1224,7 @@ def arrow_windows(
12171224
start: int,
12181225
end: int,
12191226
width: int,
1227+
sort_time: bool = False,
12201228
version: int = 0,
12211229
auto_retry=False,
12221230
retries=5,
@@ -1235,6 +1243,8 @@ def arrow_windows(
12351243
:func:`btrdb.utils.timez.to_nanoseconds` for valid input types)
12361244
width : int, required
12371245
The number of nanoseconds in each window.
1246+
sort_time : bool, default: False
1247+
Should the table be sorted on the 'time' column.
12381248
version : int, default=0, optional
12391249
The version of the stream to query.
12401250
auto_retry: bool, default: False
@@ -1285,7 +1295,10 @@ def arrow_windows(
12851295
)
12861296
if len(tables) > 0:
12871297
tabs, ver = zip(*tables)
1288-
return pa.concat_tables(tabs)
1298+
if sort_time:
1299+
return pa.concat_tables(tabs).sort_by("time")
1300+
else:
1301+
return pa.concat_tables(tabs)
12891302
else:
12901303
schema = pa.schema(
12911304
[
@@ -2094,6 +2107,8 @@ def arrow_values(
20942107
):
20952108
"""Return a pyarrow table of stream values based on the streamset parameters.
20962109
2110+
This data will be sorted by the 'time' column.
2111+
20972112
Notes
20982113
-----
20992114
This method is available for commercial customers with arrow-enabled servers.
@@ -2138,6 +2153,7 @@ def arrow_values(
21382153
data = tablex
21392154
else:
21402155
data = tablex
2156+
data = data.sort_by("time")
21412157

21422158
elif self.width is not None and self.depth is not None:
21432159
# create list of stream.windows data (the windows method should
@@ -2168,6 +2184,7 @@ def arrow_values(
21682184
data = tablex
21692185
else:
21702186
data = tablex
2187+
data = data.sort_by("time")
21712188
else:
21722189
sampling_freq = params.pop("sampling_frequency", 0)
21732190
period_ns = 0

0 commit comments

Comments
 (0)