diff --git a/btrdb/stream.py b/btrdb/stream.py index b777bf5..caf95ab 100644 --- a/btrdb/stream.py +++ b/btrdb/stream.py @@ -917,7 +917,7 @@ def arrow_values( end : int or datetime like object The end time in nanoseconds for the range to be queried. (see :func:`btrdb.utils.timez.to_nanoseconds` for valid input types) - version: int + version: int, default: 0 The version of the stream to be queried auto_retry: bool, default: False Whether to retry this request in the event of an error @@ -931,6 +931,7 @@ def arrow_values( Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False + Returns ------ pyarrow.Table @@ -1049,6 +1050,7 @@ def arrow_aligned_windows( start: int, end: int, pointwidth: int, + sort_time: bool = False, version: int = 0, auto_retry=False, retries=5, @@ -1081,7 +1083,9 @@ def arrow_aligned_windows( :func:`btrdb.utils.timez.to_nanoseconds` for valid input types) pointwidth : int, required Specify the number of ns between data points (2**pointwidth) - version : int + sort_time : bool, default: False + Should the table be sorted on the 'time' column? + version : int, default: 0 Version of the stream to query auto_retry: bool, default: False Whether to retry this request in the event of an error @@ -1123,7 +1127,10 @@ def arrow_aligned_windows( ) if len(tables) > 0: tabs, ver = zip(*tables) - return pa.concat_tables(tabs) + if sort_time: + return pa.concat_tables(tabs).sort_by("time") + else: + return pa.concat_tables(tabs) else: schema = pa.schema( [ @@ -1217,6 +1224,7 @@ def arrow_windows( start: int, end: int, width: int, + sort_time: bool = False, version: int = 0, auto_retry=False, retries=5, @@ -1235,6 +1243,8 @@ def arrow_windows( :func:`btrdb.utils.timez.to_nanoseconds` for valid input types) width : int, required The number of nanoseconds in each window. + sort_time : bool, default: False + Should the table be sorted on the 'time' column. version : int, default=0, optional The version of the stream to query. auto_retry: bool, default: False @@ -1285,7 +1295,10 @@ def arrow_windows( ) if len(tables) > 0: tabs, ver = zip(*tables) - return pa.concat_tables(tabs) + if sort_time: + return pa.concat_tables(tabs).sort_by("time") + else: + return pa.concat_tables(tabs) else: schema = pa.schema( [ @@ -2094,6 +2107,8 @@ def arrow_values( ): """Return a pyarrow table of stream values based on the streamset parameters. + This data will be sorted by the 'time' column. + Notes ----- This method is available for commercial customers with arrow-enabled servers. @@ -2138,6 +2153,7 @@ def arrow_values( data = tablex else: data = tablex + data = data.sort_by("time") elif self.width is not None and self.depth is not None: # create list of stream.windows data (the windows method should @@ -2168,6 +2184,7 @@ def arrow_values( data = tablex else: data = tablex + data = data.sort_by("time") else: sampling_freq = params.pop("sampling_frequency", 0) period_ns = 0