diff --git a/Cargo.lock b/Cargo.lock index 510be94e802..49748d4a616 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5203,10 +5203,12 @@ dependencies = [ "futures", "google-cloud-auth", "lance", + "lance-arrow", "lance-core", "lance-index", "lance-io", "lance-namespace", + "lance-namespace-reqwest-client", "log", "object_store", "rand 0.9.2", @@ -5228,8 +5230,7 @@ dependencies = [ [[package]] name = "lance-namespace-reqwest-client" version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2acdba67f84190067532fce07b51a435dd390d7cdc1129a05003e5cb3274cf0" +source = "git+https://github.com/wojiaodoubao/lance-namespace?branch=rest-table-properties#a122997e5ed74122686c3a4bd228b839d0d025eb" dependencies = [ "reqwest", "serde", diff --git a/Cargo.toml b/Cargo.toml index fc1ea0b7ae8..dae7660f148 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,7 +63,7 @@ lance-io = { version = "=2.0.0-beta.10", path = "./rust/lance-io", default-featu lance-linalg = { version = "=2.0.0-beta.10", path = "./rust/lance-linalg" } lance-namespace = { version = "=2.0.0-beta.10", path = "./rust/lance-namespace" } lance-namespace-impls = { version = "=2.0.0-beta.10", path = "./rust/lance-namespace-impls" } -lance-namespace-reqwest-client = { version = "=0.4.5" } +lance-namespace-reqwest-client = { git = "https://github.com/wojiaodoubao/lance-namespace", branch = "rest-table-properties" } lance-table = { version = "=2.0.0-beta.10", path = "./rust/lance-table" } lance-test-macros = { version = "=2.0.0-beta.10", path = "./rust/lance-test-macros" } lance-testing = { version = "=2.0.0-beta.10", path = "./rust/lance-testing" } diff --git a/java/lance-jni/Cargo.lock b/java/lance-jni/Cargo.lock index 560913786d3..e3b237c6daf 100644 --- a/java/lance-jni/Cargo.lock +++ b/java/lance-jni/Cargo.lock @@ -3766,10 +3766,12 @@ dependencies = [ "chrono", "futures", "lance", + "lance-arrow", "lance-core", "lance-index", "lance-io", "lance-namespace", + "lance-namespace-reqwest-client", "log", "object_store", "rand 0.9.2", @@ -3786,8 +3788,7 @@ dependencies = [ [[package]] name = "lance-namespace-reqwest-client" version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2acdba67f84190067532fce07b51a435dd390d7cdc1129a05003e5cb3274cf0" +source = "git+https://github.com/wojiaodoubao/lance-namespace?branch=rest-table-properties#a122997e5ed74122686c3a4bd228b839d0d025eb" dependencies = [ "reqwest", "serde", diff --git a/python/Cargo.lock b/python/Cargo.lock index 6971525b3d5..eb084384c9b 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -4289,10 +4289,12 @@ dependencies = [ "chrono", "futures", "lance", + "lance-arrow", "lance-core", "lance-index", "lance-io", "lance-namespace", + "lance-namespace-reqwest-client", "log", "object_store", "rand 0.9.2", @@ -4309,8 +4311,7 @@ dependencies = [ [[package]] name = "lance-namespace-reqwest-client" version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2acdba67f84190067532fce07b51a435dd390d7cdc1129a05003e5cb3274cf0" +source = "git+https://github.com/wojiaodoubao/lance-namespace?branch=rest-table-properties#a122997e5ed74122686c3a4bd228b839d0d025eb" dependencies = [ "reqwest", "serde", diff --git a/rust/lance-namespace-impls/Cargo.toml b/rust/lance-namespace-impls/Cargo.toml index b41e7f44e01..e55a1ec07ba 100644 --- a/rust/lance-namespace-impls/Cargo.toml +++ b/rust/lance-namespace-impls/Cargo.toml @@ -29,6 +29,7 @@ credential-vendor-azure = ["dep:azure_core", "dep:azure_identity", "dep:azure_st [dependencies] lance-namespace.workspace = true lance-core.workspace = true +lance-namespace-reqwest-client.workspace = true # REST implementation dependencies (optional, enabled by "rest" feature) reqwest = { version = "0.12", optional = true, default-features = false, features = [ @@ -82,6 +83,7 @@ azure_identity = { version = "0.21", optional = true } azure_storage = { version = "0.21", optional = true } azure_storage_blobs = { version = "0.21", optional = true } time = { version = "0.3", optional = true } +lance-arrow = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["full"] } diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index 875df33e580..ada92b3a230 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -8,6 +8,7 @@ pub mod manifest; +use crate::context::DynamicContextProvider; use arrow::record_batch::RecordBatchIterator; use arrow_ipc::reader::StreamReader; use async_trait::async_trait; @@ -15,13 +16,7 @@ use bytes::Bytes; use lance::dataset::{Dataset, WriteParams}; use lance::session::Session; use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; -use object_store::path::Path; -use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, PutMode, PutOptions}; -use std::collections::HashMap; -use std::io::Cursor; -use std::sync::Arc; -use crate::context::DynamicContextProvider; use lance_namespace::models::{ CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest, CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DeclareTableRequest, @@ -30,6 +25,11 @@ use lance_namespace::models::{ DropTableRequest, DropTableResponse, Identity, ListNamespacesRequest, ListNamespacesResponse, ListTablesRequest, ListTablesResponse, NamespaceExistsRequest, TableExistsRequest, }; +use object_store::path::Path; +use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, PutMode, PutOptions}; +use std::collections::HashMap; +use std::io::Cursor; +use std::sync::Arc; use lance_core::{box_error, Error, Result}; use lance_namespace::schema::arrow_schema_to_json; diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index 49d19712e26..cb9fd5c373e 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -6,16 +6,24 @@ //! This module provides a namespace implementation that uses a manifest table //! to track tables and nested namespaces. -use arrow::array::{Array, RecordBatch, RecordBatchIterator, StringArray}; +use arrow::array::{ + Array, ArrayRef, BooleanArray, BooleanBuilder, Date32Array, Date32Builder, Date64Array, + Date64Builder, Float32Array, Float32Builder, Float64Array, Float64Builder, Int32Array, + Int32Builder, Int64Array, Int64Builder, LargeStringArray, LargeStringBuilder, RecordBatch, + RecordBatchIterator, StringArray, StringBuilder, UInt32Array, UInt32Builder, UInt64Array, + UInt64Builder, +}; use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use arrow_ipc::reader::StreamReader; +use arrow_schema::{FieldRef, Schema}; use async_trait::async_trait; use bytes::Bytes; use futures::stream::StreamExt; use lance::dataset::optimize::{compact_files, CompactionOptions}; -use lance::dataset::{builder::DatasetBuilder, WriteParams}; +use lance::dataset::{builder::DatasetBuilder, NewColumnTransform, WriteParams}; use lance::session::Session; use lance::{dataset::scanner::Scanner, Dataset}; +use lance_arrow::RecordBatchExt; use lance_core::{box_error, Error, Result}; use lance_index::optimize::OptimizeOptions; use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams}; @@ -37,8 +45,10 @@ use lance_namespace::LanceNamespace; use object_store::path::Path; use snafu::location; use std::io::Cursor; +use std::str::FromStr; use std::{ collections::HashMap, + f32, f64, hash::{DefaultHasher, Hash, Hasher}, ops::{Deref, DerefMut}, sync::Arc, @@ -89,6 +99,7 @@ pub struct TableInfo { pub namespace: Vec, pub name: String, pub location: String, + pub properties: Option>, } /// Information about a namespace stored in the manifest @@ -235,6 +246,19 @@ impl DerefMut for DatasetWriteGuard<'_> { } } +/// Extended properties are special properties started with `lance.manifest.extended.` prefix, and +/// stored in the manifest table. +/// +/// For example, a namespace object contains metadata like: +/// ```json +/// { +/// "user_name": "Alice", +/// "lance.manifest.extended.user_id": "123456" +/// } +/// ``` +/// The first one is stored at column named "metadata", the second is stored at column named "user_id". +static EXTENDED_PREFIX: &str = "lance.manifest.extended."; + /// Manifest-based namespace implementation /// /// Uses a special `__manifest` Lance table to track tables and nested namespaces. @@ -311,6 +335,71 @@ impl ManifestNamespace { } } + /// Add extended properties to the manifest table. + pub async fn add_extended_properties( + &mut self, + properties: &Vec<(&str, DataType)>, + ) -> Result<()> { + let full_schema = self.full_manifest_schema().await?; + let fields: Vec = properties + .iter() + .map(|(name, data_type)| { + if !name.starts_with(EXTENDED_PREFIX) { + return Err(Error::io( + format!( + "Extended properties key {} must start with prefix: {}", + name, EXTENDED_PREFIX + ), + location!(), + )); + } + Ok(Field::new( + name.strip_prefix(EXTENDED_PREFIX).unwrap().to_string(), + data_type.clone(), + true, + )) + }) + .collect::>>()? + .into_iter() + .filter(|f| full_schema.column_with_name(f.name()).is_none()) + .collect(); + + let schema = Schema::new(fields); + let transform = NewColumnTransform::AllNulls(Arc::new(schema)); + + let mut ds = self.manifest_dataset.get_mut().await?; + ds.add_columns(transform, None, None).await?; + + Ok(()) + } + + /// Remove extended properties from the manifest table. + pub async fn remove_extended_properties(&mut self, properties: &Vec<&str>) -> Result<()> { + let full_schema = self.full_manifest_schema().await?; + let to_remove: Vec = properties + .iter() + .map(|name| { + if !name.starts_with(EXTENDED_PREFIX) { + return Err(Error::io( + format!( + "Extended properties key {} must start with prefix: {}", + name, EXTENDED_PREFIX + ), + location!(), + )); + } + Ok(name.strip_prefix(EXTENDED_PREFIX).unwrap().to_string()) + }) + .collect::>>()? + .into_iter() + .filter(|s| full_schema.column_with_name(s.as_str()).is_some()) + .collect(); + let remove: Vec<&str> = to_remove.iter().map(|s| s.as_str()).collect(); + + let mut ds = self.manifest_dataset.get_mut().await?; + ds.drop_columns(&remove).await + } + /// Split an object ID (table_id as vec of strings) into namespace and table name fn split_object_id(table_id: &[String]) -> (Vec, String) { if table_id.len() == 1 { @@ -516,8 +605,8 @@ impl ManifestNamespace { Ok(()) } - /// Get the manifest schema - fn manifest_schema() -> Arc { + /// Get the manifest schema of basic fields: object_id, object_type, location, metadata, base_objects + fn basic_manifest_schema() -> Arc { Arc::new(ArrowSchema::new(vec![ Field::new("object_id", DataType::Utf8, false), Field::new("object_type", DataType::Utf8, false), @@ -531,6 +620,13 @@ impl ManifestNamespace { ])) } + /// Get the full manifest schema, including basic fields and extended fields + async fn full_manifest_schema(&self) -> Result { + let dataset_guard = self.manifest_dataset.get().await?; + let schema = ArrowSchema::from(dataset_guard.schema()); + Ok(schema) + } + /// Get a scanner for the manifest dataset async fn manifest_scanner(&self) -> Result { let dataset_guard = self.manifest_dataset.get().await?; @@ -616,12 +712,6 @@ impl ManifestNamespace { source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))), location: location!(), })?; - scanner - .project(&["object_id", "location"]) - .map_err(|e| Error::IO { - source: box_error(std::io::Error::other(format!("Failed to project: {}", e))), - location: location!(), - })?; let batches = Self::execute_scanner(scanner).await?; let mut found_result: Option = None; @@ -643,6 +733,33 @@ impl ManifestNamespace { )); } + // Parse properties + let mut merged = self.batch_to_extended_props(&batch); + let metadata_array = Self::get_string_column(&batch, "metadata")?; + + if !metadata_array.is_null(0) { + let metadata_str = metadata_array.value(0); + match serde_json::from_str::>(metadata_str) { + Ok(map) => merged.extend(map), + Err(e) => { + return Err(Error::io( + format!( + "Failed to deserialize metadata for table '{}': {}", + object_id, e + ), + location!(), + )); + } + } + } + + let properties = if merged.is_empty() { + None + } else { + Some(merged) + }; + + // Parse object ID and location let object_id_array = Self::get_string_column(&batch, "object_id")?; let location_array = Self::get_string_column(&batch, "location")?; let location = location_array.value(0).to_string(); @@ -651,6 +768,7 @@ impl ManifestNamespace { namespace, name, location, + properties, }); } @@ -694,7 +812,7 @@ impl ManifestNamespace { object_type: ObjectType, location: Option, ) -> Result<()> { - self.insert_into_manifest_with_metadata(object_id, object_type, location, None, None) + self.insert_into_manifest_with_metadata(object_id, object_type, location, None, None, None) .await } @@ -706,10 +824,11 @@ impl ManifestNamespace { location: Option, metadata: Option, base_objects: Option>, + extended_batch: Option, ) -> Result<()> { use arrow::array::builder::{ListBuilder, StringBuilder}; - let schema = Self::manifest_schema(); + let basic_schema = Self::basic_manifest_schema(); // Create base_objects array from the provided list let string_builder = StringBuilder::new(); @@ -745,7 +864,7 @@ impl ManifestNamespace { }; let batch = RecordBatch::try_new( - schema.clone(), + basic_schema.clone(), vec![ Arc::new(StringArray::from(vec![object_id.as_str()])), Arc::new(StringArray::from(vec![object_type.as_str()])), @@ -761,7 +880,15 @@ impl ManifestNamespace { ) })?; - let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + // Merge extended_batch with basic batch if provided + let batch = if let Some(extended_batch) = extended_batch { + batch.merge(&extended_batch)? + } else { + batch + }; + + let schema = batch.schema(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); // Use MergeInsert to ensure uniqueness on object_id let dataset_guard = self.manifest_dataset.get().await?; @@ -892,12 +1019,6 @@ impl ManifestNamespace { source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))), location: location!(), })?; - scanner - .project(&["object_id", "metadata"]) - .map_err(|e| Error::IO { - source: box_error(std::io::Error::other(format!("Failed to project: {}", e))), - location: location!(), - })?; let batches = Self::execute_scanner(scanner).await?; let mut found_result: Option = None; @@ -919,14 +1040,15 @@ impl ManifestNamespace { )); } + let mut metadata = self.batch_to_extended_props(&batch); let object_id_array = Self::get_string_column(&batch, "object_id")?; let metadata_array = Self::get_string_column(&batch, "metadata")?; let object_id_str = object_id_array.value(0); - let metadata = if !metadata_array.is_null(0) { + if !metadata_array.is_null(0) { let metadata_str = metadata_array.value(0); match serde_json::from_str::>(metadata_str) { - Ok(map) => Some(map), + Ok(map) => metadata.extend(map), Err(e) => { return Err(Error::io( format!( @@ -937,8 +1059,12 @@ impl ManifestNamespace { )); } } - } else { + }; + + let metadata = if metadata.is_empty() { None + } else { + Some(metadata) }; let (namespace, name) = Self::parse_object_id(object_id_str); @@ -975,7 +1101,7 @@ impl ManifestNamespace { Ok(DatasetConsistencyWrapper::new(dataset)) } else { log::info!("Creating new manifest table at {}", manifest_path); - let schema = Self::manifest_schema(); + let schema = Self::basic_manifest_schema(); let empty_batch = RecordBatch::new_empty(schema.clone()); let reader = RecordBatchIterator::new(vec![Ok(empty_batch)], schema.clone()); @@ -1011,6 +1137,41 @@ impl ManifestNamespace { Ok(DatasetConsistencyWrapper::new(dataset)) } } + + fn build_metadata_json(properties: &Option>) -> Option { + properties.as_ref().and_then(|props| { + if props.is_empty() { + None + } else { + let meta_props = props + .iter() + .filter(|(key, _)| !key.starts_with(EXTENDED_PREFIX)) + .collect::>(); + Some(serde_json::to_string(&meta_props).ok()?) + } + }) + } + + fn batch_to_extended_props(&self, batch: &RecordBatch) -> HashMap { + let basic_schema = Self::basic_manifest_schema(); + let mut excluded_col: Vec<&str> = vec![]; + for field in basic_schema.fields.iter() { + excluded_col.push(field.name()); + } + batch_to_extended_props(batch, excluded_col) + } + + async fn extended_props_to_batch( + &self, + props: &HashMap, + ) -> Result> { + let mut excluded_col: Vec<&str> = vec![]; + let basic_schema = Self::basic_manifest_schema(); + for field in basic_schema.fields.iter() { + excluded_col.push(field.name()); + } + extended_props_to_batch(props, &self.full_manifest_schema().await?, excluded_col) + } } #[async_trait] @@ -1114,6 +1275,7 @@ impl LanceNamespace for ManifestNamespace { location: Some(table_uri.clone()), table_uri: Some(table_uri), storage_options, + properties: info.properties, ..Default::default() }); } @@ -1139,6 +1301,7 @@ impl LanceNamespace for ManifestNamespace { table_uri: Some(table_uri), schema: Some(Box::new(json_schema)), storage_options, + properties: info.properties, ..Default::default() }) } @@ -1150,6 +1313,7 @@ impl LanceNamespace for ManifestNamespace { location: Some(table_uri.clone()), table_uri: Some(table_uri), storage_options, + properties: info.properties, ..Default::default() }) } @@ -1208,6 +1372,15 @@ impl LanceNamespace for ManifestNamespace { let (namespace, table_name) = Self::split_object_id(table_id); let object_id = Self::build_object_id(&namespace, &table_name); + // Serialize properties and compute extended batch if provided + let metadata = Self::build_metadata_json(&request.properties); + + let extended_batch = if let Some(props) = &request.properties { + self.extended_props_to_batch(props).await? + } else { + None + }; + // Check if table already exists in manifest if self.manifest_contains_object(&object_id).await? { return Err(Error::io( @@ -1270,13 +1443,21 @@ impl LanceNamespace for ManifestNamespace { })?; // Register in manifest (store dir_name, not full URI) - self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name)) - .await?; + self.insert_into_manifest_with_metadata( + object_id, + ObjectType::Table, + Some(dir_name), + metadata, + None, + extended_batch, + ) + .await?; Ok(CreateTableResponse { version: Some(1), location: Some(table_uri), storage_options: self.storage_options.clone(), + properties: request.properties.clone(), ..Default::default() }) } @@ -1448,14 +1629,14 @@ impl LanceNamespace for ManifestNamespace { }); } - // Serialize properties if provided - let metadata = request.properties.as_ref().and_then(|props| { - if props.is_empty() { - None - } else { - Some(serde_json::to_string(props).ok()?) - } - }); + // Serialize properties and compute extended batch if provided + let metadata = Self::build_metadata_json(&request.properties); + + let extended_batch = if let Some(props) = &request.properties { + self.extended_props_to_batch(props).await? + } else { + None + }; self.insert_into_manifest_with_metadata( object_id, @@ -1463,6 +1644,7 @@ impl LanceNamespace for ManifestNamespace { None, metadata, None, + extended_batch, ) .await?; @@ -1574,6 +1756,15 @@ impl LanceNamespace for ManifestNamespace { let (namespace, table_name) = Self::split_object_id(table_id); let object_id = Self::build_object_id(&namespace, &table_name); + // Serialize properties and compute extended batch if provided + let metadata = Self::build_metadata_json(&request.properties); + + let extended_batch = if let Some(props) = &request.properties { + self.extended_props_to_batch(props).await? + } else { + None + }; + // Check if table already exists in manifest let existing = self.query_manifest_for_table(&object_id).await?; if existing.is_some() { @@ -1637,8 +1828,15 @@ impl LanceNamespace for ManifestNamespace { })?; // Add entry to manifest marking this as an empty table (store dir_name, not full path) - self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name)) - .await?; + self.insert_into_manifest_with_metadata( + object_id, + ObjectType::Table, + Some(dir_name), + metadata, + None, + extended_batch, + ) + .await?; log::info!( "Created empty table '{}' in manifest at {}", @@ -1657,6 +1855,7 @@ impl LanceNamespace for ManifestNamespace { Ok(CreateEmptyTableResponse { location: Some(table_uri), storage_options, + properties: request.properties, ..Default::default() }) } @@ -1686,6 +1885,15 @@ impl LanceNamespace for ManifestNamespace { }); } + // Serialize properties and compute extended batch if provided + let metadata = Self::build_metadata_json(&request.properties); + + let extended_batch = if let Some(props) = &request.properties { + self.extended_props_to_batch(props).await? + } else { + None + }; + // Create table location path with hash-based naming // When dir_listing_enabled is true and it's a root table, use directory-style naming: {table_name}.lance // Otherwise, use hash-based naming: {hash}_{object_id} @@ -1740,8 +1948,15 @@ impl LanceNamespace for ManifestNamespace { })?; // Add entry to manifest marking this as a declared table (store dir_name, not full path) - self.insert_into_manifest(object_id, ObjectType::Table, Some(dir_name)) - .await?; + self.insert_into_manifest_with_metadata( + object_id, + ObjectType::Table, + Some(dir_name), + metadata, + None, + extended_batch, + ) + .await?; log::info!( "Declared table '{}' in manifest at {}", @@ -1760,6 +1975,7 @@ impl LanceNamespace for ManifestNamespace { Ok(DeclareTableResponse { location: Some(table_uri), storage_options, + properties: request.properties.clone(), ..Default::default() }) } @@ -1828,12 +2044,29 @@ impl LanceNamespace for ManifestNamespace { }); } + // Serialize properties and compute extended batch if provided + let metadata = Self::build_metadata_json(&request.properties); + + let extended_batch = if let Some(props) = &request.properties { + self.extended_props_to_batch(props).await? + } else { + None + }; + // Register the table with its location in the manifest - self.insert_into_manifest(object_id, ObjectType::Table, Some(location.clone())) - .await?; + self.insert_into_manifest_with_metadata( + object_id, + ObjectType::Table, + Some(location.clone()), + metadata, + None, + extended_batch, + ) + .await?; Ok(RegisterTableResponse { location: Some(location), + properties: request.properties.clone(), ..Default::default() }) } @@ -1882,17 +2115,219 @@ impl LanceNamespace for ManifestNamespace { } } +/// Parse the first row of a RecordBatch into a HashMap, excluding specified columns. +fn batch_to_extended_props(batch: &RecordBatch, excluded: Vec<&str>) -> HashMap { + let mut result = HashMap::new(); + + if batch.num_rows() == 0 { + return result; + } + + for (i, field) in batch.schema().fields().iter().enumerate() { + let col_name = field.name().to_string(); + if excluded.contains(&col_name.as_str()) { + continue; + } + + let array = batch.column(i); + + if array.is_null(0) { + // skip null properties. + continue; + } + + let value_str = match field.data_type() { + DataType::Utf8 => { + let str_array = array.as_any().downcast_ref::().unwrap(); + str_array.value(0).to_string() + } + DataType::LargeUtf8 => { + let str_array = array.as_any().downcast_ref::().unwrap(); + str_array.value(0).to_string() + } + DataType::Boolean => { + let bool_array = array.as_any().downcast_ref::().unwrap(); + bool_array.value(0).to_string() + } + DataType::Int32 => { + let int_array = array.as_any().downcast_ref::().unwrap(); + int_array.value(0).to_string() + } + DataType::Int64 => { + let int_array = array.as_any().downcast_ref::().unwrap(); + int_array.value(0).to_string() + } + DataType::UInt32 => { + let int_array = array.as_any().downcast_ref::().unwrap(); + int_array.value(0).to_string() + } + DataType::UInt64 => { + let int_array = array.as_any().downcast_ref::().unwrap(); + int_array.value(0).to_string() + } + DataType::Float32 => { + let float_array = array.as_any().downcast_ref::().unwrap(); + float_array.value(0).to_string() + } + DataType::Float64 => { + let float_array = array.as_any().downcast_ref::().unwrap(); + float_array.value(0).to_string() + } + DataType::Date32 => { + let date_array = array.as_any().downcast_ref::().unwrap(); + date_array.value(0).to_string() + } + DataType::Date64 => { + let date_array = array.as_any().downcast_ref::().unwrap(); + date_array.value(0).to_string() + } + _ => format!("Unsupported type: {:?}", field.data_type()), + }; + + result.insert(format!("{}{}", EXTENDED_PREFIX, col_name), value_str); + } + + result +} + +/// Convert a HashMap into a RecordBatch, excluding specified columns. +fn extended_props_to_batch( + map: &HashMap, + schema: &Schema, + excluded: Vec<&str>, +) -> Result> { + // All extended properties must be covered in schema. + for k in map.keys() { + if let Some(col_name) = k.strip_prefix(EXTENDED_PREFIX) { + if !excluded.contains(&col_name) && schema.column_with_name(col_name).is_none() { + return Err(Error::InvalidInput { + source: format!("Column {} does not exist in extended properties", col_name) + .into(), + location: location!(), + }); + } + } + } + + // Construct record batch + let mut array: Vec = vec![]; + let mut fields: Vec = vec![]; + for field in schema + .fields() + .iter() + .filter(|field| !excluded.contains(&field.name().as_str())) + { + let field_name = field.name().as_str(); + + match map.get(&format!("{}{}", EXTENDED_PREFIX, field_name)) { + Some(value) if value != "null" && !value.is_empty() => match field.data_type() { + DataType::Utf8 => { + let mut builder = StringBuilder::new(); + builder.append_value(value); + let v = builder.finish(); + array.push(Arc::new(v)); + fields.push(field.clone()); + } + DataType::LargeUtf8 => { + let mut builder = LargeStringBuilder::new(); + builder.append_value(value); + let v = builder.finish(); + array.push(Arc::new(v)); + fields.push(field.clone()); + } + DataType::Boolean => { + let mut builder = BooleanBuilder::new(); + builder.append_value(bool::from_str(value).unwrap()); + let v = builder.finish(); + array.push(Arc::new(v)); + fields.push(field.clone()); + } + DataType::Int32 => { + let mut builder = Int32Builder::new(); + builder.append_value(i32::from_str(value).unwrap()); + let v = builder.finish(); + array.push(Arc::new(v)); + fields.push(field.clone()); + } + DataType::Int64 => { + let mut builder = Int64Builder::new(); + builder.append_value(i64::from_str(value).unwrap()); + let v = builder.finish(); + array.push(Arc::new(v)); + fields.push(field.clone()); + } + DataType::UInt32 => { + let mut builder = UInt32Builder::new(); + builder.append_value(u32::from_str(value).unwrap()); + let v = builder.finish(); + array.push(Arc::new(v)); + fields.push(field.clone()); + } + DataType::UInt64 => { + let mut builder = UInt64Builder::new(); + builder.append_value(u64::from_str(value).unwrap()); + let v = builder.finish(); + array.push(Arc::new(v)); + fields.push(field.clone()); + } + DataType::Float32 => { + let mut builder = Float32Builder::new(); + builder.append_value(f32::from_str(value).unwrap()); + let v = builder.finish(); + array.push(Arc::new(v)); + fields.push(field.clone()); + } + DataType::Float64 => { + let mut builder = Float64Builder::new(); + builder.append_value(f64::from_str(value).unwrap()); + let v = builder.finish(); + array.push(Arc::new(v)); + fields.push(field.clone()); + } + DataType::Date32 => { + let mut builder = Date32Builder::new(); + builder.append_value(i32::from_str(value).unwrap()); + let v = builder.finish(); + array.push(Arc::new(v)); + fields.push(field.clone()); + } + DataType::Date64 => { + let mut builder = Date64Builder::new(); + builder.append_value(i64::from_str(value).unwrap()); + let v = builder.finish(); + array.push(Arc::new(v)); + fields.push(field.clone()); + } + _ => panic!("Unsupported data type: {:?}", field.data_type()), + }, + _ => {} + } + } + + if fields.is_empty() { + return Ok(None); + } + + let schema = Schema::new(fields); + Ok(Some(RecordBatch::try_new(Arc::new(schema), array)?)) +} + #[cfg(test)] mod tests { use crate::{DirectoryNamespaceBuilder, ManifestNamespace}; + use arrow_schema::DataType; use bytes::Bytes; use lance_core::utils::tempfile::TempStdDir; + use lance_io::object_store::ObjectStore; use lance_namespace::models::{ - CreateTableRequest, DescribeTableRequest, DropTableRequest, ListTablesRequest, - TableExistsRequest, + CreateEmptyTableRequest, CreateNamespaceRequest, CreateTableRequest, + DescribeNamespaceRequest, DescribeTableRequest, DropTableRequest, ListTablesRequest, + RegisterTableRequest, TableExistsRequest, }; use lance_namespace::LanceNamespace; + use lance_namespace_reqwest_client::models::DeclareTableRequest; use rstest::rstest; + use std::collections::HashMap; fn create_test_ipc_data() -> Vec { use arrow::array::{Int32Array, StringArray}; @@ -2570,6 +3005,666 @@ mod tests { ); } + #[rstest] + #[case::with_optimization(true)] + #[case::without_optimization(false)] + #[tokio::test] + async fn test_add_extended_properties_creates_columns_and_idempotent( + #[case] inline_optimization: bool, + ) { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let mut manifest_ns = + create_manifest_namespace_for_test(temp_path, inline_optimization).await; + let schema = manifest_ns.full_manifest_schema().await.unwrap(); + assert_eq!( + ManifestNamespace::basic_manifest_schema().fields().len(), + schema.fields().len() + ); + + // Adding extended properties should create new columns + manifest_ns + .add_extended_properties(&vec![ + ("lance.manifest.extended.user_id", DataType::Utf8), + ("lance.manifest.extended.score", DataType::Int32), + ]) + .await + .unwrap(); + + let schema = manifest_ns.full_manifest_schema().await.unwrap(); + let user_field = schema.field_with_name("user_id").unwrap(); + assert_eq!(user_field.data_type(), &DataType::Utf8); + let score_field = schema.field_with_name("score").unwrap(); + assert_eq!(score_field.data_type(), &DataType::Int32); + let initial_field_count = schema.fields().len(); + + // Adding the same properties again should be a no-op + manifest_ns + .add_extended_properties(&vec![ + ("lance.manifest.extended.user_id", DataType::Utf8), + ("lance.manifest.extended.score", DataType::Int32), + ]) + .await + .unwrap(); + let schema_after = manifest_ns.full_manifest_schema().await.unwrap(); + assert_eq!(schema_after.fields().len(), initial_field_count); + } + + #[rstest] + #[case::with_optimization(true)] + #[case::without_optimization(false)] + #[tokio::test] + async fn test_add_extended_properties_rejects_missing_prefix( + #[case] inline_optimization: bool, + ) { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let mut manifest_ns = + create_manifest_namespace_for_test(temp_path, inline_optimization).await; + let result = manifest_ns + .add_extended_properties(&vec![("invalid_key", DataType::Utf8)]) + .await; + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!(err.contains("must start with prefix")); + } + + #[rstest] + #[case::with_optimization(true)] + #[case::without_optimization(false)] + #[tokio::test] + async fn test_remove_extended_properties_drops_specified_columns( + #[case] inline_optimization: bool, + ) { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let mut manifest_ns = + create_manifest_namespace_for_test(temp_path, inline_optimization).await; + manifest_ns + .add_extended_properties(&vec![ + ("lance.manifest.extended.user_id", DataType::Utf8), + ("lance.manifest.extended.group", DataType::Utf8), + ]) + .await + .unwrap(); + let schema = manifest_ns.full_manifest_schema().await.unwrap(); + assert!(schema.field_with_name("user_id").is_ok()); + assert!(schema.field_with_name("group").is_ok()); + + manifest_ns + .remove_extended_properties(&vec!["lance.manifest.extended.user_id"]) + .await + .unwrap(); + let schema_after = manifest_ns.full_manifest_schema().await.unwrap(); + assert!(schema_after.field_with_name("user_id").is_err()); + assert!(schema_after.field_with_name("group").is_ok()); + + // Remove non-existent property should be a no-op + manifest_ns + .remove_extended_properties(&vec!["lance.manifest.extended.user_id"]) + .await + .unwrap(); + let schema_after = manifest_ns.full_manifest_schema().await.unwrap(); + assert!(schema_after.field_with_name("user_id").is_err()); + assert!(schema_after.field_with_name("group").is_ok()); + } + + #[rstest] + #[case::with_optimization(true)] + #[case::without_optimization(false)] + #[tokio::test] + async fn test_remove_extended_properties_rejects_missing_prefix( + #[case] inline_optimization: bool, + ) { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let mut manifest_ns = + create_manifest_namespace_for_test(temp_path, inline_optimization).await; + let result = manifest_ns + .remove_extended_properties(&vec!["user_id"]) + .await; + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!(err.contains("must start with prefix")); + } + + #[rstest] + #[case::with_optimization(true)] + #[case::without_optimization(false)] + #[tokio::test] + async fn test_create_namespace_with_extended_properties_without_columns_fails( + #[case] inline_optimization: bool, + ) { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let namespace = DirectoryNamespaceBuilder::new(temp_path) + .inline_optimization_enabled(inline_optimization) + .build() + .await + .unwrap(); + + let mut properties = std::collections::HashMap::new(); + properties.insert( + "lance.manifest.extended.user_id".to_string(), + "123".to_string(), + ); + + let mut create_req = CreateNamespaceRequest::new(); + create_req.id = Some(vec!["ns1".to_string()]); + create_req.properties = Some(properties); + + let result = namespace.create_namespace(create_req).await; + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!(err.contains("Column user_id does not exist in extended properties")); + } + + #[rstest] + #[case::with_optimization(true)] + #[case::without_optimization(false)] + #[tokio::test] + async fn test_create_namespace_with_extended_properties_succeeds_and_describe_unified( + #[case] inline_optimization: bool, + ) { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let mut manifest_ns = + create_manifest_namespace_for_test(temp_path, inline_optimization).await; + manifest_ns + .add_extended_properties(&vec![("lance.manifest.extended.user_id", DataType::Utf8)]) + .await + .unwrap(); + + let mut properties = std::collections::HashMap::new(); + properties.insert("owner".to_string(), "alice".to_string()); + properties.insert( + "lance.manifest.extended.user_id".to_string(), + "123".to_string(), + ); + let mut create_req = CreateNamespaceRequest::new(); + create_req.id = Some(vec!["ns1".to_string()]); + create_req.properties = Some(properties); + manifest_ns.create_namespace(create_req).await.unwrap(); + + let describe_req = DescribeNamespaceRequest { + id: Some(vec!["ns1".to_string()]), + ..Default::default() + }; + let response = manifest_ns.describe_namespace(describe_req).await.unwrap(); + let props = response.properties.expect("properties should be present"); + assert_eq!(props.get("owner"), Some(&"alice".to_string())); + assert_eq!( + props.get("lance.manifest.extended.user_id"), + Some(&"123".to_string()) + ); + } + + #[rstest] + #[case::with_optimization(true)] + #[case::without_optimization(false)] + #[tokio::test] + async fn test_extended_properties_null_and_empty_values_omitted( + #[case] inline_optimization: bool, + ) { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let mut manifest_ns = + create_manifest_namespace_for_test(temp_path, inline_optimization).await; + manifest_ns + .add_extended_properties(&vec![ + ("lance.manifest.extended.null_prop", DataType::Utf8), + ("lance.manifest.extended.empty_prop", DataType::Utf8), + ("lance.manifest.extended.non_existed", DataType::Utf8), + ("lance.manifest.extended.valid_prop", DataType::Utf8), + ]) + .await + .unwrap(); + + let mut properties = std::collections::HashMap::new(); + properties.insert("owner".to_string(), "alice".to_string()); + properties.insert( + "lance.manifest.extended.null_prop".to_string(), + "null".to_string(), + ); + properties.insert( + "lance.manifest.extended.empty_prop".to_string(), + "".to_string(), + ); + properties.insert( + "lance.manifest.extended.valid_prop".to_string(), + "42".to_string(), + ); + let mut create_req = CreateNamespaceRequest::new(); + create_req.id = Some(vec!["ns1".to_string()]); + create_req.properties = Some(properties); + manifest_ns.create_namespace(create_req).await.unwrap(); + + let describe_req = DescribeNamespaceRequest { + id: Some(vec!["ns1".to_string()]), + ..Default::default() + }; + let response = manifest_ns.describe_namespace(describe_req).await.unwrap(); + let props = response.properties.expect("properties should be present"); + + assert_eq!(props.get("owner"), Some(&"alice".to_string())); + assert_eq!( + props.get("lance.manifest.extended.valid_prop"), + Some(&"42".to_string()) + ); + assert!(!props.contains_key("lance.manifest.extended.null_prop")); + assert!(!props.contains_key("lance.manifest.extended.empty_prop")); + assert!(!props.contains_key("lance.manifest.extended.non_existed")); + } + + async fn create_manifest_namespace_for_test( + root: &str, + inline_optimization: bool, + ) -> ManifestNamespace { + let (object_store, base_path) = ObjectStore::from_uri(root).await.unwrap(); + ManifestNamespace::from_directory( + root.to_string(), + None, + None, + object_store, + base_path, + true, + inline_optimization, + ) + .await + .unwrap() + } + + #[rstest] + #[case::with_optimization(true)] + #[case::without_optimization(false)] + #[tokio::test] + async fn test_create_table_with_properties_persisted(#[case] inline_optimization: bool) { + let (_temp_dir, manifest_ns, properties) = + create_manifest_and_persist_properties(inline_optimization).await; + + let buffer = create_test_ipc_data(); + let mut create_req = CreateTableRequest::new(); + create_req.id = Some(vec!["test_table".to_string()]); + create_req.properties = Some(properties); + + manifest_ns + .create_table(create_req, Bytes::from(buffer)) + .await + .unwrap(); + verify_persist_properties(&manifest_ns, "test_table").await; + } + + #[rstest] + #[case::with_optimization(true)] + #[case::without_optimization(false)] + #[tokio::test] + async fn test_declare_table_with_properties_persisted(#[case] inline_optimization: bool) { + let (_temp_dir, manifest_ns, properties) = + create_manifest_and_persist_properties(inline_optimization).await; + + let mut declare_req = DeclareTableRequest::new(); + declare_req.id = Some(vec!["test_table".to_string()]); + declare_req.properties = Some(properties); + + manifest_ns.declare_table(declare_req).await.unwrap(); + verify_persist_properties(&manifest_ns, "test_table").await; + } + + #[rstest] + #[case::with_optimization(true)] + #[case::without_optimization(false)] + #[tokio::test] + async fn test_register_table_with_properties_persisted(#[case] inline_optimization: bool) { + let (_temp_dir, manifest_ns, properties) = + create_manifest_and_persist_properties(inline_optimization).await; + + let mut register_req = RegisterTableRequest::new("registered_table.lance".to_string()); + register_req.id = Some(vec!["registered_table".to_string()]); + register_req.properties = Some(properties); + + LanceNamespace::register_table(&manifest_ns, register_req) + .await + .unwrap(); + verify_persist_properties(&manifest_ns, "registered_table").await; + } + + #[rstest] + #[case::with_optimization(true)] + #[case::without_optimization(false)] + #[tokio::test] + async fn test_create_empty_table_persist_properties(#[case] inline_optimization: bool) { + let (_temp_dir, manifest_ns, properties) = + create_manifest_and_persist_properties(inline_optimization).await; + + let mut request = CreateEmptyTableRequest::new(); + request.id = Some(vec!["empty_table".to_string()]); + request.properties = Some(properties); + + #[allow(deprecated)] + manifest_ns.create_empty_table(request).await.unwrap(); + verify_persist_properties(&manifest_ns, "empty_table").await; + } + + async fn create_manifest_and_persist_properties( + inline_optimization: bool, + ) -> (TempStdDir, ManifestNamespace, HashMap) { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let mut manifest_ns = + create_manifest_namespace_for_test(temp_path, inline_optimization).await; + + manifest_ns + .add_extended_properties(&vec![ + ("lance.manifest.extended.user_id", DataType::Utf8), + ("lance.manifest.extended.score", DataType::Int32), + ]) + .await + .unwrap(); + + let properties = std::collections::HashMap::from([ + ("owner".to_string(), "alice".to_string()), + ( + "lance.manifest.extended.user_id".to_string(), + "123".to_string(), + ), + ( + "lance.manifest.extended.score".to_string(), + "42".to_string(), + ), + ]); + + (temp_dir, manifest_ns, properties) + } + + async fn verify_persist_properties(manifest_ns: &ManifestNamespace, table: &str) { + let object_id = ManifestNamespace::build_object_id(&[], table); + let mut scanner = manifest_ns.manifest_scanner().await.unwrap(); + let filter = format!("object_id = '{}'", object_id); + scanner.filter(&filter).unwrap(); + scanner.project(&["metadata", "user_id", "score"]).unwrap(); + let batches = ManifestNamespace::execute_scanner(scanner).await.unwrap(); + + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 1); + + let metadata_array = ManifestNamespace::get_string_column(batch, "metadata").unwrap(); + let metadata_str = metadata_array.value(0); + let metadata_map: std::collections::HashMap = + serde_json::from_str(metadata_str).unwrap(); + assert_eq!(metadata_map.get("owner"), Some(&"alice".to_string())); + + let user_id_array = ManifestNamespace::get_string_column(batch, "user_id").unwrap(); + assert_eq!(user_id_array.value(0), "123"); + + let score_array = batch + .column_by_name("score") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(score_array.value(0), 42); + } + + #[rstest] + #[case::with_optimization(true)] + #[case::without_optimization(false)] + #[tokio::test] + async fn test_create_table_with_extended_properties_without_columns_fails( + #[case] inline_optimization: bool, + ) { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let manifest_ns = create_manifest_namespace_for_test(temp_path, inline_optimization).await; + + let mut properties = std::collections::HashMap::new(); + properties.insert( + "lance.manifest.extended.user_id".to_string(), + "123".to_string(), + ); + + let buffer = create_test_ipc_data(); + let mut create_req = CreateTableRequest::new(); + create_req.id = Some(vec!["test_table".to_string()]); + create_req.properties = Some(properties); + + let result = manifest_ns + .create_table(create_req, Bytes::from(buffer)) + .await; + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!(err.contains("Column user_id does not exist in extended properties")); + } + + #[rstest] + #[case::with_optimization(true)] + #[case::without_optimization(false)] + #[tokio::test] + async fn test_declare_table_with_extended_properties_without_columns_fails( + #[case] inline_optimization: bool, + ) { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let manifest_ns = create_manifest_namespace_for_test(temp_path, inline_optimization).await; + + let mut properties = std::collections::HashMap::new(); + properties.insert( + "lance.manifest.extended.user_id".to_string(), + "123".to_string(), + ); + + let mut declare_req = DeclareTableRequest::new(); + declare_req.id = Some(vec!["test_table".to_string()]); + declare_req.properties = Some(properties); + + let result = manifest_ns.declare_table(declare_req).await; + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!(err.contains("Column user_id does not exist in extended properties")); + } + + #[rstest] + #[case::with_optimization(true)] + #[case::without_optimization(false)] + #[tokio::test] + async fn test_register_table_with_extended_properties_without_columns_fails( + #[case] inline_optimization: bool, + ) { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let manifest_ns = create_manifest_namespace_for_test(temp_path, inline_optimization).await; + + let mut properties = std::collections::HashMap::new(); + properties.insert( + "lance.manifest.extended.user_id".to_string(), + "123".to_string(), + ); + + let mut register_req = RegisterTableRequest::new("registered_table.lance".to_string()); + register_req.id = Some(vec!["registered_table".to_string()]); + register_req.properties = Some(properties); + + let result = LanceNamespace::register_table(&manifest_ns, register_req).await; + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!(err.contains("Column user_id does not exist in extended properties")); + } + + #[rstest] + #[case::with_optimization(true)] + #[case::without_optimization(false)] + #[tokio::test] + async fn test_create_table_extended_properties_null_and_empty_values_omitted( + #[case] inline_optimization: bool, + ) { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let mut manifest_ns = + create_manifest_namespace_for_test(temp_path, inline_optimization).await; + manifest_ns + .add_extended_properties(&vec![ + ("lance.manifest.extended.null_prop", DataType::Utf8), + ("lance.manifest.extended.empty_prop", DataType::Utf8), + ("lance.manifest.extended.valid_prop", DataType::Utf8), + ("lance.manifest.extended.non_existed_prop", DataType::Utf8), + ]) + .await + .unwrap(); + + let mut properties = std::collections::HashMap::new(); + properties.insert("owner".to_string(), "alice".to_string()); + properties.insert( + "lance.manifest.extended.null_prop".to_string(), + "null".to_string(), + ); + properties.insert( + "lance.manifest.extended.empty_prop".to_string(), + "".to_string(), + ); + properties.insert( + "lance.manifest.extended.valid_prop".to_string(), + "42".to_string(), + ); + + let buffer = create_test_ipc_data(); + let mut create_req = CreateTableRequest::new(); + create_req.id = Some(vec!["test_table".to_string()]); + create_req.properties = Some(properties); + manifest_ns + .create_table(create_req, Bytes::from(buffer)) + .await + .unwrap(); + + let object_id = ManifestNamespace::build_object_id(&[], "test_table"); + + let mut scanner = manifest_ns.manifest_scanner().await.unwrap(); + let filter = format!("object_id = '{}'", object_id); + scanner.filter(&filter).unwrap(); + let batches = ManifestNamespace::execute_scanner(scanner).await.unwrap(); + + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 1); + + let extended_props = manifest_ns.batch_to_extended_props(batch); + + assert_eq!( + extended_props.get("lance.manifest.extended.valid_prop"), + Some(&"42".to_string()) + ); + assert!(!extended_props.contains_key("lance.manifest.extended.null_prop")); + assert!(!extended_props.contains_key("lance.manifest.extended.empty_prop")); + assert!(!extended_props.contains_key("lance.manifest.extended.non_existed_prop")); + } + + #[tokio::test] + async fn test_describe_table_unifies_properties() { + let (_temp_dir, manifest_ns, base_properties) = prepare_properties_env().await; + + // create_table scenario + let buffer = create_test_ipc_data(); + let mut create_req = CreateTableRequest::new(); + create_req.id = Some(vec!["created_table".to_string()]); + create_req.properties = Some(base_properties.clone()); + manifest_ns + .create_table(create_req, Bytes::from(buffer)) + .await + .unwrap(); + + verify_describe_table_props(&manifest_ns, "created_table", Some(true), &base_properties) + .await; + verify_describe_table_props(&manifest_ns, "created_table", Some(false), &base_properties) + .await; + + // declare_table scenario + let mut declare_req = DeclareTableRequest::new(); + declare_req.id = Some(vec!["declared_table".to_string()]); + declare_req.properties = Some(base_properties.clone()); + manifest_ns.declare_table(declare_req).await.unwrap(); + + verify_describe_table_props(&manifest_ns, "declared_table", Some(true), &base_properties) + .await; + verify_describe_table_props( + &manifest_ns, + "declared_table", + Some(false), + &base_properties, + ) + .await; + + // register_table scenario + let mut register_req = RegisterTableRequest::new("registered_table.lance".to_string()); + register_req.id = Some(vec!["registered_table".to_string()]); + register_req.properties = Some(base_properties.clone()); + LanceNamespace::register_table(&manifest_ns, register_req) + .await + .unwrap(); + + verify_describe_table_props( + &manifest_ns, + "registered_table", + Some(true), + &base_properties, + ) + .await; + verify_describe_table_props( + &manifest_ns, + "registered_table", + Some(false), + &base_properties, + ) + .await; + } + + async fn prepare_properties_env() -> ( + TempStdDir, + ManifestNamespace, + std::collections::HashMap, + ) { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let mut manifest_ns = create_manifest_namespace_for_test(temp_path, true).await; + manifest_ns + .add_extended_properties(&vec![ + ("lance.manifest.extended.user_id", DataType::Utf8), + ("lance.manifest.extended.score", DataType::Int32), + ]) + .await + .unwrap(); + + // prepare base properties + let mut base_properties = std::collections::HashMap::new(); + base_properties.insert("owner".to_string(), "alice".to_string()); + base_properties.insert( + "lance.manifest.extended.user_id".to_string(), + "123".to_string(), + ); + base_properties.insert( + "lance.manifest.extended.score".to_string(), + "42".to_string(), + ); + + (temp_dir, manifest_ns, base_properties) + } + + async fn verify_describe_table_props( + manifest_ns: &ManifestNamespace, + table_name: &str, + load_detailed_metadata: Option, + base_properties: &std::collections::HashMap, + ) { + let req = DescribeTableRequest { + id: Some(vec![table_name.to_string()]), + load_detailed_metadata, + ..Default::default() + }; + let response = manifest_ns.describe_table(req).await.unwrap(); + let props = response.properties.expect("properties should be present"); + for (k, v) in base_properties.iter() { + assert_eq!(props.get(k), Some(v)); + } + } + #[test] fn test_construct_full_uri_with_cloud_urls() { // Test S3-style URL with nested path (no trailing slash) diff --git a/rust/lance-namespace-impls/src/rest_adapter.rs b/rust/lance-namespace-impls/src/rest_adapter.rs index b63331c8a66..a190a1bcf81 100644 --- a/rust/lance-namespace-impls/src/rest_adapter.rs +++ b/rust/lance-namespace-impls/src/rest_adapter.rs @@ -2872,6 +2872,7 @@ mod tests { mode: Some("create".to_string()), identity: None, context: None, + properties: None, }; let result = namespace.create_table(create_table_req, table_data).await; assert!(result.is_ok(), "Failed to create table: {:?}", result); diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index e5e71887147..026b9d0da9f 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -845,6 +845,7 @@ impl Dataset { transaction_id: fallback_resp.transaction_id, location: fallback_resp.location, storage_options: fallback_resp.storage_options, + properties: fallback_resp.properties, } } Err(e) => {