@@ -22,7 +22,7 @@ use std::sync::{Arc, RwLock};
2222
2323use arrow_array:: RecordBatch ;
2424use futures:: channel:: mpsc:: { channel, Sender } ;
25- use futures:: stream:: BoxStream ;
25+ use futures:: stream:: { self , BoxStream } ;
2626use futures:: { SinkExt , StreamExt , TryFutureExt , TryStreamExt } ;
2727use serde:: { Deserialize , Serialize } ;
2828
@@ -160,29 +160,33 @@ impl<'a> TableScanBuilder<'a> {
160160 /// Build the table scan.
161161 pub fn build ( self ) -> Result < TableScan > {
162162 let snapshot = match self . snapshot_id {
163- Some ( snapshot_id) => self
164- . table
165- . metadata ( )
166- . snapshot_by_id ( snapshot_id)
167- . ok_or_else ( || {
168- Error :: new (
169- ErrorKind :: DataInvalid ,
170- format ! ( "Snapshot with id {} not found" , snapshot_id) ,
171- )
172- } ) ?
173- . clone ( ) ,
174- None => self
175- . table
176- . metadata ( )
177- . current_snapshot ( )
178- . ok_or_else ( || {
179- Error :: new (
180- ErrorKind :: FeatureUnsupported ,
181- "Can't scan table without snapshots" ,
182- )
183- } ) ?
184- . clone ( ) ,
163+ Some ( snapshot_id) => Some (
164+ self . table
165+ . metadata ( )
166+ . snapshot_by_id ( snapshot_id)
167+ . ok_or_else ( || {
168+ Error :: new (
169+ ErrorKind :: DataInvalid ,
170+ format ! ( "Snapshot with id {} not found" , snapshot_id) ,
171+ )
172+ } ) ?,
173+ ) ,
174+ None => self . table . metadata ( ) . current_snapshot ( ) ,
185175 } ;
176+ if snapshot. is_none ( ) {
177+ return Ok ( TableScan {
178+ plan_context : None ,
179+ batch_size : None ,
180+ file_io : self . table . file_io ( ) . clone ( ) ,
181+ column_names : self . column_names ,
182+ concurrency_limit_data_files : self . concurrency_limit_data_files ,
183+ concurrency_limit_manifest_entries : self . concurrency_limit_manifest_entries ,
184+ concurrency_limit_manifest_files : self . concurrency_limit_manifest_files ,
185+ row_group_filtering_enabled : self . row_group_filtering_enabled ,
186+ } ) ;
187+ }
188+
189+ let snapshot = snapshot. unwrap ( ) ;
186190
187191 let schema = snapshot. schema ( self . table . metadata ( ) ) ?;
188192
@@ -246,7 +250,7 @@ impl<'a> TableScanBuilder<'a> {
246250 } ;
247251
248252 let plan_context = PlanContext {
249- snapshot,
253+ snapshot : snapshot . clone ( ) ,
250254 table_metadata : self . table . metadata_ref ( ) ,
251255 snapshot_schema : schema,
252256 case_sensitive : self . case_sensitive ,
@@ -263,7 +267,7 @@ impl<'a> TableScanBuilder<'a> {
263267 batch_size : self . batch_size ,
264268 column_names : self . column_names ,
265269 file_io : self . table . file_io ( ) . clone ( ) ,
266- plan_context,
270+ plan_context : Some ( plan_context ) ,
267271 concurrency_limit_data_files : self . concurrency_limit_data_files ,
268272 concurrency_limit_manifest_entries : self . concurrency_limit_manifest_entries ,
269273 concurrency_limit_manifest_files : self . concurrency_limit_manifest_files ,
@@ -275,7 +279,7 @@ impl<'a> TableScanBuilder<'a> {
275279/// Table scan.
276280#[ derive( Debug ) ]
277281pub struct TableScan {
278- plan_context : PlanContext ,
282+ plan_context : Option < PlanContext > ,
279283 batch_size : Option < usize > ,
280284 file_io : FileIO ,
281285 column_names : Vec < String > ,
@@ -316,6 +320,12 @@ struct PlanContext {
316320impl TableScan {
317321 /// Returns a stream of [`FileScanTask`]s.
318322 pub async fn plan_files ( & self ) -> Result < FileScanTaskStream > {
323+ if self . plan_context . is_none ( ) {
324+ return Ok ( stream:: empty ( ) . boxed ( ) ) ;
325+ } ;
326+
327+ let plan_context = & self . plan_context . as_ref ( ) . unwrap ( ) ;
328+
319329 let concurrency_limit_manifest_files = self . concurrency_limit_manifest_files ;
320330 let concurrency_limit_manifest_entries = self . concurrency_limit_manifest_entries ;
321331
@@ -325,14 +335,13 @@ impl TableScan {
325335 // used to stream the results back to the caller
326336 let ( file_scan_task_tx, file_scan_task_rx) = channel ( concurrency_limit_manifest_entries) ;
327337
328- let manifest_list = self . plan_context . get_manifest_list ( ) . await ?;
338+ let manifest_list = plan_context. get_manifest_list ( ) . await ?;
329339
330340 // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
331341 // whose content type is not Data or whose partitions cannot match this
332342 // scan's filter
333- let manifest_file_contexts = self
334- . plan_context
335- . build_manifest_file_contexts ( manifest_list, manifest_entry_ctx_tx) ?;
343+ let manifest_file_contexts =
344+ plan_context. build_manifest_file_contexts ( manifest_list, manifest_entry_ctx_tx) ?;
336345
337346 let mut channel_for_manifest_error = file_scan_task_tx. clone ( ) ;
338347
@@ -392,8 +401,11 @@ impl TableScan {
392401 & self . column_names
393402 }
394403 /// Returns a reference to the snapshot of the table scan.
395- pub fn snapshot ( & self ) -> & SnapshotRef {
396- & self . plan_context . snapshot
404+ pub fn snapshot ( & self ) -> Option < & SnapshotRef > {
405+ match & self . plan_context {
406+ Some ( plan_context) => Some ( & plan_context. snapshot ) ,
407+ None => None ,
408+ }
397409 }
398410
399411 async fn process_manifest_entry (
@@ -1175,7 +1187,7 @@ mod tests {
11751187 let table_scan = table. scan ( ) . build ( ) . unwrap ( ) ;
11761188 assert_eq ! (
11771189 table. metadata( ) . current_snapshot( ) . unwrap( ) . snapshot_id( ) ,
1178- table_scan. snapshot( ) . snapshot_id( )
1190+ table_scan. snapshot( ) . unwrap ( ) . snapshot_id( )
11791191 ) ;
11801192 }
11811193
@@ -1196,7 +1208,10 @@ mod tests {
11961208 . snapshot_id ( 3051729675574597004 )
11971209 . build ( )
11981210 . unwrap ( ) ;
1199- assert_eq ! ( table_scan. snapshot( ) . snapshot_id( ) , 3051729675574597004 ) ;
1211+ assert_eq ! (
1212+ table_scan. snapshot( ) . unwrap( ) . snapshot_id( ) ,
1213+ 3051729675574597004
1214+ ) ;
12001215 }
12011216
12021217 #[ tokio:: test]
0 commit comments