Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup InformationSchema plumbing #4740

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 17 additions & 76 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,14 @@
//!
//! Information Schema]<https://en.wikipedia.org/wiki/Information_schema>

use std::{
any::Any,
sync::{Arc, Weak},
};
use std::{any::Any, sync::Arc};

use arrow::{
array::{StringBuilder, UInt64Builder},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};

use datafusion_common::Result;

use crate::config::ConfigOptions;
use crate::datasource::streaming::{PartitionStream, StreamingTable};
use crate::datasource::TableProvider;
Expand All @@ -40,87 +35,33 @@ use crate::logical_expr::TableType;
use crate::physical_plan::stream::RecordBatchStreamAdapter;
use crate::physical_plan::SendableRecordBatchStream;

use super::{
catalog::{CatalogList, CatalogProvider},
schema::SchemaProvider,
};

const INFORMATION_SCHEMA: &str = "information_schema";
const TABLES: &str = "tables";
const VIEWS: &str = "views";
const COLUMNS: &str = "columns";
const DF_SETTINGS: &str = "df_settings";

/// Wraps another [`CatalogProvider`] and adds a "information_schema"
/// schema that can introspect on tables in the catalog_list
pub(crate) struct CatalogWithInformationSchema {
catalog_list: Weak<dyn CatalogList>,
/// wrapped provider
inner: Arc<dyn CatalogProvider>,
}
use super::{catalog::CatalogList, schema::SchemaProvider};

impl CatalogWithInformationSchema {
pub(crate) fn new(
catalog_list: Weak<dyn CatalogList>,
inner: Arc<dyn CatalogProvider>,
) -> Self {
Self {
catalog_list,
inner,
}
}

/// Return a reference to the wrapped provider
pub(crate) fn inner(&self) -> Arc<dyn CatalogProvider> {
self.inner.clone()
}
}

impl CatalogProvider for CatalogWithInformationSchema {
fn as_any(&self) -> &dyn Any {
self
}

fn schema_names(&self) -> Vec<String> {
self.inner
.schema_names()
.into_iter()
.chain(std::iter::once(INFORMATION_SCHEMA.to_string()))
.collect::<Vec<String>>()
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
if name.eq_ignore_ascii_case(INFORMATION_SCHEMA) {
Weak::upgrade(&self.catalog_list).map(|catalog_list| {
Arc::new(InformationSchemaProvider {
config: InformationSchemaConfig { catalog_list },
}) as Arc<dyn SchemaProvider>
})
} else {
self.inner.schema(name)
}
}

fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
let catalog = &self.inner;
catalog.register_schema(name, schema)
}
}
pub const INFORMATION_SCHEMA: &str = "information_schema";
pub const TABLES: &str = "tables";
pub const VIEWS: &str = "views";
pub const COLUMNS: &str = "columns";
pub const DF_SETTINGS: &str = "df_settings";

/// Implements the `information_schema` virtual schema and tables
///
/// The underlying tables in the `information_schema` are created on
/// demand. This means that if more tables are added to the underlying
/// providers, they will appear the next time the `information_schema`
/// table is queried.
struct InformationSchemaProvider {
pub struct InformationSchemaProvider {
config: InformationSchemaConfig,
}

impl InformationSchemaProvider {
/// Creates a new [`InformationSchemaProvider`] for the provided `catalog_list`
pub fn new(catalog_list: Arc<dyn CatalogList>) -> Self {
Self {
config: InformationSchemaConfig { catalog_list },
}
}
}

#[derive(Clone)]
struct InformationSchemaConfig {
catalog_list: Arc<dyn CatalogList>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

#![allow(clippy::module_inception)]
pub mod catalog;
pub mod information_schema;
pub(crate) mod information_schema;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There wasn't anything public in the module so we might as well just make it crate private

https://docs.rs/datafusion/latest/datafusion/catalog/information_schema/index.html

pub mod listing_schema;
pub mod schema;

Expand Down
88 changes: 13 additions & 75 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

//! SessionContext contains methods for registering data sources and executing queries
use crate::{
catalog::{
catalog::{CatalogList, MemoryCatalogList},
information_schema::CatalogWithInformationSchema,
},
catalog::catalog::{CatalogList, MemoryCatalogList},
config::{
OPT_COLLECT_STATISTICS, OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA,
OPT_INFORMATION_SCHEMA, OPT_PARQUET_ENABLE_PRUNING, OPT_REPARTITION_AGGREGATIONS,
Expand Down Expand Up @@ -97,6 +94,7 @@ use datafusion_sql::{
use parquet::file::properties::WriterProperties;
use url::Url;

use crate::catalog::information_schema::{InformationSchemaProvider, INFORMATION_SCHEMA};
use crate::catalog::listing_schema::ListingSchemaProvider;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::memory_pool::MemoryPool;
Expand Down Expand Up @@ -422,10 +420,6 @@ impl SessionContext {
))
}
}
// Since information_schema config may have changed, revalidate
if variable == OPT_INFORMATION_SCHEMA {
state.update_information_schema();
}
drop(state);

self.return_empty_dataframe()
Expand Down Expand Up @@ -877,18 +871,10 @@ impl SessionContext {
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
let name = name.into();
let information_schema = self.copied_config().information_schema();
let state = self.state.read();
let catalog = if information_schema {
Arc::new(CatalogWithInformationSchema::new(
Arc::downgrade(&state.catalog_list),
catalog,
))
} else {
catalog
};

state.catalog_list.register_catalog(name, catalog)
self.state
.read()
.catalog_list
.register_catalog(name, catalog)
}

/// Retrieves the list of available catalog names.
Expand Down Expand Up @@ -1587,7 +1573,7 @@ impl SessionState {
// rule below performs this analysis and removes unnecessary `SortExec`s.
physical_optimizers.push(Arc::new(OptimizeSorts::new()));

let mut this = SessionState {
SessionState {
session_id,
optimizer: Optimizer::new(),
physical_optimizers,
Expand All @@ -1598,60 +1584,6 @@ impl SessionState {
config,
execution_props: ExecutionProps::new(),
runtime_env: runtime,
};
this.update_information_schema();
this
}

/// Enables/Disables information_schema support based on the value of
/// config.information_schema()
///
/// When enabled, all catalog providers are wrapped with
/// [`CatalogWithInformationSchema`] if needed
///
/// When disabled, any [`CatalogWithInformationSchema`] is unwrapped
fn update_information_schema(&mut self) {
let enabled = self.config.information_schema();
let catalog_list = &self.catalog_list;

let new_catalogs: Vec<_> = self
.catalog_list
.catalog_names()
.into_iter()
.map(|catalog_name| {
// unwrap because the list of names came from catalog
// list so it should still be there
let catalog = catalog_list.catalog(&catalog_name).unwrap();

let unwrapped = catalog
.as_any()
.downcast_ref::<CatalogWithInformationSchema>()
.map(|wrapped| wrapped.inner());

let new_catalog = match (enabled, unwrapped) {
// already wrapped, no thing needed
(true, Some(_)) => catalog,
(true, None) => {
// wrap the catalog in information schema
Arc::new(CatalogWithInformationSchema::new(
Arc::downgrade(catalog_list),
catalog,
))
}
// disabling, currently wrapped
(false, Some(unwrapped)) => unwrapped,
// disabling, currently unwrapped
(false, None) => catalog,
};

(catalog_name, new_catalog)
})
// collect to avoid concurrent modification
.collect();

// replace all catalogs
for (catalog_name, new_catalog) in new_catalogs {
catalog_list.register_catalog(catalog_name, new_catalog);
}
}

Expand Down Expand Up @@ -1721,6 +1653,12 @@ impl SessionState {
table_ref: impl Into<TableReference<'a>>,
) -> Result<Arc<dyn SchemaProvider>> {
let resolved_ref = self.resolve_table_ref(table_ref);
if self.config.information_schema() && resolved_ref.schema == INFORMATION_SCHEMA {
return Ok(Arc::new(InformationSchemaProvider::new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is cool -- it makes the information schema provider on demand 👍

self.catalog_list.clone(),
)));
}

self.catalog_list
.catalog(resolved_ref.catalog)
.ok_or_else(|| {
Expand Down