From ba2ab829621340176870c13638f54223d467822c Mon Sep 17 00:00:00 2001 From: Pranshi Date: Mon, 9 Sep 2024 23:31:21 +0530 Subject: [PATCH] update sql to take database name from the user --- crates/configuration/src/config2.sql | 39 ++----- crates/configuration/src/version1.rs | 158 ++++++++------------------- 2 files changed, 52 insertions(+), 145 deletions(-) diff --git a/crates/configuration/src/config2.sql b/crates/configuration/src/config2.sql index fda2a0e6..f2d37ada 100644 --- a/crates/configuration/src/config2.sql +++ b/crates/configuration/src/config2.sql @@ -9,8 +9,8 @@ WITH column_data AS ( JSON_OBJECT('ScalarType', c.data_type) AS type, CASE WHEN c.is_nullable = 'YES' THEN 'nullable' ELSE 'nonNullable' END AS nullable )) AS column_info - FROM chinook_sample.INFORMATION_SCHEMA.TABLES AS t - JOIN chinook_sample.INFORMATION_SCHEMA.COLUMNS AS c + FROM hasura_database_name.INFORMATION_SCHEMA.TABLES AS t + JOIN hasura_database_name.INFORMATION_SCHEMA.COLUMNS AS c ON c.table_catalog = t.table_catalog AND c.table_schema = t.table_schema AND c.table_name = t.table_name @@ -40,16 +40,16 @@ relationship_data AS ( rc.table_name AS foreign_table, json_object(fc.column_name, rc.column_name) as column_mapping )) AS relationship_info - FROM chinook_sample.INFORMATION_SCHEMA.TABLES AS t - JOIN chinook_sample.INFORMATION_SCHEMA.TABLE_CONSTRAINTS as c + FROM hasura_database_name.INFORMATION_SCHEMA.TABLES AS t + JOIN hasura_database_name.INFORMATION_SCHEMA.TABLE_CONSTRAINTS as c ON c.table_catalog = t.table_catalog AND c.table_schema = t.table_schema AND c.table_name = t.table_name - JOIN chinook_sample.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE as rc + JOIN hasura_database_name.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE as rc ON c.constraint_catalog = rc.constraint_catalog AND c.constraint_schema = rc.constraint_schema AND c.constraint_name = rc.constraint_name - JOIN chinook_sample.INFORMATION_SCHEMA.KEY_COLUMN_USAGE as fc ON c.constraint_name = fc.constraint_name + JOIN hasura_database_name.INFORMATION_SCHEMA.KEY_COLUMN_USAGE as fc ON c.constraint_name = fc.constraint_name WHERE t.table_schema = 'chinook_sample' AND c.constraint_type = 'FOREIGN KEY' GROUP BY t.table_name, table_catalog, table_schema, constraint_name, rc.table_name, fc.column_name, rc.column_name ), @@ -74,12 +74,12 @@ unique_constraint_data AS ( t.table_schema, c.constraint_name, TO_JSON_STRING(JSON_ARRAY(cc.column_name)) AS unique_constraint_info - FROM chinook_sample.INFORMATION_SCHEMA.TABLES AS t - JOIN chinook_sample.INFORMATION_SCHEMA.TABLE_CONSTRAINTS as c + FROM hasura_database_name.INFORMATION_SCHEMA.TABLES AS t + JOIN hasura_database_name.INFORMATION_SCHEMA.TABLE_CONSTRAINTS as c ON c.table_catalog = t.table_catalog AND c.table_schema = t.table_schema AND c.table_name = t.table_name - JOIN chinook_sample.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE as cc + JOIN hasura_database_name.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE as cc ON c.constraint_name = cc.constraint_name WHERE t.table_schema = 'chinook_sample' AND c.constraint_type in ('PRIMARY KEY', 'UNIQUE') @@ -120,24 +120,3 @@ SELECT FROM columns_struct LEFT JOIN relationship_struct ON columns_struct.table_name = relationship_struct.table_name LEFT JOIN unique_constraint_struct ON columns_struct.table_name = unique_constraint_struct.table_name --- SELECT --- CONCAT( --- '{', --- '"', columns_struct.table_name, '": {', --- '"schemaName": ', --- '"', CONCAT(columns_struct.table_catalog , '.', columns_struct.table_schema), '", ', --- '"tableName": ' , '"', columns_struct.table_name, '", ' --- '"columns": {', --- columns_struct.columns.columns_json, --- '},', --- '"uniquenessConstraints": {', --- coalesce(unique_constraint_struct.unique_constraint.unique_constraint_json, ""), --- '},', --- '"foreignRelations": {', --- coalesce(relationship_struct.relationships.relationships_json, ""), --- '}' --- '}', --- '}' --- ) AS result --- FROM columns_struct LEFT JOIN relationship_struct ON columns_struct.table_name = relationship_struct.table_name --- LEFT JOIN unique_constraint_struct ON columns_struct.table_name = unique_constraint_struct.table_name \ No newline at end of file diff --git a/crates/configuration/src/version1.rs b/crates/configuration/src/version1.rs index c0dbe819..41afffcf 100644 --- a/crates/configuration/src/version1.rs +++ b/crates/configuration/src/version1.rs @@ -6,7 +6,11 @@ use crate::values::{self, ConnectionUri, PoolSettings, Secret}; use super::error::ParseConfigurationError; use gcp_bigquery_client::model::job_configuration_query::JobConfigurationQuery; +use gcp_bigquery_client::model::query_parameter::QueryParameter; +use gcp_bigquery_client::model::query_parameter_type::QueryParameterType; +use gcp_bigquery_client::model::query_parameter_value::QueryParameterValue; use gcp_bigquery_client::model::query_request::QueryRequest; +use gcp_bigquery_client::project; use ndc_models::{AggregateFunctionName, ComparisonOperatorName, ScalarTypeName, TypeName}; use schemars::{schema, JsonSchema}; use serde::{Deserialize, Serialize}; @@ -51,8 +55,15 @@ const APPROX_NUMERICS: [&str; 2] = ["float", "real"]; const NOT_COUNTABLE: [&str; 3] = ["image", "ntext", "text"]; const NOT_APPROX_COUNTABLE: [&str; 4] = ["image", "sql_variant", "ntext", "text"]; -const TYPES_QUERY: &str = - "select data_type from chinook_sample.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS"; +// const TYPES_QUERY: &str = +// format!("select data_type from {}.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS"); + +// const DATA: &str = "chinook_sample"; + +const TYPES_QUERY1: &str = + "select data_type from @dataset_id.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS"; +const TYPES_QUERY2: &str = + "select data_type from @dataset.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS"; /// Initial configuration, just enough to connect to a database and elaborate a full /// 'Configuration'. @@ -355,9 +366,7 @@ impl ParsedConfiguration { pub async fn configure( args: &ParsedConfiguration, environment: impl Environment, - // configuration_query: &str, ) -> anyhow::Result { - // dbg!(args); let uri = match &args.service_key { ConnectionUri(Secret::Plain(value)) => Cow::Borrowed(value), ConnectionUri(Secret::FromEnvironment { variable }) => { @@ -365,39 +374,34 @@ pub async fn configure( } }; - // .map_err(|e| connector::UpdateConfigurationError::Other(e.into()))?; + let service_account_key = yup_oauth2::parse_service_account_key(uri.as_str()).unwrap(); - // let service_account_key_json = std::env::var("HASURA_BIGQUERY_SERVICE_KEY").unwrap(); - // dbg!(uri.as_ref().as_str()); + let project_id = "hasura-development"; + let dataset_id = "chinook_sample"; - let service_account_key = yup_oauth2::parse_service_account_key(uri.as_str()).unwrap(); + let schema_name = format!("{}.{}", project_id, dataset_id); + let database_name = schema_name.clone(); // Init BigQuery client let bigquery_client = gcp_bigquery_client::Client::from_service_account_key(service_account_key, false) .await .unwrap(); - let query_request = QueryRequest::new(CONFIGURATION_QUERY); - let project_id = "hasura-development"; + + // get scalar_types - let datasets = bigquery_client - .dataset() - .list(project_id, Default::default()) - .await - .unwrap(); - // dbg!(datasets); + let types_query = format!("select data_type from {}.{}.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS", project_id, dataset_id); let types_row = bigquery_client .job() - .query(project_id, QueryRequest::new(TYPES_QUERY)) + .query(project_id, QueryRequest::new(types_query)) .await .unwrap(); - // dbg!(types_row.query_response()); - let a = types_row.query_response().clone(); + let types_query_response = types_row.query_response().clone(); - //TODO(PY): to many unwraps! - let types = a + //TODO(PY): too many unwraps! + let types = types_query_response .rows .as_ref() .unwrap() @@ -419,61 +423,34 @@ pub async fn configure( }) .collect::>(); - let comparison_operators = get_comparison_operators(&types); - // dbg!(comparison_operators); + let scalar_types = get_scalar_types(&types, schema_name); - let aggregate_functions = get_aggregate_functions(&types); - // dbg !(aggregate_functions); + // get tables_info - let dataset_id = "chinook_sample"; + let config_query_string = CONFIGURATION_QUERY.to_string(); - let schema_name = format!("{}.{}", project_id, dataset_id); + let config_query_string_with_database_name: String = config_query_string.replace("hasura_database_name", database_name.as_str()); //TODO(PY): what is a safe name to provide as a variable name? - let scalar_types = get_scalar_types(&types, schema_name); + let tables_query_request = QueryRequest::new(config_query_string_with_database_name); - let rs = bigquery_client + let tables_result = bigquery_client .job() - // .query_all_with_location - // (project_id, - // "EU", - // JobConfigurationQuery { - // query: CONFIGURATION_QUERY, - // query_parameters: None, - // use_legacy_sql: Some(false), - // ..Default::default() - // }, - // Some(2) - // ); - // .collect::, _>>() - .query(project_id, query_request) + .query(project_id, tables_query_request) .await - // .map(|vec_of_vecs| vec_of_vecs.into_iter().flatten().collect()); .unwrap(); - // ; - // dbg!(&rs.query_response()); - - let table_rows = rs.query_response().clone(); - - // let foo : Vec = table_rows.rows.unwrap().into_iter().map(|row| { - // let abc = row.columns.unwrap().into_iter().next().unwrap(); - // let baz = abc.value.unwrap(); - // let foobar : TablesInfo = serde_json::from_value(baz.to_owned()).unwrap(); - // foobar - // }).collect(); + let table_rows = tables_result.query_response().clone(); let mut tables_info = TablesInfo::empty(); - // let table_row = table_rows.rows.unwrap_or_default().into_iter().next().unwrap(); - for row in table_rows.rows.unwrap_or_default() { - let foo = if let Some(columns) = row.columns { - if let Some(abc) = columns.into_iter().next() { - if let Some(baz) = abc.value { - if let serde_json::Value::String(str) = baz { - if let Ok(foobar) = serde_json::from_str::(&str) { - // tables_info.merge(foobar); - Ok(foobar) + let configuration_table_info = if let Some(columns) = row.columns { + if let Some(column) = columns.into_iter().next() { + if let Some(value) = column.value { + if let serde_json::Value::String(str) = value { + if let Ok(table_info_map) = serde_json::from_str::(&str) { + // tables_info.merge(table_info_map); + Ok(table_info_map) } else { Err(format!( "Failed to deserialize TablesInfo from JSON: {}", @@ -481,7 +458,7 @@ pub async fn configure( )) } } else { - Err(format!("Expected a string value, found: {:?}", baz)) + Err(format!("Expected a string value, found: {:?}", value)) } } else { Err("Missing value in columns".to_string()) @@ -492,60 +469,11 @@ pub async fn configure( } else { Err("Empty rows".to_string()) }; - if let Ok(foobar) = foo { - tables_info.merge(foobar); + if let Ok(table_info_map) = configuration_table_info { + tables_info.merge(table_info_map); } } - // dbg!(&tables_info); - - // let table_info: TablesInfo = table_rows.rows.as_ref().unwrap().into_iter().map(|row| { - // serde_json::from_str(row.columns.as_ref().unwrap().into_iter().next().unwrap().value.as_ref().unwrap().to_owned().as_str().unwrap()).unwrap() - // }).collect::>(); - - // let r = rs.query_response().rows.unwrap().get(0).unwrap(); - // dbg!(r); - // dbg!("query done"); - // let mut connection = PgConnection::connect(uri.as_str()) - // .await?; - - // dbg!("pg connection done"); - - // let row = connection // TODO(PY): why is this PG connection - // .fetch_one(CONFIGURATION_QUERY) - // .await?; - // .map_err(|e| connector::UpdateConfigurationError::Other(e.into()))?; - - // let (scalar_types, composite_types) = transitively_occurring_types( - // occurring_scalar_types( - // &tables, - // &args.metadata.native_queries, - // &args.metadata.aggregate_functions, - // ), - // &occurring_composite_types(&tables, &args.metadata.native_queries), - // composite_types, - // ); - // let (scalar_types, composite_types) = async { - // let scalar_types: metadata::ScalarTypes = serde_json::from_value(row.get(1))?; - // let composite_types: metadata::CompositeTypes = serde_json::from_value(row.get(2))?; - - // // We need to specify the concrete return type explicitly so that rustc knows that it can - // // be sent across an async boundary. - // // (last verified with rustc 1.72.1) - // Ok::<_, anyhow::Error>((scalar_types, composite_types)) - // } - // .instrument(info_span!("Decode introspection result")) - // .await?; - - // let tables: metadata::TablesInfo = serde_json::from_value(row.get(0))?; - // .map_err(|e| connector::UpdateConfigurationError::Other(e.into()))?; - - // let aggregate_functions: metadata::AggregateFunctions = serde_json::from_value(row.get(1))?; - // .map_err(|e| connector::UpdateConfigurationError::Other(e.into()))?; - - // let comparison_operators: metadata::ComparisonOperators = serde_json::from_value(row.get(2)) - // .map_err(|e| connector::UpdateConfigurationError::Other(e.into()))?; - Ok(ParsedConfiguration { version: 1, service_key: args.service_key.clone(),