Skip to content

Commit

Permalink
feat: Support virtual listing for SQL server. (#2311)
Browse files Browse the repository at this point in the history
```sql
-- List schemas
select * from list_schemas(external_db);

-- List tables in schema
select * from list_tables(external_db, dbo);

-- List columns for a given table
select * from list_columns(external_db, dbo, abc);
```

---------

Signed-off-by: Vaibhav <vrongmeal@gmail.com>
  • Loading branch information
vrongmeal authored Dec 28, 2023
1 parent e7d2126 commit ccb6fcf
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 7 deletions.
69 changes: 67 additions & 2 deletions crates/datasources/src/sqlserver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use client::{Client, QueryStream};
use async_trait::async_trait;
use chrono::naive::NaiveDateTime;
use datafusion::arrow::datatypes::{
DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit,
DataType, Field, Fields, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit,
};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::TableProvider;
Expand All @@ -22,6 +22,8 @@ use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use datafusion_ext::errors::ExtensionError;
use datafusion_ext::functions::VirtualLister;
use datafusion_ext::metrics::DataSourceMetricsStreamAdapter;
use errors::{Result, SqlServerError};
use futures::{future::BoxFuture, ready, stream::BoxStream, FutureExt, Stream, StreamExt};
Expand Down Expand Up @@ -67,10 +69,15 @@ impl SqlServerAccess {
let _schema = state.get_table_schema(schema, table).await?;
Ok(())
}

/// Connect to the server and return the access state.
pub async fn connect(&self) -> Result<SqlServerAccessState> {
SqlServerAccessState::connect(self.config.clone()).await
}
}

#[derive(Debug)]
struct SqlServerAccessState {
pub struct SqlServerAccessState {
client: Client,
/// Handle for underlying sql server connection.
///
Expand Down Expand Up @@ -174,6 +181,64 @@ impl SqlServerAccessState {
}
}

#[async_trait]
impl VirtualLister for SqlServerAccessState {
async fn list_schemas(&self) -> Result<Vec<String>, ExtensionError> {
let mut query = self
.client
.query("SELECT schema_name FROM information_schema.schemata")
.await
.map_err(ExtensionError::access)?;

let mut schema_names = Vec::new();
while let Some(row) = query.next().await {
let row = row.map_err(ExtensionError::access)?;
if let Some(s) = row
.try_get::<&str, usize>(0)
.map_err(ExtensionError::access)?
{
schema_names.push(s.to_owned());
}
}

Ok(schema_names)
}

async fn list_tables(&self, schema: &str) -> Result<Vec<String>, ExtensionError> {
let mut query = self
.client
.query(format!(
"SELECT table_name FROM information_schema.tables WHERE table_schema = '{schema}'"
))
.await
.map_err(ExtensionError::access)?;

let mut table_names = Vec::new();
while let Some(row) = query.next().await {
let row = row.map_err(ExtensionError::access)?;
if let Some(s) = row
.try_get::<&str, usize>(0)
.map_err(ExtensionError::access)?
{
table_names.push(s.to_owned());
}
}

Ok(table_names)
}

async fn list_columns(&self, schema: &str, table: &str) -> Result<Fields, ExtensionError> {
use ExtensionError::ListingErrBoxed;

let schema = self
.get_table_schema(schema, table)
.await
.map_err(|e| ListingErrBoxed(Box::new(e)))?;

Ok(schema.fields)
}
}

pub struct SqlServerTableProviderConfig {
pub access: SqlServerAccess,
pub schema: String,
Expand Down
12 changes: 7 additions & 5 deletions crates/sqlbuiltins/src/functions/table/virtual_listing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ use datasources::mongodb::MongoAccessor;
use datasources::mysql::MysqlAccessor;
use datasources::postgres::PostgresAccess;
use datasources::snowflake::{SnowflakeAccessor, SnowflakeDbConnection};
use datasources::sqlserver::SqlServerAccess;
use protogen::metastore::types::catalog::{FunctionType, RuntimePreference};
use protogen::metastore::types::options::{
DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsMongo, DatabaseOptionsMysql,
DatabaseOptionsPostgres, DatabaseOptionsSnowflake,
DatabaseOptionsPostgres, DatabaseOptionsSnowflake, DatabaseOptionsSqlServer,
};

use super::TableFunc;
Expand Down Expand Up @@ -334,10 +335,11 @@ pub(crate) async fn get_virtual_lister_for_external_db(
.map_err(|e| ExtensionError::Access(Box::new(e)))?;
Box::new(accessor)
}
DatabaseOptions::SqlServer(_) => {
return Err(ExtensionError::Unimplemented(
"SQL Server information listing",
))
DatabaseOptions::SqlServer(DatabaseOptionsSqlServer { connection_string }) => {
let access = SqlServerAccess::try_new_from_ado_string(connection_string)
.map_err(ExtensionError::access)?;
let state = access.connect().await.map_err(ExtensionError::access)?;
Box::new(state)
}
DatabaseOptions::Delta(_) => {
return Err(ExtensionError::Unimplemented(
Expand Down

0 comments on commit ccb6fcf

Please sign in to comment.