@@ -115,3 +115,52 @@ def test_arrow_streamset_to_dataframe(conn, tmp_collection):
115115 }
116116 expected_df = pd .DataFrame (expected_dat , index = pd .DatetimeIndex (expected_times ))
117117 assert values .equals (expected_df )
118+
119+
120+ def test_timesnap_backward_extends_range (conn , tmp_collection ):
121+ sec = 10 ** 9
122+ tv1 = [
123+ [int (0.5 * sec ), 0.5 ],
124+ [2 * sec , 2.0 ],
125+ ]
126+ tv2 = [
127+ [int (0.5 * sec ) - 1 , 0.5 ],
128+ [2 * sec , 2.0 ],
129+ ]
130+ tv3 = [
131+ [1 * sec , 1.0 ],
132+ [2 * sec , 2.0 ],
133+ ]
134+ s1 = conn .create (new_uuid (), tmp_collection , tags = {"name" : "s1" })
135+ s2 = conn .create (new_uuid (), tmp_collection , tags = {"name" : "s2" })
136+ s3 = conn .create (new_uuid (), tmp_collection , tags = {"name" : "s3" })
137+ s1 .insert (tv1 )
138+ s2 .insert (tv2 )
139+ s3 .insert (tv3 )
140+ ss = btrdb .stream .StreamSet ([s1 , s2 , s3 ]).filter (
141+ start = 1 * sec , end = 3 * sec , sampling_frequency = 1
142+ )
143+ values = ss .arrow_values ()
144+ assert [1 * sec , 2 * sec ] == [t .value for t in values ["time" ]]
145+ assert [0.5 , 2.0 ] == [v .as_py () for v in values [tmp_collection + "/s1" ]]
146+ assert [None , 2.0 ] == [
147+ None if isnan (v .as_py ()) else v .as_py () for v in values [tmp_collection + "/s2" ]
148+ ]
149+ assert [1.0 , 2.0 ] == [v .as_py () for v in values [tmp_collection + "/s3" ]]
150+
151+
152+ def test_timesnap_forward_restricts_range (conn , tmp_collection ):
153+ sec = 10 ** 9
154+ tv = [
155+ [1 * sec , 1.0 ],
156+ [2 * sec , 2.0 ],
157+ [int (2.75 * sec ), 2.75 ],
158+ ]
159+ s = conn .create (new_uuid (), tmp_collection , tags = {"name" : "s" })
160+ s .insert (tv )
161+ ss = btrdb .stream .StreamSet ([s ]).filter (start = 1 * sec , sampling_frequency = 1 )
162+ values = ss .filter (end = int (3.0 * sec )).arrow_values ()
163+ assert [1 * sec , 2 * sec ] == [t .value for t in values ["time" ]]
164+ assert [1.0 , 2.0 ] == [v .as_py () for v in values [tmp_collection + "/s" ]]
165+ # Same result if skipping past end instead of to end.
166+ assert values == ss .filter (end = int (2.9 * sec )).arrow_values ()
0 commit comments