Skip to content

Commit

Permalink
introduce information_schema.parameters table (apache#13341)
Browse files Browse the repository at this point in the history
  • Loading branch information
goldmedal authored and jayzhan211 committed Nov 12, 2024
1 parent 43713ba commit 1d7f487
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 3 deletions.
243 changes: 240 additions & 3 deletions datafusion/core/src/catalog_common/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use arrow_array::builder::BooleanBuilder;
use async_trait::async_trait;
use datafusion_common::error::Result;
use datafusion_common::DataFusionError;
use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, WindowUDF};
use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF};
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::{any::Any, sync::Arc};
Expand All @@ -50,10 +50,18 @@ pub(crate) const COLUMNS: &str = "columns";
pub(crate) const DF_SETTINGS: &str = "df_settings";
pub(crate) const SCHEMATA: &str = "schemata";
pub(crate) const ROUTINES: &str = "routines";
pub(crate) const PARAMETERS: &str = "parameters";

/// All information schema tables
pub const INFORMATION_SCHEMA_TABLES: &[&str] =
&[TABLES, VIEWS, COLUMNS, DF_SETTINGS, SCHEMATA, ROUTINES];
pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[
TABLES,
VIEWS,
COLUMNS,
DF_SETTINGS,
SCHEMATA,
ROUTINES,
PARAMETERS,
];

/// Implements the `information_schema` virtual schema and tables
///
Expand Down Expand Up @@ -286,6 +294,102 @@ impl InformationSchemaConfig {
fn is_deterministic(signature: &Signature) -> bool {
signature.volatility == Volatility::Immutable
}
fn make_parameters(
&self,
udfs: &HashMap<String, Arc<ScalarUDF>>,
udafs: &HashMap<String, Arc<AggregateUDF>>,
udwfs: &HashMap<String, Arc<WindowUDF>>,
config_options: &ConfigOptions,
builder: &mut InformationSchemaParametersBuilder,
) -> Result<()> {
let catalog_name = &config_options.catalog.default_catalog;
let schema_name = &config_options.catalog.default_schema;
let mut add_parameters = |func_name: &str,
args: Option<&Vec<(String, String)>>,
arg_types: Vec<String>,
return_type: Option<String>,
is_variadic: bool| {
for (position, type_name) in arg_types.iter().enumerate() {
let param_name =
args.and_then(|a| a.get(position).map(|arg| arg.0.as_str()));
builder.add_parameter(
catalog_name,
schema_name,
func_name,
position as u64 + 1,
"IN",
param_name,
type_name,
None::<&str>,
is_variadic,
);
}
if let Some(return_type) = return_type {
builder.add_parameter(
catalog_name,
schema_name,
func_name,
1,
"OUT",
None::<&str>,
return_type.as_str(),
None::<&str>,
false,
);
}
};

for (func_name, udf) in udfs {
let args = udf.documentation().and_then(|d| d.arguments.clone());
let combinations = get_udf_args_and_return_types(udf)?;
for (arg_types, return_type) in combinations {
add_parameters(
func_name,
args.as_ref(),
arg_types,
return_type,
Self::is_variadic(udf.signature()),
);
}
}

for (func_name, udaf) in udafs {
let args = udaf.documentation().and_then(|d| d.arguments.clone());
let combinations = get_udaf_args_and_return_types(udaf)?;
for (arg_types, return_type) in combinations {
add_parameters(
func_name,
args.as_ref(),
arg_types,
return_type,
Self::is_variadic(udaf.signature()),
);
}
}

for (func_name, udwf) in udwfs {
let args = udwf.documentation().and_then(|d| d.arguments.clone());
let combinations = get_udwf_args_and_return_types(udwf)?;
for (arg_types, return_type) in combinations {
add_parameters(
func_name,
args.as_ref(),
arg_types,
return_type,
Self::is_variadic(udwf.signature()),
);
}
}

Ok(())
}

fn is_variadic(signature: &Signature) -> bool {
matches!(
signature.type_signature,
TypeSignature::Variadic(_) | TypeSignature::VariadicAny
)
}
}

/// get the arguments and return types of a UDF
Expand Down Expand Up @@ -384,6 +488,7 @@ impl SchemaProvider for InformationSchemaProvider {
DF_SETTINGS => Arc::new(InformationSchemaDfSettings::new(config)),
SCHEMATA => Arc::new(InformationSchemata::new(config)),
ROUTINES => Arc::new(InformationSchemaRoutines::new(config)),
PARAMETERS => Arc::new(InformationSchemaParameters::new(config)),
_ => return Ok(None),
};

Expand Down Expand Up @@ -1098,3 +1203,135 @@ impl PartitionStream for InformationSchemaRoutines {
))
}
}

#[derive(Debug)]
struct InformationSchemaParameters {
schema: SchemaRef,
config: InformationSchemaConfig,
}

impl InformationSchemaParameters {
fn new(config: InformationSchemaConfig) -> Self {
let schema = Arc::new(Schema::new(vec![
Field::new("specific_catalog", DataType::Utf8, false),
Field::new("specific_schema", DataType::Utf8, false),
Field::new("specific_name", DataType::Utf8, false),
Field::new("ordinal_position", DataType::UInt64, false),
Field::new("parameter_mode", DataType::Utf8, false),
Field::new("parameter_name", DataType::Utf8, true),
Field::new("data_type", DataType::Utf8, false),
Field::new("parameter_default", DataType::Utf8, true),
Field::new("is_variadic", DataType::Boolean, false),
]));

Self { schema, config }
}

fn builder(&self) -> InformationSchemaParametersBuilder {
InformationSchemaParametersBuilder {
schema: self.schema.clone(),
specific_catalog: StringBuilder::new(),
specific_schema: StringBuilder::new(),
specific_name: StringBuilder::new(),
ordinal_position: UInt64Builder::new(),
parameter_mode: StringBuilder::new(),
parameter_name: StringBuilder::new(),
data_type: StringBuilder::new(),
parameter_default: StringBuilder::new(),
is_variadic: BooleanBuilder::new(),
inserted: HashSet::new(),
}
}
}

struct InformationSchemaParametersBuilder {
schema: SchemaRef,
specific_catalog: StringBuilder,
specific_schema: StringBuilder,
specific_name: StringBuilder,
ordinal_position: UInt64Builder,
parameter_mode: StringBuilder,
parameter_name: StringBuilder,
data_type: StringBuilder,
parameter_default: StringBuilder,
is_variadic: BooleanBuilder,
// use HashSet to avoid duplicate rows. The key is (specific_name, ordinal_position, parameter_mode, data_type)
inserted: HashSet<(String, u64, String, String)>,
}

impl InformationSchemaParametersBuilder {
#[allow(clippy::too_many_arguments)]
fn add_parameter(
&mut self,
specific_catalog: impl AsRef<str>,
specific_schema: impl AsRef<str>,
specific_name: impl AsRef<str>,
ordinal_position: u64,
parameter_mode: impl AsRef<str>,
parameter_name: Option<impl AsRef<str>>,
data_type: impl AsRef<str>,
parameter_default: Option<impl AsRef<str>>,
is_variadic: bool,
) {
let key = (
specific_name.as_ref().to_string(),
ordinal_position,
parameter_mode.as_ref().to_string(),
data_type.as_ref().to_string(),
);
if self.inserted.insert(key) {
self.specific_catalog
.append_value(specific_catalog.as_ref());
self.specific_schema.append_value(specific_schema.as_ref());
self.specific_name.append_value(specific_name.as_ref());
self.ordinal_position.append_value(ordinal_position);
self.parameter_mode.append_value(parameter_mode.as_ref());
self.parameter_name.append_option(parameter_name.as_ref());
self.data_type.append_value(data_type.as_ref());
self.parameter_default.append_option(parameter_default);
self.is_variadic.append_value(is_variadic);
}
}

fn finish(&mut self) -> RecordBatch {
RecordBatch::try_new(
self.schema.clone(),
vec![
Arc::new(self.specific_catalog.finish()),
Arc::new(self.specific_schema.finish()),
Arc::new(self.specific_name.finish()),
Arc::new(self.ordinal_position.finish()),
Arc::new(self.parameter_mode.finish()),
Arc::new(self.parameter_name.finish()),
Arc::new(self.data_type.finish()),
Arc::new(self.parameter_default.finish()),
Arc::new(self.is_variadic.finish()),
],
)
.unwrap()
}
}

impl PartitionStream for InformationSchemaParameters {
fn schema(&self) -> &SchemaRef {
&self.schema
}

fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let config = self.config.clone();
let mut builder = self.builder();
Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
futures::stream::once(async move {
config.make_parameters(
ctx.scalar_functions(),
ctx.aggregate_functions(),
ctx.window_functions(),
ctx.session_config().options(),
&mut builder,
)?;
Ok(builder.finish())
}),
))
}
}
57 changes: 57 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ SELECT * from information_schema.tables;
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema parameters VIEW
datafusion information_schema routines VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
Expand Down Expand Up @@ -84,6 +85,7 @@ SELECT * from information_schema.tables;
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema parameters VIEW
datafusion information_schema routines VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
Expand All @@ -99,6 +101,7 @@ SELECT * from information_schema.tables;
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema parameters VIEW
datafusion information_schema routines VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
Expand All @@ -111,6 +114,7 @@ SELECT * from information_schema.tables WHERE tables.table_schema='information_s
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema parameters VIEW
datafusion information_schema routines VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
Expand All @@ -121,6 +125,7 @@ SELECT * from information_schema.tables WHERE information_schema.tables.table_sc
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema parameters VIEW
datafusion information_schema routines VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
Expand All @@ -131,6 +136,7 @@ SELECT * from information_schema.tables WHERE datafusion.information_schema.tabl
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema parameters VIEW
datafusion information_schema routines VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
Expand Down Expand Up @@ -454,6 +460,7 @@ SHOW TABLES
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema parameters VIEW
datafusion information_schema routines VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
Expand Down Expand Up @@ -636,3 +643,53 @@ query B
select is_deterministic from information_schema.routines where routine_name = 'now';
----
false

# test every function type are included in the result
query TTTITTTTB rowsort
select * from information_schema.parameters where specific_name = 'date_trunc' OR specific_name = 'string_agg' OR specific_name = 'rank';
----
datafusion public date_trunc 1 IN precision Utf8 NULL false
datafusion public date_trunc 1 IN precision Utf8View NULL false
datafusion public date_trunc 1 OUT NULL Timestamp(Microsecond, None) NULL false
datafusion public date_trunc 1 OUT NULL Timestamp(Microsecond, Some("+TZ")) NULL false
datafusion public date_trunc 1 OUT NULL Timestamp(Millisecond, None) NULL false
datafusion public date_trunc 1 OUT NULL Timestamp(Millisecond, Some("+TZ")) NULL false
datafusion public date_trunc 1 OUT NULL Timestamp(Nanosecond, None) NULL false
datafusion public date_trunc 1 OUT NULL Timestamp(Nanosecond, Some("+TZ")) NULL false
datafusion public date_trunc 1 OUT NULL Timestamp(Second, None) NULL false
datafusion public date_trunc 1 OUT NULL Timestamp(Second, Some("+TZ")) NULL false
datafusion public date_trunc 2 IN expression Timestamp(Microsecond, None) NULL false
datafusion public date_trunc 2 IN expression Timestamp(Microsecond, Some("+TZ")) NULL false
datafusion public date_trunc 2 IN expression Timestamp(Millisecond, None) NULL false
datafusion public date_trunc 2 IN expression Timestamp(Millisecond, Some("+TZ")) NULL false
datafusion public date_trunc 2 IN expression Timestamp(Nanosecond, None) NULL false
datafusion public date_trunc 2 IN expression Timestamp(Nanosecond, Some("+TZ")) NULL false
datafusion public date_trunc 2 IN expression Timestamp(Second, None) NULL false
datafusion public date_trunc 2 IN expression Timestamp(Second, Some("+TZ")) NULL false
datafusion public string_agg 1 IN expression LargeUtf8 NULL false
datafusion public string_agg 1 OUT NULL LargeUtf8 NULL false
datafusion public string_agg 2 IN delimiter LargeUtf8 NULL false
datafusion public string_agg 2 IN delimiter Null NULL false
datafusion public string_agg 2 IN delimiter Utf8 NULL false

# test variable length arguments
query TTTB rowsort
select specific_name, data_type, parameter_mode, is_variadic from information_schema.parameters where specific_name = 'concat';
----
concat LargeUtf8 IN true
concat LargeUtf8 OUT false
concat Utf8 IN true
concat Utf8 OUT false
concat Utf8View IN true
concat Utf8View OUT false

# test ceorcion signature
query TTIT rowsort
select specific_name, data_type, ordinal_position, parameter_mode from information_schema.parameters where specific_name = 'repeat';
----
repeat Int64 2 IN
repeat LargeUtf8 1 IN
repeat LargeUtf8 1 OUT
repeat Utf8 1 IN
repeat Utf8 1 OUT
repeat Utf8View 1 IN
Loading

0 comments on commit 1d7f487

Please sign in to comment.