From 7d05a75075aa92077e1ec50eff1fb4a05f8b3dd3 Mon Sep 17 00:00:00 2001 From: JosepBove Date: Mon, 2 Dec 2024 18:07:18 +0100 Subject: [PATCH 1/4] :sparkles:feat: clickhouse in new --- cli/src/commands/new.rs | 25 ++++++++++++++++++++----- core/src/helpers/file.rs | 2 -- core/src/manifest/storage.rs | 15 +++++++++++++++ 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/cli/src/commands/new.rs b/cli/src/commands/new.rs index 5b32f8dd..211ea779 100644 --- a/cli/src/commands/new.rs +++ b/cli/src/commands/new.rs @@ -13,12 +13,11 @@ use rindexer::{ contract::{Contract, ContractDetails}, core::{Manifest, ProjectType}, network::Network, - storage::{CsvDetails, PostgresDetails, Storage}, + storage::{CsvDetails, PostgresDetails, ClickhouseDetails, Storage}, yaml::{write_manifest, YAML_CONFIG_NAME}, }, write_file, StringOrArray, WriteFileError, }; - use crate::console::{ print_error_message, print_success_message, prompt_for_input, prompt_for_input_list, prompt_for_optional_input, @@ -82,8 +81,8 @@ pub fn handle_new_command( let project_description = prompt_for_optional_input::("Project Description", None); let repository = prompt_for_optional_input::("Repository", None); let storage_choice = prompt_for_input_list( - "What Storages To Enable? (graphql can only be supported if postgres is enabled)", - &["postgres".to_string(), "csv".to_string(), "both".to_string(), "none".to_string()], + "What Storages To Enable? (graphql can only be supported if postgres is enabled) both means postgres and csv", + &["postgres".to_string(), "csv".to_string(), "both".to_string(), "clickhouse".to_string(), "none".to_string()], None, ); let mut postgres_docker_enable = false; @@ -98,7 +97,7 @@ pub fn handle_new_command( let postgres_enabled = storage_choice == "postgres" || storage_choice == "both"; let csv_enabled = storage_choice == "csv" || storage_choice == "both"; - + let clickhouse_enabled = storage_choice == "clickhouse"; let rindexer_yaml_path = project_path.join(YAML_CONFIG_NAME); let rindexer_abis_folder = project_path.join("abis"); @@ -187,6 +186,13 @@ pub fn handle_new_command( } else { None }, + clickhouse: if clickhouse_enabled { + Some(ClickhouseDetails{ + enabled: true, + }) + } else { + None + } }, graphql: None, }; @@ -219,6 +225,15 @@ POSTGRES_PASSWORD=rindexer"#; } } + if clickhouse_enabled { + let env = "CLICKHOUSE_URL=\nCLICKHOUSE_USER=\nCLICKHOUSE_PASSWORD="; + + write_file(&project_path.join(".env"), env).map_err(|e| { + print_error_message(&format!("Failed to write .env file: {}", e)); + e + })?; + } + if is_rust_project { generate_rindexer_rust_project(&project_path); } diff --git a/core/src/helpers/file.rs b/core/src/helpers/file.rs index eccb0b83..4aeffa9f 100644 --- a/core/src/helpers/file.rs +++ b/core/src/helpers/file.rs @@ -34,10 +34,8 @@ pub fn write_file(path: &Path, contents: &str) -> Result<(), WriteFileError> { if let Some(dir) = path.parent() { fs::create_dir_all(dir).map_err(WriteFileError::CouldNotCreateDir)?; } - let cleaned_contents: String = contents.lines().map(|line| line.trim_end()).collect::>().join("\n"); - let mut file = File::create(path).map_err(WriteFileError::CouldNotCreateFile)?; file.write_all(cleaned_contents.as_bytes()).map_err(WriteFileError::CouldNotConvertToBytes)?; Ok(()) diff --git a/core/src/manifest/storage.rs b/core/src/manifest/storage.rs index 7138188e..d69b8819 100644 --- a/core/src/manifest/storage.rs +++ b/core/src/manifest/storage.rs @@ -104,6 +104,11 @@ pub struct CsvDetails { pub disable_create_headers: Option, } +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ClickhouseDetails { + pub enabled: bool +} + #[derive(Debug, Serialize, Deserialize, Default, Clone)] pub struct Storage { #[serde(default, skip_serializing_if = "Option::is_none")] @@ -111,6 +116,9 @@ pub struct Storage { #[serde(default, skip_serializing_if = "Option::is_none")] pub csv: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub clickhouse: Option } #[derive(thiserror::Error, Debug)] @@ -174,6 +182,13 @@ impl Storage { .map_or(false, |details| details.disable_create_headers.unwrap_or_default()) } + pub fn clickhouse_enabled(&self) -> bool { + match &self.clickhouse { + Some(details) => details.enabled, + None => false, + } + } + pub async fn create_relationships_and_indexes( &self, project_path: &Path, From a735a4f82efbd8f5be3b8f7283debd6b86b1cb23 Mon Sep 17 00:00:00 2001 From: JosepBove Date: Mon, 2 Dec 2024 21:24:22 +0100 Subject: [PATCH 2/4] :sparkles:feat: start experimenting --- cli/src/commands/new.rs | 2 +- core/Cargo.toml | 2 + core/src/api/graphql.rs | 3 +- core/src/database/clickhouse/client.rs | 47 ++++++++++++++ core/src/database/clickhouse/generate.rs | 79 ++++++++++++++++++++++++ core/src/database/clickhouse/mod.rs | 3 + core/src/database/clickhouse/setup.rs | 31 ++++++++++ core/src/database/common_sql/generate.rs | 18 ++++++ core/src/database/common_sql/mod.rs | 1 + core/src/database/mod.rs | 2 + core/src/database/postgres/client.rs | 4 +- core/src/database/postgres/generate.rs | 19 +----- core/src/generator/events_bindings.rs | 3 +- core/src/indexer/no_code.rs | 13 +++- 14 files changed, 203 insertions(+), 24 deletions(-) create mode 100644 core/src/database/clickhouse/client.rs create mode 100644 core/src/database/clickhouse/generate.rs create mode 100644 core/src/database/clickhouse/mod.rs create mode 100644 core/src/database/clickhouse/setup.rs create mode 100644 core/src/database/common_sql/generate.rs create mode 100644 core/src/database/common_sql/mod.rs diff --git a/cli/src/commands/new.rs b/cli/src/commands/new.rs index 211ea779..71f8fe33 100644 --- a/cli/src/commands/new.rs +++ b/cli/src/commands/new.rs @@ -226,7 +226,7 @@ POSTGRES_PASSWORD=rindexer"#; } if clickhouse_enabled { - let env = "CLICKHOUSE_URL=\nCLICKHOUSE_USER=\nCLICKHOUSE_PASSWORD="; + let env = "CLICKHOUSE_URL=\nCLICKHOUSE_USER=\nCLICKHOUSE_PASSWORD=\nCLICKHOUSE_DATABASE="; write_file(&project_path.join(".env"), env).map_err(|e| { print_error_message(&format!("Failed to write .env file: {}", e)); diff --git a/core/Cargo.toml b/core/Cargo.toml index a4616dd5..e45c5ac9 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -10,6 +10,7 @@ include = ["src/**", "resources/**", "Cargo.toml", "build.rs"] [dev-dependencies] tempfile = "3.3" mockito = "0.30" +clickhouse = { version = "0.12.2", features = ["test-util"] } [dependencies] ethers = { version = "2.0", features = ["rustls", "openssl"] } @@ -58,6 +59,7 @@ deadpool-lapin = "0.12" teloxide = "0.12" serenity = { version = "0.12", features = ["client", "framework"] } once_cell = "1.19.0" +clickhouse = "0.12.2" # build jemallocator = { version = "0.5.0", optional = true } diff --git a/core/src/api/graphql.rs b/core/src/api/graphql.rs index 8bc02ace..3ae707fd 100644 --- a/core/src/api/graphql.rs +++ b/core/src/api/graphql.rs @@ -16,12 +16,13 @@ use tracing::{error, info}; use crate::{ database::postgres::{ - client::connection_string, generate::generate_indexer_contract_schema_name, + client::connection_string, }, helpers::{kill_process_on_port, set_thread_no_logging}, indexer::Indexer, manifest::graphql::GraphQLSettings, }; +use crate::database::common_sql::generate::generate_indexer_contract_schema_name; pub struct GraphqlOverrideSettings { pub enabled: bool, diff --git a/core/src/database/clickhouse/client.rs b/core/src/database/clickhouse/client.rs new file mode 100644 index 00000000..70b9e6ed --- /dev/null +++ b/core/src/database/clickhouse/client.rs @@ -0,0 +1,47 @@ +use std::env; +use clickhouse::Client; +use dotenv::dotenv; + +pub struct ClickhouseConnection { + url: String, + user: String, + password: String, + database: String, +} + +pub fn clickhouse_connection() -> Result { + dotenv().ok(); + + let connection = ClickhouseConnection { + url: env::var("CLICKHOUSE_URL")?, + user: env::var("CLICKHOUSE_USER")?, + password: env::var("CLICKHOUSE_PASSWORD")?, + database: env::var("CLICKHOUSE_DATABASE=")?, + }; + + Ok(connection) +} + +#[derive(thiserror::Error, Debug)] +pub enum ClickhouseConnectionError { + #[error("The clickhouse env vars are wrong please check your environment: {0}")] + ClickhouseConnectionConfigWrong(#[from] env::VarError), +} + +pub struct ClickhouseClient { + conn: Client +} + +impl ClickhouseClient { + pub async fn new() -> Result { + let connection = clickhouse_connection()?; + + let client = Client::default() + .with_url(connection.url) + .with_user(connection.user) + .with_password(connection.password) + .with_database(connection.database); + + Ok(ClickhouseClient { conn: client }) + } +} \ No newline at end of file diff --git a/core/src/database/clickhouse/generate.rs b/core/src/database/clickhouse/generate.rs new file mode 100644 index 00000000..c054967a --- /dev/null +++ b/core/src/database/clickhouse/generate.rs @@ -0,0 +1,79 @@ +use std::path::Path; + +use tracing::{error, info}; + +use crate::{ + abi::{ABIInput, ABIItem, EventInfo, GenerateAbiPropertiesType, ParamTypeError, ReadAbiError}, + helpers::camel_to_snake, + indexer::Indexer, + manifest::contract::Contract, + types::code::Code, +}; + +#[derive(thiserror::Error, Debug)] +pub enum GenerateTablesForIndexerClickhouseError { + #[error("{0}")] + ReadAbiError(#[from] ReadAbiError), + + #[error("{0}")] + ParamTypeError(#[from] ParamTypeError), +} +pub fn generate_tables_for_indexer_clickhouse( + project_path: &Path, + indexer: &Indexer +) -> Result { + /* + let mut sql = "CREATE SCHEMA IF NOT EXISTS rindexer_internal;".to_string(); + + for contract in &indexer.contracts { + let contract_name = contract.before_modify_name_if_filter_readonly(); + let abi_items = ABIItem::read_abi_items(project_path, contract)?; + let event_names = ABIItem::extract_event_names_and_signatures_from_abi(abi_items)?; + let schema_name = generate_indexer_contract_schema_name(&indexer.name, &contract_name); + let networks: Vec<&str> = contract.details.iter().map(|d| d.network.as_str()).collect(); + + if !disable_event_tables { + sql.push_str(format!("CREATE SCHEMA IF NOT EXISTS {};", schema_name).as_str()); + info!("Creating schema if not exists: {}", schema_name); + + let event_matching_name_on_other = find_clashing_event_names( + project_path, + contract, + &indexer.contracts, + &event_names, + )?; + + sql.push_str(&generate_event_table_sql_with_comments( + &event_names, + &contract.name, + &schema_name, + event_matching_name_on_other, + )); + } + // we still need to create the internal tables for the contract + sql.push_str(&generate_internal_event_table_sql(&event_names, &schema_name, networks)); + } + + sql.push_str(&format!( + r#" + CREATE TABLE IF NOT EXISTS rindexer_internal.{indexer_name}_last_known_relationship_dropping_sql ( + key INT PRIMARY KEY, + value TEXT NOT NULL + ); + "#, + indexer_name = camel_to_snake(&indexer.name) + )); + + sql.push_str(&format!( + r#" + CREATE TABLE IF NOT EXISTS rindexer_internal.{indexer_name}_last_known_indexes_dropping_sql ( + key INT PRIMARY KEY, + value TEXT NOT NULL + ); + "#, + indexer_name = camel_to_snake(&indexer.name) + )); + */ + Ok(Code::new("".parse().unwrap())) + +} \ No newline at end of file diff --git a/core/src/database/clickhouse/mod.rs b/core/src/database/clickhouse/mod.rs new file mode 100644 index 00000000..f7018cac --- /dev/null +++ b/core/src/database/clickhouse/mod.rs @@ -0,0 +1,3 @@ +pub mod client; +pub mod setup; +pub mod generate; \ No newline at end of file diff --git a/core/src/database/clickhouse/setup.rs b/core/src/database/clickhouse/setup.rs new file mode 100644 index 00000000..44b7b360 --- /dev/null +++ b/core/src/database/clickhouse/setup.rs @@ -0,0 +1,31 @@ +use std::path::Path; +use futures::TryFutureExt; +use tracing::{debug, info}; +use crate::database::clickhouse::client::{ClickhouseClient, ClickhouseConnectionError}; +use crate::manifest::core::Manifest; + +#[derive(thiserror::Error, Debug)] +pub enum SetupClickhouseError { + #[error("{0}")] + ClickhouseConnection(#[from] ClickhouseConnectionError), +} + +pub async fn setup_clickhouse( + project_path: &Path, + manifest: &Manifest, +) -> Result { + info!("Setting up clickhouse"); + let client = ClickhouseClient::new().await?; + + /* let sql = generate_tables_for_indexer_clickhouse( + project_path, + &manifest.to_indexer() + )?; + + debug!("{}", sql); + client.batch_execute(sql.as_str()).await?; +*/ + info!("Created tables for {}", manifest.name); + + Ok(client) +} diff --git a/core/src/database/common_sql/generate.rs b/core/src/database/common_sql/generate.rs new file mode 100644 index 00000000..45788132 --- /dev/null +++ b/core/src/database/common_sql/generate.rs @@ -0,0 +1,18 @@ +use crate::helpers::camel_to_snake; + +pub fn generate_event_table_full_name( + indexer_name: &str, + contract_name: &str, + event_name: &str, +) -> String { + let schema_name = generate_indexer_contract_schema_name(indexer_name, contract_name); + format!("{}.{}", schema_name, camel_to_snake(event_name)) +} + +pub fn generate_event_table_columns_names_sql(column_names: &[String]) -> String { + column_names.iter().map(|name| format!("\"{}\"", name)).collect::>().join(", ") +} + +pub fn generate_indexer_contract_schema_name(indexer_name: &str, contract_name: &str) -> String { + format!("{}_{}", camel_to_snake(indexer_name), camel_to_snake(contract_name)) +} diff --git a/core/src/database/common_sql/mod.rs b/core/src/database/common_sql/mod.rs new file mode 100644 index 00000000..118c66d7 --- /dev/null +++ b/core/src/database/common_sql/mod.rs @@ -0,0 +1 @@ +pub mod generate; \ No newline at end of file diff --git a/core/src/database/mod.rs b/core/src/database/mod.rs index 26e9103c..7a927ecb 100644 --- a/core/src/database/mod.rs +++ b/core/src/database/mod.rs @@ -1 +1,3 @@ +pub mod common_sql; pub mod postgres; +pub mod clickhouse; \ No newline at end of file diff --git a/core/src/database/postgres/client.rs b/core/src/database/postgres/client.rs index aaf4985d..78952335 100644 --- a/core/src/database/postgres/client.rs +++ b/core/src/database/postgres/client.rs @@ -14,9 +14,9 @@ use tokio_postgres::{ Statement, ToStatement, Transaction as PgTransaction, }; use tracing::{debug, error}; - +use crate::database::common_sql::generate::generate_event_table_columns_names_sql; use crate::database::postgres::{ - generate::generate_event_table_columns_names_sql, sql_type_wrapper::EthereumSqlTypeWrapper, + sql_type_wrapper::EthereumSqlTypeWrapper, }; pub fn connection_string() -> Result { diff --git a/core/src/database/postgres/generate.rs b/core/src/database/postgres/generate.rs index b8929fc5..9e5c8b0b 100644 --- a/core/src/database/postgres/generate.rs +++ b/core/src/database/postgres/generate.rs @@ -9,6 +9,7 @@ use crate::{ manifest::contract::Contract, types::code::Code, }; +use crate::database::common_sql::generate::generate_indexer_contract_schema_name; fn generate_columns(inputs: &[ABIInput], property_type: &GenerateAbiPropertiesType) -> Vec { ABIInput::generate_abi_name_properties(inputs, property_type, None) @@ -214,24 +215,6 @@ pub fn generate_tables_for_indexer_sql( Ok(Code::new(sql)) } - -pub fn generate_event_table_full_name( - indexer_name: &str, - contract_name: &str, - event_name: &str, -) -> String { - let schema_name = generate_indexer_contract_schema_name(indexer_name, contract_name); - format!("{}.{}", schema_name, camel_to_snake(event_name)) -} - -pub fn generate_event_table_columns_names_sql(column_names: &[String]) -> String { - column_names.iter().map(|name| format!("\"{}\"", name)).collect::>().join(", ") -} - -pub fn generate_indexer_contract_schema_name(indexer_name: &str, contract_name: &str) -> String { - format!("{}_{}", camel_to_snake(indexer_name), camel_to_snake(contract_name)) -} - pub fn drop_tables_for_indexer_sql(project_path: &Path, indexer: &Indexer) -> Code { let mut sql = format!( "DROP TABLE IF EXISTS rindexer_internal.{}_last_known_indexes_dropping_sql CASCADE;", diff --git a/core/src/generator/events_bindings.rs b/core/src/generator/events_bindings.rs index a0fb2073..6fb35c96 100644 --- a/core/src/generator/events_bindings.rs +++ b/core/src/generator/events_bindings.rs @@ -9,7 +9,7 @@ use crate::{ ParamTypeError, ReadAbiError, }, database::postgres::generate::{ - generate_column_names_only_with_base_properties, generate_event_table_full_name, + generate_column_names_only_with_base_properties, }, helpers::{camel_to_snake, camel_to_snake_advanced, to_pascal_case}, manifest::{ @@ -18,6 +18,7 @@ use crate::{ }, types::code::Code, }; +use crate::database::common_sql::generate::generate_event_table_full_name; pub fn abigen_contract_name(contract: &Contract) -> String { format!("Rindexer{}Gen", contract.name) diff --git a/core/src/indexer/no_code.rs b/core/src/indexer/no_code.rs index dae75a91..92606095 100644 --- a/core/src/indexer/no_code.rs +++ b/core/src/indexer/no_code.rs @@ -12,7 +12,7 @@ use crate::{ database::postgres::{ client::PostgresClient, generate::{ - generate_column_names_only_with_base_properties, generate_event_table_full_name, + generate_column_names_only_with_base_properties, }, setup::{setup_postgres, SetupPostgresError}, sql_type_wrapper::{ @@ -40,6 +40,9 @@ use crate::{ streams::StreamsClients, AsyncCsvAppender, FutureExt, IndexingDetails, StartDetails, StartNoCodeDetails, }; +use crate::database::clickhouse::client::ClickhouseClient; +use crate::database::clickhouse::setup::{setup_clickhouse, SetupClickhouseError}; +use crate::database::common_sql::generate::generate_event_table_full_name; #[derive(thiserror::Error, Debug)] pub enum SetupNoCodeError { @@ -52,6 +55,9 @@ pub enum SetupNoCodeError { #[error("Could not setup postgres: {0}")] SetupPostgresError(#[from] SetupPostgresError), + #[error("Could not setup clickhouse: {0}")] + SetupClickhouseError(#[from] SetupClickhouseError), + #[error("{0}")] RetryClientError(#[from] RetryClientError), @@ -81,6 +87,11 @@ pub async fn setup_no_code( postgres = Some(Arc::new(setup_postgres(project_path, &manifest).await?)); } + let mut clickhouse: Option> = None; + if manifest.storage.clickhouse_enabled() { + clickhouse = Some(Arc::new(setup_clickhouse(project_path, &manifest).await?)); + } + if !details.indexing_details.enabled { return Ok(StartDetails { manifest_path: details.manifest_path, From 3f8339543b73b3b647f8285fa0bd49a80a0f7e19 Mon Sep 17 00:00:00 2001 From: JosepBove Date: Mon, 2 Dec 2024 22:20:17 +0100 Subject: [PATCH 3/4] :sparkles:feat: tls support --- core/Cargo.toml | 2 +- core/src/database/clickhouse/client.rs | 8 +++----- core/src/database/clickhouse/setup.rs | 1 - 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index e45c5ac9..29393861 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -59,7 +59,7 @@ deadpool-lapin = "0.12" teloxide = "0.12" serenity = { version = "0.12", features = ["client", "framework"] } once_cell = "1.19.0" -clickhouse = "0.12.2" +clickhouse = { version = "0.12.2", features = ["native-tls"] } # build jemallocator = { version = "0.5.0", optional = true } diff --git a/core/src/database/clickhouse/client.rs b/core/src/database/clickhouse/client.rs index 70b9e6ed..e337ca16 100644 --- a/core/src/database/clickhouse/client.rs +++ b/core/src/database/clickhouse/client.rs @@ -6,7 +6,7 @@ pub struct ClickhouseConnection { url: String, user: String, password: String, - database: String, + //database: String, } pub fn clickhouse_connection() -> Result { @@ -15,8 +15,7 @@ pub fn clickhouse_connection() -> Result { let connection = ClickhouseConnection { url: env::var("CLICKHOUSE_URL")?, user: env::var("CLICKHOUSE_USER")?, - password: env::var("CLICKHOUSE_PASSWORD")?, - database: env::var("CLICKHOUSE_DATABASE=")?, + password: env::var("CLICKHOUSE_PASSWORD")? }; Ok(connection) @@ -39,8 +38,7 @@ impl ClickhouseClient { let client = Client::default() .with_url(connection.url) .with_user(connection.user) - .with_password(connection.password) - .with_database(connection.database); + .with_password(connection.password); Ok(ClickhouseClient { conn: client }) } diff --git a/core/src/database/clickhouse/setup.rs b/core/src/database/clickhouse/setup.rs index 44b7b360..7f84d51d 100644 --- a/core/src/database/clickhouse/setup.rs +++ b/core/src/database/clickhouse/setup.rs @@ -16,7 +16,6 @@ pub async fn setup_clickhouse( ) -> Result { info!("Setting up clickhouse"); let client = ClickhouseClient::new().await?; - /* let sql = generate_tables_for_indexer_clickhouse( project_path, &manifest.to_indexer() From ea3d95a9952a834c32e0fadfb780d9853cd22663 Mon Sep 17 00:00:00 2001 From: JosepBove Date: Wed, 4 Dec 2024 14:45:25 +0100 Subject: [PATCH 4/4] :sparkles:feat: clickhouse callback --- cli/src/commands/new.rs | 2 +- core/src/abi.rs | 16 ++ core/src/database/clickhouse/client.rs | 49 ++++- core/src/database/clickhouse/generate.rs | 196 ++++++++++++++++-- core/src/database/clickhouse/setup.rs | 28 ++- core/src/database/common_sql/generate.rs | 185 +++++++++++++++++ core/src/database/postgres/generate.rs | 175 ---------------- core/src/database/postgres/setup.rs | 2 +- .../src/database/postgres/sql_type_wrapper.rs | 168 +++++++++++++++ core/src/indexer/no_code.rs | 47 +++-- core/src/lib.rs | 2 + 11 files changed, 646 insertions(+), 224 deletions(-) diff --git a/cli/src/commands/new.rs b/cli/src/commands/new.rs index 71f8fe33..211ea779 100644 --- a/cli/src/commands/new.rs +++ b/cli/src/commands/new.rs @@ -226,7 +226,7 @@ POSTGRES_PASSWORD=rindexer"#; } if clickhouse_enabled { - let env = "CLICKHOUSE_URL=\nCLICKHOUSE_USER=\nCLICKHOUSE_PASSWORD=\nCLICKHOUSE_DATABASE="; + let env = "CLICKHOUSE_URL=\nCLICKHOUSE_USER=\nCLICKHOUSE_PASSWORD="; write_file(&project_path.join(".env"), env).map_err(|e| { print_error_message(&format!("Failed to write .env file: {}", e)); diff --git a/core/src/abi.rs b/core/src/abi.rs index fa4b5394..74038940 100644 --- a/core/src/abi.rs +++ b/core/src/abi.rs @@ -15,6 +15,7 @@ use crate::{ helpers::camel_to_snake, manifest::contract::{Contract, ParseAbiError}, }; +use crate::database::clickhouse::generate::solidity_type_to_clickhouse_type; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ABIInput { @@ -42,6 +43,7 @@ pub enum GenerateAbiPropertiesType { PostgresColumnsNamesOnly, CsvHeaderNames, Object, + ClickhouseWithDataTypes } #[derive(Debug)] @@ -136,6 +138,20 @@ impl ABIInput { camel_to_snake(&input.name), ); + vec![GenerateAbiNamePropertiesResult::new( + value, + &input.name, + &input.type_, + )] + } + GenerateAbiPropertiesType::ClickhouseWithDataTypes => { + let value = format!( + "\"{}{}\" {}", + prefix.map_or_else(|| "".to_string(), |p| format!("{}_", p)), + camel_to_snake(&input.name), + solidity_type_to_clickhouse_type(&input.type_) + ); + vec![GenerateAbiNamePropertiesResult::new( value, &input.name, diff --git a/core/src/database/clickhouse/client.rs b/core/src/database/clickhouse/client.rs index e337ca16..7da973db 100644 --- a/core/src/database/clickhouse/client.rs +++ b/core/src/database/clickhouse/client.rs @@ -1,12 +1,14 @@ use std::env; +use bb8::RunError; use clickhouse::Client; use dotenv::dotenv; +use crate::database::postgres::client::PostgresError; +use crate::EthereumSqlTypeWrapper; pub struct ClickhouseConnection { url: String, user: String, password: String, - //database: String, } pub fn clickhouse_connection() -> Result { @@ -27,6 +29,12 @@ pub enum ClickhouseConnectionError { ClickhouseConnectionConfigWrong(#[from] env::VarError), } +#[derive(thiserror::Error, Debug)] +pub enum ClickhouseError{ + #[error("ClickhouseError {0}")] + ClickhouseError(String), +} + pub struct ClickhouseClient { conn: Client } @@ -42,4 +50,43 @@ impl ClickhouseClient { Ok(ClickhouseClient { conn: client }) } + + pub async fn execute(&self, sql: &str) -> Result<(), ClickhouseError> { + self.conn.query(sql).execute().await.map_err(|e| ClickhouseError::ClickhouseError(e.to_string())) + } + pub async fn execute_batch(&self, sql: &str) -> Result<(), ClickhouseError> { + let statements: Vec<&str> = sql.split(';') + .map(str::trim) + .filter(|s| !s.is_empty()) // Remove empty statements + .collect(); + + for statement in statements { + self.execute(statement).await?; + } + + Ok(()) + } + pub async fn bulk_insert<'a>( + &self, + table_name: &str, + column_names: &[String], + bulk_data: &'a [Vec], + ) -> Result { + // Generate the base INSERT query + let column_names_str = column_names.join(", "); + let query = format!("INSERT INTO {} ({}) VALUES", table_name, column_names_str); + + // Serialize data for ClickHouse + let mut values = Vec::new(); + for row in bulk_data.iter() { + let row_values: Vec = row.iter().map(|value| value.to_clickhouse_value()).collect(); + values.push(format!("({})", row_values.join(", "))); + } + + let full_query = format!("{} {}", query, values.join(", ")); + + self.execute(&full_query).await?; + + Ok(bulk_data.len() as u64) + } } \ No newline at end of file diff --git a/core/src/database/clickhouse/generate.rs b/core/src/database/clickhouse/generate.rs index c054967a..b7dba355 100644 --- a/core/src/database/clickhouse/generate.rs +++ b/core/src/database/clickhouse/generate.rs @@ -1,5 +1,5 @@ use std::path::Path; - +use async_std::prelude::StreamExt; use tracing::{error, info}; use crate::{ @@ -9,6 +9,7 @@ use crate::{ manifest::contract::Contract, types::code::Code, }; +use crate::database::common_sql::generate::{generate_indexer_contract_schema_name, GenerateTablesForIndexerSqlError}; #[derive(thiserror::Error, Debug)] pub enum GenerateTablesForIndexerClickhouseError { @@ -20,10 +21,10 @@ pub enum GenerateTablesForIndexerClickhouseError { } pub fn generate_tables_for_indexer_clickhouse( project_path: &Path, - indexer: &Indexer -) -> Result { - /* - let mut sql = "CREATE SCHEMA IF NOT EXISTS rindexer_internal;".to_string(); + indexer: &Indexer, + disable_event_tables: bool, +) -> Result { + let mut sql = "CREATE DATABASE IF NOT EXISTS rindexer_internal;".to_string(); for contract in &indexer.contracts { let contract_name = contract.before_modify_name_if_filter_readonly(); @@ -33,17 +34,17 @@ pub fn generate_tables_for_indexer_clickhouse( let networks: Vec<&str> = contract.details.iter().map(|d| d.network.as_str()).collect(); if !disable_event_tables { - sql.push_str(format!("CREATE SCHEMA IF NOT EXISTS {};", schema_name).as_str()); + sql.push_str(format!("CREATE DATABASE IF NOT EXISTS {};", schema_name).as_str()); info!("Creating schema if not exists: {}", schema_name); - let event_matching_name_on_other = find_clashing_event_names( + let event_matching_name_on_other =find_clashing_event_names( project_path, contract, &indexer.contracts, &event_names, )?; - sql.push_str(&generate_event_table_sql_with_comments( + sql.push_str(&generate_event_table_clickhouse( &event_names, &contract.name, &schema_name, @@ -51,29 +52,184 @@ pub fn generate_tables_for_indexer_clickhouse( )); } // we still need to create the internal tables for the contract - sql.push_str(&generate_internal_event_table_sql(&event_names, &schema_name, networks)); + sql.push_str(&generate_internal_event_table_clickhouse(&event_names, &schema_name, networks)); } sql.push_str(&format!( r#" - CREATE TABLE IF NOT EXISTS rindexer_internal.{indexer_name}_last_known_relationship_dropping_sql ( - key INT PRIMARY KEY, - value TEXT NOT NULL - ); + CREATE TABLE IF NOT EXISTS rindexer_internal.{indexer_name}_last_known_indexes_dropping_sql + ( + key Int32, + value String + ) + ENGINE = MergeTree + ORDER BY key; "#, indexer_name = camel_to_snake(&indexer.name) )); sql.push_str(&format!( r#" - CREATE TABLE IF NOT EXISTS rindexer_internal.{indexer_name}_last_known_indexes_dropping_sql ( - key INT PRIMARY KEY, - value TEXT NOT NULL - ); + CREATE TABLE IF NOT EXISTS rindexer_internal.{indexer_name}_last_known_indexes_dropping_sql + ( + key Int32, + value String + ) + ENGINE = MergeTree + ORDER BY key; "#, indexer_name = camel_to_snake(&indexer.name) )); - */ - Ok(Code::new("".parse().unwrap())) -} \ No newline at end of file + Ok(Code::new(sql)) +} + +fn find_clashing_event_names( + project_path: &Path, + current_contract: &Contract, + other_contracts: &[Contract], + current_event_names: &[EventInfo], +) -> Result, GenerateTablesForIndexerSqlError> { + let mut clashing_events = Vec::new(); + + for other_contract in other_contracts { + if other_contract.name == current_contract.name { + continue; + } + + let other_abi_items = ABIItem::read_abi_items(project_path, other_contract)?; + let other_event_names = + ABIItem::extract_event_names_and_signatures_from_abi(other_abi_items)?; + + for event_name in current_event_names { + if other_event_names.iter().any(|e| e.name == event_name.name) && + !clashing_events.contains(&event_name.name) + { + clashing_events.push(event_name.name.clone()); + } + } + } + + Ok(clashing_events) +} + +fn generate_event_table_clickhouse( + abi_inputs: &[EventInfo], + contract_name: &str, + schema_name: &str, + apply_full_name_comment_for_events: Vec, +) -> String { + abi_inputs + .iter() + .map(|event_info| { + let table_name = format!("{}.{}", schema_name, camel_to_snake(&event_info.name)); + info!("Creating table if not exists: {}", table_name); + let event_columns = if event_info.inputs.is_empty() { + "".to_string() + } else { + generate_columns_with_data_types(&event_info.inputs).join(", ") + "," + }; + + let create_table_sql = format!( + "CREATE TABLE IF NOT EXISTS {} (\ + rindexer_id UInt64 NOT NULL, \ + contract_address FixedString(66) NOT NULL, \ + {} \ + tx_hash FixedString(66) NOT NULL, \ + block_number Float64 NOT NULL, \ + block_hash FixedString(66) NOT NULL, \ + network String NOT NULL, \ + tx_index Float64 NOT NULL, \ + log_index String NOT NULL\ + )\ + ENGINE = MergeTree + ORDER BY rindexer_id;", + table_name, event_columns + ); + + return create_table_sql; + + }) + .collect::>() + .join("\n") +} + +fn generate_internal_event_table_clickhouse( + abi_inputs: &[EventInfo], + schema_name: &str, + networks: Vec<&str>, +) -> String { + abi_inputs.iter().map(|event_info| { + let table_name = format!( + "rindexer_internal.{}_{}", + schema_name, + camel_to_snake(&event_info.name) + ); + + let create_table_query = format!( + r#"CREATE TABLE IF NOT EXISTS {} ("network" String NOT NULL, "last_synced_block" Float64 NOT NULL) ENGINE = MergeTree ORDER BY network;;"#, + table_name + ); + + let insert_queries = networks.iter().map(|network| { + + format!( + r#"INSERT INTO {} ("network", "last_synced_block") VALUES ('{}', 0);"#, + table_name, + network + ) + }).collect::>().join("\n"); + + format!("{}", create_table_query) + }).collect::>().join("\n") +} + +fn generate_columns(inputs: &[ABIInput], property_type: &GenerateAbiPropertiesType) -> Vec { + ABIInput::generate_abi_name_properties(inputs, property_type, None) + .into_iter() + .map(|m| m.value) + .collect() +} + +pub fn generate_columns_with_data_types(inputs: &[ABIInput]) -> Vec { + generate_columns(inputs, &GenerateAbiPropertiesType::ClickhouseWithDataTypes) +} +#[allow(clippy::manual_strip)] +pub fn solidity_type_to_clickhouse_type(abi_type: &str) -> String { + let is_array = abi_type.ends_with("[]"); + let base_type = abi_type.trim_end_matches("[]"); + + let sql_type = match base_type { + "address" => "FixedString(42)", // Use FixedString for fixed-size strings + "bool" => "UInt8", // Use UInt8 to represent booleans (0 or 1) + "string" => "String", // Use String for variable-length text + t if t.starts_with("bytes") => "String", // Use String for binary data + t if t.starts_with("int") || t.starts_with("uint") => { + // Handling fixed-size integers (intN and uintN where N can be 8 to 256 in steps of 8) + let (prefix, size): (&str, usize) = if t.starts_with("int") { + ("int", t[3..].parse().expect("Invalid intN type")) + } else { + ("uint", t[4..].parse().expect("Invalid uintN type")) + }; + + match size { + 8 => "Int8", + 16 => "Int16", + 32 => "Int32", + 64 => "Int64", + 128 | 256 => "String", // Use String for very large integers as ClickHouse doesn't support 128/256-bit integers + _ => panic!("Unsupported {}N size: {}", prefix, size), + } + } + _ => panic!("Unsupported type: {}", base_type), + }; + + // Return the SQL type, appending array brackets if necessary + if is_array { + // ClickHouse does not have native array types with specific sizes like PostgreSQL + // Represent arrays as Array(T) where T is the base type + format!("Array({})", sql_type) + } else { + sql_type.to_string() + } +} diff --git a/core/src/database/clickhouse/setup.rs b/core/src/database/clickhouse/setup.rs index 7f84d51d..5ac2e354 100644 --- a/core/src/database/clickhouse/setup.rs +++ b/core/src/database/clickhouse/setup.rs @@ -1,13 +1,20 @@ use std::path::Path; use futures::TryFutureExt; +use regex::Regex; use tracing::{debug, info}; -use crate::database::clickhouse::client::{ClickhouseClient, ClickhouseConnectionError}; +use crate::database::clickhouse::client::{ClickhouseClient, ClickhouseConnectionError, ClickhouseError}; +use crate::database::clickhouse::generate::generate_tables_for_indexer_clickhouse; +use crate::database::common_sql::generate::GenerateTablesForIndexerSqlError; use crate::manifest::core::Manifest; #[derive(thiserror::Error, Debug)] pub enum SetupClickhouseError { - #[error("{0}")] - ClickhouseConnection(#[from] ClickhouseConnectionError), + #[error("Clickhouse connection error {0}")] + ClickhouseConnectionError(#[from] ClickhouseConnectionError), + #[error("Failed to generate tables for indexer: {0}")] + ClickhouseTableGenerationError(#[from] GenerateTablesForIndexerSqlError), + #[error("Clickhouse execution error {0}")] + ClickhouseExecutionError(#[from] ClickhouseError) } pub async fn setup_clickhouse( @@ -15,15 +22,16 @@ pub async fn setup_clickhouse( manifest: &Manifest, ) -> Result { info!("Setting up clickhouse"); - let client = ClickhouseClient::new().await?; - /* let sql = generate_tables_for_indexer_clickhouse( + + let client = ClickhouseClient::new().await.map_err(SetupClickhouseError::ClickhouseConnectionError)?; + + let sql = generate_tables_for_indexer_clickhouse( project_path, - &manifest.to_indexer() - )?; + &manifest.to_indexer(), + false + ).map_err(SetupClickhouseError::ClickhouseTableGenerationError)?; - debug!("{}", sql); - client.batch_execute(sql.as_str()).await?; -*/ + client.execute_batch(sql.as_str()).await?; info!("Created tables for {}", manifest.name); Ok(client) diff --git a/core/src/database/common_sql/generate.rs b/core/src/database/common_sql/generate.rs index 45788132..62023925 100644 --- a/core/src/database/common_sql/generate.rs +++ b/core/src/database/common_sql/generate.rs @@ -1,4 +1,12 @@ +use std::path::Path; +use tracing::info; +use crate::abi::{EventInfo, ParamTypeError, ReadAbiError}; +use crate::ABIItem; +use crate::database::postgres::generate::{generate_columns_with_data_types}; use crate::helpers::camel_to_snake; +use crate::indexer::Indexer; +use crate::manifest::contract::Contract; +use crate::types::code::Code; pub fn generate_event_table_full_name( indexer_name: &str, @@ -16,3 +24,180 @@ pub fn generate_event_table_columns_names_sql(column_names: &[String]) -> String pub fn generate_indexer_contract_schema_name(indexer_name: &str, contract_name: &str) -> String { format!("{}_{}", camel_to_snake(indexer_name), camel_to_snake(contract_name)) } + + +#[derive(thiserror::Error, Debug)] +pub enum GenerateTablesForIndexerSqlError { + #[error("{0}")] + ReadAbiError(#[from] ReadAbiError), + + #[error("{0}")] + ParamTypeError(#[from] ParamTypeError), +} + +pub fn generate_tables_for_indexer_sql( + project_path: &Path, + indexer: &Indexer, + disable_event_tables: bool, +) -> Result { + let mut sql = "CREATE SCHEMA IF NOT EXISTS rindexer_internal;".to_string(); + + for contract in &indexer.contracts { + let contract_name = contract.before_modify_name_if_filter_readonly(); + let abi_items = ABIItem::read_abi_items(project_path, contract)?; + let event_names = ABIItem::extract_event_names_and_signatures_from_abi(abi_items)?; + let schema_name = generate_indexer_contract_schema_name(&indexer.name, &contract_name); + let networks: Vec<&str> = contract.details.iter().map(|d| d.network.as_str()).collect(); + + if !disable_event_tables { + sql.push_str(format!("CREATE SCHEMA IF NOT EXISTS {};", schema_name).as_str()); + info!("Creating schema if not exists: {}", schema_name); + + let event_matching_name_on_other = find_clashing_event_names( + project_path, + contract, + &indexer.contracts, + &event_names, + )?; + + sql.push_str(&generate_event_table_sql_with_comments( + &event_names, + &contract.name, + &schema_name, + event_matching_name_on_other, + )); + } + // we still need to create the internal tables for the contract + sql.push_str(&generate_internal_event_table_sql(&event_names, &schema_name, networks)); + } + + sql.push_str(&format!( + r#" + CREATE TABLE IF NOT EXISTS rindexer_internal.{indexer_name}_last_known_relationship_dropping_sql ( + key INT PRIMARY KEY, + value TEXT NOT NULL + ); + "#, + indexer_name = camel_to_snake(&indexer.name) + )); + + sql.push_str(&format!( + r#" + CREATE TABLE IF NOT EXISTS rindexer_internal.{indexer_name}_last_known_indexes_dropping_sql ( + key INT PRIMARY KEY, + value TEXT NOT NULL + ); + "#, + indexer_name = camel_to_snake(&indexer.name) + )); + + Ok(Code::new(sql)) +} + +fn generate_event_table_sql_with_comments( + abi_inputs: &[EventInfo], + contract_name: &str, + schema_name: &str, + apply_full_name_comment_for_events: Vec, +) -> String { + abi_inputs + .iter() + .map(|event_info| { + let table_name = format!("{}.{}", schema_name, camel_to_snake(&event_info.name)); + info!("Creating table if not exists: {}", table_name); + let event_columns = if event_info.inputs.is_empty() { + "".to_string() + } else { + generate_columns_with_data_types(&event_info.inputs).join(", ") + "," + }; + + let create_table_sql = format!( + "CREATE TABLE IF NOT EXISTS {} (\ + rindexer_id SERIAL PRIMARY KEY NOT NULL, \ + contract_address CHAR(66) NOT NULL, \ + {} \ + tx_hash CHAR(66) NOT NULL, \ + block_number NUMERIC NOT NULL, \ + block_hash CHAR(66) NOT NULL, \ + network VARCHAR(50) NOT NULL, \ + tx_index NUMERIC NOT NULL, \ + log_index VARCHAR(78) NOT NULL\ + );", + table_name, event_columns + ); + + if !apply_full_name_comment_for_events.contains(&event_info.name) { + return create_table_sql; + } + + // smart comments needed to avoid clashing of order by graphql names + let table_comment = format!( + "COMMENT ON TABLE {} IS E'@name {}{}';", + table_name, contract_name, event_info.name + ); + + format!("{}\n{}", create_table_sql, table_comment) + }) + .collect::>() + .join("\n") +} + +fn generate_internal_event_table_sql( + abi_inputs: &[EventInfo], + schema_name: &str, + networks: Vec<&str>, +) -> String { + abi_inputs.iter().map(|event_info| { + let table_name = format!( + "rindexer_internal.{}_{}", + schema_name, + camel_to_snake(&event_info.name) + ); + + let create_table_query = format!( + r#"CREATE TABLE IF NOT EXISTS {} ("network" TEXT PRIMARY KEY, "last_synced_block" NUMERIC);"#, + table_name + ); + + let insert_queries = networks.iter().map(|network| { + format!( + r#"INSERT INTO {} ("network", "last_synced_block") VALUES ('{}', 0) ON CONFLICT ("network") DO NOTHING;"#, + table_name, + network + ) + }).collect::>().join("\n"); + + format!("{}\n{}", create_table_query, insert_queries) + }).collect::>().join("\n") +} + +/// If any event names match the whole table name should be exposed differently on graphql +/// to avoid clashing of graphql namings +fn find_clashing_event_names( + project_path: &Path, + current_contract: &Contract, + other_contracts: &[Contract], + current_event_names: &[EventInfo], +) -> Result, GenerateTablesForIndexerSqlError> { + let mut clashing_events = Vec::new(); + + for other_contract in other_contracts { + if other_contract.name == current_contract.name { + continue; + } + + let other_abi_items = ABIItem::read_abi_items(project_path, other_contract)?; + let other_event_names = + ABIItem::extract_event_names_and_signatures_from_abi(other_abi_items)?; + + for event_name in current_event_names { + if other_event_names.iter().any(|e| e.name == event_name.name) && + !clashing_events.contains(&event_name.name) + { + clashing_events.push(event_name.name.clone()); + } + } + } + + Ok(clashing_events) +} \ No newline at end of file diff --git a/core/src/database/postgres/generate.rs b/core/src/database/postgres/generate.rs index 9e5c8b0b..a51cbc8b 100644 --- a/core/src/database/postgres/generate.rs +++ b/core/src/database/postgres/generate.rs @@ -40,181 +40,6 @@ pub fn generate_column_names_only_with_base_properties(inputs: &[ABIInput]) -> V column_names } -fn generate_event_table_sql_with_comments( - abi_inputs: &[EventInfo], - contract_name: &str, - schema_name: &str, - apply_full_name_comment_for_events: Vec, -) -> String { - abi_inputs - .iter() - .map(|event_info| { - let table_name = format!("{}.{}", schema_name, camel_to_snake(&event_info.name)); - info!("Creating table if not exists: {}", table_name); - let event_columns = if event_info.inputs.is_empty() { - "".to_string() - } else { - generate_columns_with_data_types(&event_info.inputs).join(", ") + "," - }; - - let create_table_sql = format!( - "CREATE TABLE IF NOT EXISTS {} (\ - rindexer_id SERIAL PRIMARY KEY NOT NULL, \ - contract_address CHAR(66) NOT NULL, \ - {} \ - tx_hash CHAR(66) NOT NULL, \ - block_number NUMERIC NOT NULL, \ - block_hash CHAR(66) NOT NULL, \ - network VARCHAR(50) NOT NULL, \ - tx_index NUMERIC NOT NULL, \ - log_index VARCHAR(78) NOT NULL\ - );", - table_name, event_columns - ); - - if !apply_full_name_comment_for_events.contains(&event_info.name) { - return create_table_sql; - } - - // smart comments needed to avoid clashing of order by graphql names - let table_comment = format!( - "COMMENT ON TABLE {} IS E'@name {}{}';", - table_name, contract_name, event_info.name - ); - - format!("{}\n{}", create_table_sql, table_comment) - }) - .collect::>() - .join("\n") -} - -fn generate_internal_event_table_sql( - abi_inputs: &[EventInfo], - schema_name: &str, - networks: Vec<&str>, -) -> String { - abi_inputs.iter().map(|event_info| { - let table_name = format!( - "rindexer_internal.{}_{}", - schema_name, - camel_to_snake(&event_info.name) - ); - - let create_table_query = format!( - r#"CREATE TABLE IF NOT EXISTS {} ("network" TEXT PRIMARY KEY, "last_synced_block" NUMERIC);"#, - table_name - ); - - let insert_queries = networks.iter().map(|network| { - format!( - r#"INSERT INTO {} ("network", "last_synced_block") VALUES ('{}', 0) ON CONFLICT ("network") DO NOTHING;"#, - table_name, - network - ) - }).collect::>().join("\n"); - - format!("{}\n{}", create_table_query, insert_queries) - }).collect::>().join("\n") -} - -#[derive(thiserror::Error, Debug)] -pub enum GenerateTablesForIndexerSqlError { - #[error("{0}")] - ReadAbiError(#[from] ReadAbiError), - - #[error("{0}")] - ParamTypeError(#[from] ParamTypeError), -} - -/// If any event names match the whole table name should be exposed differently on graphql -/// to avoid clashing of graphql namings -fn find_clashing_event_names( - project_path: &Path, - current_contract: &Contract, - other_contracts: &[Contract], - current_event_names: &[EventInfo], -) -> Result, GenerateTablesForIndexerSqlError> { - let mut clashing_events = Vec::new(); - - for other_contract in other_contracts { - if other_contract.name == current_contract.name { - continue; - } - - let other_abi_items = ABIItem::read_abi_items(project_path, other_contract)?; - let other_event_names = - ABIItem::extract_event_names_and_signatures_from_abi(other_abi_items)?; - - for event_name in current_event_names { - if other_event_names.iter().any(|e| e.name == event_name.name) && - !clashing_events.contains(&event_name.name) - { - clashing_events.push(event_name.name.clone()); - } - } - } - - Ok(clashing_events) -} - -pub fn generate_tables_for_indexer_sql( - project_path: &Path, - indexer: &Indexer, - disable_event_tables: bool, -) -> Result { - let mut sql = "CREATE SCHEMA IF NOT EXISTS rindexer_internal;".to_string(); - - for contract in &indexer.contracts { - let contract_name = contract.before_modify_name_if_filter_readonly(); - let abi_items = ABIItem::read_abi_items(project_path, contract)?; - let event_names = ABIItem::extract_event_names_and_signatures_from_abi(abi_items)?; - let schema_name = generate_indexer_contract_schema_name(&indexer.name, &contract_name); - let networks: Vec<&str> = contract.details.iter().map(|d| d.network.as_str()).collect(); - - if !disable_event_tables { - sql.push_str(format!("CREATE SCHEMA IF NOT EXISTS {};", schema_name).as_str()); - info!("Creating schema if not exists: {}", schema_name); - - let event_matching_name_on_other = find_clashing_event_names( - project_path, - contract, - &indexer.contracts, - &event_names, - )?; - - sql.push_str(&generate_event_table_sql_with_comments( - &event_names, - &contract.name, - &schema_name, - event_matching_name_on_other, - )); - } - // we still need to create the internal tables for the contract - sql.push_str(&generate_internal_event_table_sql(&event_names, &schema_name, networks)); - } - - sql.push_str(&format!( - r#" - CREATE TABLE IF NOT EXISTS rindexer_internal.{indexer_name}_last_known_relationship_dropping_sql ( - key INT PRIMARY KEY, - value TEXT NOT NULL - ); - "#, - indexer_name = camel_to_snake(&indexer.name) - )); - - sql.push_str(&format!( - r#" - CREATE TABLE IF NOT EXISTS rindexer_internal.{indexer_name}_last_known_indexes_dropping_sql ( - key INT PRIMARY KEY, - value TEXT NOT NULL - ); - "#, - indexer_name = camel_to_snake(&indexer.name) - )); - - Ok(Code::new(sql)) -} pub fn drop_tables_for_indexer_sql(project_path: &Path, indexer: &Indexer) -> Code { let mut sql = format!( "DROP TABLE IF EXISTS rindexer_internal.{}_last_known_indexes_dropping_sql CASCADE;", diff --git a/core/src/database/postgres/setup.rs b/core/src/database/postgres/setup.rs index 5226d3a4..e66738b2 100644 --- a/core/src/database/postgres/setup.rs +++ b/core/src/database/postgres/setup.rs @@ -5,11 +5,11 @@ use tracing::{debug, info}; use crate::{ database::postgres::{ client::{PostgresClient, PostgresConnectionError, PostgresError}, - generate::{generate_tables_for_indexer_sql, GenerateTablesForIndexerSqlError}, }, drop_tables_for_indexer_sql, manifest::core::Manifest, }; +use crate::database::common_sql::generate::{generate_tables_for_indexer_sql, GenerateTablesForIndexerSqlError}; #[derive(thiserror::Error, Debug)] pub enum SetupPostgresError { diff --git a/core/src/database/postgres/sql_type_wrapper.rs b/core/src/database/postgres/sql_type_wrapper.rs index d6d49e32..d6cec4e5 100644 --- a/core/src/database/postgres/sql_type_wrapper.rs +++ b/core/src/database/postgres/sql_type_wrapper.rs @@ -282,6 +282,174 @@ impl EthereumSqlTypeWrapper { EthereumSqlTypeWrapper::DateTime(_) => PgType::TIMESTAMPTZ, } } + pub fn to_clickhouse_value(&self) -> String { + match self { + // Boolean + EthereumSqlTypeWrapper::Bool(value) => value.to_string(), + EthereumSqlTypeWrapper::VecBool(values) => format!( + "[{}]", + values + .iter() + .map(|v| v.to_string()) + .collect::>() + .join(", ") + ), + + // 8-bit integers + EthereumSqlTypeWrapper::U8(value) => value.to_string(), + EthereumSqlTypeWrapper::I8(value) => value.to_string(), + EthereumSqlTypeWrapper::VecU8(values) => format!( + "[{}]", + values.iter().map(|v| v.to_string()).collect::>().join(", ") + ), + EthereumSqlTypeWrapper::VecI8(values) => format!( + "[{}]", + values.iter().map(|v| v.to_string()).collect::>().join(", ") + ), + + // 16-bit integers + EthereumSqlTypeWrapper::U16(value) => value.to_string(), + EthereumSqlTypeWrapper::I16(value) => value.to_string(), + EthereumSqlTypeWrapper::VecU16(values) => format!( + "[{}]", + values.iter().map(|v| v.to_string()).collect::>().join(", ") + ), + EthereumSqlTypeWrapper::VecI16(values) => format!( + "[{}]", + values.iter().map(|v| v.to_string()).collect::>().join(", ") + ), + + // 32-bit integers + EthereumSqlTypeWrapper::U32(value) => value.to_string(), + EthereumSqlTypeWrapper::I32(value) => value.to_string(), + EthereumSqlTypeWrapper::VecU32(values) => format!( + "[{}]", + values.iter().map(|v| v.to_string()).collect::>().join(", ") + ), + EthereumSqlTypeWrapper::VecI32(values) => format!( + "[{}]", + values.iter().map(|v| v.to_string()).collect::>().join(", ") + ), + + // 64-bit integers + EthereumSqlTypeWrapper::U64(value) => value.to_string(), + EthereumSqlTypeWrapper::I64(value) => value.to_string(), + EthereumSqlTypeWrapper::VecU64(values) => format!( + "[{}]", + values.iter().map(|v| v.to_string()).collect::>().join(", ") + ), + EthereumSqlTypeWrapper::VecI64(values) => format!( + "[{}]", + values.iter().map(|v| v.to_string()).collect::>().join(", ") + ), + + // 128-bit integers + EthereumSqlTypeWrapper::U128(value) => value.to_string(), + EthereumSqlTypeWrapper::I128(value) => value.to_string(), + EthereumSqlTypeWrapper::VecU128(values) => format!( + "[{}]", + values.iter().map(|v| v.to_string()).collect::>().join(", ") + ), + EthereumSqlTypeWrapper::VecI128(values) => format!( + "[{}]", + values.iter().map(|v| v.to_string()).collect::>().join(", ") + ), + + // 256-bit integers + EthereumSqlTypeWrapper::U256(value) => value.to_string(), + EthereumSqlTypeWrapper::I256(value) => value.to_string(), + EthereumSqlTypeWrapper::VecU256(values) => format!( + "[{}]", + values.iter().map(|v| v.to_string()).collect::>().join(", ") + ), + EthereumSqlTypeWrapper::VecI256(values) => format!( + "[{}]", + values.iter().map(|v| v.to_string()).collect::>().join(", ") + ), + + // 512-bit integers + EthereumSqlTypeWrapper::U512(value) => value.to_string(), + EthereumSqlTypeWrapper::VecU512(values) => format!( + "[{}]", + values.iter().map(|v| v.to_string()).collect::>().join(", ") + ), + + // Hashes + EthereumSqlTypeWrapper::H128(value) => format!("'{}'", format!("{:?}", value)), + EthereumSqlTypeWrapper::H160(value) => format!("'{}'", format!("{:?}", value)), + EthereumSqlTypeWrapper::H256(value) => format!("'{}'", format!("{:?}", value)), + EthereumSqlTypeWrapper::H512(value) => format!("'{}'", format!("{:?}", value)), + EthereumSqlTypeWrapper::VecH128(values) => format!( + "[{}]", + values + .iter() + .map(|v| format!("'{}'", format!("{:?}", v))) + .collect::>() + .join(", ") + ), + EthereumSqlTypeWrapper::VecH160(values) => format!( + "[{}]", + values + .iter() + .map(|v| format!("'{}'", format!("{:?}", v))) + .collect::>() + .join(", ") + ), + EthereumSqlTypeWrapper::VecH256(values) => format!( + "[{}]", + values + .iter() + .map(|v| format!("'{}'", format!("{:?}", v))) + .collect::>() + .join(", ") + ), + EthereumSqlTypeWrapper::VecH512(values) => format!( + "[{}]", + values + .iter() + .map(|v| format!("'{}'", format!("{:?}", v))) + .collect::>() + .join(", ") + ), + + // Address + EthereumSqlTypeWrapper::Address(address) => format!("'{}'", address), + EthereumSqlTypeWrapper::VecAddress(addresses) => format!( + "[{}]", + addresses + .iter() + .map(|addr| format!("'{}'", addr)) + .collect::>() + .join(", ") + ), + + // Strings and Bytes + EthereumSqlTypeWrapper::String(value) => format!("'{}'", value.replace("'", "\\'")), + EthereumSqlTypeWrapper::VecString(values) => format!( + "[{}]", + values + .iter() + .map(|v| format!("'{}'", v.replace("'", "\\'"))) + .collect::>() + .join(", ") + ), + EthereumSqlTypeWrapper::Bytes(value) => format!("0x{}", hex::encode(value)), + EthereumSqlTypeWrapper::VecBytes(values) => format!( + "[{}]", + values + .iter() + .map(|v| format!("0x{}", hex::encode(v))) + .collect::>() + .join(", ") + ), + + // DateTime + EthereumSqlTypeWrapper::DateTime(value) => format!("'{}'", value.to_rfc3339()), + + // Default case to catch unsupported variants + _ => panic!("Unsupported EthereumSqlTypeWrapper variant for ClickHouse serialization"), + } + } } impl ToSql for EthereumSqlTypeWrapper { diff --git a/core/src/indexer/no_code.rs b/core/src/indexer/no_code.rs index 92606095..f6a5a890 100644 --- a/core/src/indexer/no_code.rs +++ b/core/src/indexer/no_code.rs @@ -111,7 +111,7 @@ pub async fn setup_no_code( ); let events = - process_events(project_path, &mut manifest, postgres, &network_providers).await?; + process_events(project_path, &mut manifest, postgres, clickhouse, &network_providers).await?; let registry = EventCallbackRegistry { events }; info!( @@ -143,8 +143,9 @@ struct NoCodeCallbackParams { index_event_in_order: bool, csv: Option>, postgres: Option>, - postgres_event_table_name: String, - postgres_column_names: Vec, + sql_event_table_name: String, + sql_column_names: Vec, + clickhouse: Option>, streams_clients: Arc>, chat_clients: Arc>, } @@ -178,7 +179,7 @@ fn no_code_callback(params: Arc) -> EventCallbackType { let network = results.first().unwrap().tx_information.network.clone(); let mut indexed_count = 0; - let mut postgres_bulk_data: Vec> = Vec::new(); + let mut sql_bulk_data: Vec> = Vec::new(); let mut postgres_bulk_column_types: Vec = Vec::new(); let mut csv_bulk_data: Vec> = Vec::new(); @@ -265,13 +266,16 @@ fn no_code_callback(params: Arc) -> EventCallbackType { all_params.extend(event_parameters); all_params.extend(end_global_parameters); + // Set column types dynamically based on first result if postgres_bulk_column_types.is_empty() { postgres_bulk_column_types = all_params.iter().map(|param| param.to_type()).collect(); } - postgres_bulk_data.push(all_params); + if params.postgres.is_some() || params.clickhouse.is_some() { + sql_bulk_data.push(all_params); + } if params.csv.is_some() { let mut csv_data: Vec = vec![format!("{:?}", address)]; @@ -294,16 +298,16 @@ fn no_code_callback(params: Arc) -> EventCallbackType { } if let Some(postgres) = ¶ms.postgres { - let bulk_data_length = postgres_bulk_data.len(); + let bulk_data_length = sql_bulk_data.len(); if bulk_data_length > 0 { // anything over 100 events is considered bulk and goes the COPY route if bulk_data_length > 100 { if let Err(e) = postgres .bulk_insert_via_copy( - ¶ms.postgres_event_table_name, - ¶ms.postgres_column_names, + ¶ms.sql_event_table_name, + ¶ms.sql_column_names, &postgres_bulk_column_types, - &postgres_bulk_data, + &sql_bulk_data, ) .await { @@ -315,9 +319,9 @@ fn no_code_callback(params: Arc) -> EventCallbackType { } } else if let Err(e) = postgres .bulk_insert( - ¶ms.postgres_event_table_name, - ¶ms.postgres_column_names, - &postgres_bulk_data, + ¶ms.sql_event_table_name, + ¶ms.sql_column_names, + &sql_bulk_data, ) .await { @@ -330,6 +334,14 @@ fn no_code_callback(params: Arc) -> EventCallbackType { } } + if let Some(clickhouse) = ¶ms.clickhouse { + clickhouse.bulk_insert( + ¶ms.sql_event_table_name, + ¶ms.sql_column_names, + &sql_bulk_data, + ).await.expect("TODO: Clickhouse error"); + } + if let Some(csv) = ¶ms.csv { if !csv_bulk_data.is_empty() { if let Err(e) = csv.append_bulk(csv_bulk_data).await { @@ -467,6 +479,7 @@ pub async fn process_events( project_path: &Path, manifest: &mut Manifest, postgres: Option>, + clickhouse: Option>, network_providers: &[CreateNetworkProvider], ) -> Result, ProcessIndexersError> { let mut events: Vec = vec![]; @@ -523,11 +536,12 @@ pub async fn process_events( csv = Some(Arc::new(csv_appender)); } - let postgres_column_names = + let sql_column_names = generate_column_names_only_with_base_properties(&event_info.inputs); - let postgres_event_table_name = + let sql_event_table_name = generate_event_table_full_name(&manifest.name, &contract.name, &event_info.name); + let streams_client = if let Some(streams) = &contract.streams { Some(StreamsClients::new(streams.clone()).await) } else { @@ -560,8 +574,9 @@ pub async fn process_events( index_event_in_order, csv, postgres: postgres.clone(), - postgres_event_table_name, - postgres_column_names, + sql_event_table_name, + sql_column_names, + clickhouse: clickhouse.clone(), streams_clients: Arc::new(streams_client), chat_clients: Arc::new(chat_clients), })), diff --git a/core/src/lib.rs b/core/src/lib.rs index 12a59452..d83af68a 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(let_chains)] + // public pub mod generator; pub mod indexer;