From 5d9bfce570301cf7e2f7c638b976ed1ce52dce5c Mon Sep 17 00:00:00 2001 From: Brandon Martin Date: Wed, 11 Dec 2024 11:21:59 -0800 Subject: [PATCH] Use execute_query for introspection (#155) ### What The configuration introspection was using `select_first_row` which was not properly streaming all results and had a limits which caused introspections to fail on larger sets of tables. ### How This removes `select_first_row` and uses `execute_query` to fetch all introspection data. --------- Co-authored-by: Daniel Harvey --- Cargo.lock | 2 + crates/configuration/Cargo.toml | 3 +- crates/configuration/src/error.rs | 9 +++ crates/configuration/src/version1.rs | 92 +++++++++++++--------- crates/ndc-sqlserver/src/connector.rs | 12 +++ crates/query-engine/execution/src/query.rs | 2 +- 6 files changed, 82 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 645f1be3..8c628216 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1305,8 +1305,10 @@ dependencies = [ "bb8-tiberius", "ndc-models", "prometheus", + "query-engine-execution", "query-engine-metadata", "query-engine-metrics", + "query-engine-sql", "schemars", "serde", "serde_json", diff --git a/crates/configuration/Cargo.toml b/crates/configuration/Cargo.toml index 7c722d33..7b405d71 100644 --- a/crates/configuration/Cargo.toml +++ b/crates/configuration/Cargo.toml @@ -10,7 +10,8 @@ workspace = true ndc-models = { workspace = true } query-engine-metadata = { path = "../query-engine/metadata" } query-engine-metrics = { path = "../query-engine/metrics" } - +query-engine-execution = { path = "../query-engine/execution" } +query-engine-sql = { path = "../query-engine/sql" } schemars = { version = "0.8.16", features = ["smol_str", "preserve_order"] } serde = "1.0.198" diff --git a/crates/configuration/src/error.rs b/crates/configuration/src/error.rs index 1458a645..c3e31c77 100644 --- a/crates/configuration/src/error.rs +++ b/crates/configuration/src/error.rs @@ -34,6 +34,15 @@ pub enum Error { #[error("Error creating connection pool while introspecting the database: {0}")] ConnectionPoolError(#[from] bb8_tiberius::Error), + #[error("Failed to get connection from pool: {0}")] + GetConnectionFromPool(#[from] bb8::RunError), + + #[error("JSON deserialization error: {0}")] + JsonDeserializationError(String), + + #[error("Failed to execute introspection query: {0}")] + IntrospectionQueryExecutionError(String), + // error while parsing stored procedure introspection #[error("Error parsing stored procedure introspection: {0}")] StoredProcedureIntrospectionError(serde_json::Error), diff --git a/crates/configuration/src/version1.rs b/crates/configuration/src/version1.rs index 43f90e0b..9d0f720c 100644 --- a/crates/configuration/src/version1.rs +++ b/crates/configuration/src/version1.rs @@ -7,20 +7,20 @@ use crate::secret::Secret; use crate::{uri, ConnectionUri}; use ndc_models::{AggregateFunctionName, CollectionName, ComparisonOperatorName, FieldName}; +use query_engine_execution::query::execute_query; use query_engine_metadata::metadata; use query_engine_metadata::metadata::stored_procedures::{ StoredProcedureArgumentInfo, StoredProcedureInfo, StoredProcedures, }; use query_engine_metadata::metadata::{database, Nullable}; - use query_engine_metrics::metrics; +use query_engine_sql::sql::{ast::RawSql, string::SQL}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::collections::BTreeSet; use thiserror::Error; -use tiberius::Query; // TODO(KC): Move the `table_configuration.sql` to the `static` folder present // in the root of this repo. @@ -156,22 +156,6 @@ async fn create_mssql_pool( bb8::Pool::builder().max_size(2).build(mgr).await } -async fn select_first_row( - mssql_pool: &bb8::Pool, - query: &str, -) -> tiberius::Row { - let mut connection = mssql_pool.get().await.unwrap(); - - // let's do a query to check everything is ok - let select = Query::new(query); - - // go! - let stream = select.query(&mut connection).await.unwrap(); - - // Nothing is fetched, the first result set starts. - stream.into_row().await.unwrap().unwrap() -} - // get_stored_procedures fetches the stored procedures from the database and returns them as a // vector of introspection::IntrospectStoredProcedure. async fn configure_stored_procedures( @@ -181,11 +165,26 @@ async fn configure_stored_procedures( ) -> Result { match config_options { Some(config_options) => { - let stored_procedures_row = - select_first_row(mssql_pool, STORED_PROCS_CONFIGURATION_QUERY).await; + let mut connection = mssql_pool + .get() + .await + .map_err(Error::GetConnectionFromPool)?; + // Let's do some stored procedures introspection + let mut stored_procs_query = SQL::new(); + RawSql::RawText(STORED_PROCS_CONFIGURATION_QUERY.to_string()) + .to_sql(&mut stored_procs_query); + let mut stored_procs_rows = Vec::new(); + execute_query( + &mut connection, + &stored_procs_query, + &BTreeMap::new(), + &mut stored_procs_rows, + ) + .await + .map_err(|e| Error::IntrospectionQueryExecutionError(format!("{:?}", e)))?; let introspected_stored_procedures: Vec = - serde_json::from_str(stored_procedures_row.get(0).unwrap()) - .map_err(Error::StoredProcedureIntrospectionError)?; + serde_json::from_slice(&stored_procs_rows) + .map_err(|e| Error::JsonDeserializationError(e.to_string()))?; let new_stored_procedures = get_stored_procedures(introspected_stored_procedures); // traverse the new stored procedures and add them to the existing stored procedures @@ -233,26 +232,47 @@ pub async fn configure( .await .map_err(Error::ConnectionPoolError)?; - let mut metadata = query_engine_metadata::metadata::Metadata::default(); + let mut connection = mssql_pool + .get() + .await + .map_err(Error::GetConnectionFromPool)?; + + // Let's do some table introspection + let mut table_query = SQL::new(); + RawSql::RawText(TABLE_CONFIGURATION_QUERY.to_string()).to_sql(&mut table_query); + let mut table_rows = Vec::new(); + execute_query( + &mut connection, + &table_query, + &BTreeMap::new(), + &mut table_rows, + ) + .await + .map_err(|e| Error::IntrospectionQueryExecutionError(format!("{:?}", e)))?; + let tables: Vec = serde_json::from_slice(&table_rows) + .map_err(|e| Error::JsonDeserializationError(e.to_string()))?; + + // Let's do some types introspection + let mut types_query = SQL::new(); + RawSql::RawText(TYPES_QUERY.to_string()).to_sql(&mut types_query); + let mut types_rows = Vec::new(); + execute_query( + &mut connection, + &types_query, + &BTreeMap::new(), + &mut types_rows, + ) + .await + .map_err(|e| Error::IntrospectionQueryExecutionError(format!("{:?}", e)))?; + let type_names: Vec = serde_json::from_slice(&types_rows) + .map_err(|e| Error::JsonDeserializationError(e.to_string()))?; + let mut metadata = query_engine_metadata::metadata::Metadata::default(); metadata.native_queries = configuration.metadata.native_queries.clone(); metadata.native_mutations = configuration.metadata.native_mutations.clone(); - - let tables_row = select_first_row(&mssql_pool, TABLE_CONFIGURATION_QUERY).await; - - let tables: Vec = - serde_json::from_str(tables_row.get(0).unwrap()).unwrap(); - metadata.tables = get_tables_info(tables); - - let types_row = select_first_row(&mssql_pool, TYPES_QUERY).await; - - let type_names: Vec = serde_json::from_str(types_row.get(0).unwrap()).unwrap(); - metadata.comparison_operators = get_comparison_operators(&type_names); - metadata.aggregate_functions = get_aggregate_functions(&type_names); - metadata.stored_procedures = configure_stored_procedures( &mssql_pool, configuration.metadata.stored_procedures.clone(), diff --git a/crates/ndc-sqlserver/src/connector.rs b/crates/ndc-sqlserver/src/connector.rs index f1e23f0e..1f9e1377 100644 --- a/crates/ndc-sqlserver/src/connector.rs +++ b/crates/ndc-sqlserver/src/connector.rs @@ -108,6 +108,18 @@ impl connector::ConnectorSetup for SQLServerSetu configuration::Error::ConnectionPoolError(inner) => { std::io::Error::new(std::io::ErrorKind::Other, inner.to_string()).into() } + configuration::Error::GetConnectionFromPool(inner) => { + std::io::Error::new(std::io::ErrorKind::Other, inner.to_string()).into() + } + configuration::Error::JsonDeserializationError(inner) => connector::ParseError::from( + std::io::Error::new(std::io::ErrorKind::Other, inner.to_string()), + ), + configuration::Error::IntrospectionQueryExecutionError(inner) => { + connector::ParseError::from(std::io::Error::new( + std::io::ErrorKind::Other, + inner.to_string(), + )) + } configuration::Error::StoredProcedureIntrospectionError(inner) => { connector::ParseError::from(std::io::Error::new( std::io::ErrorKind::Other, diff --git a/crates/query-engine/execution/src/query.rs b/crates/query-engine/execution/src/query.rs index 6f349f2e..2544d953 100644 --- a/crates/query-engine/execution/src/query.rs +++ b/crates/query-engine/execution/src/query.rs @@ -71,7 +71,7 @@ async fn execute_queries( } /// Execute the query on one set of variables. -pub(crate) async fn execute_query( +pub async fn execute_query( connection: &mut bb8::PooledConnection<'_, bb8_tiberius::ConnectionManager>, query: &sql::string::SQL, variables: &BTreeMap,