From 46dc0e5b53542aa7c66b1141c45250f41292669f Mon Sep 17 00:00:00 2001 From: Justin Gilmer Date: Mon, 21 Aug 2023 16:25:37 -0400 Subject: [PATCH 1/3] Provide option to sort the arrow tables Previously, the arrow endpoints are not guaranteed to present their data in a sorted order, this PR lets the user set that for single stream calls, and sorts by time for the streamset operations by default. Streamset transformers like streamset.to_dataframe in the old version of the api used a PointBuffer that would sort the values by time before returning to the user. The single stream arrow methods now have a boolean argument `sorted` to let the user specify if they want to sort the returned table on the 'time' column or not. This is False by default. The streamset methods though, will be sorted by default and the user wont be able to switch that. We can change that later if needed. --- btrdb/stream.py | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/btrdb/stream.py b/btrdb/stream.py index b777bf5..b28de75 100644 --- a/btrdb/stream.py +++ b/btrdb/stream.py @@ -899,6 +899,7 @@ def arrow_values( self, start, end, + sorted: bool = False, version: int = 0, auto_retry=False, retries=5, @@ -917,7 +918,9 @@ def arrow_values( end : int or datetime like object The end time in nanoseconds for the range to be queried. (see :func:`btrdb.utils.timez.to_nanoseconds` for valid input types) - version: int + sorted : bool, default: False + Should the table be sorted by the 'time' column? + version: int, default: 0 The version of the stream to be queried auto_retry: bool, default: False Whether to retry this request in the event of an error @@ -931,6 +934,7 @@ def arrow_values( Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False + Returns ------ pyarrow.Table @@ -957,7 +961,10 @@ def arrow_values( tables = list(arrow_and_versions) if len(tables) > 0: tabs, ver = zip(*tables) - return pa.concat_tables(tabs) + if sorted: + return pa.concat_tables(tabs).sort_by("time") + else: + return pa.concat_tables(tabs) else: schema = pa.schema( [ @@ -1049,6 +1056,7 @@ def arrow_aligned_windows( start: int, end: int, pointwidth: int, + sorted: bool = False, version: int = 0, auto_retry=False, retries=5, @@ -1081,7 +1089,9 @@ def arrow_aligned_windows( :func:`btrdb.utils.timez.to_nanoseconds` for valid input types) pointwidth : int, required Specify the number of ns between data points (2**pointwidth) - version : int + sorted : bool, default: False + Should the table be sorted on the 'time' column? + version : int, default: 0 Version of the stream to query auto_retry: bool, default: False Whether to retry this request in the event of an error @@ -1123,7 +1133,10 @@ def arrow_aligned_windows( ) if len(tables) > 0: tabs, ver = zip(*tables) - return pa.concat_tables(tabs) + if sorted: + return pa.concat_tables(tabs).sort_by("time") + else: + return pa.concat_tables(tabs) else: schema = pa.schema( [ @@ -1217,6 +1230,7 @@ def arrow_windows( start: int, end: int, width: int, + sorted: bool = False, version: int = 0, auto_retry=False, retries=5, @@ -1235,6 +1249,8 @@ def arrow_windows( :func:`btrdb.utils.timez.to_nanoseconds` for valid input types) width : int, required The number of nanoseconds in each window. + sorted : bool, default: False + Should the table be sorted on the 'time' column. version : int, default=0, optional The version of the stream to query. auto_retry: bool, default: False @@ -1285,7 +1301,10 @@ def arrow_windows( ) if len(tables) > 0: tabs, ver = zip(*tables) - return pa.concat_tables(tabs) + if sorted: + return pa.concat_tables(tabs).sort_by("time") + else: + return pa.concat_tables(tabs) else: schema = pa.schema( [ @@ -2094,6 +2113,8 @@ def arrow_values( ): """Return a pyarrow table of stream values based on the streamset parameters. + This data will be sorted by the 'time' column. + Notes ----- This method is available for commercial customers with arrow-enabled servers. @@ -2190,7 +2211,7 @@ def arrow_values( data = pa.Table.from_arrays( [pa.array([]) for i in range(1 + len(self._streams))], schema=schema ) - return data + return data.sort_by("time") def __repr__(self): token = "stream" if len(self) == 1 else "streams" From 786b3114820a71f4ef53893c9be8db10704eac6b Mon Sep 17 00:00:00 2001 From: Justin Gilmer Date: Tue, 22 Aug 2023 11:57:45 -0400 Subject: [PATCH 2/3] Only sort window queries. --- btrdb/stream.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/btrdb/stream.py b/btrdb/stream.py index b28de75..70c7a57 100644 --- a/btrdb/stream.py +++ b/btrdb/stream.py @@ -899,7 +899,6 @@ def arrow_values( self, start, end, - sorted: bool = False, version: int = 0, auto_retry=False, retries=5, @@ -918,8 +917,6 @@ def arrow_values( end : int or datetime like object The end time in nanoseconds for the range to be queried. (see :func:`btrdb.utils.timez.to_nanoseconds` for valid input types) - sorted : bool, default: False - Should the table be sorted by the 'time' column? version: int, default: 0 The version of the stream to be queried auto_retry: bool, default: False @@ -961,10 +958,7 @@ def arrow_values( tables = list(arrow_and_versions) if len(tables) > 0: tabs, ver = zip(*tables) - if sorted: - return pa.concat_tables(tabs).sort_by("time") - else: - return pa.concat_tables(tabs) + return pa.concat_tables(tabs) else: schema = pa.schema( [ @@ -2159,6 +2153,7 @@ def arrow_values( data = tablex else: data = tablex + data = data.sort_by("time") elif self.width is not None and self.depth is not None: # create list of stream.windows data (the windows method should @@ -2189,6 +2184,7 @@ def arrow_values( data = tablex else: data = tablex + data = data.sort_by("time") else: sampling_freq = params.pop("sampling_frequency", 0) period_ns = 0 @@ -2211,7 +2207,7 @@ def arrow_values( data = pa.Table.from_arrays( [pa.array([]) for i in range(1 + len(self._streams))], schema=schema ) - return data.sort_by("time") + return data def __repr__(self): token = "stream" if len(self) == 1 else "streams" From 58de6944080995ec2cb465b327885387c073a4ba Mon Sep 17 00:00:00 2001 From: Justin Gilmer Date: Wed, 23 Aug 2023 15:46:20 -0400 Subject: [PATCH 3/3] Update sorted parameter to sort_time. --- btrdb/stream.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/btrdb/stream.py b/btrdb/stream.py index 70c7a57..caf95ab 100644 --- a/btrdb/stream.py +++ b/btrdb/stream.py @@ -1050,7 +1050,7 @@ def arrow_aligned_windows( start: int, end: int, pointwidth: int, - sorted: bool = False, + sort_time: bool = False, version: int = 0, auto_retry=False, retries=5, @@ -1083,7 +1083,7 @@ def arrow_aligned_windows( :func:`btrdb.utils.timez.to_nanoseconds` for valid input types) pointwidth : int, required Specify the number of ns between data points (2**pointwidth) - sorted : bool, default: False + sort_time : bool, default: False Should the table be sorted on the 'time' column? version : int, default: 0 Version of the stream to query @@ -1127,7 +1127,7 @@ def arrow_aligned_windows( ) if len(tables) > 0: tabs, ver = zip(*tables) - if sorted: + if sort_time: return pa.concat_tables(tabs).sort_by("time") else: return pa.concat_tables(tabs) @@ -1224,7 +1224,7 @@ def arrow_windows( start: int, end: int, width: int, - sorted: bool = False, + sort_time: bool = False, version: int = 0, auto_retry=False, retries=5, @@ -1243,7 +1243,7 @@ def arrow_windows( :func:`btrdb.utils.timez.to_nanoseconds` for valid input types) width : int, required The number of nanoseconds in each window. - sorted : bool, default: False + sort_time : bool, default: False Should the table be sorted on the 'time' column. version : int, default=0, optional The version of the stream to query. @@ -1295,7 +1295,7 @@ def arrow_windows( ) if len(tables) > 0: tabs, ver = zip(*tables) - if sorted: + if sort_time: return pa.concat_tables(tabs).sort_by("time") else: return pa.concat_tables(tabs)