Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[task #8917] Implement information_schema.schemata #8993

Merged
merged 1 commit into from
Jan 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 132 additions & 2 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ pub(crate) const TABLES: &str = "tables";
pub(crate) const VIEWS: &str = "views";
pub(crate) const COLUMNS: &str = "columns";
pub(crate) const DF_SETTINGS: &str = "df_settings";
pub(crate) const SCHEMATA: &str = "schemata";

/// All information schema tables
pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[TABLES, VIEWS, COLUMNS, DF_SETTINGS];
pub const INFORMATION_SCHEMA_TABLES: &[&str] =
&[TABLES, VIEWS, COLUMNS, DF_SETTINGS, SCHEMATA];

/// Implements the `information_schema` virtual schema and tables
///
Expand Down Expand Up @@ -115,6 +117,27 @@ impl InformationSchemaConfig {
DF_SETTINGS,
TableType::View,
);
builder.add_table(
&catalog_name,
INFORMATION_SCHEMA,
SCHEMATA,
TableType::View,
);
}
}

async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) {
for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();

for schema_name in catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
if let Some(schema) = catalog.schema(&schema_name) {
let schema_owner = schema.owner_name();
builder.add_schemata(&catalog_name, &schema_name, schema_owner);
}
}
}
}
}

Expand Down Expand Up @@ -196,6 +219,7 @@ impl SchemaProvider for InformationSchemaProvider {
VIEWS.to_string(),
COLUMNS.to_string(),
DF_SETTINGS.to_string(),
SCHEMATA.to_string(),
]
}

Expand All @@ -209,6 +233,8 @@ impl SchemaProvider for InformationSchemaProvider {
Arc::new(InformationSchemaViews::new(config))
} else if name.eq_ignore_ascii_case("df_settings") {
Arc::new(InformationSchemaDfSettings::new(config))
} else if name.eq_ignore_ascii_case("schemata") {
Arc::new(InformationSchemata::new(config))
} else {
return None;
};
Expand All @@ -219,7 +245,10 @@ impl SchemaProvider for InformationSchemaProvider {
}

fn table_exist(&self, name: &str) -> bool {
matches!(name.to_ascii_lowercase().as_str(), TABLES | VIEWS | COLUMNS)
matches!(
name.to_ascii_lowercase().as_str(),
TABLES | VIEWS | COLUMNS | SCHEMATA
)
}
}

Expand Down Expand Up @@ -617,6 +646,107 @@ impl InformationSchemaColumnsBuilder {
}
}

struct InformationSchemata {
schema: SchemaRef,
config: InformationSchemaConfig,
}

impl InformationSchemata {
fn new(config: InformationSchemaConfig) -> Self {
let schema = Arc::new(Schema::new(vec![
Field::new("catalog_name", DataType::Utf8, false),
Field::new("schema_name", DataType::Utf8, false),
Field::new("schema_owner", DataType::Utf8, true),
Field::new("default_character_set_catalog", DataType::Utf8, true),
Field::new("default_character_set_schema", DataType::Utf8, true),
Field::new("default_character_set_name", DataType::Utf8, true),
Field::new("sql_path", DataType::Utf8, true),
]));
Self { schema, config }
}

fn builder(&self) -> InformationSchemataBuilder {
InformationSchemataBuilder {
schema: self.schema.clone(),
catalog_name: StringBuilder::new(),
schema_name: StringBuilder::new(),
schema_owner: StringBuilder::new(),
default_character_set_catalog: StringBuilder::new(),
default_character_set_schema: StringBuilder::new(),
default_character_set_name: StringBuilder::new(),
sql_path: StringBuilder::new(),
}
}
}

struct InformationSchemataBuilder {
schema: SchemaRef,
catalog_name: StringBuilder,
schema_name: StringBuilder,
schema_owner: StringBuilder,
default_character_set_catalog: StringBuilder,
default_character_set_schema: StringBuilder,
default_character_set_name: StringBuilder,
sql_path: StringBuilder,
}

impl InformationSchemataBuilder {
fn add_schemata(
&mut self,
catalog_name: &str,
schema_name: &str,
schema_owner: Option<&str>,
) {
self.catalog_name.append_value(catalog_name);
self.schema_name.append_value(schema_name);
match schema_owner {
Some(owner) => self.schema_owner.append_value(owner),
None => self.schema_owner.append_null(),
}
// refer to https://www.postgresql.org/docs/current/infoschema-schemata.html,
// these rows apply to a feature that is not implemented in DataFusion
self.default_character_set_catalog.append_null();
self.default_character_set_schema.append_null();
self.default_character_set_name.append_null();
self.sql_path.append_null();
}

fn finish(&mut self) -> RecordBatch {
RecordBatch::try_new(
self.schema.clone(),
vec![
Arc::new(self.catalog_name.finish()),
Arc::new(self.schema_name.finish()),
Arc::new(self.schema_owner.finish()),
Arc::new(self.default_character_set_catalog.finish()),
Arc::new(self.default_character_set_schema.finish()),
Arc::new(self.default_character_set_name.finish()),
Arc::new(self.sql_path.finish()),
],
)
.unwrap()
}
}

impl PartitionStream for InformationSchemata {
fn schema(&self) -> &SchemaRef {
&self.schema
}

fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let mut builder = self.builder();
let config = self.config.clone();
Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
// TODO: Stream this
futures::stream::once(async move {
config.make_schemata(&mut builder).await;
Ok(builder.finish())
}),
))
}
}

struct InformationSchemaDfSettings {
schema: SchemaRef,
config: InformationSchemaConfig,
Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ use crate::error::{DataFusionError, Result};
/// [`CatalogProvider`]: super::CatalogProvider
#[async_trait]
pub trait SchemaProvider: Sync + Send {
/// Returns the owner of the Schema, default is None. This value is reported
/// as part of `information_tables.schemata
fn owner_name(&self) -> Option<&str> {
None
}

/// Returns this `SchemaProvider` as [`Any`] so that it can be downcast to a
/// specific implementation.
fn as_any(&self) -> &dyn Any;
Expand Down
17 changes: 17 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
statement error DataFusion error: Error during planning: table 'datafusion.information_schema.tables' not found
SELECT * from information_schema.tables

# Verify the information schema does not exit by default
statement error DataFusion error: Error during planning: table 'datafusion.information_schema.schemata' not found
SELECT * from information_schema.schemata

statement error DataFusion error: Error during planning: SHOW \[VARIABLE\] is not supported unless information_schema is enabled
show all

Expand All @@ -35,9 +39,16 @@ SELECT * from information_schema.tables;
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
datafusion information_schema views VIEW

# Verify the information schema now does exist and is empty
query TTTTTTT rowsort
SELECT * from information_schema.schemata;
----
datafusion public NULL NULL NULL NULL NULL

# Disable information_schema and verify it now errors again
statement ok
set datafusion.catalog.information_schema = false
Expand Down Expand Up @@ -66,6 +77,7 @@ SELECT * from information_schema.tables;
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
datafusion information_schema views VIEW
datafusion public t BASE TABLE
Expand All @@ -79,6 +91,7 @@ SELECT * from information_schema.tables;
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
datafusion information_schema views VIEW
datafusion public t BASE TABLE
Expand All @@ -89,6 +102,7 @@ SELECT * from information_schema.tables WHERE tables.table_schema='information_s
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
datafusion information_schema views VIEW

Expand All @@ -97,6 +111,7 @@ SELECT * from information_schema.tables WHERE information_schema.tables.table_sc
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
datafusion information_schema views VIEW

Expand All @@ -105,6 +120,7 @@ SELECT * from information_schema.tables WHERE datafusion.information_schema.tabl
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
datafusion information_schema views VIEW

Expand Down Expand Up @@ -391,6 +407,7 @@ SHOW TABLES
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
datafusion information_schema views VIEW

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ SELECT * from information_schema.tables;
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
datafusion information_schema views VIEW

Expand Down Expand Up @@ -79,16 +80,19 @@ SELECT * from information_schema.tables;
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
datafusion information_schema views VIEW
my_catalog information_schema columns VIEW
my_catalog information_schema df_settings VIEW
my_catalog information_schema schemata VIEW
my_catalog information_schema tables VIEW
my_catalog information_schema views VIEW
my_catalog my_schema t1 BASE TABLE
my_catalog my_schema t2 BASE TABLE
my_other_catalog information_schema columns VIEW
my_other_catalog information_schema df_settings VIEW
my_other_catalog information_schema schemata VIEW
my_other_catalog information_schema tables VIEW
my_other_catalog information_schema views VIEW
my_other_catalog my_other_schema t3 BASE TABLE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ SELECT * from information_schema.tables;
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
datafusion information_schema views VIEW
datafusion public physical BASE TABLE
Expand Down
Loading