Skip to content

Commit

Permalink
Cleanup InformationSchema plumbing (#4740)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored Dec 27, 2022
1 parent 9046a8a commit 8c0c0ad
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 152 deletions.
93 changes: 17 additions & 76 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,14 @@
//!
//! Information Schema]<https://en.wikipedia.org/wiki/Information_schema>
use std::{
any::Any,
sync::{Arc, Weak},
};
use std::{any::Any, sync::Arc};

use arrow::{
array::{StringBuilder, UInt64Builder},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};

use datafusion_common::Result;

use crate::config::ConfigOptions;
use crate::datasource::streaming::{PartitionStream, StreamingTable};
use crate::datasource::TableProvider;
Expand All @@ -40,87 +35,33 @@ use crate::logical_expr::TableType;
use crate::physical_plan::stream::RecordBatchStreamAdapter;
use crate::physical_plan::SendableRecordBatchStream;

use super::{
catalog::{CatalogList, CatalogProvider},
schema::SchemaProvider,
};

const INFORMATION_SCHEMA: &str = "information_schema";
const TABLES: &str = "tables";
const VIEWS: &str = "views";
const COLUMNS: &str = "columns";
const DF_SETTINGS: &str = "df_settings";

/// Wraps another [`CatalogProvider`] and adds a "information_schema"
/// schema that can introspect on tables in the catalog_list
pub(crate) struct CatalogWithInformationSchema {
catalog_list: Weak<dyn CatalogList>,
/// wrapped provider
inner: Arc<dyn CatalogProvider>,
}
use super::{catalog::CatalogList, schema::SchemaProvider};

impl CatalogWithInformationSchema {
pub(crate) fn new(
catalog_list: Weak<dyn CatalogList>,
inner: Arc<dyn CatalogProvider>,
) -> Self {
Self {
catalog_list,
inner,
}
}

/// Return a reference to the wrapped provider
pub(crate) fn inner(&self) -> Arc<dyn CatalogProvider> {
self.inner.clone()
}
}

impl CatalogProvider for CatalogWithInformationSchema {
fn as_any(&self) -> &dyn Any {
self
}

fn schema_names(&self) -> Vec<String> {
self.inner
.schema_names()
.into_iter()
.chain(std::iter::once(INFORMATION_SCHEMA.to_string()))
.collect::<Vec<String>>()
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
if name.eq_ignore_ascii_case(INFORMATION_SCHEMA) {
Weak::upgrade(&self.catalog_list).map(|catalog_list| {
Arc::new(InformationSchemaProvider {
config: InformationSchemaConfig { catalog_list },
}) as Arc<dyn SchemaProvider>
})
} else {
self.inner.schema(name)
}
}

fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
let catalog = &self.inner;
catalog.register_schema(name, schema)
}
}
pub const INFORMATION_SCHEMA: &str = "information_schema";
pub const TABLES: &str = "tables";
pub const VIEWS: &str = "views";
pub const COLUMNS: &str = "columns";
pub const DF_SETTINGS: &str = "df_settings";

/// Implements the `information_schema` virtual schema and tables
///
/// The underlying tables in the `information_schema` are created on
/// demand. This means that if more tables are added to the underlying
/// providers, they will appear the next time the `information_schema`
/// table is queried.
struct InformationSchemaProvider {
pub struct InformationSchemaProvider {
config: InformationSchemaConfig,
}

impl InformationSchemaProvider {
/// Creates a new [`InformationSchemaProvider`] for the provided `catalog_list`
pub fn new(catalog_list: Arc<dyn CatalogList>) -> Self {
Self {
config: InformationSchemaConfig { catalog_list },
}
}
}

#[derive(Clone)]
struct InformationSchemaConfig {
catalog_list: Arc<dyn CatalogList>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#![allow(clippy::module_inception)]
pub mod catalog;
pub mod information_schema;
pub(crate) mod information_schema;
pub mod listing_schema;
pub mod schema;

Expand Down
88 changes: 13 additions & 75 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

//! SessionContext contains methods for registering data sources and executing queries
use crate::{
catalog::{
catalog::{CatalogList, MemoryCatalogList},
information_schema::CatalogWithInformationSchema,
},
catalog::catalog::{CatalogList, MemoryCatalogList},
config::{
OPT_COLLECT_STATISTICS, OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA,
OPT_INFORMATION_SCHEMA, OPT_PARQUET_ENABLE_PRUNING, OPT_REPARTITION_AGGREGATIONS,
Expand Down Expand Up @@ -97,6 +94,7 @@ use datafusion_sql::{
use parquet::file::properties::WriterProperties;
use url::Url;

use crate::catalog::information_schema::{InformationSchemaProvider, INFORMATION_SCHEMA};
use crate::catalog::listing_schema::ListingSchemaProvider;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::memory_pool::MemoryPool;
Expand Down Expand Up @@ -422,10 +420,6 @@ impl SessionContext {
))
}
}
// Since information_schema config may have changed, revalidate
if variable == OPT_INFORMATION_SCHEMA {
state.update_information_schema();
}
drop(state);

self.return_empty_dataframe()
Expand Down Expand Up @@ -877,18 +871,10 @@ impl SessionContext {
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
let name = name.into();
let information_schema = self.copied_config().information_schema();
let state = self.state.read();
let catalog = if information_schema {
Arc::new(CatalogWithInformationSchema::new(
Arc::downgrade(&state.catalog_list),
catalog,
))
} else {
catalog
};

state.catalog_list.register_catalog(name, catalog)
self.state
.read()
.catalog_list
.register_catalog(name, catalog)
}

/// Retrieves the list of available catalog names.
Expand Down Expand Up @@ -1587,7 +1573,7 @@ impl SessionState {
// rule below performs this analysis and removes unnecessary `SortExec`s.
physical_optimizers.push(Arc::new(OptimizeSorts::new()));

let mut this = SessionState {
SessionState {
session_id,
optimizer: Optimizer::new(),
physical_optimizers,
Expand All @@ -1598,60 +1584,6 @@ impl SessionState {
config,
execution_props: ExecutionProps::new(),
runtime_env: runtime,
};
this.update_information_schema();
this
}

/// Enables/Disables information_schema support based on the value of
/// config.information_schema()
///
/// When enabled, all catalog providers are wrapped with
/// [`CatalogWithInformationSchema`] if needed
///
/// When disabled, any [`CatalogWithInformationSchema`] is unwrapped
fn update_information_schema(&mut self) {
let enabled = self.config.information_schema();
let catalog_list = &self.catalog_list;

let new_catalogs: Vec<_> = self
.catalog_list
.catalog_names()
.into_iter()
.map(|catalog_name| {
// unwrap because the list of names came from catalog
// list so it should still be there
let catalog = catalog_list.catalog(&catalog_name).unwrap();

let unwrapped = catalog
.as_any()
.downcast_ref::<CatalogWithInformationSchema>()
.map(|wrapped| wrapped.inner());

let new_catalog = match (enabled, unwrapped) {
// already wrapped, no thing needed
(true, Some(_)) => catalog,
(true, None) => {
// wrap the catalog in information schema
Arc::new(CatalogWithInformationSchema::new(
Arc::downgrade(catalog_list),
catalog,
))
}
// disabling, currently wrapped
(false, Some(unwrapped)) => unwrapped,
// disabling, currently unwrapped
(false, None) => catalog,
};

(catalog_name, new_catalog)
})
// collect to avoid concurrent modification
.collect();

// replace all catalogs
for (catalog_name, new_catalog) in new_catalogs {
catalog_list.register_catalog(catalog_name, new_catalog);
}
}

Expand Down Expand Up @@ -1721,6 +1653,12 @@ impl SessionState {
table_ref: impl Into<TableReference<'a>>,
) -> Result<Arc<dyn SchemaProvider>> {
let resolved_ref = self.resolve_table_ref(table_ref);
if self.config.information_schema() && resolved_ref.schema == INFORMATION_SCHEMA {
return Ok(Arc::new(InformationSchemaProvider::new(
self.catalog_list.clone(),
)));
}

self.catalog_list
.catalog(resolved_ref.catalog)
.ok_or_else(|| {
Expand Down

0 comments on commit 8c0c0ad

Please sign in to comment.