From 625cdbf27932ac4d042d40cef616fdc00e5ac164 Mon Sep 17 00:00:00 2001 From: "yukkit.zhang" Date: Sat, 7 Sep 2024 15:14:39 +0800 Subject: [PATCH] add doc&ut --- .../src/table/table_provider_factory.rs | 141 ++++++++++++++++-- 1 file changed, 126 insertions(+), 15 deletions(-) diff --git a/crates/integrations/datafusion/src/table/table_provider_factory.rs b/crates/integrations/datafusion/src/table/table_provider_factory.rs index c53c8a5ed..7f4ce9d15 100644 --- a/crates/integrations/datafusion/src/table/table_provider_factory.rs +++ b/crates/integrations/datafusion/src/table/table_provider_factory.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::borrow::Cow; use std::collections::HashMap; use std::sync::Arc; @@ -35,12 +36,56 @@ use crate::to_datafusion_error; /// /// # Example /// +/// The following example demonstrates how to create an Iceberg external table using SQL in +/// a DataFusion session with `IcebergTableProviderFactory`: +/// /// ``` -/// use datafusion::catalog::TableProviderFactory; +/// use std::sync::Arc; +/// +/// use datafusion::execution::session_state::SessionStateBuilder; +/// use datafusion::prelude::*; +/// use datafusion::sql::TableReference; /// use iceberg_datafusion::IcebergTableProviderFactory; /// -/// let factory = IcebergTableProviderFactory::new(); -/// // Use the factory to create a table provider for an Iceberg table +/// #[tokio::main] +/// async fn main() { +/// // Create a new session context +/// let mut state = SessionStateBuilder::new().with_default_features().build(); +/// +/// // Register the IcebergTableProviderFactory in the session +/// state.table_factories_mut().insert( +/// "ICEBERG".to_string(), +/// Arc::new(IcebergTableProviderFactory::new()), +/// ); +/// +/// let ctx = SessionContext::new_with_state(state); +/// +/// // Define the table reference and the location of the Iceberg metadata file +/// let table_ref = TableReference::bare("my_iceberg_table"); +/// // /path/to/iceberg/metadata +/// let metadata_file_path = format!( +/// "{}/testdata/table_metadata/{}", +/// env!("CARGO_MANIFEST_DIR"), +/// "TableMetadataV2.json" +/// ); +/// +/// // SQL command to create the Iceberg external table +/// let sql = format!( +/// "CREATE EXTERNAL TABLE {} STORED AS ICEBERG LOCATION '{}'", +/// table_ref, metadata_file_path +/// ); +/// +/// // Execute the SQL to create the external table +/// ctx.sql(&sql).await.expect("Failed to create table"); +/// +/// // Verify the table was created by retrieving the table provider +/// let table_provider = ctx +/// .table_provider(table_ref) +/// .await +/// .expect("Table not found"); +/// +/// println!("Iceberg external table created successfully."); +/// } /// ``` /// /// # Note @@ -75,7 +120,9 @@ impl TableProviderFactory for IcebergTableProviderFactory { let metadata_file_path = &cmd.location; let options = &cmd.options; - let table = create_static_table(table_name, metadata_file_path, options) + let table_name_with_ns = complement_namespace_if_necessary(table_name); + + let table = create_static_table(table_name_with_ns, metadata_file_path, options) .await .map_err(to_datafusion_error)? .into_table(); @@ -88,6 +135,8 @@ impl TableProviderFactory for IcebergTableProviderFactory { } fn check_cmd(cmd: &CreateExternalTable) -> Result<()> { + println!("Checking command: {:?}", cmd); + let CreateExternalTable { schema, table_partition_cols, @@ -111,8 +160,27 @@ fn check_cmd(cmd: &CreateExternalTable) -> Result<()> { Ok(()) } +/// Complements the namespace of a table name if necessary. +/// +/// # Note +/// If the table name is a bare name, it will be complemented with the 'default' namespace. +/// Otherwise, it will be returned as is. Because Iceberg tables are always namespaced, but DataFusion +/// external table commands not include the namespace, this function ensures that the namespace is always present. +/// +/// # See also +/// - [`iceberg::NamespaceIdent`] +/// - [`datafusion::sql::planner::SqlToRel::external_table_to_plan`] +fn complement_namespace_if_necessary(table_name: &TableReference) -> Cow<'_, TableReference> { + match table_name { + TableReference::Bare { table } => { + Cow::Owned(TableReference::partial("default", table.as_ref())) + } + other => Cow::Borrowed(other), + } +} + async fn create_static_table( - table_name: &TableReference, + table_name: Cow<'_, TableReference>, metadata_file_path: &str, props: &HashMap, ) -> Result { @@ -129,6 +197,7 @@ mod tests { use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::catalog::TableProviderFactory; use datafusion::common::{Constraints, DFSchema}; + use datafusion::execution::session_state::SessionStateBuilder; use datafusion::logical_expr::CreateExternalTable; use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY; use datafusion::prelude::SessionContext; @@ -153,17 +222,18 @@ mod tests { ]) } - #[tokio::test] - async fn test_schema_of_created_table() { - let factory = IcebergTableProviderFactory::new(); - - let metadata_file_path = format!( + fn table_metadata_location() -> String { + format!( "{}/testdata/table_metadata/{}", env!("CARGO_MANIFEST_DIR"), "TableMetadataV2.json" - ); + ) + } + + fn create_external_table_cmd() -> CreateExternalTable { + let metadata_file_path = table_metadata_location(); - let cmd = CreateExternalTable { + CreateExternalTable { name: TableReference::partial("static_ns", "static_table"), location: metadata_file_path, schema: Arc::new(DFSchema::empty()), @@ -176,10 +246,16 @@ mod tests { if_not_exists: Default::default(), definition: Default::default(), unbounded: Default::default(), - }; + } + } + + #[tokio::test] + async fn test_schema_of_created_table() { + let factory = IcebergTableProviderFactory::new(); + + let state = SessionStateBuilder::new().build(); + let cmd = create_external_table_cmd(); - let ctx = SessionContext::new(); - let state = ctx.state(); let table_provider = factory .create(&state, &cmd) .await @@ -190,4 +266,39 @@ mod tests { assert_eq!(actual_schema.as_ref(), &expected_schema); } + + #[tokio::test] + async fn test_schema_of_created_external_table_sql() { + let mut state = SessionStateBuilder::new().with_default_features().build(); + state.table_factories_mut().insert( + "ICEBERG".to_string(), + Arc::new(IcebergTableProviderFactory::new()), + ); + let ctx = SessionContext::new_with_state(state); + + // All external tables in DataFusion use bare names. + // See https://github.com/apache/datafusion/blob/main/datafusion/sql/src/statement.rs#L1038-#L1039 + let table_ref = TableReference::bare("static_table"); + + // Create the external table + let sql = format!( + "CREATE EXTERNAL TABLE {} STORED AS ICEBERG LOCATION '{}'", + table_ref, + table_metadata_location() + ); + println!("{}", sql); + let _df = ctx.sql(&sql).await.expect("create table failed"); + + // Get the created external table + let table_provider = ctx + .table_provider(table_ref) + .await + .expect("table not found"); + + // Check the schema of the created table + let expected_schema = table_metadata_v2_schema(); + let actual_schema = table_provider.schema(); + + assert_eq!(actual_schema.as_ref(), &expected_schema); + } }