diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index b64f8fb8e..b7b927fdd 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -24,4 +24,5 @@ pub use error::*; mod physical_plan; mod schema; mod table; +pub use table::table_provider_factory::IcebergTableProviderFactory; pub use table::*; diff --git a/crates/integrations/datafusion/src/table.rs b/crates/integrations/datafusion/src/table/mod.rs similarity index 97% rename from crates/integrations/datafusion/src/table.rs rename to crates/integrations/datafusion/src/table/mod.rs index bb24713aa..82f29bb52 100644 --- a/crates/integrations/datafusion/src/table.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +pub mod table_provider_factory; + use std::any::Any; use std::sync::Arc; @@ -41,6 +43,9 @@ pub struct IcebergTableProvider { } impl IcebergTableProvider { + pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self { + IcebergTableProvider { table, schema } + } /// Asynchronously tries to construct a new [`IcebergTableProvider`] /// using the given client and table name to fetch an actual [`Table`] /// in the provided namespace. diff --git a/crates/integrations/datafusion/src/table/table_provider_factory.rs b/crates/integrations/datafusion/src/table/table_provider_factory.rs new file mode 100644 index 000000000..b8e66bd30 --- /dev/null +++ b/crates/integrations/datafusion/src/table/table_provider_factory.rs @@ -0,0 +1,300 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::borrow::Cow; +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::catalog::{Session, TableProvider, TableProviderFactory}; +use datafusion::error::Result as DFResult; +use datafusion::logical_expr::CreateExternalTable; +use datafusion::sql::TableReference; +use iceberg::arrow::schema_to_arrow_schema; +use iceberg::io::FileIO; +use iceberg::table::StaticTable; +use iceberg::{Error, ErrorKind, Result, TableIdent}; + +use super::IcebergTableProvider; +use crate::to_datafusion_error; + +/// A factory that implements DataFusion's `TableProviderFactory` to create `IcebergTableProvider` instances. +/// +/// # Example +/// +/// The following example demonstrates how to create an Iceberg external table using SQL in +/// a DataFusion session with `IcebergTableProviderFactory`: +/// +/// ``` +/// use std::sync::Arc; +/// +/// use datafusion::execution::session_state::SessionStateBuilder; +/// use datafusion::prelude::*; +/// use datafusion::sql::TableReference; +/// use iceberg_datafusion::IcebergTableProviderFactory; +/// +/// #[tokio::main] +/// async fn main() { +/// // Create a new session context +/// let mut state = SessionStateBuilder::new().with_default_features().build(); +/// +/// // Register the IcebergTableProviderFactory in the session +/// state.table_factories_mut().insert( +/// "ICEBERG".to_string(), +/// Arc::new(IcebergTableProviderFactory::new()), +/// ); +/// +/// let ctx = SessionContext::new_with_state(state); +/// +/// // Define the table reference and the location of the Iceberg metadata file +/// let table_ref = TableReference::bare("my_iceberg_table"); +/// // /path/to/iceberg/metadata +/// let metadata_file_path = format!( +/// "{}/testdata/table_metadata/{}", +/// env!("CARGO_MANIFEST_DIR"), +/// "TableMetadataV2.json" +/// ); +/// +/// // SQL command to create the Iceberg external table +/// let sql = format!( +/// "CREATE EXTERNAL TABLE {} STORED AS ICEBERG LOCATION '{}'", +/// table_ref, metadata_file_path +/// ); +/// +/// // Execute the SQL to create the external table +/// ctx.sql(&sql).await.expect("Failed to create table"); +/// +/// // Verify the table was created by retrieving the table provider +/// let table_provider = ctx +/// .table_provider(table_ref) +/// .await +/// .expect("Table not found"); +/// +/// println!("Iceberg external table created successfully."); +/// } +/// ``` +/// +/// # Note +/// This factory is designed to work with the DataFusion query engine, +/// specifically for handling Iceberg tables in external table commands. +/// Currently, this implementation supports only reading Iceberg tables, with +/// the creation of new tables not yet available. +/// +/// # Errors +/// An error will be returned if any unsupported feature, such as partition columns, +/// order expressions, constraints, or column defaults, is detected in the table creation command. +#[derive(Default)] +pub struct IcebergTableProviderFactory {} + +impl IcebergTableProviderFactory { + pub fn new() -> Self { + Self {} + } +} + +#[async_trait] +impl TableProviderFactory for IcebergTableProviderFactory { + async fn create( + &self, + _state: &dyn Session, + cmd: &CreateExternalTable, + ) -> DFResult> { + check_cmd(cmd).map_err(to_datafusion_error)?; + + let table_name = &cmd.name; + let metadata_file_path = &cmd.location; + let options = &cmd.options; + + let table_name_with_ns = complement_namespace_if_necessary(table_name); + + let table = create_static_table(table_name_with_ns, metadata_file_path, options) + .await + .map_err(to_datafusion_error)? + .into_table(); + + let schema = schema_to_arrow_schema(table.metadata().current_schema()) + .map_err(to_datafusion_error)?; + + Ok(Arc::new(IcebergTableProvider::new(table, Arc::new(schema)))) + } +} + +fn check_cmd(cmd: &CreateExternalTable) -> Result<()> { + let CreateExternalTable { + schema, + table_partition_cols, + order_exprs, + constraints, + column_defaults, + .. + } = cmd; + + // Check if any of the fields violate the constraints in a single condition + let is_invalid = !schema.fields().is_empty() + || !table_partition_cols.is_empty() + || !order_exprs.is_empty() + || !constraints.is_empty() + || !column_defaults.is_empty(); + + if is_invalid { + return Err(Error::new(ErrorKind::FeatureUnsupported, "Currently we only support reading existing icebergs tables in external table command. To create new table, please use catalog provider.")); + } + + Ok(()) +} + +/// Complements the namespace of a table name if necessary. +/// +/// # Note +/// If the table name is a bare name, it will be complemented with the 'default' namespace. +/// Otherwise, it will be returned as is. Because Iceberg tables are always namespaced, but DataFusion +/// external table commands maybe not include the namespace, this function ensures that the namespace is always present. +/// +/// # See also +/// - [`iceberg::NamespaceIdent`] +/// - [`datafusion::sql::planner::SqlToRel::external_table_to_plan`] +fn complement_namespace_if_necessary(table_name: &TableReference) -> Cow<'_, TableReference> { + match table_name { + TableReference::Bare { table } => { + Cow::Owned(TableReference::partial("default", table.as_ref())) + } + other => Cow::Borrowed(other), + } +} + +async fn create_static_table( + table_name: Cow<'_, TableReference>, + metadata_file_path: &str, + props: &HashMap, +) -> Result { + let table_ident = TableIdent::from_strs(table_name.to_vec())?; + let file_io = FileIO::from_path(metadata_file_path)? + .with_props(props) + .build()?; + StaticTable::from_metadata_file(metadata_file_path, table_ident, file_io).await +} + +#[cfg(test)] +mod tests { + + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::catalog::TableProviderFactory; + use datafusion::common::{Constraints, DFSchema}; + use datafusion::execution::session_state::SessionStateBuilder; + use datafusion::logical_expr::CreateExternalTable; + use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY; + use datafusion::prelude::SessionContext; + use datafusion::sql::TableReference; + + use super::*; + + fn table_metadata_v2_schema() -> Schema { + Schema::new(vec![ + Field::new("x", DataType::Int64, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("y", DataType::Int64, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + Field::new("z", DataType::Int64, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + ]) + } + + fn table_metadata_location() -> String { + format!( + "{}/testdata/table_metadata/{}", + env!("CARGO_MANIFEST_DIR"), + "TableMetadataV2.json" + ) + } + + fn create_external_table_cmd() -> CreateExternalTable { + let metadata_file_path = table_metadata_location(); + + CreateExternalTable { + name: TableReference::partial("static_ns", "static_table"), + location: metadata_file_path, + schema: Arc::new(DFSchema::empty()), + file_type: "iceberg".to_string(), + options: Default::default(), + table_partition_cols: Default::default(), + order_exprs: Default::default(), + constraints: Constraints::empty(), + column_defaults: Default::default(), + if_not_exists: Default::default(), + definition: Default::default(), + unbounded: Default::default(), + } + } + + #[tokio::test] + async fn test_schema_of_created_table() { + let factory = IcebergTableProviderFactory::new(); + + let state = SessionStateBuilder::new().build(); + let cmd = create_external_table_cmd(); + + let table_provider = factory + .create(&state, &cmd) + .await + .expect("create table failed"); + + let expected_schema = table_metadata_v2_schema(); + let actual_schema = table_provider.schema(); + + assert_eq!(actual_schema.as_ref(), &expected_schema); + } + + #[tokio::test] + async fn test_schema_of_created_external_table_sql() { + let mut state = SessionStateBuilder::new().with_default_features().build(); + state.table_factories_mut().insert( + "ICEBERG".to_string(), + Arc::new(IcebergTableProviderFactory::new()), + ); + let ctx = SessionContext::new_with_state(state); + + // All external tables in DataFusion use bare names. + // See https://github.com/apache/datafusion/blob/main/datafusion/sql/src/statement.rs#L1038-#L1039 + let table_ref = TableReference::bare("static_table"); + + // Create the external table + let sql = format!( + "CREATE EXTERNAL TABLE {} STORED AS ICEBERG LOCATION '{}'", + table_ref, + table_metadata_location() + ); + let _df = ctx.sql(&sql).await.expect("create table failed"); + + // Get the created external table + let table_provider = ctx + .table_provider(table_ref) + .await + .expect("table not found"); + + // Check the schema of the created table + let expected_schema = table_metadata_v2_schema(); + let actual_schema = table_provider.schema(); + + assert_eq!(actual_schema.as_ref(), &expected_schema); + } +} diff --git a/crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2.json b/crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2.json new file mode 100644 index 000000000..a7b47217f --- /dev/null +++ b/crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2.json @@ -0,0 +1,121 @@ +{ + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "s3://bucket/test/location", + "last-sequence-number": 34, + "last-updated-ms": 1602638573590, + "last-column-id": 3, + "current-schema-id": 1, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "fields": [ + { + "id": 1, + "name": "x", + "required": true, + "type": "long" + } + ] + }, + { + "type": "struct", + "schema-id": 1, + "identifier-field-ids": [ + 1, + 2 + ], + "fields": [ + { + "id": 1, + "name": "x", + "required": true, + "type": "long" + }, + { + "id": 2, + "name": "y", + "required": true, + "type": "long" + }, + { + "id": 3, + "name": "z", + "required": true, + "type": "long" + } + ] + } + ], + "default-spec-id": 0, + "partition-specs": [ + { + "spec-id": 0, + "fields": [ + { + "name": "x", + "transform": "identity", + "source-id": 1, + "field-id": 1000 + } + ] + } + ], + "last-partition-id": 1000, + "default-sort-order-id": 3, + "sort-orders": [ + { + "order-id": 3, + "fields": [ + { + "transform": "identity", + "source-id": 2, + "direction": "asc", + "null-order": "nulls-first" + }, + { + "transform": "bucket[4]", + "source-id": 3, + "direction": "desc", + "null-order": "nulls-last" + } + ] + } + ], + "properties": {}, + "current-snapshot-id": 3055729675574597004, + "snapshots": [ + { + "snapshot-id": 3051729675574597004, + "timestamp-ms": 1515100955770, + "sequence-number": 0, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://a/b/1.avro" + }, + { + "snapshot-id": 3055729675574597004, + "parent-snapshot-id": 3051729675574597004, + "timestamp-ms": 1555100955770, + "sequence-number": 1, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://a/b/2.avro", + "schema-id": 1 + } + ], + "snapshot-log": [ + { + "snapshot-id": 3051729675574597004, + "timestamp-ms": 1515100955770 + }, + { + "snapshot-id": 3055729675574597004, + "timestamp-ms": 1555100955770 + } + ], + "metadata-log": [] +} \ No newline at end of file