Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/clickhouse #123

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions cli/src/commands/new.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ use rindexer::{
contract::{Contract, ContractDetails},
core::{Manifest, ProjectType},
network::Network,
storage::{CsvDetails, PostgresDetails, Storage},
storage::{CsvDetails, PostgresDetails, ClickhouseDetails, Storage},
yaml::{write_manifest, YAML_CONFIG_NAME},
},
write_file, StringOrArray, WriteFileError,
};

use crate::console::{
print_error_message, print_success_message, prompt_for_input, prompt_for_input_list,
prompt_for_optional_input,
Expand Down Expand Up @@ -82,8 +81,8 @@ pub fn handle_new_command(
let project_description = prompt_for_optional_input::<String>("Project Description", None);
let repository = prompt_for_optional_input::<String>("Repository", None);
let storage_choice = prompt_for_input_list(
"What Storages To Enable? (graphql can only be supported if postgres is enabled)",
&["postgres".to_string(), "csv".to_string(), "both".to_string(), "none".to_string()],
"What Storages To Enable? (graphql can only be supported if postgres is enabled) both means postgres and csv",
&["postgres".to_string(), "csv".to_string(), "both".to_string(), "clickhouse".to_string(), "none".to_string()],
None,
);
let mut postgres_docker_enable = false;
Expand All @@ -98,7 +97,7 @@ pub fn handle_new_command(

let postgres_enabled = storage_choice == "postgres" || storage_choice == "both";
let csv_enabled = storage_choice == "csv" || storage_choice == "both";

let clickhouse_enabled = storage_choice == "clickhouse";
let rindexer_yaml_path = project_path.join(YAML_CONFIG_NAME);
let rindexer_abis_folder = project_path.join("abis");

Expand Down Expand Up @@ -187,6 +186,13 @@ pub fn handle_new_command(
} else {
None
},
clickhouse: if clickhouse_enabled {
Some(ClickhouseDetails{
enabled: true,
})
} else {
None
}
},
graphql: None,
};
Expand Down Expand Up @@ -219,6 +225,15 @@ POSTGRES_PASSWORD=rindexer"#;
}
}

if clickhouse_enabled {
let env = "CLICKHOUSE_URL=\nCLICKHOUSE_USER=\nCLICKHOUSE_PASSWORD=";

write_file(&project_path.join(".env"), env).map_err(|e| {
print_error_message(&format!("Failed to write .env file: {}", e));
e
})?;
}

if is_rust_project {
generate_rindexer_rust_project(&project_path);
}
Expand Down
2 changes: 2 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ include = ["src/**", "resources/**", "Cargo.toml", "build.rs"]
[dev-dependencies]
tempfile = "3.3"
mockito = "0.30"
clickhouse = { version = "0.12.2", features = ["test-util"] }

[dependencies]
ethers = { version = "2.0", features = ["rustls", "openssl"] }
Expand Down Expand Up @@ -58,6 +59,7 @@ deadpool-lapin = "0.12"
teloxide = "0.12"
serenity = { version = "0.12", features = ["client", "framework"] }
once_cell = "1.19.0"
clickhouse = { version = "0.12.2", features = ["native-tls"] }

# build
jemallocator = { version = "0.5.0", optional = true }
Expand Down
16 changes: 16 additions & 0 deletions core/src/abi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{
helpers::camel_to_snake,
manifest::contract::{Contract, ParseAbiError},
};
use crate::database::clickhouse::generate::solidity_type_to_clickhouse_type;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ABIInput {
Expand Down Expand Up @@ -42,6 +43,7 @@ pub enum GenerateAbiPropertiesType {
PostgresColumnsNamesOnly,
CsvHeaderNames,
Object,
ClickhouseWithDataTypes
}

#[derive(Debug)]
Expand Down Expand Up @@ -136,6 +138,20 @@ impl ABIInput {
camel_to_snake(&input.name),
);

vec![GenerateAbiNamePropertiesResult::new(
value,
&input.name,
&input.type_,
)]
}
GenerateAbiPropertiesType::ClickhouseWithDataTypes => {
let value = format!(
"\"{}{}\" {}",
prefix.map_or_else(|| "".to_string(), |p| format!("{}_", p)),
camel_to_snake(&input.name),
solidity_type_to_clickhouse_type(&input.type_)
);

vec![GenerateAbiNamePropertiesResult::new(
value,
&input.name,
Expand Down
3 changes: 2 additions & 1 deletion core/src/api/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ use tracing::{error, info};

use crate::{
database::postgres::{
client::connection_string, generate::generate_indexer_contract_schema_name,
client::connection_string,
},
helpers::{kill_process_on_port, set_thread_no_logging},
indexer::Indexer,
manifest::graphql::GraphQLSettings,
};
use crate::database::common_sql::generate::generate_indexer_contract_schema_name;

pub struct GraphqlOverrideSettings {
pub enabled: bool,
Expand Down
92 changes: 92 additions & 0 deletions core/src/database/clickhouse/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use std::env;
use bb8::RunError;
use clickhouse::Client;
use dotenv::dotenv;
use crate::database::postgres::client::PostgresError;
use crate::EthereumSqlTypeWrapper;

pub struct ClickhouseConnection {
url: String,
user: String,
password: String,
}

pub fn clickhouse_connection() -> Result<ClickhouseConnection, env::VarError> {
dotenv().ok();

let connection = ClickhouseConnection {
url: env::var("CLICKHOUSE_URL")?,
user: env::var("CLICKHOUSE_USER")?,
password: env::var("CLICKHOUSE_PASSWORD")?
};

Ok(connection)
}

#[derive(thiserror::Error, Debug)]
pub enum ClickhouseConnectionError {
#[error("The clickhouse env vars are wrong please check your environment: {0}")]
ClickhouseConnectionConfigWrong(#[from] env::VarError),
}

#[derive(thiserror::Error, Debug)]
pub enum ClickhouseError{
#[error("ClickhouseError {0}")]
ClickhouseError(String),
}

pub struct ClickhouseClient {
conn: Client
}

impl ClickhouseClient {
pub async fn new() -> Result<Self, ClickhouseConnectionError> {
let connection = clickhouse_connection()?;

let client = Client::default()
.with_url(connection.url)
.with_user(connection.user)
.with_password(connection.password);

Ok(ClickhouseClient { conn: client })
}

pub async fn execute(&self, sql: &str) -> Result<(), ClickhouseError> {
self.conn.query(sql).execute().await.map_err(|e| ClickhouseError::ClickhouseError(e.to_string()))
}
pub async fn execute_batch(&self, sql: &str) -> Result<(), ClickhouseError> {
let statements: Vec<&str> = sql.split(';')
.map(str::trim)
.filter(|s| !s.is_empty()) // Remove empty statements
.collect();

for statement in statements {
self.execute(statement).await?;
}

Ok(())
}
pub async fn bulk_insert<'a>(
&self,
table_name: &str,
column_names: &[String],
bulk_data: &'a [Vec<EthereumSqlTypeWrapper>],
) -> Result<u64, ClickhouseError> {
// Generate the base INSERT query
let column_names_str = column_names.join(", ");
let query = format!("INSERT INTO {} ({}) VALUES", table_name, column_names_str);

// Serialize data for ClickHouse
let mut values = Vec::new();
for row in bulk_data.iter() {
let row_values: Vec<String> = row.iter().map(|value| value.to_clickhouse_value()).collect();
values.push(format!("({})", row_values.join(", ")));
}

let full_query = format!("{} {}", query, values.join(", "));

self.execute(&full_query).await?;

Ok(bulk_data.len() as u64)
}
}
Loading