diff --git a/java/lance-jni/Cargo.lock b/java/lance-jni/Cargo.lock index 2df1aada479..38894e6b2c6 100644 --- a/java/lance-jni/Cargo.lock +++ b/java/lance-jni/Cargo.lock @@ -4118,6 +4118,7 @@ dependencies = [ "roaring", "serde", "serde_json", + "smallvec", "snafu", "tantivy", "tempfile", diff --git a/python/Cargo.lock b/python/Cargo.lock index 7b73a36aa6a..555308d942e 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -4557,6 +4557,7 @@ dependencies = [ "roaring", "serde", "serde_json", + "smallvec", "snafu", "tantivy", "tempfile", diff --git a/rust/lance-namespace-impls/Cargo.toml b/rust/lance-namespace-impls/Cargo.toml index 85ee4a6989f..b1df32a0c31 100644 --- a/rust/lance-namespace-impls/Cargo.toml +++ b/rust/lance-namespace-impls/Cargo.toml @@ -14,7 +14,7 @@ rust-version.workspace = true [features] default = ["dir-aws", "dir-azure", "dir-gcp", "dir-oss", "dir-huggingface"] rest = ["dep:reqwest"] -rest-adapter = ["dep:axum", "dep:tower", "dep:tower-http", "dep:serde"] +rest-adapter = ["dep:axum", "dep:tower", "dep:tower-http"] # Cloud storage features for directory implementation - align with lance-io dir-gcp = ["lance-io/gcp", "lance/gcp"] dir-aws = ["lance-io/aws", "lance/aws"] @@ -23,7 +23,7 @@ dir-oss = ["lance-io/oss", "lance/oss"] dir-huggingface = ["lance-io/huggingface", "lance/huggingface"] # Credential vending features credential-vendor-aws = ["dep:aws-sdk-sts", "dep:aws-config", "dep:aws-credential-types", "dep:sha2", "dep:base64"] -credential-vendor-gcp = ["dep:google-cloud-auth", "dep:reqwest", "dep:serde", "dep:sha2", "dep:base64"] +credential-vendor-gcp = ["dep:google-cloud-auth", "dep:reqwest", "dep:sha2", "dep:base64"] credential-vendor-azure = ["dep:azure_core", "dep:azure_identity", "dep:azure_storage", "dep:azure_storage_blobs", "dep:time", "dep:sha2", "dep:base64", "dep:reqwest"] [dependencies] @@ -53,7 +53,7 @@ arrow-schema = { workspace = true } axum = { workspace = true, optional = true } tower = { workspace = true, optional = true } tower-http = { workspace = true, optional = true, features = ["trace", "cors", "normalize-path"] } -serde = { workspace = true, optional = true } +serde = { workspace = true } # Common dependencies async-trait.workspace = true diff --git a/rust/lance-namespace-impls/src/json.rs b/rust/lance-namespace-impls/src/json.rs new file mode 100644 index 00000000000..9cee068feab --- /dev/null +++ b/rust/lance-namespace-impls/src/json.rs @@ -0,0 +1,25 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use lance_namespace::models::JsonArrowDataType; +use serde::{Deserialize, Serialize}; + +// TODO: remove this after https://github.com/lance-format/lance-namespace/pull/297 is merged +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JsonPartitionField { + field_id: String, + source_ids: Vec, + transform: JsonTransform, + expression: String, + result_type: JsonArrowDataType, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JsonTransform { + #[serde(rename = "type")] + r#type: String, // The transform type + #[serde(rename = "num_buckets", skip_serializing_if = "Option::is_none")] + num_buckets: Option, // Number of buckets N + #[serde(rename = "width", skip_serializing_if = "Option::is_none")] + width: Option, // Truncation width W +} diff --git a/rust/lance-namespace-impls/src/lib.rs b/rust/lance-namespace-impls/src/lib.rs index 88248841bcb..2f825cd3d37 100644 --- a/rust/lance-namespace-impls/src/lib.rs +++ b/rust/lance-namespace-impls/src/lib.rs @@ -77,6 +77,8 @@ pub mod rest; #[cfg(feature = "rest-adapter")] pub mod rest_adapter; +pub mod partition; +pub mod json; // Re-export connect builder pub use connect::ConnectBuilder; diff --git a/rust/lance-namespace-impls/src/partition.rs b/rust/lance-namespace-impls/src/partition.rs new file mode 100644 index 00000000000..931b9db9664 --- /dev/null +++ b/rust/lance-namespace-impls/src/partition.rs @@ -0,0 +1,546 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors +#![allow(unused)] + +use crate::json::JsonPartitionField; +use crate::DirectoryNamespace; +use arrow::array::RecordBatch; +use arrow_schema::DataType; +use async_trait::async_trait; +use bytes::Bytes; +use lance::deps::datafusion::logical_expr::Expr; +use lance_core::datatypes::{Field, Schema}; +use lance_namespace::models::{ + AlterTableAddColumnsRequest, AlterTableAddColumnsResponse, AlterTableAlterColumnsRequest, + AlterTableAlterColumnsResponse, AlterTableDropColumnsRequest, AlterTableDropColumnsResponse, + AlterTransactionRequest, AlterTransactionResponse, AnalyzeTableQueryPlanRequest, + CountTableRowsRequest, CreateEmptyTableRequest, CreateEmptyTableResponse, + CreateNamespaceRequest, CreateNamespaceResponse, CreateTableIndexRequest, + CreateTableIndexResponse, CreateTableRequest, CreateTableResponse, + CreateTableScalarIndexResponse, CreateTableTagRequest, CreateTableTagResponse, + DeclareTableRequest, DeclareTableResponse, DeleteFromTableRequest, DeleteFromTableResponse, + DeleteTableTagRequest, DeleteTableTagResponse, DeregisterTableRequest, DeregisterTableResponse, + DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableIndexStatsRequest, + DescribeTableIndexStatsResponse, DescribeTableRequest, DescribeTableResponse, + DescribeTransactionRequest, DescribeTransactionResponse, DropNamespaceRequest, + DropNamespaceResponse, DropTableIndexRequest, DropTableIndexResponse, DropTableRequest, + DropTableResponse, ExplainTableQueryPlanRequest, GetTableStatsRequest, GetTableStatsResponse, + GetTableTagVersionRequest, GetTableTagVersionResponse, InsertIntoTableRequest, + InsertIntoTableResponse, ListNamespacesRequest, ListNamespacesResponse, + ListTableIndicesRequest, ListTableIndicesResponse, ListTableTagsRequest, ListTableTagsResponse, + ListTableVersionsRequest, ListTableVersionsResponse, ListTablesRequest, ListTablesResponse, + MergeInsertIntoTableRequest, MergeInsertIntoTableResponse, NamespaceExistsRequest, + QueryTableRequest, RegisterTableRequest, RegisterTableResponse, RenameTableRequest, + RenameTableResponse, RestoreTableRequest, RestoreTableResponse, TableExistsRequest, + UpdateTableRequest, UpdateTableResponse, UpdateTableSchemaMetadataRequest, + UpdateTableSchemaMetadataResponse, UpdateTableTagRequest, UpdateTableTagResponse, +}; +use lance_namespace::LanceNamespace; +use std::fmt::{Debug, Formatter}; + +/// A PartitionedNamespace is a directory namespace containing a collection of tables that share a +/// common schema. These tables are physically separated and independent, but logically related +/// through partition fields definition. +pub struct PartitionedNamespace { + dir: DirectoryNamespace, +} + +impl Debug for PartitionedNamespace { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + todo!() + } +} + +impl PartitionedNamespace { + /// Partition pruning for the given filter expression. + /// + /// # Arguments + /// + /// * `filter` - The filter expression to be applied. + /// + /// Returns the list of (partition table, refine expr) that are required to scan. + pub fn plan_scan(&self, filter: &Expr) -> lance_core::Result> { + todo!() + } + + /// Resolve the target partition table for the input row. Create it (empty table) if not exists. + /// + /// # Arguments + /// + /// * `record` - The record batch to be resolved, it should contain only one row. + /// + /// Returns the partition table that the input row belongs to. + pub fn resolve_or_create_partition_table( + &self, + record: &RecordBatch, + ) -> lance_core::Result { + todo!() + } + + /// Commit the partition table changes. + /// + /// If ACID is disabled, commit does nothing. + /// Otherwise, if the partition namespace is changed after read version, this method will + /// auto-detect the conflicts. + /// + /// # Arguments + /// + /// * `read_version` - The partition tables that are read in the transaction. + /// * `new_version` - The partition tables that are written in the transaction. + /// + /// Returns the new version of the partitioned namespace. + pub fn commit( + &self, + read_version: Option>, + new_version: Option>, + ) -> lance_core::Result>> { + todo!() + } + + /// Schema of the partitioned namespace. + pub fn schema(&self) -> Schema { + todo!() + } + + /// All partition tables of the partitioned namespace. + pub fn tables(&self) -> Vec { + todo!() + } + + /// Partitioning of the partitioned namespace. + pub fn partitioning(&self) -> Partitioning { + todo!() + } + + // Partition Evolution. + + // TODO: Based on `update_partition_spec`, we can provides PartitionUpdater api, e.g.: + // ``` + // partition_ns.partition_updater() + // .remove_field("old_partition_field") + // .add_field("new_partition_field", Expressions.day("ts")) + // .commit(); + // ``` + /// Update the partition spec. + /// + /// # Arguments + /// + /// * `partition_spec` - The new partition spec. + /// + /// Returns the new partition spec. + fn update_partition_spec( + &self, + partition_spec: Vec, + ) -> lance_core::Result { + todo!() + } + + // Schema Evolution. + + /// Add a new column to the partitioned namespace. + /// + /// # Arguments + /// + /// * `column` - The column to be added. + /// + /// Returns the new schema. + fn add_column(&self, column: &Field) -> lance_core::Result { + todo!() + } + + /// Drop the given column from the partitioned namespace. + /// + /// # Arguments + /// + /// * `column` - The column to be dropped. + /// + /// Returns the new schema. + fn drop_column(&self, column: &str) -> lance_core::Result { + todo!() + } + + /// Rename the given column in the partitioned namespace. + /// + /// # Arguments + /// + /// * `old_name` - The old name of the column. + /// * `new_name` - The new name of the column. + /// + /// Returns the new schema. + fn rename_column(&self, old_name: &str, new_name: &str) -> lance_core::Result { + todo!() + } + + /// Promote the type of the given column to the new type in the partitioned namespace. + /// + /// # Arguments + /// + /// * `column` - The column to be promoted. + /// * `new_type` - The new type of the column. + /// + /// Returns the new schema. + fn type_promotion(&self, column: &str, new_type: &DataType) -> lance_core::Result { + todo!() + } +} + +#[async_trait] +impl LanceNamespace for PartitionedNamespace { + async fn list_namespaces( + &self, + _request: ListNamespacesRequest, + ) -> lance_core::Result { + todo!() + } + + async fn describe_namespace( + &self, + _request: DescribeNamespaceRequest, + ) -> lance_core::Result { + todo!() + } + + async fn create_namespace( + &self, + _request: CreateNamespaceRequest, + ) -> lance_core::Result { + todo!() + } + + async fn drop_namespace( + &self, + _request: DropNamespaceRequest, + ) -> lance_core::Result { + todo!() + } + + async fn namespace_exists(&self, _request: NamespaceExistsRequest) -> lance_core::Result<()> { + todo!() + } + + async fn list_tables( + &self, + _request: ListTablesRequest, + ) -> lance_core::Result { + todo!() + } + + async fn describe_table( + &self, + _request: DescribeTableRequest, + ) -> lance_core::Result { + todo!() + } + + async fn register_table( + &self, + _request: RegisterTableRequest, + ) -> lance_core::Result { + todo!() + } + + async fn table_exists(&self, _request: TableExistsRequest) -> lance_core::Result<()> { + todo!() + } + + async fn drop_table( + &self, + _request: DropTableRequest, + ) -> lance_core::Result { + todo!() + } + + async fn deregister_table( + &self, + _request: DeregisterTableRequest, + ) -> lance_core::Result { + todo!() + } + + async fn count_table_rows(&self, _request: CountTableRowsRequest) -> lance_core::Result { + todo!() + } + + async fn create_table( + &self, + _request: CreateTableRequest, + _request_data: Bytes, + ) -> lance_core::Result { + todo!() + } + + async fn declare_table( + &self, + _request: DeclareTableRequest, + ) -> lance_core::Result { + todo!() + } + + async fn create_empty_table( + &self, + _request: CreateEmptyTableRequest, + ) -> lance_core::Result { + todo!() + } + + async fn insert_into_table( + &self, + _request: InsertIntoTableRequest, + _request_data: Bytes, + ) -> lance_core::Result { + todo!() + } + + async fn merge_insert_into_table( + &self, + _request: MergeInsertIntoTableRequest, + _request_data: Bytes, + ) -> lance_core::Result { + todo!() + } + + async fn update_table( + &self, + _request: UpdateTableRequest, + ) -> lance_core::Result { + todo!() + } + + async fn delete_from_table( + &self, + _request: DeleteFromTableRequest, + ) -> lance_core::Result { + todo!() + } + + async fn query_table(&self, _request: QueryTableRequest) -> lance_core::Result { + todo!() + } + + async fn create_table_index( + &self, + _request: CreateTableIndexRequest, + ) -> lance_core::Result { + todo!() + } + + async fn list_table_indices( + &self, + _request: ListTableIndicesRequest, + ) -> lance_core::Result { + todo!() + } + + async fn describe_table_index_stats( + &self, + _request: DescribeTableIndexStatsRequest, + ) -> lance_core::Result { + todo!() + } + + async fn describe_transaction( + &self, + _request: DescribeTransactionRequest, + ) -> lance_core::Result { + todo!() + } + + async fn alter_transaction( + &self, + _request: AlterTransactionRequest, + ) -> lance_core::Result { + todo!() + } + + async fn create_table_scalar_index( + &self, + _request: CreateTableIndexRequest, + ) -> lance_core::Result { + todo!() + } + + async fn drop_table_index( + &self, + _request: DropTableIndexRequest, + ) -> lance_core::Result { + todo!() + } + + async fn list_all_tables( + &self, + _request: ListTablesRequest, + ) -> lance_core::Result { + todo!() + } + + async fn restore_table( + &self, + _request: RestoreTableRequest, + ) -> lance_core::Result { + todo!() + } + + async fn rename_table( + &self, + _request: RenameTableRequest, + ) -> lance_core::Result { + todo!() + } + + async fn list_table_versions( + &self, + _request: ListTableVersionsRequest, + ) -> lance_core::Result { + todo!() + } + + async fn update_table_schema_metadata( + &self, + _request: UpdateTableSchemaMetadataRequest, + ) -> lance_core::Result { + todo!() + } + + async fn get_table_stats( + &self, + _request: GetTableStatsRequest, + ) -> lance_core::Result { + todo!() + } + + async fn explain_table_query_plan( + &self, + _request: ExplainTableQueryPlanRequest, + ) -> lance_core::Result { + todo!() + } + + async fn analyze_table_query_plan( + &self, + _request: AnalyzeTableQueryPlanRequest, + ) -> lance_core::Result { + todo!() + } + + async fn alter_table_add_columns( + &self, + _request: AlterTableAddColumnsRequest, + ) -> lance_core::Result { + todo!() + } + + async fn alter_table_alter_columns( + &self, + _request: AlterTableAlterColumnsRequest, + ) -> lance_core::Result { + todo!() + } + + async fn alter_table_drop_columns( + &self, + _request: AlterTableDropColumnsRequest, + ) -> lance_core::Result { + todo!() + } + + async fn list_table_tags( + &self, + _request: ListTableTagsRequest, + ) -> lance_core::Result { + todo!() + } + + async fn get_table_tag_version( + &self, + _request: GetTableTagVersionRequest, + ) -> lance_core::Result { + todo!() + } + + async fn create_table_tag( + &self, + _request: CreateTableTagRequest, + ) -> lance_core::Result { + todo!() + } + + async fn delete_table_tag( + &self, + _request: DeleteTableTagRequest, + ) -> lance_core::Result { + todo!() + } + + async fn update_table_tag( + &self, + _request: UpdateTableTagRequest, + ) -> lance_core::Result { + todo!() + } + + fn namespace_id(&self) -> String { + todo!() + } +} + +/// Create a new partitioned namespace with the given location, schema, and partition. +/// +/// # Arguments +/// +/// * `location` - The location of the partitioned namespace. +/// * `schema` - The schema of the partitioned namespace. +/// * `partition` - The initial partition of the partitioned namespace. +/// +/// Returns the created partitioned namespace. +pub fn create_partitioned_namespace( + location: &str, + schema: Schema, + partition: PartitionSpec, +) -> lance_core::Result { + todo!() +} + +/// Partition table of the partitioned namespace. +pub struct PartitionTable { + id: Vec, // namespace id + read_version: Option, // read version +} + +/// Partitioning contains all partition specs of the partitioned namespace. +pub struct Partitioning {} + +impl Partitioning { + pub fn current() -> PartitionSpec { + todo!() + } + pub fn id(id: i32) -> PartitionSpec { + todo!() + } + pub fn all() -> Vec { + todo!() + } +} + +/// Partition specification defines how to derive partition values from a record in a partitioned +/// namespace. +pub struct PartitionSpec { + id: i32, + fields: Vec, +} + +/// Partition field definition. +pub struct PartitionField { + field_id: String, // Unique identifier for this partition field + source_ids: Vec, // Field IDs of the source columns in the schema + expression: Expr, // DataFusion expression using `col0`, `col1`, ... as column references + result_type: DataType, // Result type of the partition value +} + +impl PartitionField { + pub fn to_json(self) -> JsonPartitionField { + todo!() + } + pub fn from_json(json: &str) -> Self { + todo!() + } +}