Skip to content

Commit

Permalink
Fix/clickhouse sink issue with collapsing merge tree (#2477)
Browse files Browse the repository at this point in the history
* fix/clickhouse with Collapsing Merge Tree

* revert back to default MergeTree

* fix: clippy fmt
  • Loading branch information
duonganhthu43 authored Apr 16, 2024
1 parent 6829bd6 commit b626a82
Show file tree
Hide file tree
Showing 32 changed files with 1,021 additions and 14 deletions.
5 changes: 5 additions & 0 deletions dozer-ingestion/aerospike/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,11 @@ pub(crate) fn map_value_to_field(
AerospikeConnectorError::ParsingIntFailed
})?))
}
FieldType::Int8 => {
check_type("int8")?;
let string = value.as_str().ok_or_else(unsupported_type)?;
Ok(Field::Int8(string.parse()?))
}
FieldType::U128 => {
check_type("str")?;
let string = value.as_str().ok_or_else(unsupported_type)?;
Expand Down
1 change: 1 addition & 0 deletions dozer-ingestion/mysql/src/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ impl<'a> IntoField<'a> for Value {
FieldType::UInt => Field::UInt(from_value_opt::<u64>(value)?),
FieldType::U128 => Field::U128(from_value_opt::<u128>(value)?),
FieldType::Int => Field::Int(from_value_opt::<i64>(value)?),
FieldType::Int8 => Field::Int8(from_value_opt::<i8>(value)?),
FieldType::I128 => Field::I128(from_value_opt::<i128>(value)?),
FieldType::Float => Field::Float(from_value_opt::<f64>(value)?.into()),
FieldType::Boolean => Field::Boolean(from_value_opt::<bool>(value)?),
Expand Down
3 changes: 3 additions & 0 deletions dozer-ingestion/tests/test_suite/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ fn assert_record_matches_schema(record: &Record, schema: &Schema, only_match_pk:
FieldType::Int => {
assert!(value.as_int().is_some())
}
FieldType::Int8 => {
assert!(value.as_int().is_some())
}
FieldType::I128 => {
assert!(value.as_i128().is_some())
}
Expand Down
13 changes: 13 additions & 0 deletions dozer-ingestion/tests/test_suite/connectors/object_store/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ fn field_type_to_arrow(field_type: FieldType) -> Option<arrow::datatypes::DataTy
FieldType::UInt => Some(arrow::datatypes::DataType::UInt64),
FieldType::U128 => None,
FieldType::Int => Some(arrow::datatypes::DataType::Int64),
FieldType::Int8 => Some(arrow::datatypes::DataType::Int64),
FieldType::I128 => None,
FieldType::Float => Some(arrow::datatypes::DataType::Float64),
FieldType::Boolean => Some(arrow::datatypes::DataType::Boolean),
Expand Down Expand Up @@ -349,6 +350,18 @@ fn fields_to_arrow<'a, F: IntoIterator<Item = &'a Field>>(
}
Arc::new(builder.finish())
}
FieldType::Int8 => {
let mut builder = arrow::array::Int64Array::builder(count);
for field in fields {
match field {
Field::Int(value) => builder.append_value(*value),
Field::Int8(value) => builder.append_value(*value as i64),
Field::Null => builder.append_null(),
_ => panic!("Unexpected field type"),
}
}
Arc::new(builder.finish())
}
FieldType::I128 => panic!("Unexpected field type"),
FieldType::Float => {
let mut builder = arrow::array::Float64Array::builder(count);
Expand Down
2 changes: 2 additions & 0 deletions dozer-ingestion/tests/test_suite/connectors/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ fn field_type_to_sql(field_type: FieldType) -> Option<String> {
FieldType::UInt => None,
FieldType::U128 => None,
FieldType::Int => Some("INT8".to_string()),
FieldType::Int8 => Some("INT8".to_string()),
FieldType::I128 => None,
FieldType::Float => Some("FLOAT8".to_string()),
FieldType::Boolean => Some("BOOLEAN".to_string()),
Expand Down Expand Up @@ -229,6 +230,7 @@ fn field_to_sql(field: &Field) -> String {
Field::UInt(i) => i.to_string(),
Field::U128(i) => i.to_string(),
Field::Int(i) => i.to_string(),
Field::Int8(i) => i.to_string(),
Field::I128(i) => i.to_string(),
Field::Float(f) => f.to_string(),
Field::Boolean(b) => b.to_string(),
Expand Down
5 changes: 5 additions & 0 deletions dozer-ingestion/webhook/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ pub fn map_record(
let field = Field::Int(i64_value);
values.push(field);
}
FieldType::Int8 => {
let i8_value: i8 = serde_json::from_value(value.clone())?;
let field = Field::Int8(i8_value);
values.push(field);
}
FieldType::Float => {
let float_value: f64 = serde_json::from_value(value.clone())?;
let field = Field::Float(OrderedFloat(float_value));
Expand Down
18 changes: 18 additions & 0 deletions dozer-sink-aerospike/src/aerospike.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,9 @@ unsafe fn init_key_single(
Field::Int(v) => {
as_key_init_int64(key, namespace.as_ptr(), set.as_ptr(), *v);
}
Field::Int8(v) => {
as_key_init_int64(key, namespace.as_ptr(), set.as_ptr(), (*v).into());
}
Field::U128(v) => set_str_key(key, namespace, set, v.to_string(), allocated_strings),
Field::I128(v) => set_str_key(key, namespace, set, v.to_string(), allocated_strings),
Field::Decimal(v) => set_str_key(key, namespace, set, v.to_string(), allocated_strings),
Expand Down Expand Up @@ -554,6 +557,13 @@ pub(crate) unsafe fn new_record_map(
Field::Int(v) => {
as_orderedmap_set(map, key, check_alloc(as_integer_new(*v)) as *const as_val);
}
Field::Int8(v) => {
as_orderedmap_set(
map,
key,
check_alloc(as_integer_new((*v).into())) as *const as_val,
);
}
Field::I128(v) => {
map_set_str(map, key, v, allocated_strings);
}
Expand Down Expand Up @@ -662,6 +672,9 @@ pub(crate) unsafe fn init_batch_write_operations(
Field::Int(v) => {
as_operations_add_write_int64(ops, name, *v);
}
Field::Int8(v) => {
as_operations_add_write_int64(ops, name, (*v).into());
}
Field::I128(v) => {
set_operation_str(ops, name, v.to_string(), allocated_strings);
}
Expand Down Expand Up @@ -806,6 +819,11 @@ fn parse_val(
Some(Field::Int(v.value))
})
}
dozer_types::types::FieldType::Int8 => {
map(val, as_val_type_e_AS_INTEGER, |v: &as_integer| {
Some(Field::Int8(v.value as i8))
})
}
dozer_types::types::FieldType::I128 => {
map(val, as_val_type_e_AS_STRING, |v: &as_string| {
Some(Field::I128(unsafe {
Expand Down
1 change: 1 addition & 0 deletions dozer-sink-aerospike/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl SinkFactory for AerospikeSinkFactory {
dozer_types::types::FieldType::UInt
| dozer_types::types::FieldType::U128
| dozer_types::types::FieldType::Int
| dozer_types::types::FieldType::Int8
| dozer_types::types::FieldType::I128
| dozer_types::types::FieldType::String
| dozer_types::types::FieldType::Text
Expand Down
19 changes: 13 additions & 6 deletions dozer-sink-clickhouse/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,25 @@ pub fn get_create_table_query(
fields: &[FieldDefinition],
table_options: Option<ClickhouseTableOptions>,
) -> String {
let engine = table_options
.as_ref()
.and_then(|c| c.engine.clone())
.unwrap_or_else(|| DEFAULT_TABLE_ENGINE.to_string());
let engine_name = if engine == "CollapsingMergeTree" {
"CollapsingMergeTree(sign)".to_string()
} else {
engine.to_owned()
};
let mut parts = fields
.iter()
.map(|field| {
let typ = map_field_to_type(field);
format!("{} {}", field.name, typ)
})
.collect::<Vec<_>>();

let engine = table_options
.as_ref()
.and_then(|c| c.engine.clone())
.unwrap_or_else(|| DEFAULT_TABLE_ENGINE.to_string());
if engine == "CollapsingMergeTree" {
parts.push("sign Int8".to_string());
}

parts.push(
table_options
Expand Down Expand Up @@ -63,7 +70,7 @@ pub fn get_create_table_query(
"CREATE TABLE IF NOT EXISTS {table_name} {cluster} (
{query}
)
ENGINE = {engine}
ENGINE = {engine_name}
{order_by}
{partition_by}
{sample_by}
Expand Down
1 change: 1 addition & 0 deletions dozer-sink-clickhouse/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ pub fn map_field_to_type(field: &FieldDefinition) -> String {
FieldType::UInt => "UInt64",
FieldType::U128 => "UInt128",
FieldType::Int => "Int64",
FieldType::Int8 => "Int8",
FieldType::I128 => "Int128",
FieldType::Float => "Float64",
FieldType::Boolean => "Boolean",
Expand Down
27 changes: 20 additions & 7 deletions dozer-sink-clickhouse/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::metadata::{
};
use crate::schema::{ClickhouseSchema, ClickhouseTable};
use dozer_types::tonic::async_trait;
use dozer_types::types::{Field, Operation, Schema, TableOperation};
use dozer_types::types::{Field, FieldDefinition, Operation, Schema, TableOperation};
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
Expand Down Expand Up @@ -113,7 +113,6 @@ impl SinkFactory for ClickhouseSinkFactory {
let table = ClickhouseSchema::get_clickhouse_table(client.clone(), &self.config).await?;

ClickhouseSchema::compare_with_dozer_schema(client.clone(), &schema, &table).await?;

let sink = ClickhouseSink::new(
client,
self.config.clone(),
Expand Down Expand Up @@ -155,6 +154,19 @@ impl ClickhouseSink {
runtime: Arc<Runtime>,
table: ClickhouseTable,
) -> Self {
let mut schema = schema.clone();

if table.engine == "CollapsingMergeTree" && !schema.fields.is_empty() {
// get source from any field in schema
let source = schema.fields[0].source.clone();
schema.fields.push(FieldDefinition {
name: "sign".to_string(),
typ: dozer_types::types::FieldType::Int8,
nullable: false,
description: None,
source,
});
}
Self {
client,
runtime,
Expand Down Expand Up @@ -249,8 +261,7 @@ impl Sink for ClickhouseSink {
Operation::Insert { new } => {
if self.table.engine == "CollapsingMergeTree" {
let mut values = new.values;
values.push(Field::Int(1));

values.push(Field::Int8(1));
self.insert_values(&values)?;
} else {
self.insert_values(&new.values)?;
Expand All @@ -273,17 +284,19 @@ impl Sink for ClickhouseSink {
return Err(BoxedError::from(ClickhouseSinkError::UnsupportedOperation));
}
let mut values = old.values;
values.push(Field::Int(-1));
values.push(Field::Int8(-1));
self.insert_values(&values)?;

let mut values = new.values;
values.push(Field::Int(1));
values.push(Field::Int8(1));
self.insert_values(&values)?;
}
Operation::BatchInsert { new } => {
for record in new {
let mut values = record.values;
values.push(Field::Int(1));
if self.table.engine == "CollapsingMergeTree" {
values.push(Field::Int8(1));
}
self.insert_values(&values)?;
}
self.commit_batch()?;
Expand Down
2 changes: 1 addition & 1 deletion dozer-sink-clickhouse/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ fn add_last_column_to_block(
FieldType::UInt => add_last_column.call(trivial_mapper!(Field::UInt)),
FieldType::U128 => add_last_column.call(trivial_mapper!(Field::U128)),
FieldType::Int => add_last_column.call(trivial_mapper!(Field::Int)),
FieldType::Int8 => add_last_column.call(trivial_mapper!(Field::Int8)),
FieldType::I128 => add_last_column.call(trivial_mapper!(Field::I128)),
FieldType::Boolean => add_last_column.call(trivial_mapper!(Field::Boolean)),
FieldType::Float => add_last_column.call(|field| match field {
Expand Down Expand Up @@ -276,7 +277,6 @@ pub async fn insert_multi(
query_id: Option<String>,
) -> Result<(), QueryError> {
let mut block = Block::<clickhouse_rs::Simple>::new();

for field in fields.iter().rev() {
block = add_last_column_to_block(block, &field.name, &mut rows, field.typ, field.nullable)?;
}
Expand Down
1 change: 1 addition & 0 deletions dozer-sink-oracle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ impl OracleSinkFactory {
FieldType::UInt => "NUMBER(20)",
FieldType::U128 => unimplemented!(),
FieldType::Int => "NUMBER(20)",
FieldType::Int8 => unimplemented!(),
FieldType::I128 => unimplemented!(),
// Should this be BINARY_DOUBLE?
FieldType::Float => "NUMBER",
Expand Down
24 changes: 24 additions & 0 deletions dozer-sql/expression/src/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ impl Display for CastOperatorType {
FieldType::UInt => f.write_str("CAST AS UINT"),
FieldType::U128 => f.write_str("CAST AS U128"),
FieldType::Int => f.write_str("CAST AS INT"),
FieldType::Int8 => f.write_str("CAST AS INT8"),
FieldType::I128 => f.write_str("CAST AS I128"),
FieldType::Float => f.write_str("CAST AS FLOAT"),
FieldType::Boolean => f.write_str("CAST AS BOOLEAN"),
Expand Down Expand Up @@ -79,6 +80,19 @@ impl CastOperatorType {
FieldType::Int => (
vec![
FieldType::Int,
FieldType::Int8,
FieldType::String,
FieldType::UInt,
FieldType::I128,
FieldType::U128,
FieldType::Json,
],
FieldType::Int,
),
FieldType::Int8 => (
vec![
FieldType::Int,
FieldType::Int8,
FieldType::String,
FieldType::UInt,
FieldType::I128,
Expand Down Expand Up @@ -252,6 +266,16 @@ pub fn cast_field(input: &Field, output_type: FieldType) -> Result<Field, Error>
})
}
}
FieldType::Int8 => {
if let Some(value) = input.to_int8() {
Ok(Field::Int8(value))
} else {
Err(Error::InvalidCast {
from: input.clone(),
to: FieldType::Int,
})
}
}
FieldType::I128 => {
if let Some(value) = input.to_i128() {
Ok(Field::I128(value))
Expand Down
Loading

0 comments on commit b626a82

Please sign in to comment.