diff --git a/bindings/python/Cargo.lock b/bindings/python/Cargo.lock index 8249414b8d..814c9afb35 100644 --- a/bindings/python/Cargo.lock +++ b/bindings/python/Cargo.lock @@ -2313,6 +2313,7 @@ dependencies = [ "chrono", "derive_builder", "expect-test", + "flate2", "fnv", "futures", "itertools 0.13.0", diff --git a/bindings/python/src/datafusion_table_provider.rs b/bindings/python/src/datafusion_table_provider.rs index b5e1bf952e..8db7223b34 100644 --- a/bindings/python/src/datafusion_table_provider.rs +++ b/bindings/python/src/datafusion_table_provider.rs @@ -23,7 +23,7 @@ use datafusion_ffi::table_provider::FFI_TableProvider; use iceberg::TableIdent; use iceberg::io::FileIO; use iceberg::table::StaticTable; -use iceberg_datafusion::table::IcebergTableProvider; +use iceberg_datafusion::table::IcebergStaticTableProvider; use pyo3::exceptions::PyRuntimeError; use pyo3::prelude::*; use pyo3::types::PyCapsule; @@ -32,7 +32,7 @@ use crate::runtime::runtime; #[pyclass(name = "IcebergDataFusionTable")] pub struct PyIcebergDataFusionTable { - inner: Arc, + inner: Arc, } #[pymethods] @@ -69,7 +69,7 @@ impl PyIcebergDataFusionTable { let table = static_table.into_table(); - IcebergTableProvider::try_new_from_table(table) + IcebergStaticTableProvider::try_new_from_table(table) .await .map_err(|e| { PyRuntimeError::new_err(format!("Failed to create table provider: {e}")) diff --git a/crates/integration_tests/tests/shared_tests/datafusion.rs b/crates/integration_tests/tests/shared_tests/datafusion.rs index 81bbb5f54c..60dd9f36c8 100644 --- a/crates/integration_tests/tests/shared_tests/datafusion.rs +++ b/crates/integration_tests/tests/shared_tests/datafusion.rs @@ -26,7 +26,7 @@ use datafusion::error::DataFusionError; use datafusion::prelude::SessionContext; use iceberg::{Catalog, CatalogBuilder, TableIdent}; use iceberg_catalog_rest::RestCatalogBuilder; -use iceberg_datafusion::IcebergTableProvider; +use iceberg_datafusion::IcebergStaticTableProvider; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::get_shared_containers; @@ -47,7 +47,7 @@ async fn test_basic_queries() -> Result<(), DataFusionError> { let ctx = SessionContext::new(); let table_provider = Arc::new( - IcebergTableProvider::try_new_from_table(table) + IcebergStaticTableProvider::try_new_from_table(table) .await .unwrap(), ); diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 3920ee73ca..31bbdbd67f 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -28,6 +28,7 @@ use iceberg::inspect::MetadataTableType; use iceberg::{Catalog, NamespaceIdent, Result}; use crate::table::IcebergTableProvider; +use crate::to_datafusion_error; /// Represents a [`SchemaProvider`] for the Iceberg [`Catalog`], managing /// access to table providers within a specific namespace. @@ -113,7 +114,10 @@ impl SchemaProvider for IcebergSchemaProvider { let metadata_table_type = MetadataTableType::try_from(metadata_table_name).map_err(DataFusionError::Plan)?; if let Some(table) = self.tables.get(table_name) { - let metadata_table = table.metadata_table(metadata_table_type); + let metadata_table = table + .metadata_table(metadata_table_type) + .await + .map_err(to_datafusion_error)?; return Ok(Some(Arc::new(metadata_table))); } else { return Ok(None); diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 42a3baad3b..8527668d6c 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -15,6 +15,16 @@ // specific language governing permissions and limitations // under the License. +//! Iceberg table providers for DataFusion. +//! +//! This module provides two table provider implementations: +//! +//! - [`IcebergTableProvider`]: Catalog-backed provider with automatic metadata refresh. +//! Use for write operations and when you need to see the latest table state. +//! +//! - [`IcebergStaticTableProvider`]: Static provider for read-only access to a specific +//! table snapshot. Use for consistent analytical queries or time-travel scenarios. + pub mod metadata_table; pub mod table_provider_factory; @@ -38,98 +48,61 @@ use iceberg::table::Table; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; use metadata_table::IcebergMetadataTableProvider; +use crate::error::to_datafusion_error; use crate::physical_plan::commit::IcebergCommitExec; use crate::physical_plan::project::project_with_partition; use crate::physical_plan::repartition::repartition; use crate::physical_plan::scan::IcebergTableScan; use crate::physical_plan::write::IcebergWriteExec; -/// Represents a [`TableProvider`] for the Iceberg [`Catalog`], -/// managing access to a [`Table`]. +/// Catalog-backed table provider with automatic metadata refresh. +/// +/// This provider loads fresh table metadata from the catalog on every scan and write +/// operation, ensuring you always see the latest table state. Use this when you need +/// write operations or want to see the most up-to-date data. +/// +/// For read-only access to a specific snapshot without catalog overhead, use +/// [`IcebergStaticTableProvider`] instead. #[derive(Debug, Clone)] pub struct IcebergTableProvider { - /// A table in the catalog. - table: Table, - /// Table snapshot id that will be queried via this provider. - snapshot_id: Option, - /// A reference-counted arrow `Schema`. + /// The catalog that manages this table + catalog: Arc, + /// The table identifier (namespace + name) + table_ident: TableIdent, + /// A reference-counted arrow `Schema` (cached at construction) schema: ArrowSchemaRef, - /// The catalog that the table belongs to. - catalog: Option>, } impl IcebergTableProvider { - pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self { - IcebergTableProvider { - table, - snapshot_id: None, - schema, - catalog: None, - } - } - /// Asynchronously tries to construct a new [`IcebergTableProvider`] - /// using the given client and table name to fetch an actual [`Table`] - /// in the provided namespace. + /// Creates a new catalog-backed table provider. + /// + /// Loads the table once to get the initial schema, then stores the catalog + /// reference for future metadata refreshes on each operation. pub(crate) async fn try_new( - client: Arc, + catalog: Arc, namespace: NamespaceIdent, name: impl Into, ) -> Result { - let ident = TableIdent::new(namespace, name.into()); - let table = client.load_table(&ident).await?; + let table_ident = TableIdent::new(namespace, name.into()); + // Load table once to get initial schema + let table = catalog.load_table(&table_ident).await?; let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); Ok(IcebergTableProvider { - table, - snapshot_id: None, - schema, - catalog: Some(client), - }) - } - - /// Asynchronously tries to construct a new [`IcebergTableProvider`] - /// using the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation. - pub async fn try_new_from_table(table: Table) -> Result { - let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); - Ok(IcebergTableProvider { - table, - snapshot_id: None, - schema, - catalog: None, - }) - } - - /// Asynchronously tries to construct a new [`IcebergTableProvider`] - /// using a specific snapshot of the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation. - pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64) -> Result { - let snapshot = table - .metadata() - .snapshot_by_id(snapshot_id) - .ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - format!( - "snapshot id {snapshot_id} not found in table {}", - table.identifier().name() - ), - ) - })?; - let schema = snapshot.schema(table.metadata())?; - let schema = Arc::new(schema_to_arrow_schema(&schema)?); - Ok(IcebergTableProvider { - table, - snapshot_id: Some(snapshot_id), + catalog, + table_ident, schema, - catalog: None, }) } - pub(crate) fn metadata_table(&self, r#type: MetadataTableType) -> IcebergMetadataTableProvider { - IcebergMetadataTableProvider { - table: self.table.clone(), - r#type, - } + pub(crate) async fn metadata_table( + &self, + r#type: MetadataTableType, + ) -> Result { + // Load fresh table metadata for metadata table access + let table = self.catalog.load_table(&self.table_ident).await?; + Ok(IcebergMetadataTableProvider { table, r#type }) } } @@ -154,9 +127,17 @@ impl TableProvider for IcebergTableProvider { filters: &[Expr], _limit: Option, ) -> DFResult> { + // Load fresh table metadata from catalog + let table = self + .catalog + .load_table(&self.table_ident) + .await + .map_err(to_datafusion_error)?; + + // Create scan with fresh metadata (always use current snapshot) Ok(Arc::new(IcebergTableScan::new( - self.table.clone(), - self.snapshot_id, + table, + None, // Always use current snapshot for catalog-backed provider self.schema.clone(), projection, filters, @@ -177,17 +158,18 @@ impl TableProvider for IcebergTableProvider { input: Arc, _insert_op: InsertOp, ) -> DFResult> { - let Some(catalog) = self.catalog.clone() else { - return Err(DataFusionError::Execution( - "Catalog cannot be none for insert_into".to_string(), - )); - }; + // Load fresh table metadata from catalog + let table = self + .catalog + .load_table(&self.table_ident) + .await + .map_err(to_datafusion_error)?; - let partition_spec = self.table.metadata().default_partition_spec(); + let partition_spec = table.metadata().default_partition_spec(); // Step 1: Project partition values for partitioned tables let plan_with_partition = if !partition_spec.is_unpartitioned() { - project_with_partition(input, &self.table)? + project_with_partition(input, &table)? } else { input }; @@ -200,14 +182,11 @@ impl TableProvider for IcebergTableProvider { ) })?; - let repartitioned_plan = repartition( - plan_with_partition, - self.table.metadata_ref(), - target_partitions, - )?; + let repartitioned_plan = + repartition(plan_with_partition, table.metadata_ref(), target_partitions)?; let write_plan = Arc::new(IcebergWriteExec::new( - self.table.clone(), + table.clone(), repartitioned_plan, self.schema.clone(), )); @@ -216,21 +195,139 @@ impl TableProvider for IcebergTableProvider { let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(write_plan)); Ok(Arc::new(IcebergCommitExec::new( - self.table.clone(), - catalog, + table, + self.catalog.clone(), coalesce_partitions, self.schema.clone(), ))) } } +/// Static table provider for read-only snapshot access. +/// +/// This provider holds a cached table instance and does not refresh metadata or support +/// write operations. Use this for consistent analytical queries, time-travel scenarios, +/// or when you want to avoid catalog overhead. +/// +/// For catalog-backed tables with write support and automatic refresh, use +/// [`IcebergTableProvider`] instead. +#[derive(Debug, Clone)] +pub struct IcebergStaticTableProvider { + /// The static table instance (never refreshed) + table: Table, + /// Optional snapshot ID for this static view + snapshot_id: Option, + /// A reference-counted arrow `Schema` + schema: ArrowSchemaRef, +} + +impl IcebergStaticTableProvider { + /// Creates a static provider from a table instance. + /// + /// Uses the table's current snapshot for all queries. Does not support write operations. + pub async fn try_new_from_table(table: Table) -> Result { + let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); + Ok(IcebergStaticTableProvider { + table, + snapshot_id: None, + schema, + }) + } + + /// Creates a static provider for a specific table snapshot. + /// + /// Queries the specified snapshot for all operations. Useful for time-travel queries. + /// Does not support write operations. + pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64) -> Result { + let snapshot = table + .metadata() + .snapshot_by_id(snapshot_id) + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + format!( + "snapshot id {snapshot_id} not found in table {}", + table.identifier().name() + ), + ) + })?; + let table_schema = snapshot.schema(table.metadata())?; + let schema = Arc::new(schema_to_arrow_schema(&table_schema)?); + Ok(IcebergStaticTableProvider { + table, + snapshot_id: Some(snapshot_id), + schema, + }) + } +} + +#[async_trait] +impl TableProvider for IcebergStaticTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> ArrowSchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + _limit: Option, + ) -> DFResult> { + // Use cached table (no refresh) + Ok(Arc::new(IcebergTableScan::new( + self.table.clone(), + self.snapshot_id, + self.schema.clone(), + projection, + filters, + ))) + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> DFResult> { + // Push down all filters, as a single source of truth, the scanner will drop the filters which couldn't be push down + Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) + } + + async fn insert_into( + &self, + _state: &dyn Session, + _input: Arc, + _insert_op: InsertOp, + ) -> DFResult> { + Err(to_datafusion_error(Error::new( + ErrorKind::FeatureUnsupported, + "Write operations are not supported on IcebergStaticTableProvider. \ + Use IcebergTableProvider with a catalog for write support." + .to_string(), + ))) + } +} + #[cfg(test)] mod tests { + use std::collections::HashMap; + use std::sync::Arc; + use datafusion::common::Column; use datafusion::prelude::SessionContext; - use iceberg::TableIdent; use iceberg::io::FileIO; + use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; + use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::table::{StaticTable, Table}; + use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent}; + use tempfile::TempDir; use super::*; @@ -253,10 +350,59 @@ mod tests { static_table.into_table() } + async fn get_test_catalog_and_table() -> (Arc, NamespaceIdent, String, TempDir) { + let temp_dir = TempDir::new().unwrap(); + let warehouse_path = temp_dir.path().to_str().unwrap().to_string(); + + let catalog = MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path.clone())]), + ) + .await + .unwrap(); + + let namespace = NamespaceIdent::new("test_ns".to_string()); + catalog + .create_namespace(&namespace, HashMap::new()) + .await + .unwrap(); + + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + let table_creation = TableCreation::builder() + .name("test_table".to_string()) + .location(format!("{}/test_table", warehouse_path)) + .schema(schema) + .properties(HashMap::new()) + .build(); + + catalog + .create_table(&namespace, table_creation) + .await + .unwrap(); + + ( + Arc::new(catalog), + namespace, + "test_table".to_string(), + temp_dir, + ) + } + + // Tests for IcebergStaticTableProvider + #[tokio::test] - async fn test_try_new_from_table() { + async fn test_static_provider_from_table() { let table = get_test_table_from_metadata_file().await; - let table_provider = IcebergTableProvider::try_new_from_table(table.clone()) + let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone()) .await .unwrap(); let ctx = SessionContext::new(); @@ -278,11 +424,11 @@ mod tests { } #[tokio::test] - async fn test_try_new_from_table_snapshot() { + async fn test_static_provider_from_snapshot() { let table = get_test_table_from_metadata_file().await; let snapshot_id = table.metadata().snapshots().next().unwrap().snapshot_id(); let table_provider = - IcebergTableProvider::try_new_from_table_snapshot(table.clone(), snapshot_id) + IcebergStaticTableProvider::try_new_from_table_snapshot(table.clone(), snapshot_id) .await .unwrap(); let ctx = SessionContext::new(); @@ -304,16 +450,152 @@ mod tests { } #[tokio::test] - async fn test_physical_input_schema_consistent_with_logical_input_schema() { + async fn test_static_provider_rejects_writes() { + let table = get_test_table_from_metadata_file().await; + let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone()) + .await + .unwrap(); + let ctx = SessionContext::new(); + ctx.register_table("mytable", Arc::new(table_provider)) + .unwrap(); + + // Attempt to insert into the static provider should fail + let result = ctx.sql("INSERT INTO mytable VALUES (1, 2, 3)").await; + + // The error should occur during planning or execution + // We expect an error indicating write operations are not supported + assert!( + result.is_err() || { + let df = result.unwrap(); + df.collect().await.is_err() + } + ); + } + + #[tokio::test] + async fn test_static_provider_scan() { let table = get_test_table_from_metadata_file().await; - let table_provider = IcebergTableProvider::try_new_from_table(table.clone()) + let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone()) .await .unwrap(); let ctx = SessionContext::new(); ctx.register_table("mytable", Arc::new(table_provider)) .unwrap(); + + // Test that scan operations work correctly let df = ctx.sql("SELECT count(*) FROM mytable").await.unwrap(); let physical_plan = df.create_physical_plan().await; - assert!(physical_plan.is_ok()) + assert!(physical_plan.is_ok()); + } + + // Tests for IcebergTableProvider + + #[tokio::test] + async fn test_catalog_backed_provider_creation() { + let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; + + // Test creating a catalog-backed provider + let provider = + IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) + .await + .unwrap(); + + // Verify the schema is loaded correctly + let schema = provider.schema(); + assert_eq!(schema.fields().len(), 2); + assert_eq!(schema.field(0).name(), "id"); + assert_eq!(schema.field(1).name(), "name"); + } + + #[tokio::test] + async fn test_catalog_backed_provider_scan() { + let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; + + let provider = + IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) + .await + .unwrap(); + + let ctx = SessionContext::new(); + ctx.register_table("test_table", Arc::new(provider)) + .unwrap(); + + // Test that scan operations work correctly + let df = ctx.sql("SELECT * FROM test_table").await.unwrap(); + + // Verify the schema in the query result + let df_schema = df.schema(); + assert_eq!(df_schema.fields().len(), 2); + assert_eq!(df_schema.field(0).name(), "id"); + assert_eq!(df_schema.field(1).name(), "name"); + + let physical_plan = df.create_physical_plan().await; + assert!(physical_plan.is_ok()); + } + + #[tokio::test] + async fn test_catalog_backed_provider_insert() { + let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; + + let provider = + IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) + .await + .unwrap(); + + let ctx = SessionContext::new(); + ctx.register_table("test_table", Arc::new(provider)) + .unwrap(); + + // Test that insert operations work correctly + let result = ctx.sql("INSERT INTO test_table VALUES (1, 'test')").await; + + // Insert should succeed (or at least not fail during planning) + assert!(result.is_ok()); + + // Try to execute the insert plan + let df = result.unwrap(); + let execution_result = df.collect().await; + + // The execution should succeed + assert!(execution_result.is_ok()); + } + + #[tokio::test] + async fn test_physical_input_schema_consistent_with_logical_input_schema() { + let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; + + let provider = + IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) + .await + .unwrap(); + + let ctx = SessionContext::new(); + ctx.register_table("test_table", Arc::new(provider)) + .unwrap(); + + // Create a query plan + let df = ctx.sql("SELECT id, name FROM test_table").await.unwrap(); + + // Get logical schema before consuming df + let logical_schema = df.schema().clone(); + + // Get physical plan (this consumes df) + let physical_plan = df.create_physical_plan().await.unwrap(); + let physical_schema = physical_plan.schema(); + + // Verify that logical and physical schemas are consistent + assert_eq!( + logical_schema.fields().len(), + physical_schema.fields().len() + ); + + for (logical_field, physical_field) in logical_schema + .fields() + .iter() + .zip(physical_schema.fields().iter()) + { + assert_eq!(logical_field.name(), physical_field.name()); + assert_eq!(logical_field.data_type(), physical_field.data_type()); + } } } diff --git a/crates/integrations/datafusion/src/table/table_provider_factory.rs b/crates/integrations/datafusion/src/table/table_provider_factory.rs index e8e87dd318..8c0c8e90de 100644 --- a/crates/integrations/datafusion/src/table/table_provider_factory.rs +++ b/crates/integrations/datafusion/src/table/table_provider_factory.rs @@ -24,12 +24,11 @@ use datafusion::catalog::{Session, TableProvider, TableProviderFactory}; use datafusion::error::Result as DFResult; use datafusion::logical_expr::CreateExternalTable; use datafusion::sql::TableReference; -use iceberg::arrow::schema_to_arrow_schema; use iceberg::io::FileIO; use iceberg::table::StaticTable; use iceberg::{Error, ErrorKind, Result, TableIdent}; -use super::IcebergTableProvider; +use super::IcebergStaticTableProvider; use crate::to_datafusion_error; /// A factory that implements DataFusion's `TableProviderFactory` to create `IcebergTableProvider` instances. @@ -126,10 +125,11 @@ impl TableProviderFactory for IcebergTableProviderFactory { .map_err(to_datafusion_error)? .into_table(); - let schema = schema_to_arrow_schema(table.metadata().current_schema()) + let provider = IcebergStaticTableProvider::try_new_from_table(table) + .await .map_err(to_datafusion_error)?; - Ok(Arc::new(IcebergTableProvider::new(table, Arc::new(schema)))) + Ok(Arc::new(provider)) } } diff --git a/crates/integrations/datafusion/tests/integration_datafusion_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_test.rs index fdf5b17d18..3ad84f383e 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs @@ -492,10 +492,6 @@ async fn test_insert_into() -> Result<()> { .unwrap(); assert_eq!(rows_inserted.value(0), 2); - // Refresh context to avoid getting stale table - let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?); - ctx.register_catalog("catalog", catalog); - // Query the table to verify the inserted data let df = ctx .sql("SELECT * FROM catalog.test_insert_into.my_table") @@ -650,10 +646,6 @@ async fn test_insert_into_nested() -> Result<()> { .unwrap(); assert_eq!(rows_inserted.value(0), 2); - // Refresh context to avoid getting stale table - let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?); - ctx.register_catalog("catalog", catalog); - // Query the table to verify the inserted data let df = ctx .sql("SELECT * FROM catalog.test_insert_nested.nested_table ORDER BY id") @@ -880,10 +872,6 @@ async fn test_insert_into_partitioned() -> Result<()> { .unwrap(); assert_eq!(rows_inserted.value(0), 5); - // Refresh catalog to get updated table - let catalog = Arc::new(IcebergCatalogProvider::try_new(client.clone()).await?); - ctx.register_catalog("catalog", catalog); - // Query the table to verify data let df = ctx .sql("SELECT * FROM catalog.test_partitioned_write.partitioned_table ORDER BY id")