Skip to content

Commit

Permalink
fix: Clickhouse conversion of Null, Text and Decimal (#2466)
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei authored Mar 23, 2024
1 parent 8d4a3c5 commit 529182a
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 184 deletions.
8 changes: 4 additions & 4 deletions dozer-sink-clickhouse/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,21 +123,21 @@ impl ClickhouseClient {
&self,
table_name: &str,
fields: &[FieldDefinition],
values: &[Field],
values: Vec<Field>,
query_id: Option<String>,
) -> Result<(), QueryError> {
let client = self.pool.get_handle().await?;
insert_multi(client, table_name, fields, &[values.to_vec()], query_id).await
insert_multi(client, table_name, fields, vec![values], query_id).await
}

pub async fn insert_multi(
&self,
table_name: &str,
fields: &[FieldDefinition],
values: &[Vec<Field>],
rows: Vec<Vec<Field>>,
query_id: Option<String>,
) -> Result<(), QueryError> {
let client = self.pool.get_handle().await?;
insert_multi(client, table_name, fields, values, query_id).await
insert_multi(client, table_name, fields, rows, query_id).await
}
}
18 changes: 15 additions & 3 deletions dozer-sink-clickhouse/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use dozer_types::thiserror::{self, Error};
use dozer_types::{
thiserror::{self, Error},
types::FieldType,
};

#[derive(Error, Debug)]
pub enum ClickhouseSinkError {
Expand Down Expand Up @@ -32,8 +35,17 @@ pub enum QueryError {
#[error("Clickhouse error: {0:?}")]
DataFetchError(#[from] clickhouse_rs::errors::Error),

#[error("Unexpected field type for {0:?}, expected {0}")]
TypeMismatch(String, String),
#[error("Unexpected field type for {field_name:?}, expected {field_type:?}")]
TypeMismatch {
field_name: String,
field_type: FieldType,
},

#[error("Decimal overflow")]
DecimalOverflow,

#[error("Unsupported field type {0:?}")]
UnsupportedFieldType(FieldType),

#[error("{0:?}")]
CustomError(String),
Expand Down
2 changes: 1 addition & 1 deletion dozer-sink-clickhouse/src/schema.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::client::ClickhouseClient;
use crate::errors::ClickhouseSinkError::{self, SinkTableDoesNotExist};
use crate::types::DECIMAL_SCALE;
use clickhouse_rs::types::Complex;
use clickhouse_rs::{Block, ClientHandle};
use dozer_types::log::warn;
Expand Down Expand Up @@ -145,6 +144,7 @@ impl ClickhouseSchema {
}

pub fn map_field_to_type(field: &FieldDefinition) -> String {
const DECIMAL_SCALE: u8 = 4;
let decimal = format!("Decimal(10, {})", DECIMAL_SCALE);
let typ: &str = match field.typ {
FieldType::UInt => "UInt64",
Expand Down
11 changes: 3 additions & 8 deletions dozer-sink-clickhouse/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl ClickhouseSink {
.insert(
REPLICA_METADATA_TABLE,
&self.metadata.schema.fields,
&[
vec![
Field::String(self.sink_table_name.clone()),
Field::UInt(txid),
],
Expand All @@ -196,22 +196,17 @@ impl ClickhouseSink {
}

fn commit_batch(&mut self) -> Result<(), BoxedError> {
let batch = std::mem::take(&mut self.batch);
self.runtime.block_on(async {
//Insert batch
self.client
.insert_multi(
&self.sink_table_name,
&self.schema.fields,
&self.batch,
None,
)
.insert_multi(&self.sink_table_name, &self.schema.fields, batch, None)
.await?;

self.insert_metadata().await?;
Ok::<(), BoxedError>(())
})?;

self.batch.clear();
Ok(())
}

Expand Down
Loading

0 comments on commit 529182a

Please sign in to comment.