From ccb6fcf24869de13be69195abc4c17d755ba3672 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Thu, 28 Dec 2023 12:55:43 +0530 Subject: [PATCH] feat: Support virtual listing for SQL server. (#2311) ```sql -- List schemas select * from list_schemas(external_db); -- List tables in schema select * from list_tables(external_db, dbo); -- List columns for a given table select * from list_columns(external_db, dbo, abc); ``` --------- Signed-off-by: Vaibhav --- crates/datasources/src/sqlserver/mod.rs | 69 ++++++++++++++++++- .../src/functions/table/virtual_listing.rs | 12 ++-- 2 files changed, 74 insertions(+), 7 deletions(-) diff --git a/crates/datasources/src/sqlserver/mod.rs b/crates/datasources/src/sqlserver/mod.rs index 5c8dad08c..3b3e2017e 100644 --- a/crates/datasources/src/sqlserver/mod.rs +++ b/crates/datasources/src/sqlserver/mod.rs @@ -7,7 +7,7 @@ use client::{Client, QueryStream}; use async_trait::async_trait; use chrono::naive::NaiveDateTime; use datafusion::arrow::datatypes::{ - DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit, + DataType, Field, Fields, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit, }; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::TableProvider; @@ -22,6 +22,8 @@ use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use datafusion_ext::errors::ExtensionError; +use datafusion_ext::functions::VirtualLister; use datafusion_ext::metrics::DataSourceMetricsStreamAdapter; use errors::{Result, SqlServerError}; use futures::{future::BoxFuture, ready, stream::BoxStream, FutureExt, Stream, StreamExt}; @@ -67,10 +69,15 @@ impl SqlServerAccess { let _schema = state.get_table_schema(schema, table).await?; Ok(()) } + + /// Connect to the server and return the access state. + pub async fn connect(&self) -> Result { + SqlServerAccessState::connect(self.config.clone()).await + } } #[derive(Debug)] -struct SqlServerAccessState { +pub struct SqlServerAccessState { client: Client, /// Handle for underlying sql server connection. /// @@ -174,6 +181,64 @@ impl SqlServerAccessState { } } +#[async_trait] +impl VirtualLister for SqlServerAccessState { + async fn list_schemas(&self) -> Result, ExtensionError> { + let mut query = self + .client + .query("SELECT schema_name FROM information_schema.schemata") + .await + .map_err(ExtensionError::access)?; + + let mut schema_names = Vec::new(); + while let Some(row) = query.next().await { + let row = row.map_err(ExtensionError::access)?; + if let Some(s) = row + .try_get::<&str, usize>(0) + .map_err(ExtensionError::access)? + { + schema_names.push(s.to_owned()); + } + } + + Ok(schema_names) + } + + async fn list_tables(&self, schema: &str) -> Result, ExtensionError> { + let mut query = self + .client + .query(format!( + "SELECT table_name FROM information_schema.tables WHERE table_schema = '{schema}'" + )) + .await + .map_err(ExtensionError::access)?; + + let mut table_names = Vec::new(); + while let Some(row) = query.next().await { + let row = row.map_err(ExtensionError::access)?; + if let Some(s) = row + .try_get::<&str, usize>(0) + .map_err(ExtensionError::access)? + { + table_names.push(s.to_owned()); + } + } + + Ok(table_names) + } + + async fn list_columns(&self, schema: &str, table: &str) -> Result { + use ExtensionError::ListingErrBoxed; + + let schema = self + .get_table_schema(schema, table) + .await + .map_err(|e| ListingErrBoxed(Box::new(e)))?; + + Ok(schema.fields) + } +} + pub struct SqlServerTableProviderConfig { pub access: SqlServerAccess, pub schema: String, diff --git a/crates/sqlbuiltins/src/functions/table/virtual_listing.rs b/crates/sqlbuiltins/src/functions/table/virtual_listing.rs index 2557a4e09..f3a158962 100644 --- a/crates/sqlbuiltins/src/functions/table/virtual_listing.rs +++ b/crates/sqlbuiltins/src/functions/table/virtual_listing.rs @@ -17,10 +17,11 @@ use datasources::mongodb::MongoAccessor; use datasources::mysql::MysqlAccessor; use datasources::postgres::PostgresAccess; use datasources::snowflake::{SnowflakeAccessor, SnowflakeDbConnection}; +use datasources::sqlserver::SqlServerAccess; use protogen::metastore::types::catalog::{FunctionType, RuntimePreference}; use protogen::metastore::types::options::{ DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsMongo, DatabaseOptionsMysql, - DatabaseOptionsPostgres, DatabaseOptionsSnowflake, + DatabaseOptionsPostgres, DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, }; use super::TableFunc; @@ -334,10 +335,11 @@ pub(crate) async fn get_virtual_lister_for_external_db( .map_err(|e| ExtensionError::Access(Box::new(e)))?; Box::new(accessor) } - DatabaseOptions::SqlServer(_) => { - return Err(ExtensionError::Unimplemented( - "SQL Server information listing", - )) + DatabaseOptions::SqlServer(DatabaseOptionsSqlServer { connection_string }) => { + let access = SqlServerAccess::try_new_from_ado_string(connection_string) + .map_err(ExtensionError::access)?; + let state = access.connect().await.map_err(ExtensionError::access)?; + Box::new(state) } DatabaseOptions::Delta(_) => { return Err(ExtensionError::Unimplemented(