Skip to content

Commit

Permalink
datafusion: Create table provider for a snapshot.
Browse files Browse the repository at this point in the history
The Iceberg table provider allows querying an Iceberg table via
datafusion. The initial implementation only allowed querying the latest
snapshot of the table. It sometimes useful to query a specific snapshot
(time travel). This commit adds this capability.  It adds a new method
(`try_new_from_table_snapshot`) that creates a provider for a specific
table snapshot.

All existing APIs should work as before.

Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
  • Loading branch information
Leonid Ryzhyk committed Nov 20, 2024
1 parent b2fb803 commit 229030b
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 6 deletions.
15 changes: 13 additions & 2 deletions crates/integrations/datafusion/src/physical_plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ use crate::to_datafusion_error;
pub(crate) struct IcebergTableScan {
/// A table in the catalog.
table: Table,
/// Snapshot of the table to scan.
snapshot_id: Option<i64>,
/// A reference-counted arrow `Schema`.
schema: ArrowSchemaRef,
/// Stores certain, often expensive to compute,
Expand All @@ -58,6 +60,7 @@ impl IcebergTableScan {
/// Creates a new [`IcebergTableScan`] object.
pub(crate) fn new(
table: Table,
snapshot_id: Option<i64>,
schema: ArrowSchemaRef,
projection: Option<&Vec<usize>>,
filters: &[Expr],
Expand All @@ -68,6 +71,7 @@ impl IcebergTableScan {

Self {
table,
snapshot_id,
schema,
plan_properties,
projection,
Expand Down Expand Up @@ -119,6 +123,7 @@ impl ExecutionPlan for IcebergTableScan {
) -> DFResult<SendableRecordBatchStream> {
let fut = get_batch_stream(
self.table.clone(),
self.snapshot_id,
self.projection.clone(),
self.predicates.clone(),
);
Expand Down Expand Up @@ -157,12 +162,18 @@ impl DisplayAs for IcebergTableScan {
/// and then converts it into a stream of Arrow [`RecordBatch`]es.
async fn get_batch_stream(
table: Table,
snapshot_id: Option<i64>,
column_names: Option<Vec<String>>,
predicates: Option<Predicate>,
) -> DFResult<Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>> {
let scan_builder = match snapshot_id {
Some(snapshot_id) => table.scan().snapshot_id(snapshot_id),
None => table.scan(),
};

let mut scan_builder = match column_names {
Some(column_names) => table.scan().select(column_names),
None => table.scan().select_all(),
Some(column_names) => scan_builder.select(column_names),
None => scan_builder.select_all(),
};
if let Some(pred) = predicates {
scan_builder = scan_builder.with_filter(pred);
Expand Down
79 changes: 75 additions & 4 deletions crates/integrations/datafusion/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
use datafusion::physical_plan::ExecutionPlan;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::table::Table;
use iceberg::{Catalog, NamespaceIdent, Result, TableIdent};
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};

use crate::physical_plan::scan::IcebergTableScan;

Expand All @@ -39,13 +39,19 @@ use crate::physical_plan::scan::IcebergTableScan;
pub struct IcebergTableProvider {
/// A table in the catalog.
table: Table,
/// Table snapshot id that will be queried via this provider.
snapshot_id: Option<i64>,
/// A reference-counted arrow `Schema`.
schema: ArrowSchemaRef,
}

impl IcebergTableProvider {
pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self {
IcebergTableProvider { table, schema }
IcebergTableProvider {
table,
snapshot_id: None,
schema,
}
}
/// Asynchronously tries to construct a new [`IcebergTableProvider`]
/// using the given client and table name to fetch an actual [`Table`]
Expand All @@ -60,14 +66,52 @@ impl IcebergTableProvider {

let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);

Ok(IcebergTableProvider { table, schema })
Ok(IcebergTableProvider {
table,
snapshot_id: None,
schema,
})
}

/// Asynchronously tries to construct a new [`IcebergTableProvider`]
/// using the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation.
pub async fn try_new_from_table(table: Table) -> Result<Self> {
let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
Ok(IcebergTableProvider { table, schema })
Ok(IcebergTableProvider {
table,
snapshot_id: None,
schema,
})
}

/// Asynchronously tries to construct a new [`IcebergTableProvider`]
/// using a specific snapshot of the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation.
pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64) -> Result<Self> {
let schema_id = table
.metadata()
.snapshot_by_id(snapshot_id)
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
format!(
"snapshot id {snapshot_id} not found in table {}",
table.identifier().name()
),
)
})?
.schema_id()
.ok_or_else(|| Error::new(ErrorKind::Unexpected, format!("cannot create a table provider: table snapshot {snapshot_id} does not have a schema id set")))?;
let schema = Arc::new(schema_to_arrow_schema(
table
.metadata()
.schema_by_id(schema_id)
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, format!("snapshot {snapshot_id} has current schema id {schema_id}; however this schema id does not exist in table metadata")))?,
)?);
Ok(IcebergTableProvider {
table,
snapshot_id: Some(snapshot_id),
schema,
})
}
}

Expand All @@ -94,6 +138,7 @@ impl TableProvider for IcebergTableProvider {
) -> DFResult<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(IcebergTableScan::new(
self.table.clone(),
self.snapshot_id,
self.schema.clone(),
projection,
filters,
Expand Down Expand Up @@ -162,4 +207,30 @@ mod tests {
let has_column = df_schema.has_column(&Column::from_name("z"));
assert!(has_column);
}

#[tokio::test]
async fn test_try_new_from_table_snapshot() {
let table = get_test_table_from_metadata_file().await;
let snapshot_id = table.metadata().snapshots().next().unwrap().snapshot_id();
let table_provider =
IcebergTableProvider::try_new_from_table_snapshot(table.clone(), snapshot_id)
.await
.unwrap();
let ctx = SessionContext::new();
ctx.register_table("mytable", Arc::new(table_provider))
.unwrap();
let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
let df_schema = df.schema();
let df_columns = df_schema.fields();
assert_eq!(df_columns.len(), 3);
let x_column = df_columns.first().unwrap();
let column_data = format!(
"{:?}:{:?}",
x_column.name(),
x_column.data_type().to_string()
);
assert_eq!(column_data, "\"x\":\"Int64\"");
let has_column = df_schema.has_column(&Column::from_name("z"));
assert!(has_column);
}
}

0 comments on commit 229030b

Please sign in to comment.