diff --git a/src/catalog.rs b/src/catalog.rs index d15ae551..f8262e6b 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -1,6 +1,7 @@ use std::str::FromStr; use std::{collections::HashMap, fmt::Debug, sync::Arc}; +use arrow_schema::Schema; use async_trait::async_trait; use datafusion::catalog::schema::MemorySchemaProvider; use datafusion::datasource::TableProvider; @@ -27,7 +28,6 @@ use crate::{ AllDatabaseColumnsResult, AllDatabaseFunctionsResult, Error as RepositoryError, Repository, TableVersionsResult, }, - schema::Schema, }; pub const DEFAULT_DB: &str = "default"; diff --git a/src/context/delta.rs b/src/context/delta.rs index 80772b08..ea613af5 100644 --- a/src/context/delta.rs +++ b/src/context/delta.rs @@ -3,7 +3,6 @@ use crate::context::SeafowlContext; #[cfg(test)] use crate::frontend::http::tests::deterministic_uuid; use crate::object_store::wrapped::InternalObjectStore; -use crate::schema::Schema as SeafowlSchema; use bytes::BytesMut; use datafusion::error::{DataFusionError as Error, Result}; @@ -11,6 +10,7 @@ use datafusion::execution::context::SessionState; use datafusion::parquet::basic::{Compression, ZstdLevel}; use datafusion::{ arrow::datatypes::{Schema, SchemaRef}, + datasource::TableProvider, error::DataFusionError, execution::context::TaskContext, parquet::{arrow::ArrowWriter, file::properties::WriterProperties}, @@ -18,8 +18,9 @@ use datafusion::{ sql::TableReference, }; use deltalake::kernel::{Action, Add, Schema as DeltaSchema}; -use deltalake::operations::create::CreateBuilder; -use deltalake::operations::transaction::commit; +use deltalake::operations::{ + convert_to_delta::ConvertToDeltaBuilder, create::CreateBuilder, transaction::commit, +}; use deltalake::protocol::{DeltaOperation, SaveMode}; use deltalake::writer::create_add; use deltalake::DeltaTable; @@ -272,20 +273,22 @@ pub async fn plan_to_object_store( .collect() } +pub(super) enum CreateDeltaTableDetails { + WithSchema(Schema), + FromFiles(Path), +} + impl SeafowlContext { pub(super) async fn create_delta_table<'a>( &self, name: impl Into>, - schema: &Schema, + details: CreateDeltaTableDetails, ) -> Result> { let table_ref: TableReference = name.into(); let resolved_ref = table_ref.resolve(&self.database, DEFAULT_SCHEMA); let schema_name = resolved_ref.schema.clone(); let table_name = resolved_ref.table.clone(); - let sf_schema = SeafowlSchema { - arrow_schema: Arc::new(schema.clone()), - }; let collection_id = self .table_catalog .get_collection_id_by_name(&self.database, &schema_name) @@ -294,8 +297,6 @@ impl SeafowlContext { Error::Plan(format!("Schema {schema_name:?} does not exist!")) })?; - let delta_schema = DeltaSchema::try_from(schema)?; - // TODO: we could be doing this inside the DB itself (i.e. `... DEFAULT gen_random_uuid()` // in Postgres and `... DEFAULT (uuid())` in SQLite) however we won't be able to do it until // sqlx 0.7 is released (which has libsqlite3-sys > 0.25, with the SQLite version that has @@ -307,25 +308,45 @@ impl SeafowlContext { let table_log_store = self.internal_object_store.get_log_store(table_uuid); // NB: there's also a uuid generated below for table's `DeltaTableMetaData::id`, so it would - // be nice if those two could match - let table = Arc::new( - CreateBuilder::new() - .with_log_store(table_log_store) - .with_table_name(&*table_name) - .with_columns(delta_schema.fields().clone()) - .with_comment(format!( - "Created by Seafowl version {}", - env!("CARGO_PKG_VERSION") - )) - .await?, - ); + // be nice if those two could match somehow + let table = Arc::new(match details { + CreateDeltaTableDetails::WithSchema(schema) => { + let delta_schema = DeltaSchema::try_from(&schema)?; + + CreateBuilder::new() + .with_log_store(table_log_store) + .with_table_name(&*table_name) + .with_columns(delta_schema.fields().clone()) + .with_comment(format!( + "Created by Seafowl {}", + env!("CARGO_PKG_VERSION") + )) + .await? + } + CreateDeltaTableDetails::FromFiles(_path) => { + // Now convert them to a Delta table + ConvertToDeltaBuilder::new() + .with_log_store(table_log_store) + .with_table_name(&*table_name) + .with_comment(format!( + "Converted by Seafowl {}", + env!("CARGO_PKG_VERSION") + )) + .await? + } + }); // We still persist the table into our own catalog, one reason is us being able to load all // tables and their schemas in bulk to satisfy information_schema queries. // Another is to keep track of table uuid's, which are used to construct the table uri. // We may look into doing this via delta-rs somehow eventually. self.table_catalog - .create_table(collection_id, &table_name, &sf_schema, table_uuid) + .create_table( + collection_id, + &table_name, + TableProvider::schema(table.as_ref()).as_ref(), + table_uuid, + ) .await?; self.inner.register_table(resolved_ref, table.clone())?; diff --git a/src/context/logical.rs b/src/context/logical.rs index 7934415c..23e806aa 100644 --- a/src/context/logical.rs +++ b/src/context/logical.rs @@ -1,11 +1,11 @@ use crate::catalog::DEFAULT_SCHEMA; use crate::context::SeafowlContext; -use crate::datafusion::parser::{DFParser, Statement as DFStatement}; +use crate::datafusion::parser::{DFParser, Statement as DFStatement, CONVERT_TO_DELTA}; use crate::datafusion::utils::build_schema; use crate::wasm_udf::data_types::CreateFunctionDetails; use crate::{ nodes::{ - CreateFunction, CreateTable, DropFunction, DropSchema, RenameTable, + ConvertTable, CreateFunction, CreateTable, DropFunction, DropSchema, RenameTable, SeafowlExtensionNode, Vacuum, }, version::TableVersionProcessor, @@ -267,7 +267,11 @@ impl SeafowlContext { "Unsupported SQL statement: {s:?}" ))), }, - DFStatement::CopyTo(CopyToStatement { ref mut source, .. }) => { + DFStatement::CopyTo(CopyToStatement { + ref mut source, + options, + .. + }) if !options.contains(&CONVERT_TO_DELTA) => { let state = if let CopyToSource::Query(ref mut query) = source { self.rewrite_time_travel_query(query).await? } else { @@ -275,12 +279,27 @@ impl SeafowlContext { }; state.statement_to_plan(stmt).await } + DFStatement::CopyTo(CopyToStatement { + source: CopyToSource::Relation(table_name), + target, + options, + }) if options.contains(&CONVERT_TO_DELTA) => { + Ok(LogicalPlan::Extension(Extension { + node: Arc::new(SeafowlExtensionNode::ConvertTable(ConvertTable { + location: target.clone(), + name: table_name.to_string(), + output_schema: Arc::new(DFSchema::empty()), + })), + })) + } DFStatement::DescribeTableStmt(_) | DFStatement::CreateExternalTable(_) => { self.inner.state().statement_to_plan(stmt).await } - DFStatement::Explain(_) => Err(Error::NotImplemented(format!( - "Unsupported SQL statement: {statement:?}" - ))), + DFStatement::CopyTo(_) | DFStatement::Explain(_) => { + Err(Error::NotImplemented(format!( + "Unsupported SQL statement: {statement:?}" + ))) + } } } diff --git a/src/context/physical.rs b/src/context/physical.rs index a9b82eb9..9ed46fe4 100644 --- a/src/context/physical.rs +++ b/src/context/physical.rs @@ -1,3 +1,4 @@ +use super::delta::CreateDeltaTableDetails; use crate::catalog::{DEFAULT_SCHEMA, STAGING_SCHEMA}; use crate::config::context::build_object_store; use crate::config::schema; @@ -6,7 +7,7 @@ use crate::context::delta::plan_to_object_store; use crate::context::SeafowlContext; use crate::delta_rs::backports::parquet_scan_from_actions; use crate::nodes::{ - CreateFunction, CreateTable, DropFunction, DropSchema, RenameTable, + ConvertTable, CreateFunction, CreateTable, DropFunction, DropSchema, RenameTable, SeafowlExtensionNode, Vacuum, }; use crate::object_store::http::try_prepare_http_url; @@ -49,6 +50,7 @@ use deltalake::operations::vacuum::VacuumBuilder; use deltalake::protocol::{DeltaOperation, SaveMode}; use deltalake::DeltaTable; use log::info; +use object_store::path::Path; use std::borrow::Cow; use std::ops::Deref; use std::ops::Not; @@ -226,8 +228,11 @@ impl SeafowlContext { // TODO: this means we'll have 2 table versions at the end, 1st from the create // and 2nd from the insert, while it seems more reasonable that in this case we have // only one - self.create_delta_table(name, plan.schema().as_ref()) - .await?; + self.create_delta_table( + name, + CreateDeltaTableDetails::WithSchema(plan.schema().as_ref().clone()), + ) + .await?; self.plan_to_delta_table(name, &plan).await?; Ok(make_dummy_exec()) @@ -544,12 +549,31 @@ impl SeafowlContext { // Other custom nodes we made like CREATE TABLE/INSERT/ALTER match SeafowlExtensionNode::from_dynamic(node) { Some(sfe_node) => match sfe_node { + SeafowlExtensionNode::ConvertTable(ConvertTable { + location, + name, + .. + }) => { + self.create_delta_table( + name, + CreateDeltaTableDetails::FromFiles(Path::from( + location.as_str(), + )), + ) + .await?; + + Ok(make_dummy_exec()) + } SeafowlExtensionNode::CreateTable(CreateTable { schema, name, .. }) => { - self.create_delta_table(name.as_str(), schema).await?; + self.create_delta_table( + name.as_str(), + CreateDeltaTableDetails::WithSchema(schema.clone()), + ) + .await?; Ok(make_dummy_exec()) } @@ -870,8 +894,11 @@ impl SeafowlContext { }; if !table_exists { - self.create_delta_table(table_ref.clone(), plan.schema().as_ref()) - .await?; + self.create_delta_table( + table_ref.clone(), + CreateDeltaTableDetails::WithSchema(plan.schema().as_ref().clone()), + ) + .await?; } self.plan_to_delta_table(table_ref, &plan).await diff --git a/src/datafusion/parser.rs b/src/datafusion/parser.rs index 71c0459c..18cd04cb 100644 --- a/src/datafusion/parser.rs +++ b/src/datafusion/parser.rs @@ -30,6 +30,7 @@ use datafusion::sql::parser::{ CopyToSource, CopyToStatement, CreateExternalTable, DescribeTableStmt, }; use datafusion_common::parsers::CompressionTypeVariant; +use lazy_static::lazy_static; use sqlparser::ast::{CreateFunctionBody, Expr, ObjectName, OrderByExpr, Value}; use sqlparser::tokenizer::{TokenWithLocation, Word}; use sqlparser::{ @@ -41,7 +42,6 @@ use sqlparser::{ use std::collections::{HashMap, VecDeque}; use std::str::FromStr; use std::string::ToString; -use strum_macros::Display; // Use `Parser::expected` instead, if possible macro_rules! parser_err { @@ -63,10 +63,12 @@ pub struct DFParser<'a> { parser: Parser<'a>, } -#[derive(Debug, Clone, Display)] -#[strum(serialize_all = "UPPERCASE")] -enum KeywordExtensions { - Vacuum, +// Hacky way to distinguish `COPY TO` statement from `CONVERT TO DELTA`. +// We should really introduce our own Statement enum which encapsulates +// the DataFusion one and adds our custom variants (this one and `VACUUM`). +lazy_static! { + pub static ref CONVERT_TO_DELTA: (String, Value) = + ("CONVERT_TO_DELTA".to_string(), Value::Boolean(true)); } impl<'a> DFParser<'a> { @@ -136,39 +138,25 @@ impl<'a> DFParser<'a> { pub fn parse_statement(&mut self) -> Result { match self.parser.peek_token().token { Token::Word(w) => { - match w { - Word { - keyword: Keyword::CREATE, - .. - } => { - // move one token forward + match w.keyword { + Keyword::CREATE => { self.parser.next_token(); - // use custom parsing self.parse_create() } - Word { - keyword: Keyword::COPY, - .. - } => { + Keyword::CONVERT => { + self.parser.next_token(); + self.parse_convert() + } + Keyword::COPY => { self.parser.next_token(); self.parse_copy() } - Word { - keyword: Keyword::DESCRIBE, - .. - } => { - // move one token forward + Keyword::DESCRIBE => { self.parser.next_token(); - // use custom parsing self.parse_describe() } - Word { value, .. } - if value.to_uppercase() - == KeywordExtensions::Vacuum.to_string() => - { - // move one token forward + Keyword::VACUUM => { self.parser.next_token(); - // use custom parsing self.parse_vacuum() } _ => { @@ -195,6 +183,22 @@ impl<'a> DFParser<'a> { })) } + // Parse `CONVERT location TO DELTA table_name` type statement + pub fn parse_convert(&mut self) -> Result { + let location = self.parser.parse_literal_string()?; + self.parser + .expect_keywords(&[Keyword::TO, Keyword::DELTA])?; + let table_name = self.parser.parse_object_name()?; + + // We'll use the CopyToStatement struct to pass the location and table name + // as it's the closest match to what we need. + Ok(Statement::CopyTo(CopyToStatement { + source: CopyToSource::Relation(table_name), + target: location, + options: vec![CONVERT_TO_DELTA.clone()], + })) + } + pub fn parse_vacuum(&mut self) -> Result { // Since `VACUUM` is not a supported keyword by sqlparser, we abuse the semantically related // TRUNCATE to smuggle the info on whether we want GC of tables, partitions or the DB itself. diff --git a/src/nodes.rs b/src/nodes.rs index c5edead5..19e00c7a 100644 --- a/src/nodes.rs +++ b/src/nodes.rs @@ -8,6 +8,16 @@ use crate::wasm_udf::data_types::CreateFunctionDetails; use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode}; use strum_macros::AsRefStr; +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub struct ConvertTable { + /// Location from which to convert + pub location: String, + /// Name of the table to convert to + pub name: String, + /// Dummy result schema for the plan (empty) + pub output_schema: DFSchemaRef, +} + #[derive(Debug, Clone, Hash, PartialEq, Eq)] pub struct CreateTable { /// The table schema @@ -69,6 +79,7 @@ pub struct Vacuum { #[derive(AsRefStr, Debug, Clone, Hash, PartialEq, Eq)] pub enum SeafowlExtensionNode { + ConvertTable(ConvertTable), CreateTable(CreateTable), CreateFunction(CreateFunction), DropFunction(DropFunction), @@ -102,6 +113,9 @@ impl UserDefinedLogicalNode for SeafowlExtensionNode { // (& means it has to have been borrowed and we can't own anything, since this // function will exit soon) match self { + SeafowlExtensionNode::ConvertTable(ConvertTable { + output_schema, .. + }) => output_schema, SeafowlExtensionNode::CreateTable(CreateTable { output_schema, .. }) => { output_schema } @@ -131,6 +145,11 @@ impl UserDefinedLogicalNode for SeafowlExtensionNode { fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { + SeafowlExtensionNode::ConvertTable(ConvertTable { + location, name, .. + }) => { + write!(f, "Convert: {location} to {name}") + } SeafowlExtensionNode::CreateTable(CreateTable { name, .. }) => { write!(f, "Create: {name}") } diff --git a/src/repository/default.rs b/src/repository/default.rs index d01e931d..794b0f6a 100644 --- a/src/repository/default.rs +++ b/src/repository/default.rs @@ -239,10 +239,16 @@ impl Repository for $repo { // Create columns // TODO this breaks if we have more than (bind limit) columns - if !schema.arrow_schema.fields().is_empty() { + if !schema.fields().is_empty() { let mut builder: QueryBuilder<_> = QueryBuilder::new("INSERT INTO table_column(table_version_id, name, type) "); - builder.push_values(schema.to_column_names_types(), |mut b, col| { + + let fields: Vec<(String, String)> = schema.fields() + .iter() + .map(|f| (f.name().clone(), field_to_json(f).to_string())) + .collect(); + + builder.push_values(fields, |mut b, col| { b.push_bind(new_version_id) .push_bind(col.0) .push_bind(col.1); diff --git a/src/repository/interface.rs b/src/repository/interface.rs index 33f9d3fc..ce846402 100644 --- a/src/repository/interface.rs +++ b/src/repository/interface.rs @@ -1,18 +1,16 @@ use std::fmt::Debug; use std::str::FromStr; +use arrow_schema::Schema; use async_trait::async_trait; use strum::ParseError; use strum_macros::{Display, EnumString}; use uuid::Uuid; -use crate::wasm_udf::data_types::CreateFunctionDetails; -use crate::{ - data_types::{ - CollectionId, DatabaseId, FunctionId, TableId, TableVersionId, Timestamp, - }, - schema::Schema, +use crate::data_types::{ + CollectionId, DatabaseId, FunctionId, TableId, TableVersionId, Timestamp, }; +use crate::wasm_udf::data_types::CreateFunctionDetails; #[derive(sqlx::FromRow, Default, Debug, PartialEq, Eq)] pub struct AllDatabaseColumnsResult { @@ -232,14 +230,11 @@ pub mod tests { .create_collection(database_id, DEFAULT_SCHEMA) .await .expect("Error creating default schema"); - let empty_schema = Schema { - arrow_schema: Arc::new(ArrowSchema::empty()), - }; repository .create_table( default_schema_id, "empty_table", - &empty_schema, + &ArrowSchema::empty(), Uuid::default(), ) .await @@ -254,12 +249,9 @@ pub mod tests { ArrowField::new("date", ArrowDataType::Date64, false), ArrowField::new("value", ArrowDataType::Float64, false), ]); - let schema = Schema { - arrow_schema: Arc::new(arrow_schema), - }; let (table_id, table_version_id) = repository - .create_table(collection_id, "testtable", &schema, Uuid::default()) + .create_table(collection_id, "testtable", &arrow_schema, Uuid::default()) .await .expect("Error creating table"); @@ -564,10 +556,6 @@ pub mod tests { )); // Make a new table in the existing collection with the same name - let schema = Schema { - arrow_schema: Arc::new(ArrowSchema::empty()), - }; - let collection_id_1 = repository .get_collection_id_by_name("testdb", "testcol") .await @@ -579,7 +567,12 @@ pub mod tests { assert!(matches!( repository - .create_table(collection_id_2, "testtable2", &schema, Uuid::default()) + .create_table( + collection_id_2, + "testtable2", + &ArrowSchema::empty(), + Uuid::default() + ) .await .unwrap_err(), Error::UniqueConstraintViolation(_) @@ -587,7 +580,12 @@ pub mod tests { // Make a new table in the previous collection, try renaming let (new_table_id, _) = repository - .create_table(collection_id_1, "testtable2", &schema, Uuid::default()) + .create_table( + collection_id_1, + "testtable2", + &ArrowSchema::empty(), + Uuid::default(), + ) .await .unwrap(); diff --git a/src/repository/postgres.rs b/src/repository/postgres.rs index 1f9adbe0..3dede0d0 100644 --- a/src/repository/postgres.rs +++ b/src/repository/postgres.rs @@ -1,5 +1,7 @@ use std::{fmt::Debug, time::Duration}; +use arrow_integration_test::field_to_json; +use arrow_schema::Schema; use async_trait::async_trait; use futures::TryStreamExt; use sqlx::{ @@ -12,7 +14,6 @@ use uuid::Uuid; use crate::{ data_types::{CollectionId, DatabaseId, FunctionId, TableId, TableVersionId}, implement_repository, - schema::Schema, wasm_udf::data_types::CreateFunctionDetails, }; diff --git a/src/repository/sqlite.rs b/src/repository/sqlite.rs index a2738e90..8645d223 100644 --- a/src/repository/sqlite.rs +++ b/src/repository/sqlite.rs @@ -1,5 +1,7 @@ use std::{fmt::Debug, str::FromStr}; +use arrow_integration_test::field_to_json; +use arrow_schema::Schema; use async_trait::async_trait; use futures::TryStreamExt; use sqlx::sqlite::SqliteJournalMode; @@ -12,7 +14,6 @@ use uuid::Uuid; use crate::{ data_types::{CollectionId, DatabaseId, FunctionId, TableId, TableVersionId}, - schema::Schema, wasm_udf::data_types::CreateFunctionDetails, };