Skip to content

Commit

Permalink
Convert-to-delta initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Nov 29, 2023
1 parent 40b1158 commit 85b632e
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 87 deletions.
2 changes: 1 addition & 1 deletion src/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::str::FromStr;
use std::{collections::HashMap, fmt::Debug, sync::Arc};

use arrow_schema::Schema;
use async_trait::async_trait;
use datafusion::catalog::schema::MemorySchemaProvider;
use datafusion::datasource::TableProvider;
Expand All @@ -27,7 +28,6 @@ use crate::{
AllDatabaseColumnsResult, AllDatabaseFunctionsResult, Error as RepositoryError,
Repository, TableVersionsResult,
},
schema::Schema,
};

pub const DEFAULT_DB: &str = "default";
Expand Down
65 changes: 43 additions & 22 deletions src/context/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,24 @@ use crate::context::SeafowlContext;
#[cfg(test)]
use crate::frontend::http::tests::deterministic_uuid;
use crate::object_store::wrapped::InternalObjectStore;
use crate::schema::Schema as SeafowlSchema;

use bytes::BytesMut;
use datafusion::error::{DataFusionError as Error, Result};
use datafusion::execution::context::SessionState;
use datafusion::parquet::basic::{Compression, ZstdLevel};
use datafusion::{
arrow::datatypes::{Schema, SchemaRef},
datasource::TableProvider,
error::DataFusionError,
execution::context::TaskContext,
parquet::{arrow::ArrowWriter, file::properties::WriterProperties},
physical_plan::ExecutionPlan,
sql::TableReference,
};
use deltalake::kernel::{Action, Add, Schema as DeltaSchema};
use deltalake::operations::create::CreateBuilder;
use deltalake::operations::transaction::commit;
use deltalake::operations::{
convert_to_delta::ConvertToDeltaBuilder, create::CreateBuilder, transaction::commit,
};
use deltalake::protocol::{DeltaOperation, SaveMode};
use deltalake::writer::create_add;
use deltalake::DeltaTable;
Expand Down Expand Up @@ -272,20 +273,22 @@ pub async fn plan_to_object_store(
.collect()
}

pub(super) enum CreateDeltaTableDetails {
WithSchema(Schema),
FromFiles(Path),
}

impl SeafowlContext {
pub(super) async fn create_delta_table<'a>(
&self,
name: impl Into<TableReference<'a>>,
schema: &Schema,
details: CreateDeltaTableDetails,
) -> Result<Arc<DeltaTable>> {
let table_ref: TableReference = name.into();
let resolved_ref = table_ref.resolve(&self.database, DEFAULT_SCHEMA);
let schema_name = resolved_ref.schema.clone();
let table_name = resolved_ref.table.clone();

let sf_schema = SeafowlSchema {
arrow_schema: Arc::new(schema.clone()),
};
let collection_id = self
.table_catalog
.get_collection_id_by_name(&self.database, &schema_name)
Expand All @@ -294,8 +297,6 @@ impl SeafowlContext {
Error::Plan(format!("Schema {schema_name:?} does not exist!"))
})?;

let delta_schema = DeltaSchema::try_from(schema)?;

// TODO: we could be doing this inside the DB itself (i.e. `... DEFAULT gen_random_uuid()`
// in Postgres and `... DEFAULT (uuid())` in SQLite) however we won't be able to do it until
// sqlx 0.7 is released (which has libsqlite3-sys > 0.25, with the SQLite version that has
Expand All @@ -307,25 +308,45 @@ impl SeafowlContext {
let table_log_store = self.internal_object_store.get_log_store(table_uuid);

// NB: there's also a uuid generated below for table's `DeltaTableMetaData::id`, so it would
// be nice if those two could match
let table = Arc::new(
CreateBuilder::new()
.with_log_store(table_log_store)
.with_table_name(&*table_name)
.with_columns(delta_schema.fields().clone())
.with_comment(format!(
"Created by Seafowl version {}",
env!("CARGO_PKG_VERSION")
))
.await?,
);
// be nice if those two could match somehow
let table = Arc::new(match details {
CreateDeltaTableDetails::WithSchema(schema) => {
let delta_schema = DeltaSchema::try_from(&schema)?;

CreateBuilder::new()
.with_log_store(table_log_store)
.with_table_name(&*table_name)
.with_columns(delta_schema.fields().clone())
.with_comment(format!(
"Created by Seafowl {}",
env!("CARGO_PKG_VERSION")
))
.await?
}
CreateDeltaTableDetails::FromFiles(_path) => {
// Now convert them to a Delta table
ConvertToDeltaBuilder::new()
.with_log_store(table_log_store)
.with_table_name(&*table_name)
.with_comment(format!(
"Converted by Seafowl {}",
env!("CARGO_PKG_VERSION")
))
.await?
}
});

// We still persist the table into our own catalog, one reason is us being able to load all
// tables and their schemas in bulk to satisfy information_schema queries.
// Another is to keep track of table uuid's, which are used to construct the table uri.
// We may look into doing this via delta-rs somehow eventually.
self.table_catalog
.create_table(collection_id, &table_name, &sf_schema, table_uuid)
.create_table(
collection_id,
&table_name,
TableProvider::schema(table.as_ref()).as_ref(),
table_uuid,
)
.await?;

self.inner.register_table(resolved_ref, table.clone())?;
Expand Down
31 changes: 25 additions & 6 deletions src/context/logical.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::catalog::DEFAULT_SCHEMA;
use crate::context::SeafowlContext;
use crate::datafusion::parser::{DFParser, Statement as DFStatement};
use crate::datafusion::parser::{DFParser, Statement as DFStatement, CONVERT_TO_DELTA};
use crate::datafusion::utils::build_schema;
use crate::wasm_udf::data_types::CreateFunctionDetails;
use crate::{
nodes::{
CreateFunction, CreateTable, DropFunction, DropSchema, RenameTable,
ConvertTable, CreateFunction, CreateTable, DropFunction, DropSchema, RenameTable,
SeafowlExtensionNode, Vacuum,
},
version::TableVersionProcessor,
Expand Down Expand Up @@ -267,20 +267,39 @@ impl SeafowlContext {
"Unsupported SQL statement: {s:?}"
))),
},
DFStatement::CopyTo(CopyToStatement { ref mut source, .. }) => {
DFStatement::CopyTo(CopyToStatement {
ref mut source,
options,
..
}) if !options.contains(&CONVERT_TO_DELTA) => {
let state = if let CopyToSource::Query(ref mut query) = source {
self.rewrite_time_travel_query(query).await?
} else {
self.inner.state()
};
state.statement_to_plan(stmt).await
}
DFStatement::CopyTo(CopyToStatement {
source: CopyToSource::Relation(table_name),
target,
options,
}) if options.contains(&CONVERT_TO_DELTA) => {
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(SeafowlExtensionNode::ConvertTable(ConvertTable {
location: target.clone(),
name: table_name.to_string(),
output_schema: Arc::new(DFSchema::empty()),
})),
}))
}
DFStatement::DescribeTableStmt(_) | DFStatement::CreateExternalTable(_) => {
self.inner.state().statement_to_plan(stmt).await
}
DFStatement::Explain(_) => Err(Error::NotImplemented(format!(
"Unsupported SQL statement: {statement:?}"
))),
DFStatement::CopyTo(_) | DFStatement::Explain(_) => {
Err(Error::NotImplemented(format!(
"Unsupported SQL statement: {statement:?}"
)))
}
}
}

Expand Down
39 changes: 33 additions & 6 deletions src/context/physical.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::delta::CreateDeltaTableDetails;
use crate::catalog::{DEFAULT_SCHEMA, STAGING_SCHEMA};
use crate::config::context::build_object_store;
use crate::config::schema;
Expand All @@ -6,7 +7,7 @@ use crate::context::delta::plan_to_object_store;
use crate::context::SeafowlContext;
use crate::delta_rs::backports::parquet_scan_from_actions;
use crate::nodes::{
CreateFunction, CreateTable, DropFunction, DropSchema, RenameTable,
ConvertTable, CreateFunction, CreateTable, DropFunction, DropSchema, RenameTable,
SeafowlExtensionNode, Vacuum,
};
use crate::object_store::http::try_prepare_http_url;
Expand Down Expand Up @@ -49,6 +50,7 @@ use deltalake::operations::vacuum::VacuumBuilder;
use deltalake::protocol::{DeltaOperation, SaveMode};
use deltalake::DeltaTable;
use log::info;
use object_store::path::Path;
use std::borrow::Cow;
use std::ops::Deref;
use std::ops::Not;
Expand Down Expand Up @@ -226,8 +228,11 @@ impl SeafowlContext {
// TODO: this means we'll have 2 table versions at the end, 1st from the create
// and 2nd from the insert, while it seems more reasonable that in this case we have
// only one
self.create_delta_table(name, plan.schema().as_ref())
.await?;
self.create_delta_table(
name,
CreateDeltaTableDetails::WithSchema(plan.schema().as_ref().clone()),
)
.await?;
self.plan_to_delta_table(name, &plan).await?;

Ok(make_dummy_exec())
Expand Down Expand Up @@ -544,12 +549,31 @@ impl SeafowlContext {
// Other custom nodes we made like CREATE TABLE/INSERT/ALTER
match SeafowlExtensionNode::from_dynamic(node) {
Some(sfe_node) => match sfe_node {
SeafowlExtensionNode::ConvertTable(ConvertTable {
location,
name,
..
}) => {
self.create_delta_table(
name,
CreateDeltaTableDetails::FromFiles(Path::from(
location.as_str(),
)),
)
.await?;

Ok(make_dummy_exec())
}
SeafowlExtensionNode::CreateTable(CreateTable {
schema,
name,
..
}) => {
self.create_delta_table(name.as_str(), schema).await?;
self.create_delta_table(
name.as_str(),
CreateDeltaTableDetails::WithSchema(schema.clone()),
)
.await?;

Ok(make_dummy_exec())
}
Expand Down Expand Up @@ -870,8 +894,11 @@ impl SeafowlContext {
};

if !table_exists {
self.create_delta_table(table_ref.clone(), plan.schema().as_ref())
.await?;
self.create_delta_table(
table_ref.clone(),
CreateDeltaTableDetails::WithSchema(plan.schema().as_ref().clone()),
)
.await?;
}

self.plan_to_delta_table(table_ref, &plan).await
Expand Down
60 changes: 32 additions & 28 deletions src/datafusion/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use datafusion::sql::parser::{
CopyToSource, CopyToStatement, CreateExternalTable, DescribeTableStmt,
};
use datafusion_common::parsers::CompressionTypeVariant;
use lazy_static::lazy_static;
use sqlparser::ast::{CreateFunctionBody, Expr, ObjectName, OrderByExpr, Value};
use sqlparser::tokenizer::{TokenWithLocation, Word};
use sqlparser::{
Expand All @@ -41,7 +42,6 @@ use sqlparser::{
use std::collections::{HashMap, VecDeque};
use std::str::FromStr;
use std::string::ToString;
use strum_macros::Display;

// Use `Parser::expected` instead, if possible
macro_rules! parser_err {
Expand All @@ -63,10 +63,12 @@ pub struct DFParser<'a> {
parser: Parser<'a>,
}

#[derive(Debug, Clone, Display)]
#[strum(serialize_all = "UPPERCASE")]
enum KeywordExtensions {
Vacuum,
// Hacky way to distinguish `COPY TO` statement from `CONVERT TO DELTA`.
// We should really introduce our own Statement enum which encapsulates
// the DataFusion one and adds our custom variants (this one and `VACUUM`).
lazy_static! {
pub static ref CONVERT_TO_DELTA: (String, Value) =
("CONVERT_TO_DELTA".to_string(), Value::Boolean(true));
}

impl<'a> DFParser<'a> {
Expand Down Expand Up @@ -136,39 +138,25 @@ impl<'a> DFParser<'a> {
pub fn parse_statement(&mut self) -> Result<Statement, ParserError> {
match self.parser.peek_token().token {
Token::Word(w) => {
match w {
Word {
keyword: Keyword::CREATE,
..
} => {
// move one token forward
match w.keyword {
Keyword::CREATE => {
self.parser.next_token();
// use custom parsing
self.parse_create()
}
Word {
keyword: Keyword::COPY,
..
} => {
Keyword::CONVERT => {
self.parser.next_token();
self.parse_convert()
}
Keyword::COPY => {
self.parser.next_token();
self.parse_copy()
}
Word {
keyword: Keyword::DESCRIBE,
..
} => {
// move one token forward
Keyword::DESCRIBE => {
self.parser.next_token();
// use custom parsing
self.parse_describe()
}
Word { value, .. }
if value.to_uppercase()
== KeywordExtensions::Vacuum.to_string() =>
{
// move one token forward
Keyword::VACUUM => {
self.parser.next_token();
// use custom parsing
self.parse_vacuum()
}
_ => {
Expand All @@ -195,6 +183,22 @@ impl<'a> DFParser<'a> {
}))
}

// Parse `CONVERT location TO DELTA table_name` type statement
pub fn parse_convert(&mut self) -> Result<Statement, ParserError> {
let location = self.parser.parse_literal_string()?;
self.parser
.expect_keywords(&[Keyword::TO, Keyword::DELTA])?;
let table_name = self.parser.parse_object_name()?;

// We'll use the CopyToStatement struct to pass the location and table name
// as it's the closest match to what we need.
Ok(Statement::CopyTo(CopyToStatement {
source: CopyToSource::Relation(table_name),
target: location,
options: vec![CONVERT_TO_DELTA.clone()],
}))
}

pub fn parse_vacuum(&mut self) -> Result<Statement, ParserError> {
// Since `VACUUM` is not a supported keyword by sqlparser, we abuse the semantically related
// TRUNCATE to smuggle the info on whether we want GC of tables, partitions or the DB itself.
Expand Down
Loading

0 comments on commit 85b632e

Please sign in to comment.