From 85b632e8d5867537d775905bc97b2d42e96934d1 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Mon, 27 Nov 2023 15:20:56 +0100 Subject: [PATCH 1/4] Convert-to-delta initial implementation --- src/catalog.rs | 2 +- src/context/delta.rs | 65 ++++++++++++++++++++++++------------- src/context/logical.rs | 31 ++++++++++++++---- src/context/physical.rs | 39 ++++++++++++++++++---- src/datafusion/parser.rs | 60 ++++++++++++++++++---------------- src/nodes.rs | 19 +++++++++++ src/repository/default.rs | 10 ++++-- src/repository/interface.rs | 38 ++++++++++------------ src/repository/postgres.rs | 3 +- src/repository/sqlite.rs | 3 +- 10 files changed, 183 insertions(+), 87 deletions(-) diff --git a/src/catalog.rs b/src/catalog.rs index d15ae551..f8262e6b 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -1,6 +1,7 @@ use std::str::FromStr; use std::{collections::HashMap, fmt::Debug, sync::Arc}; +use arrow_schema::Schema; use async_trait::async_trait; use datafusion::catalog::schema::MemorySchemaProvider; use datafusion::datasource::TableProvider; @@ -27,7 +28,6 @@ use crate::{ AllDatabaseColumnsResult, AllDatabaseFunctionsResult, Error as RepositoryError, Repository, TableVersionsResult, }, - schema::Schema, }; pub const DEFAULT_DB: &str = "default"; diff --git a/src/context/delta.rs b/src/context/delta.rs index 80772b08..ea613af5 100644 --- a/src/context/delta.rs +++ b/src/context/delta.rs @@ -3,7 +3,6 @@ use crate::context::SeafowlContext; #[cfg(test)] use crate::frontend::http::tests::deterministic_uuid; use crate::object_store::wrapped::InternalObjectStore; -use crate::schema::Schema as SeafowlSchema; use bytes::BytesMut; use datafusion::error::{DataFusionError as Error, Result}; @@ -11,6 +10,7 @@ use datafusion::execution::context::SessionState; use datafusion::parquet::basic::{Compression, ZstdLevel}; use datafusion::{ arrow::datatypes::{Schema, SchemaRef}, + datasource::TableProvider, error::DataFusionError, execution::context::TaskContext, parquet::{arrow::ArrowWriter, file::properties::WriterProperties}, @@ -18,8 +18,9 @@ use datafusion::{ sql::TableReference, }; use deltalake::kernel::{Action, Add, Schema as DeltaSchema}; -use deltalake::operations::create::CreateBuilder; -use deltalake::operations::transaction::commit; +use deltalake::operations::{ + convert_to_delta::ConvertToDeltaBuilder, create::CreateBuilder, transaction::commit, +}; use deltalake::protocol::{DeltaOperation, SaveMode}; use deltalake::writer::create_add; use deltalake::DeltaTable; @@ -272,20 +273,22 @@ pub async fn plan_to_object_store( .collect() } +pub(super) enum CreateDeltaTableDetails { + WithSchema(Schema), + FromFiles(Path), +} + impl SeafowlContext { pub(super) async fn create_delta_table<'a>( &self, name: impl Into>, - schema: &Schema, + details: CreateDeltaTableDetails, ) -> Result> { let table_ref: TableReference = name.into(); let resolved_ref = table_ref.resolve(&self.database, DEFAULT_SCHEMA); let schema_name = resolved_ref.schema.clone(); let table_name = resolved_ref.table.clone(); - let sf_schema = SeafowlSchema { - arrow_schema: Arc::new(schema.clone()), - }; let collection_id = self .table_catalog .get_collection_id_by_name(&self.database, &schema_name) @@ -294,8 +297,6 @@ impl SeafowlContext { Error::Plan(format!("Schema {schema_name:?} does not exist!")) })?; - let delta_schema = DeltaSchema::try_from(schema)?; - // TODO: we could be doing this inside the DB itself (i.e. `... DEFAULT gen_random_uuid()` // in Postgres and `... DEFAULT (uuid())` in SQLite) however we won't be able to do it until // sqlx 0.7 is released (which has libsqlite3-sys > 0.25, with the SQLite version that has @@ -307,25 +308,45 @@ impl SeafowlContext { let table_log_store = self.internal_object_store.get_log_store(table_uuid); // NB: there's also a uuid generated below for table's `DeltaTableMetaData::id`, so it would - // be nice if those two could match - let table = Arc::new( - CreateBuilder::new() - .with_log_store(table_log_store) - .with_table_name(&*table_name) - .with_columns(delta_schema.fields().clone()) - .with_comment(format!( - "Created by Seafowl version {}", - env!("CARGO_PKG_VERSION") - )) - .await?, - ); + // be nice if those two could match somehow + let table = Arc::new(match details { + CreateDeltaTableDetails::WithSchema(schema) => { + let delta_schema = DeltaSchema::try_from(&schema)?; + + CreateBuilder::new() + .with_log_store(table_log_store) + .with_table_name(&*table_name) + .with_columns(delta_schema.fields().clone()) + .with_comment(format!( + "Created by Seafowl {}", + env!("CARGO_PKG_VERSION") + )) + .await? + } + CreateDeltaTableDetails::FromFiles(_path) => { + // Now convert them to a Delta table + ConvertToDeltaBuilder::new() + .with_log_store(table_log_store) + .with_table_name(&*table_name) + .with_comment(format!( + "Converted by Seafowl {}", + env!("CARGO_PKG_VERSION") + )) + .await? + } + }); // We still persist the table into our own catalog, one reason is us being able to load all // tables and their schemas in bulk to satisfy information_schema queries. // Another is to keep track of table uuid's, which are used to construct the table uri. // We may look into doing this via delta-rs somehow eventually. self.table_catalog - .create_table(collection_id, &table_name, &sf_schema, table_uuid) + .create_table( + collection_id, + &table_name, + TableProvider::schema(table.as_ref()).as_ref(), + table_uuid, + ) .await?; self.inner.register_table(resolved_ref, table.clone())?; diff --git a/src/context/logical.rs b/src/context/logical.rs index 7934415c..23e806aa 100644 --- a/src/context/logical.rs +++ b/src/context/logical.rs @@ -1,11 +1,11 @@ use crate::catalog::DEFAULT_SCHEMA; use crate::context::SeafowlContext; -use crate::datafusion::parser::{DFParser, Statement as DFStatement}; +use crate::datafusion::parser::{DFParser, Statement as DFStatement, CONVERT_TO_DELTA}; use crate::datafusion::utils::build_schema; use crate::wasm_udf::data_types::CreateFunctionDetails; use crate::{ nodes::{ - CreateFunction, CreateTable, DropFunction, DropSchema, RenameTable, + ConvertTable, CreateFunction, CreateTable, DropFunction, DropSchema, RenameTable, SeafowlExtensionNode, Vacuum, }, version::TableVersionProcessor, @@ -267,7 +267,11 @@ impl SeafowlContext { "Unsupported SQL statement: {s:?}" ))), }, - DFStatement::CopyTo(CopyToStatement { ref mut source, .. }) => { + DFStatement::CopyTo(CopyToStatement { + ref mut source, + options, + .. + }) if !options.contains(&CONVERT_TO_DELTA) => { let state = if let CopyToSource::Query(ref mut query) = source { self.rewrite_time_travel_query(query).await? } else { @@ -275,12 +279,27 @@ impl SeafowlContext { }; state.statement_to_plan(stmt).await } + DFStatement::CopyTo(CopyToStatement { + source: CopyToSource::Relation(table_name), + target, + options, + }) if options.contains(&CONVERT_TO_DELTA) => { + Ok(LogicalPlan::Extension(Extension { + node: Arc::new(SeafowlExtensionNode::ConvertTable(ConvertTable { + location: target.clone(), + name: table_name.to_string(), + output_schema: Arc::new(DFSchema::empty()), + })), + })) + } DFStatement::DescribeTableStmt(_) | DFStatement::CreateExternalTable(_) => { self.inner.state().statement_to_plan(stmt).await } - DFStatement::Explain(_) => Err(Error::NotImplemented(format!( - "Unsupported SQL statement: {statement:?}" - ))), + DFStatement::CopyTo(_) | DFStatement::Explain(_) => { + Err(Error::NotImplemented(format!( + "Unsupported SQL statement: {statement:?}" + ))) + } } } diff --git a/src/context/physical.rs b/src/context/physical.rs index a9b82eb9..9ed46fe4 100644 --- a/src/context/physical.rs +++ b/src/context/physical.rs @@ -1,3 +1,4 @@ +use super::delta::CreateDeltaTableDetails; use crate::catalog::{DEFAULT_SCHEMA, STAGING_SCHEMA}; use crate::config::context::build_object_store; use crate::config::schema; @@ -6,7 +7,7 @@ use crate::context::delta::plan_to_object_store; use crate::context::SeafowlContext; use crate::delta_rs::backports::parquet_scan_from_actions; use crate::nodes::{ - CreateFunction, CreateTable, DropFunction, DropSchema, RenameTable, + ConvertTable, CreateFunction, CreateTable, DropFunction, DropSchema, RenameTable, SeafowlExtensionNode, Vacuum, }; use crate::object_store::http::try_prepare_http_url; @@ -49,6 +50,7 @@ use deltalake::operations::vacuum::VacuumBuilder; use deltalake::protocol::{DeltaOperation, SaveMode}; use deltalake::DeltaTable; use log::info; +use object_store::path::Path; use std::borrow::Cow; use std::ops::Deref; use std::ops::Not; @@ -226,8 +228,11 @@ impl SeafowlContext { // TODO: this means we'll have 2 table versions at the end, 1st from the create // and 2nd from the insert, while it seems more reasonable that in this case we have // only one - self.create_delta_table(name, plan.schema().as_ref()) - .await?; + self.create_delta_table( + name, + CreateDeltaTableDetails::WithSchema(plan.schema().as_ref().clone()), + ) + .await?; self.plan_to_delta_table(name, &plan).await?; Ok(make_dummy_exec()) @@ -544,12 +549,31 @@ impl SeafowlContext { // Other custom nodes we made like CREATE TABLE/INSERT/ALTER match SeafowlExtensionNode::from_dynamic(node) { Some(sfe_node) => match sfe_node { + SeafowlExtensionNode::ConvertTable(ConvertTable { + location, + name, + .. + }) => { + self.create_delta_table( + name, + CreateDeltaTableDetails::FromFiles(Path::from( + location.as_str(), + )), + ) + .await?; + + Ok(make_dummy_exec()) + } SeafowlExtensionNode::CreateTable(CreateTable { schema, name, .. }) => { - self.create_delta_table(name.as_str(), schema).await?; + self.create_delta_table( + name.as_str(), + CreateDeltaTableDetails::WithSchema(schema.clone()), + ) + .await?; Ok(make_dummy_exec()) } @@ -870,8 +894,11 @@ impl SeafowlContext { }; if !table_exists { - self.create_delta_table(table_ref.clone(), plan.schema().as_ref()) - .await?; + self.create_delta_table( + table_ref.clone(), + CreateDeltaTableDetails::WithSchema(plan.schema().as_ref().clone()), + ) + .await?; } self.plan_to_delta_table(table_ref, &plan).await diff --git a/src/datafusion/parser.rs b/src/datafusion/parser.rs index 71c0459c..18cd04cb 100644 --- a/src/datafusion/parser.rs +++ b/src/datafusion/parser.rs @@ -30,6 +30,7 @@ use datafusion::sql::parser::{ CopyToSource, CopyToStatement, CreateExternalTable, DescribeTableStmt, }; use datafusion_common::parsers::CompressionTypeVariant; +use lazy_static::lazy_static; use sqlparser::ast::{CreateFunctionBody, Expr, ObjectName, OrderByExpr, Value}; use sqlparser::tokenizer::{TokenWithLocation, Word}; use sqlparser::{ @@ -41,7 +42,6 @@ use sqlparser::{ use std::collections::{HashMap, VecDeque}; use std::str::FromStr; use std::string::ToString; -use strum_macros::Display; // Use `Parser::expected` instead, if possible macro_rules! parser_err { @@ -63,10 +63,12 @@ pub struct DFParser<'a> { parser: Parser<'a>, } -#[derive(Debug, Clone, Display)] -#[strum(serialize_all = "UPPERCASE")] -enum KeywordExtensions { - Vacuum, +// Hacky way to distinguish `COPY TO` statement from `CONVERT TO DELTA`. +// We should really introduce our own Statement enum which encapsulates +// the DataFusion one and adds our custom variants (this one and `VACUUM`). +lazy_static! { + pub static ref CONVERT_TO_DELTA: (String, Value) = + ("CONVERT_TO_DELTA".to_string(), Value::Boolean(true)); } impl<'a> DFParser<'a> { @@ -136,39 +138,25 @@ impl<'a> DFParser<'a> { pub fn parse_statement(&mut self) -> Result { match self.parser.peek_token().token { Token::Word(w) => { - match w { - Word { - keyword: Keyword::CREATE, - .. - } => { - // move one token forward + match w.keyword { + Keyword::CREATE => { self.parser.next_token(); - // use custom parsing self.parse_create() } - Word { - keyword: Keyword::COPY, - .. - } => { + Keyword::CONVERT => { + self.parser.next_token(); + self.parse_convert() + } + Keyword::COPY => { self.parser.next_token(); self.parse_copy() } - Word { - keyword: Keyword::DESCRIBE, - .. - } => { - // move one token forward + Keyword::DESCRIBE => { self.parser.next_token(); - // use custom parsing self.parse_describe() } - Word { value, .. } - if value.to_uppercase() - == KeywordExtensions::Vacuum.to_string() => - { - // move one token forward + Keyword::VACUUM => { self.parser.next_token(); - // use custom parsing self.parse_vacuum() } _ => { @@ -195,6 +183,22 @@ impl<'a> DFParser<'a> { })) } + // Parse `CONVERT location TO DELTA table_name` type statement + pub fn parse_convert(&mut self) -> Result { + let location = self.parser.parse_literal_string()?; + self.parser + .expect_keywords(&[Keyword::TO, Keyword::DELTA])?; + let table_name = self.parser.parse_object_name()?; + + // We'll use the CopyToStatement struct to pass the location and table name + // as it's the closest match to what we need. + Ok(Statement::CopyTo(CopyToStatement { + source: CopyToSource::Relation(table_name), + target: location, + options: vec![CONVERT_TO_DELTA.clone()], + })) + } + pub fn parse_vacuum(&mut self) -> Result { // Since `VACUUM` is not a supported keyword by sqlparser, we abuse the semantically related // TRUNCATE to smuggle the info on whether we want GC of tables, partitions or the DB itself. diff --git a/src/nodes.rs b/src/nodes.rs index c5edead5..19e00c7a 100644 --- a/src/nodes.rs +++ b/src/nodes.rs @@ -8,6 +8,16 @@ use crate::wasm_udf::data_types::CreateFunctionDetails; use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode}; use strum_macros::AsRefStr; +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub struct ConvertTable { + /// Location from which to convert + pub location: String, + /// Name of the table to convert to + pub name: String, + /// Dummy result schema for the plan (empty) + pub output_schema: DFSchemaRef, +} + #[derive(Debug, Clone, Hash, PartialEq, Eq)] pub struct CreateTable { /// The table schema @@ -69,6 +79,7 @@ pub struct Vacuum { #[derive(AsRefStr, Debug, Clone, Hash, PartialEq, Eq)] pub enum SeafowlExtensionNode { + ConvertTable(ConvertTable), CreateTable(CreateTable), CreateFunction(CreateFunction), DropFunction(DropFunction), @@ -102,6 +113,9 @@ impl UserDefinedLogicalNode for SeafowlExtensionNode { // (& means it has to have been borrowed and we can't own anything, since this // function will exit soon) match self { + SeafowlExtensionNode::ConvertTable(ConvertTable { + output_schema, .. + }) => output_schema, SeafowlExtensionNode::CreateTable(CreateTable { output_schema, .. }) => { output_schema } @@ -131,6 +145,11 @@ impl UserDefinedLogicalNode for SeafowlExtensionNode { fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { + SeafowlExtensionNode::ConvertTable(ConvertTable { + location, name, .. + }) => { + write!(f, "Convert: {location} to {name}") + } SeafowlExtensionNode::CreateTable(CreateTable { name, .. }) => { write!(f, "Create: {name}") } diff --git a/src/repository/default.rs b/src/repository/default.rs index d01e931d..794b0f6a 100644 --- a/src/repository/default.rs +++ b/src/repository/default.rs @@ -239,10 +239,16 @@ impl Repository for $repo { // Create columns // TODO this breaks if we have more than (bind limit) columns - if !schema.arrow_schema.fields().is_empty() { + if !schema.fields().is_empty() { let mut builder: QueryBuilder<_> = QueryBuilder::new("INSERT INTO table_column(table_version_id, name, type) "); - builder.push_values(schema.to_column_names_types(), |mut b, col| { + + let fields: Vec<(String, String)> = schema.fields() + .iter() + .map(|f| (f.name().clone(), field_to_json(f).to_string())) + .collect(); + + builder.push_values(fields, |mut b, col| { b.push_bind(new_version_id) .push_bind(col.0) .push_bind(col.1); diff --git a/src/repository/interface.rs b/src/repository/interface.rs index 33f9d3fc..ce846402 100644 --- a/src/repository/interface.rs +++ b/src/repository/interface.rs @@ -1,18 +1,16 @@ use std::fmt::Debug; use std::str::FromStr; +use arrow_schema::Schema; use async_trait::async_trait; use strum::ParseError; use strum_macros::{Display, EnumString}; use uuid::Uuid; -use crate::wasm_udf::data_types::CreateFunctionDetails; -use crate::{ - data_types::{ - CollectionId, DatabaseId, FunctionId, TableId, TableVersionId, Timestamp, - }, - schema::Schema, +use crate::data_types::{ + CollectionId, DatabaseId, FunctionId, TableId, TableVersionId, Timestamp, }; +use crate::wasm_udf::data_types::CreateFunctionDetails; #[derive(sqlx::FromRow, Default, Debug, PartialEq, Eq)] pub struct AllDatabaseColumnsResult { @@ -232,14 +230,11 @@ pub mod tests { .create_collection(database_id, DEFAULT_SCHEMA) .await .expect("Error creating default schema"); - let empty_schema = Schema { - arrow_schema: Arc::new(ArrowSchema::empty()), - }; repository .create_table( default_schema_id, "empty_table", - &empty_schema, + &ArrowSchema::empty(), Uuid::default(), ) .await @@ -254,12 +249,9 @@ pub mod tests { ArrowField::new("date", ArrowDataType::Date64, false), ArrowField::new("value", ArrowDataType::Float64, false), ]); - let schema = Schema { - arrow_schema: Arc::new(arrow_schema), - }; let (table_id, table_version_id) = repository - .create_table(collection_id, "testtable", &schema, Uuid::default()) + .create_table(collection_id, "testtable", &arrow_schema, Uuid::default()) .await .expect("Error creating table"); @@ -564,10 +556,6 @@ pub mod tests { )); // Make a new table in the existing collection with the same name - let schema = Schema { - arrow_schema: Arc::new(ArrowSchema::empty()), - }; - let collection_id_1 = repository .get_collection_id_by_name("testdb", "testcol") .await @@ -579,7 +567,12 @@ pub mod tests { assert!(matches!( repository - .create_table(collection_id_2, "testtable2", &schema, Uuid::default()) + .create_table( + collection_id_2, + "testtable2", + &ArrowSchema::empty(), + Uuid::default() + ) .await .unwrap_err(), Error::UniqueConstraintViolation(_) @@ -587,7 +580,12 @@ pub mod tests { // Make a new table in the previous collection, try renaming let (new_table_id, _) = repository - .create_table(collection_id_1, "testtable2", &schema, Uuid::default()) + .create_table( + collection_id_1, + "testtable2", + &ArrowSchema::empty(), + Uuid::default(), + ) .await .unwrap(); diff --git a/src/repository/postgres.rs b/src/repository/postgres.rs index 1f9adbe0..3dede0d0 100644 --- a/src/repository/postgres.rs +++ b/src/repository/postgres.rs @@ -1,5 +1,7 @@ use std::{fmt::Debug, time::Duration}; +use arrow_integration_test::field_to_json; +use arrow_schema::Schema; use async_trait::async_trait; use futures::TryStreamExt; use sqlx::{ @@ -12,7 +14,6 @@ use uuid::Uuid; use crate::{ data_types::{CollectionId, DatabaseId, FunctionId, TableId, TableVersionId}, implement_repository, - schema::Schema, wasm_udf::data_types::CreateFunctionDetails, }; diff --git a/src/repository/sqlite.rs b/src/repository/sqlite.rs index a2738e90..8645d223 100644 --- a/src/repository/sqlite.rs +++ b/src/repository/sqlite.rs @@ -1,5 +1,7 @@ use std::{fmt::Debug, str::FromStr}; +use arrow_integration_test::field_to_json; +use arrow_schema::Schema; use async_trait::async_trait; use futures::TryStreamExt; use sqlx::sqlite::SqliteJournalMode; @@ -12,7 +14,6 @@ use uuid::Uuid; use crate::{ data_types::{CollectionId, DatabaseId, FunctionId, TableId, TableVersionId}, - schema::Schema, wasm_udf::data_types::CreateFunctionDetails, }; From 5833d1b5b62f9865c872f42e5b944e5b559a0342 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 29 Nov 2023 13:33:39 +0100 Subject: [PATCH 2/4] Pivot to supporting only bare UUID paths for in-place conversion --- src/context/delta.rs | 51 +++++++++++++++++++++++++---------------- src/context/physical.rs | 2 +- 2 files changed, 32 insertions(+), 21 deletions(-) diff --git a/src/context/delta.rs b/src/context/delta.rs index ea613af5..113cbdd0 100644 --- a/src/context/delta.rs +++ b/src/context/delta.rs @@ -275,7 +275,7 @@ pub async fn plan_to_object_store( pub(super) enum CreateDeltaTableDetails { WithSchema(Schema), - FromFiles(Path), + FromPath(Path), } impl SeafowlContext { @@ -297,23 +297,23 @@ impl SeafowlContext { Error::Plan(format!("Schema {schema_name:?} does not exist!")) })?; - // TODO: we could be doing this inside the DB itself (i.e. `... DEFAULT gen_random_uuid()` - // in Postgres and `... DEFAULT (uuid())` in SQLite) however we won't be able to do it until - // sqlx 0.7 is released (which has libsqlite3-sys > 0.25, with the SQLite version that has - // the `uuid()` function). - // Then we could create the table in our catalog first and try to create the delta table itself - // with the returned uuid (and delete the catalog entry if the object store creation fails). - // On the other hand that would complicate etag testing logic. - let table_uuid = get_uuid(); - let table_log_store = self.internal_object_store.get_log_store(table_uuid); - // NB: there's also a uuid generated below for table's `DeltaTableMetaData::id`, so it would // be nice if those two could match somehow - let table = Arc::new(match details { + let (table_uuid, table) = match details { CreateDeltaTableDetails::WithSchema(schema) => { + // TODO: we could be doing this inside the DB itself (i.e. `... DEFAULT gen_random_uuid()` + // in Postgres and `... DEFAULT (uuid())` in SQLite) however we won't be able to do it until + // sqlx 0.7 is released (which has libsqlite3-sys > 0.25, with the SQLite version that has + // the `uuid()` function). + // Then we could create the table in our catalog first and try to create the delta table itself + // with the returned uuid (and delete the catalog entry if the object store creation fails). + // On the other hand that would complicate etag testing logic. + let table_uuid = get_uuid(); + let table_log_store = + self.internal_object_store.get_log_store(table_uuid); let delta_schema = DeltaSchema::try_from(&schema)?; - CreateBuilder::new() + let table = CreateBuilder::new() .with_log_store(table_log_store) .with_table_name(&*table_name) .with_columns(delta_schema.fields().clone()) @@ -321,20 +321,30 @@ impl SeafowlContext { "Created by Seafowl {}", env!("CARGO_PKG_VERSION") )) - .await? + .await?; + (table_uuid, table) } - CreateDeltaTableDetails::FromFiles(_path) => { - // Now convert them to a Delta table - ConvertToDeltaBuilder::new() + CreateDeltaTableDetails::FromPath(path) => { + // For now interpret the path as containing only the final UUID table prefix, + // in accordance with Seafowl convention + let table_uuid = Uuid::try_parse(path.as_ref()).map_err(|e| { + DataFusionError::Execution(format!( + "Unable to parse the UUID path of the table: {e}" + )) + })?; + let table_log_store = + self.internal_object_store.get_log_store(table_uuid); + let table = ConvertToDeltaBuilder::new() .with_log_store(table_log_store) .with_table_name(&*table_name) .with_comment(format!( "Converted by Seafowl {}", env!("CARGO_PKG_VERSION") )) - .await? + .await?; + (table_uuid, table) } - }); + }; // We still persist the table into our own catalog, one reason is us being able to load all // tables and their schemas in bulk to satisfy information_schema queries. @@ -344,11 +354,12 @@ impl SeafowlContext { .create_table( collection_id, &table_name, - TableProvider::schema(table.as_ref()).as_ref(), + TableProvider::schema(&table).as_ref(), table_uuid, ) .await?; + let table = Arc::new(table); self.inner.register_table(resolved_ref, table.clone())?; debug!("Created new table {table}"); Ok(table) diff --git a/src/context/physical.rs b/src/context/physical.rs index 9ed46fe4..3c5238e1 100644 --- a/src/context/physical.rs +++ b/src/context/physical.rs @@ -556,7 +556,7 @@ impl SeafowlContext { }) => { self.create_delta_table( name, - CreateDeltaTableDetails::FromFiles(Path::from( + CreateDeltaTableDetails::FromPath(Path::from( location.as_str(), )), ) From d72f6e76f78bb280ec26725b24f1df7ab1f466dd Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 29 Nov 2023 14:59:41 +0100 Subject: [PATCH 3/4] Add integration test for the CONVERT TO DELTA statement --- tests/statements/convert.rs | 75 +++++++++++++++++++++++++++++++++++++ tests/statements/mod.rs | 3 ++ 2 files changed, 78 insertions(+) create mode 100644 tests/statements/convert.rs diff --git a/tests/statements/convert.rs b/tests/statements/convert.rs new file mode 100644 index 00000000..2601837d --- /dev/null +++ b/tests/statements/convert.rs @@ -0,0 +1,75 @@ +use crate::statements::*; + +#[tokio::test] +async fn test_convert_from_flat_parquet_table() -> Result<(), DataFusionError> { + let (context, maybe_test_dir) = make_context_with_pg(ObjectStoreType::Local).await; + + // Prepare a flat Parquet table + let table_uuid = Uuid::new_v4(); + let temp_dir = maybe_test_dir.expect("temporary data dir exists"); + let table_path = temp_dir.path().join(table_uuid.to_string()); + // Create the directory as otherwise the COPY will fail + create_dir(table_path.clone()).await?; + + // COPY some values multiple times to test converting flat table with more than one parquet file + context + .plan_query(&format!( + "COPY (VALUES (1, 'one'), (2, 'two')) TO '{}/file_1.parquet'", + table_path.display() + )) + .await?; + context + .plan_query(&format!( + "COPY (VALUES (3, 'three'), (4, 'four')) TO '{}/file_2.parquet'", + table_path.display() + )) + .await?; + context + .plan_query(&format!( + "COPY (VALUES (5, 'five'), (6, 'six')) TO '{}/file_3.parquet'", + table_path.display() + )) + .await?; + + // Now test the actual conversion + context + .plan_query(&format!("CONVERT '{table_uuid}' TO DELTA table_converted")) + .await?; + + // Finally test the contents of the converted table + let plan = context + .plan_query("SELECT * FROM table_converted ORDER BY column1") + .await?; + let results = context.collect(plan).await.unwrap(); + + let expected = [ + "+---------+---------+", + "| column1 | column2 |", + "+---------+---------+", + "| 1 | one |", + "| 2 | two |", + "| 3 | three |", + "| 4 | four |", + "| 5 | five |", + "| 6 | six |", + "+---------+---------+", + ]; + assert_batches_eq!(expected, &results); + + // Also check the final directory state + testutils::assert_uploaded_objects( + context + .internal_object_store + .get_log_store(table_uuid) + .object_store(), + vec![ + Path::from("_delta_log/00000000000000000000.json"), + Path::from("file_1.parquet"), + Path::from("file_2.parquet"), + Path::from("file_3.parquet"), + ], + ) + .await; + + Ok(()) +} diff --git a/tests/statements/mod.rs b/tests/statements/mod.rs index f15d0a43..04ed17d3 100644 --- a/tests/statements/mod.rs +++ b/tests/statements/mod.rs @@ -17,7 +17,9 @@ use serde_json::json; use sqlx::{any::install_default_drivers, AnyPool, Executor}; #[cfg(feature = "remote-tables")] use tempfile::{NamedTempFile, TempPath}; +use tokio::fs::create_dir; use tokio::time::sleep; +use uuid::Uuid; use rstest::rstest; use tempfile::TempDir; @@ -34,6 +36,7 @@ mod dml; mod query; // Hack because integration tests do not set cfg(test) // https://users.rust-lang.org/t/sharing-helper-function-between-unit-and-integration-tests/9941/2 +mod convert; #[allow(dead_code)] #[path = "../../src/testutils.rs"] mod testutils; From 2f7fb936770d810030f5a71ffef239d4fa057ecc Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Fri, 1 Dec 2023 17:19:16 +0100 Subject: [PATCH 4/4] Rename enum variant from WithSchema to EmptyTable --- src/context/delta.rs | 4 ++-- src/context/physical.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/context/delta.rs b/src/context/delta.rs index 113cbdd0..88b9343f 100644 --- a/src/context/delta.rs +++ b/src/context/delta.rs @@ -274,7 +274,7 @@ pub async fn plan_to_object_store( } pub(super) enum CreateDeltaTableDetails { - WithSchema(Schema), + EmptyTable(Schema), FromPath(Path), } @@ -300,7 +300,7 @@ impl SeafowlContext { // NB: there's also a uuid generated below for table's `DeltaTableMetaData::id`, so it would // be nice if those two could match somehow let (table_uuid, table) = match details { - CreateDeltaTableDetails::WithSchema(schema) => { + CreateDeltaTableDetails::EmptyTable(schema) => { // TODO: we could be doing this inside the DB itself (i.e. `... DEFAULT gen_random_uuid()` // in Postgres and `... DEFAULT (uuid())` in SQLite) however we won't be able to do it until // sqlx 0.7 is released (which has libsqlite3-sys > 0.25, with the SQLite version that has diff --git a/src/context/physical.rs b/src/context/physical.rs index 3c5238e1..96fe633e 100644 --- a/src/context/physical.rs +++ b/src/context/physical.rs @@ -230,7 +230,7 @@ impl SeafowlContext { // only one self.create_delta_table( name, - CreateDeltaTableDetails::WithSchema(plan.schema().as_ref().clone()), + CreateDeltaTableDetails::EmptyTable(plan.schema().as_ref().clone()), ) .await?; self.plan_to_delta_table(name, &plan).await?; @@ -571,7 +571,7 @@ impl SeafowlContext { }) => { self.create_delta_table( name.as_str(), - CreateDeltaTableDetails::WithSchema(schema.clone()), + CreateDeltaTableDetails::EmptyTable(schema.clone()), ) .await?; @@ -896,7 +896,7 @@ impl SeafowlContext { if !table_exists { self.create_delta_table( table_ref.clone(), - CreateDeltaTableDetails::WithSchema(plan.schema().as_ref().clone()), + CreateDeltaTableDetails::EmptyTable(plan.schema().as_ref().clone()), ) .await?; }