Skip to content

Commit

Permalink
drop schema refactored
Browse files Browse the repository at this point in the history
  • Loading branch information
jaylmiller committed Apr 29, 2023
1 parent fd785b2 commit be19d7b
Show file tree
Hide file tree
Showing 10 changed files with 269 additions and 13 deletions.
2 changes: 2 additions & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod parsers;
#[cfg(feature = "pyarrow")]
mod pyarrow;
pub mod scalar;
mod schema_reference;
pub mod stats;
mod table_reference;
pub mod test_util;
Expand All @@ -39,6 +40,7 @@ pub use error::{
SharedResult,
};
pub use scalar::{ScalarType, ScalarValue};
pub use schema_reference::{OwnedSchemaReference, SchemaReference};
pub use stats::{ColumnStatistics, Statistics};
pub use table_reference::{OwnedTableReference, ResolvedTableReference, TableReference};

Expand Down
64 changes: 64 additions & 0 deletions datafusion/common/src/schema_reference.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::borrow::Cow;

#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum SchemaReference<'a> {
Bare {
schema: Cow<'a, str>,
},
Full {
schema: Cow<'a, str>,
catalog: Cow<'a, str>,
},
}

impl SchemaReference<'_> {
/// Get only the schema name that this references.
pub fn schema_name(&self) -> &str {
match self {
SchemaReference::Bare { schema } => schema,
SchemaReference::Full { schema, catalog: _ } => schema,
}
}
}

pub type OwnedSchemaReference = SchemaReference<'static>;

impl std::fmt::Display for SchemaReference<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Bare { schema } => write!(f, "{schema}"),
Self::Full { schema, catalog } => write!(f, "{catalog}.{schema}"),
}
}
}

impl<'a> From<&'a OwnedSchemaReference> for SchemaReference<'a> {
fn from(value: &'a OwnedSchemaReference) -> Self {
match value {
SchemaReference::Bare { schema } => SchemaReference::Bare {
schema: Cow::Borrowed(schema),
},
SchemaReference::Full { schema, catalog } => SchemaReference::Full {
schema: Cow::Borrowed(schema),
catalog: Cow::Borrowed(catalog),
},
}
}
}
88 changes: 86 additions & 2 deletions datafusion/core/src/catalog/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,26 @@ pub trait CatalogProvider: Sync + Send {
"Registering new schemas is not supported".to_string(),
))
}

/// Removes a schema from this catalog. Implementations of this method should return
/// errors if the schema exists but cannot be dropped. For example, in DataFusion's
/// default in-memory catalog, [`MemoryCatalogProvider`], a non-empty schema
/// will only be successfully dropped when `cascade` is true.
/// This is equivalent to how DROP SCHEMA works in PostgreSQL.
///
/// Implementations of this method should return None if schema with `name` does
/// cannot be found.
///
/// By default returns a "Not Implemented" error
fn deregister_schema(
&self,
_name: &str,
_cascade: bool,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
Err(DataFusionError::NotImplemented(
"Deregistering new schemas is not supported".to_string(),
))
}
}

/// Simple in-memory implementation of a catalog.
Expand Down Expand Up @@ -160,13 +180,38 @@ impl CatalogProvider for MemoryCatalogProvider {
) -> Result<Option<Arc<dyn SchemaProvider>>> {
Ok(self.schemas.insert(name.into(), schema))
}

fn deregister_schema(
&self,
name: &str,
cascade: bool,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
if let Some(schema) = self.schema(name) {
let table_names = schema.table_names();
match (table_names.is_empty(), cascade) {
(true, _) | (false, true) => {
let (_, removed) = self.schemas.remove(name).unwrap();
Ok(Some(removed))
}
(false, false) => Err(DataFusionError::Execution(format!(
"Cannot drop schema {} because other tables depend on it: {}",
name,
itertools::join(table_names.iter(), ", ")
))),
}
} else {
Ok(None)
}
}
}

#[cfg(test)]
mod tests {
use crate::catalog::schema::MemorySchemaProvider;

use super::*;
use crate::catalog::schema::MemorySchemaProvider;
use crate::datasource::empty::EmptyTable;
use crate::datasource::TableProvider;
use arrow::datatypes::Schema;

#[test]
fn default_register_schema_not_supported() {
Expand Down Expand Up @@ -194,4 +239,43 @@ mod tests {
Err(e) => assert_eq!(e.to_string(), "This feature is not implemented: Registering new schemas is not supported"),
};
}

#[test]
fn memory_catalog_dereg_nonempty_schema() {
let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;

let schema = Arc::new(MemorySchemaProvider::new()) as Arc<dyn SchemaProvider>;
let test_table = Arc::new(EmptyTable::new(Arc::new(Schema::empty())))
as Arc<dyn TableProvider>;
schema.register_table("t".into(), test_table).unwrap();

cat.register_schema("foo", schema.clone()).unwrap();

assert!(
cat.deregister_schema("foo", false).is_err(),
"dropping empty schema without cascade should error"
);
let removed = cat.deregister_schema("foo", true).unwrap().unwrap();
assert!(
Arc::ptr_eq(&removed, &schema),
"unexpected provider returned by deregister_schema"
);
}

#[test]
fn memory_catalog_dereg_empty_schema() {
let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;

let schema = Arc::new(MemorySchemaProvider::new()) as Arc<dyn SchemaProvider>;
cat.register_schema("foo", schema.clone()).unwrap();

let removed = cat.deregister_schema("foo", false).unwrap().unwrap();
assert!(Arc::ptr_eq(&removed, &schema));
}

#[test]
fn memory_catalog_dereg_missing() {
let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;
assert!(cat.deregister_schema("foo", false).unwrap().is_none());
}
}
47 changes: 44 additions & 3 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ use crate::datasource::{
use crate::error::{DataFusionError, Result};
use crate::logical_expr::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
CreateView, DropTable, DropView, Explain, LogicalPlan, LogicalPlanBuilder,
SetVariable, TableSource, TableType, UNNAMED_TABLE,
CreateView, DropCatalogSchema, DropTable, DropView, Explain, LogicalPlan,
LogicalPlanBuilder, SetVariable, TableSource, TableType, UNNAMED_TABLE,
};
use crate::optimizer::OptimizerRule;
use datafusion_sql::{planner::ParserOptions, ResolvedTableReference, TableReference};
Expand All @@ -86,7 +86,7 @@ use crate::physical_plan::PhysicalPlanner;
use crate::variable::{VarProvider, VarType};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion_common::OwnedTableReference;
use datafusion_common::{OwnedTableReference, SchemaReference};
use datafusion_sql::{
parser::DFParser,
planner::{ContextProvider, SqlToRel},
Expand Down Expand Up @@ -383,6 +383,7 @@ impl SessionContext {
DdlStatement::CreateCatalog(cmd) => self.create_catalog(cmd).await,
DdlStatement::DropTable(cmd) => self.drop_table(cmd).await,
DdlStatement::DropView(cmd) => self.drop_view(cmd).await,
DdlStatement::DropCatalogSchema(cmd) => self.drop_schema(cmd).await,
},
// TODO what about the other statements (like TransactionStart and TransactionEnd)
LogicalPlan::Statement(Statement::SetVariable(stmt)) => {
Expand Down Expand Up @@ -653,6 +654,46 @@ impl SessionContext {
}
}

async fn drop_schema(&self, cmd: DropCatalogSchema) -> Result<DataFrame> {
let DropCatalogSchema {
name,
if_exists: allow_missing,
cascade,
schema: _,
} = cmd;
let catalog = {
let state = self.state.read();
let catalog_name = match &name {
SchemaReference::Full { catalog, .. } => catalog.to_string(),
SchemaReference::Bare { .. } => {
state.config_options().catalog.default_catalog.to_string()
}
};
if let Some(catalog) = state.catalog_list.catalog(&catalog_name) {
catalog
} else if allow_missing {
return self.return_empty_dataframe();
} else {
return self.schema_doesnt_exist_err(name);
}
};
let dereg = catalog.deregister_schema(name.schema_name(), cascade)?;
match (dereg, allow_missing) {
(None, true) => self.return_empty_dataframe(),
(None, false) => self.schema_doesnt_exist_err(name),
(Some(_), _) => self.return_empty_dataframe(),
}
}

fn schema_doesnt_exist_err(
&self,
schemaref: SchemaReference<'_>,
) -> Result<DataFrame> {
Err(DataFusionError::Execution(format!(
"Schema '{schemaref}' doesn't exist."
)))
}

async fn set_variable(&self, stmt: SetVariable) -> Result<DataFrame> {
let SetVariable {
variable, value, ..
Expand Down
22 changes: 22 additions & 0 deletions datafusion/core/tests/sqllogictests/test_files/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -643,3 +643,25 @@ select * from t;

statement ok
drop table t;

##########
# Dropping schemas
##########

statement error DataFusion error: Execution error: Cannot drop schema foo_schema because other tables depend on it: bar
DROP SCHEMA foo_schema;

statement ok
DROP SCHEMA foo_schema CASCADE;

statement error DataFusion error: Execution error: Schema 'doesnt_exist' doesn't exist.
DROP SCHEMA doesnt_exist;

statement ok
DROP SCHEMA IF EXISTS doesnt_exist;

statement ok
CREATE SCHEMA empty_schema;

statement ok
DROP SCHEMA empty_schema;
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ SHOW CREATE TABLE test.xyz
----
datafusion test xyz CREATE VIEW test.xyz AS SELECT * FROM abc

statement error DataFusion error: This feature is not implemented: Only `DROP TABLE/VIEW
statement error DataFusion error: Execution error: Cannot drop schema test because other tables depend on it: xyz
DROP SCHEMA test;

statement ok
Expand Down
28 changes: 27 additions & 1 deletion datafusion/expr/src/logical_plan/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use datafusion_common::Column;
use datafusion_common::{
parsers::CompressionTypeVariant, DFSchemaRef, OwnedTableReference,
};
use datafusion_common::{Column, OwnedSchemaReference};
use std::collections::HashMap;
use std::sync::Arc;
use std::{
Expand All @@ -45,6 +45,8 @@ pub enum DdlStatement {
DropTable(DropTable),
/// Drops a view.
DropView(DropView),
/// Drops a catalog schema
DropCatalogSchema(DropCatalogSchema),
}

impl DdlStatement {
Expand All @@ -62,6 +64,7 @@ impl DdlStatement {
DdlStatement::CreateCatalog(CreateCatalog { schema, .. }) => schema,
DdlStatement::DropTable(DropTable { schema, .. }) => schema,
DdlStatement::DropView(DropView { schema, .. }) => schema,
DdlStatement::DropCatalogSchema(DropCatalogSchema { schema, .. }) => schema,
}
}

Expand All @@ -76,6 +79,7 @@ impl DdlStatement {
DdlStatement::CreateCatalog(_) => "CreateCatalog",
DdlStatement::DropTable(_) => "DropTable",
DdlStatement::DropView(_) => "DropView",
DdlStatement::DropCatalogSchema(_) => "DropCatalogSchema",
}
}

Expand All @@ -91,6 +95,7 @@ impl DdlStatement {
DdlStatement::CreateView(CreateView { input, .. }) => vec![input],
DdlStatement::DropTable(_) => vec![],
DdlStatement::DropView(_) => vec![],
DdlStatement::DropCatalogSchema(_) => vec![],
}
}

Expand Down Expand Up @@ -147,6 +152,14 @@ impl DdlStatement {
}) => {
write!(f, "DropView: {name:?} if not exist:={if_exists}")
}
DdlStatement::DropCatalogSchema(DropCatalogSchema {
name,
if_exists,
cascade,
..
}) => {
write!(f, "DropCatalogSchema: {name:?} if not exist:={if_exists} cascade:={cascade}")
}
}
}
}
Expand Down Expand Up @@ -273,3 +286,16 @@ pub struct DropView {
/// Dummy schema
pub schema: DFSchemaRef,
}

/// Drops a schema
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct DropCatalogSchema {
/// The schema name
pub name: OwnedSchemaReference,
/// If the schema exists
pub if_exists: bool,
/// Whether drop should cascade
pub cascade: bool,
/// Dummy schema
pub schema: DFSchemaRef,
}
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub use builder::{
};
pub use ddl::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
CreateView, DdlStatement, DropTable, DropView,
CreateView, DdlStatement, DropCatalogSchema, DropTable, DropView,
};
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1372,6 +1372,9 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlan::Ddl(DdlStatement::DropView(_)) => Err(proto_error(
"LogicalPlan serde is not yet implemented for DropView",
)),
LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
"LogicalPlan serde is not yet implemented for DropCatalogSchema",
)),
LogicalPlan::Statement(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for Statement",
)),
Expand Down
Loading

0 comments on commit be19d7b

Please sign in to comment.