Skip to content

Commit

Permalink
update sql to take database name from the user
Browse files Browse the repository at this point in the history
  • Loading branch information
pranshi06 committed Sep 9, 2024
1 parent 05daa4f commit ba2ab82
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 145 deletions.
39 changes: 9 additions & 30 deletions crates/configuration/src/config2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ WITH column_data AS (
JSON_OBJECT('ScalarType', c.data_type) AS type,
CASE WHEN c.is_nullable = 'YES' THEN 'nullable' ELSE 'nonNullable' END AS nullable
)) AS column_info
FROM chinook_sample.INFORMATION_SCHEMA.TABLES AS t
JOIN chinook_sample.INFORMATION_SCHEMA.COLUMNS AS c
FROM hasura_database_name.INFORMATION_SCHEMA.TABLES AS t
JOIN hasura_database_name.INFORMATION_SCHEMA.COLUMNS AS c
ON c.table_catalog = t.table_catalog
AND c.table_schema = t.table_schema
AND c.table_name = t.table_name
Expand Down Expand Up @@ -40,16 +40,16 @@ relationship_data AS (
rc.table_name AS foreign_table,
json_object(fc.column_name, rc.column_name) as column_mapping
)) AS relationship_info
FROM chinook_sample.INFORMATION_SCHEMA.TABLES AS t
JOIN chinook_sample.INFORMATION_SCHEMA.TABLE_CONSTRAINTS as c
FROM hasura_database_name.INFORMATION_SCHEMA.TABLES AS t
JOIN hasura_database_name.INFORMATION_SCHEMA.TABLE_CONSTRAINTS as c
ON c.table_catalog = t.table_catalog
AND c.table_schema = t.table_schema
AND c.table_name = t.table_name
JOIN chinook_sample.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE as rc
JOIN hasura_database_name.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE as rc
ON c.constraint_catalog = rc.constraint_catalog
AND c.constraint_schema = rc.constraint_schema
AND c.constraint_name = rc.constraint_name
JOIN chinook_sample.INFORMATION_SCHEMA.KEY_COLUMN_USAGE as fc ON c.constraint_name = fc.constraint_name
JOIN hasura_database_name.INFORMATION_SCHEMA.KEY_COLUMN_USAGE as fc ON c.constraint_name = fc.constraint_name
WHERE t.table_schema = 'chinook_sample' AND c.constraint_type = 'FOREIGN KEY'
GROUP BY t.table_name, table_catalog, table_schema, constraint_name, rc.table_name, fc.column_name, rc.column_name
),
Expand All @@ -74,12 +74,12 @@ unique_constraint_data AS (
t.table_schema,
c.constraint_name,
TO_JSON_STRING(JSON_ARRAY(cc.column_name)) AS unique_constraint_info
FROM chinook_sample.INFORMATION_SCHEMA.TABLES AS t
JOIN chinook_sample.INFORMATION_SCHEMA.TABLE_CONSTRAINTS as c
FROM hasura_database_name.INFORMATION_SCHEMA.TABLES AS t
JOIN hasura_database_name.INFORMATION_SCHEMA.TABLE_CONSTRAINTS as c
ON c.table_catalog = t.table_catalog
AND c.table_schema = t.table_schema
AND c.table_name = t.table_name
JOIN chinook_sample.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE as cc
JOIN hasura_database_name.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE as cc
ON c.constraint_name = cc.constraint_name
WHERE t.table_schema = 'chinook_sample'
AND c.constraint_type in ('PRIMARY KEY', 'UNIQUE')
Expand Down Expand Up @@ -120,24 +120,3 @@ SELECT
FROM columns_struct
LEFT JOIN relationship_struct ON columns_struct.table_name = relationship_struct.table_name
LEFT JOIN unique_constraint_struct ON columns_struct.table_name = unique_constraint_struct.table_name
-- SELECT
-- CONCAT(
-- '{',
-- '"', columns_struct.table_name, '": {',
-- '"schemaName": ',
-- '"', CONCAT(columns_struct.table_catalog , '.', columns_struct.table_schema), '", ',
-- '"tableName": ' , '"', columns_struct.table_name, '", '
-- '"columns": {',
-- columns_struct.columns.columns_json,
-- '},',
-- '"uniquenessConstraints": {',
-- coalesce(unique_constraint_struct.unique_constraint.unique_constraint_json, ""),
-- '},',
-- '"foreignRelations": {',
-- coalesce(relationship_struct.relationships.relationships_json, ""),
-- '}'
-- '}',
-- '}'
-- ) AS result
-- FROM columns_struct LEFT JOIN relationship_struct ON columns_struct.table_name = relationship_struct.table_name
-- LEFT JOIN unique_constraint_struct ON columns_struct.table_name = unique_constraint_struct.table_name
158 changes: 43 additions & 115 deletions crates/configuration/src/version1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ use crate::values::{self, ConnectionUri, PoolSettings, Secret};

use super::error::ParseConfigurationError;
use gcp_bigquery_client::model::job_configuration_query::JobConfigurationQuery;
use gcp_bigquery_client::model::query_parameter::QueryParameter;
use gcp_bigquery_client::model::query_parameter_type::QueryParameterType;
use gcp_bigquery_client::model::query_parameter_value::QueryParameterValue;
use gcp_bigquery_client::model::query_request::QueryRequest;
use gcp_bigquery_client::project;
use ndc_models::{AggregateFunctionName, ComparisonOperatorName, ScalarTypeName, TypeName};
use schemars::{schema, JsonSchema};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -51,8 +55,15 @@ const APPROX_NUMERICS: [&str; 2] = ["float", "real"];
const NOT_COUNTABLE: [&str; 3] = ["image", "ntext", "text"];
const NOT_APPROX_COUNTABLE: [&str; 4] = ["image", "sql_variant", "ntext", "text"];

const TYPES_QUERY: &str =
"select data_type from chinook_sample.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS";
// const TYPES_QUERY: &str =
// format!("select data_type from {}.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS");

// const DATA: &str = "chinook_sample";

const TYPES_QUERY1: &str =
"select data_type from @dataset_id.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS";
const TYPES_QUERY2: &str =
"select data_type from @dataset.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS";

/// Initial configuration, just enough to connect to a database and elaborate a full
/// 'Configuration'.
Expand Down Expand Up @@ -355,49 +366,42 @@ impl ParsedConfiguration {
pub async fn configure(
args: &ParsedConfiguration,
environment: impl Environment,
// configuration_query: &str,
) -> anyhow::Result<ParsedConfiguration> {
// dbg!(args);
let uri = match &args.service_key {
ConnectionUri(Secret::Plain(value)) => Cow::Borrowed(value),
ConnectionUri(Secret::FromEnvironment { variable }) => {
Cow::Owned(environment.read(variable)?)
}
};

// .map_err(|e| connector::UpdateConfigurationError::Other(e.into()))?;
let service_account_key = yup_oauth2::parse_service_account_key(uri.as_str()).unwrap();

// let service_account_key_json = std::env::var("HASURA_BIGQUERY_SERVICE_KEY").unwrap();
// dbg!(uri.as_ref().as_str());
let project_id = "hasura-development";
let dataset_id = "chinook_sample";

let service_account_key = yup_oauth2::parse_service_account_key(uri.as_str()).unwrap();
let schema_name = format!("{}.{}", project_id, dataset_id);
let database_name = schema_name.clone();

// Init BigQuery client
let bigquery_client =
gcp_bigquery_client::Client::from_service_account_key(service_account_key, false)
.await
.unwrap();
let query_request = QueryRequest::new(CONFIGURATION_QUERY);
let project_id = "hasura-development";

// get scalar_types

let datasets = bigquery_client
.dataset()
.list(project_id, Default::default())
.await
.unwrap();
// dbg!(datasets);
let types_query = format!("select data_type from {}.{}.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS", project_id, dataset_id);

let types_row = bigquery_client
.job()
.query(project_id, QueryRequest::new(TYPES_QUERY))
.query(project_id, QueryRequest::new(types_query))
.await
.unwrap();

// dbg!(types_row.query_response());
let a = types_row.query_response().clone();
let types_query_response = types_row.query_response().clone();

//TODO(PY): to many unwraps!
let types = a
//TODO(PY): too many unwraps!
let types = types_query_response
.rows
.as_ref()
.unwrap()
Expand All @@ -419,69 +423,42 @@ pub async fn configure(
})
.collect::<Vec<_>>();

let comparison_operators = get_comparison_operators(&types);
// dbg!(comparison_operators);
let scalar_types = get_scalar_types(&types, schema_name);

let aggregate_functions = get_aggregate_functions(&types);
// dbg !(aggregate_functions);
// get tables_info

let dataset_id = "chinook_sample";
let config_query_string = CONFIGURATION_QUERY.to_string();

let schema_name = format!("{}.{}", project_id, dataset_id);
let config_query_string_with_database_name: String = config_query_string.replace("hasura_database_name", database_name.as_str()); //TODO(PY): what is a safe name to provide as a variable name?

let scalar_types = get_scalar_types(&types, schema_name);
let tables_query_request = QueryRequest::new(config_query_string_with_database_name);

let rs = bigquery_client
let tables_result = bigquery_client
.job()
// .query_all_with_location
// (project_id,
// "EU",
// JobConfigurationQuery {
// query: CONFIGURATION_QUERY,
// query_parameters: None,
// use_legacy_sql: Some(false),
// ..Default::default()
// },
// Some(2)
// );
// .collect::<Result<Vec<_>, _>>()
.query(project_id, query_request)
.query(project_id, tables_query_request)
.await
// .map(|vec_of_vecs| vec_of_vecs.into_iter().flatten().collect());
.unwrap();
// ;

// dbg!(&rs.query_response());

let table_rows = rs.query_response().clone();

// let foo : Vec<TablesInfo> = table_rows.rows.unwrap().into_iter().map(|row| {
// let abc = row.columns.unwrap().into_iter().next().unwrap();
// let baz = abc.value.unwrap();
// let foobar : TablesInfo = serde_json::from_value(baz.to_owned()).unwrap();
// foobar
// }).collect();
let table_rows = tables_result.query_response().clone();

let mut tables_info = TablesInfo::empty();

// let table_row = table_rows.rows.unwrap_or_default().into_iter().next().unwrap();

for row in table_rows.rows.unwrap_or_default() {
let foo = if let Some(columns) = row.columns {
if let Some(abc) = columns.into_iter().next() {
if let Some(baz) = abc.value {
if let serde_json::Value::String(str) = baz {
if let Ok(foobar) = serde_json::from_str::<TablesInfo>(&str) {
// tables_info.merge(foobar);
Ok(foobar)
let configuration_table_info = if let Some(columns) = row.columns {
if let Some(column) = columns.into_iter().next() {
if let Some(value) = column.value {
if let serde_json::Value::String(str) = value {
if let Ok(table_info_map) = serde_json::from_str::<TablesInfo>(&str) {
// tables_info.merge(table_info_map);
Ok(table_info_map)
} else {
Err(format!(
"Failed to deserialize TablesInfo from JSON: {}",
str
))
}
} else {
Err(format!("Expected a string value, found: {:?}", baz))
Err(format!("Expected a string value, found: {:?}", value))
}
} else {
Err("Missing value in columns".to_string())
Expand All @@ -492,60 +469,11 @@ pub async fn configure(
} else {
Err("Empty rows".to_string())
};
if let Ok(foobar) = foo {
tables_info.merge(foobar);
if let Ok(table_info_map) = configuration_table_info {
tables_info.merge(table_info_map);
}
}

// dbg!(&tables_info);

// let table_info: TablesInfo = table_rows.rows.as_ref().unwrap().into_iter().map(|row| {
// serde_json::from_str(row.columns.as_ref().unwrap().into_iter().next().unwrap().value.as_ref().unwrap().to_owned().as_str().unwrap()).unwrap()
// }).collect::<Vec<_>>();

// let r = rs.query_response().rows.unwrap().get(0).unwrap();
// dbg!(r);
// dbg!("query done");
// let mut connection = PgConnection::connect(uri.as_str())
// .await?;

// dbg!("pg connection done");

// let row = connection // TODO(PY): why is this PG connection
// .fetch_one(CONFIGURATION_QUERY)
// .await?;
// .map_err(|e| connector::UpdateConfigurationError::Other(e.into()))?;

// let (scalar_types, composite_types) = transitively_occurring_types(
// occurring_scalar_types(
// &tables,
// &args.metadata.native_queries,
// &args.metadata.aggregate_functions,
// ),
// &occurring_composite_types(&tables, &args.metadata.native_queries),
// composite_types,
// );
// let (scalar_types, composite_types) = async {
// let scalar_types: metadata::ScalarTypes = serde_json::from_value(row.get(1))?;
// let composite_types: metadata::CompositeTypes = serde_json::from_value(row.get(2))?;

// // We need to specify the concrete return type explicitly so that rustc knows that it can
// // be sent across an async boundary.
// // (last verified with rustc 1.72.1)
// Ok::<_, anyhow::Error>((scalar_types, composite_types))
// }
// .instrument(info_span!("Decode introspection result"))
// .await?;

// let tables: metadata::TablesInfo = serde_json::from_value(row.get(0))?;
// .map_err(|e| connector::UpdateConfigurationError::Other(e.into()))?;

// let aggregate_functions: metadata::AggregateFunctions = serde_json::from_value(row.get(1))?;
// .map_err(|e| connector::UpdateConfigurationError::Other(e.into()))?;

// let comparison_operators: metadata::ComparisonOperators = serde_json::from_value(row.get(2))
// .map_err(|e| connector::UpdateConfigurationError::Other(e.into()))?;

Ok(ParsedConfiguration {
version: 1,
service_key: args.service_key.clone(),
Expand Down

0 comments on commit ba2ab82

Please sign in to comment.