From 8c95702c98f543bf31415e92f9c36932eb2daf2a Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Fri, 17 Mar 2023 21:06:36 +0800 Subject: [PATCH] fix(sink): remove `unimplemented` (#8622) Signed-off-by: tabVersion Co-authored-by: tabVersion --- src/connector/src/sink/kafka.rs | 98 +----------------------------- src/connector/src/sink/mod.rs | 100 ++++++++++++++++++++++++++++++- src/connector/src/sink/remote.rs | 44 +------------- 3 files changed, 102 insertions(+), 140 deletions(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 4354b9c5c8698..32c38d8bed6f1 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -23,12 +23,9 @@ use rdkafka::message::ToBytes; use rdkafka::producer::{BaseRecord, DefaultProducerContext, Producer, ThreadedProducer}; use rdkafka::types::RDKafkaErrorCode; use rdkafka::ClientConfig; -use risingwave_common::array::{ArrayError, ArrayResult, Op, RowRef, StreamChunk}; +use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::row::Row; -use risingwave_common::types::to_text::ToText; -use risingwave_common::types::{DataType, DatumRef, ScalarRefImpl}; -use risingwave_common::util::iter_util::ZipEqFast; use serde_derive::Deserialize; use serde_json::{json, Map, Value}; use tracing::warn; @@ -37,7 +34,7 @@ use super::{ Sink, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; use crate::common::KafkaCommon; -use crate::sink::Result; +use crate::sink::{datum_to_json_object, record_to_json, Result}; use crate::{deserialize_bool_from_string, deserialize_duration_from_string}; pub const KAFKA_SINK: &str = "kafka"; @@ -386,96 +383,6 @@ impl Debug for KafkaSink { } } -fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult { - let scalar_ref = match datum { - None => return Ok(Value::Null), - Some(datum) => datum, - }; - - let data_type = field.data_type(); - - tracing::debug!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref); - - let value = match (data_type, scalar_ref) { - (DataType::Boolean, ScalarRefImpl::Bool(v)) => { - json!(v) - } - (DataType::Int16, ScalarRefImpl::Int16(v)) => { - json!(v) - } - (DataType::Int32, ScalarRefImpl::Int32(v)) => { - json!(v) - } - (DataType::Int64, ScalarRefImpl::Int64(v)) => { - json!(v) - } - (DataType::Float32, ScalarRefImpl::Float32(v)) => { - json!(f32::from(v)) - } - (DataType::Float64, ScalarRefImpl::Float64(v)) => { - json!(f64::from(v)) - } - (DataType::Varchar, ScalarRefImpl::Utf8(v)) => { - json!(v) - } - (DataType::Decimal, ScalarRefImpl::Decimal(v)) => { - // fixme - json!(v.to_text()) - } - ( - dt @ DataType::Date - | dt @ DataType::Time - | dt @ DataType::Timestamp - | dt @ DataType::Timestamptz - | dt @ DataType::Interval - | dt @ DataType::Bytea, - scalar, - ) => { - json!(scalar.to_text_with_type(&dt)) - } - (DataType::List { datatype }, ScalarRefImpl::List(list_ref)) => { - let mut vec = Vec::with_capacity(list_ref.values_ref().len()); - let inner_field = Field::unnamed(Box::::into_inner(datatype)); - for sub_datum_ref in list_ref.values_ref() { - let value = datum_to_json_object(&inner_field, sub_datum_ref)?; - vec.push(value); - } - json!(vec) - } - (DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => { - let mut map = Map::with_capacity(st.fields.len()); - for (sub_datum_ref, sub_field) in struct_ref.fields_ref().into_iter().zip_eq_fast( - st.fields - .iter() - .zip_eq_fast(st.field_names.iter()) - .map(|(dt, name)| Field::with_name(dt.clone(), name)), - ) { - let value = datum_to_json_object(&sub_field, sub_datum_ref)?; - map.insert(sub_field.name.clone(), value); - } - json!(map) - } - _ => { - return Err(ArrayError::internal( - "datum_to_json_object: unsupported data type".to_string(), - )); - } - }; - - Ok(value) -} - -fn record_to_json(row: RowRef<'_>, schema: &[Field]) -> Result> { - let mut mappings = Map::with_capacity(schema.len()); - for (field, datum_ref) in schema.iter().zip_eq_fast(row.iter()) { - let key = field.name.clone(); - let value = datum_to_json_object(field, datum_ref) - .map_err(|e| SinkError::JsonParse(e.to_string()))?; - mappings.insert(key, value); - } - Ok(mappings) -} - fn pk_to_json( row: RowRef<'_>, schema: &[Field], @@ -611,6 +518,7 @@ impl KafkaTransactionConductor { mod test { use maplit::hashmap; use risingwave_common::test_prelude::StreamChunkTestExt; + use risingwave_common::types::DataType; use super::*; diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index cb2803c60a8de..11ecb5b67ac71 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -23,11 +23,16 @@ use std::collections::HashMap; use anyhow::anyhow; use async_trait::async_trait; use enum_as_inner::EnumAsInner; -use risingwave_common::array::StreamChunk; -use risingwave_common::catalog::Schema; +use risingwave_common::array::{ArrayError, ArrayResult, RowRef, StreamChunk}; +use risingwave_common::catalog::{Field, Schema}; use risingwave_common::error::{ErrorCode, RwError}; +use risingwave_common::row::Row; +use risingwave_common::types::to_text::ToText; +use risingwave_common::types::{DataType, DatumRef, ScalarRefImpl}; +use risingwave_common::util::iter_util::ZipEqFast; use risingwave_rpc_client::error::RpcError; use serde::{Deserialize, Serialize}; +use serde_json::{json, Map, Value}; use thiserror::Error; pub use tracing; @@ -37,7 +42,6 @@ use crate::sink::kafka::{KafkaConfig, KafkaSink, KAFKA_SINK}; use crate::sink::redis::{RedisConfig, RedisSink}; use crate::sink::remote::{RemoteConfig, RemoteSink}; use crate::ConnectorParams; - pub const SINK_TYPE_OPTION: &str = "type"; pub const SINK_TYPE_APPEND_ONLY: &str = "append-only"; pub const SINK_TYPE_DEBEZIUM: &str = "debezium"; @@ -257,3 +261,93 @@ impl From for RwError { ErrorCode::SinkError(Box::new(e)).into() } } + +pub fn record_to_json(row: RowRef<'_>, schema: &[Field]) -> Result> { + let mut mappings = Map::with_capacity(schema.len()); + for (field, datum_ref) in schema.iter().zip_eq_fast(row.iter()) { + let key = field.name.clone(); + let value = datum_to_json_object(field, datum_ref) + .map_err(|e| SinkError::JsonParse(e.to_string()))?; + mappings.insert(key, value); + } + Ok(mappings) +} + +fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult { + let scalar_ref = match datum { + None => return Ok(Value::Null), + Some(datum) => datum, + }; + + let data_type = field.data_type(); + + tracing::debug!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref); + + let value = match (data_type, scalar_ref) { + (DataType::Boolean, ScalarRefImpl::Bool(v)) => { + json!(v) + } + (DataType::Int16, ScalarRefImpl::Int16(v)) => { + json!(v) + } + (DataType::Int32, ScalarRefImpl::Int32(v)) => { + json!(v) + } + (DataType::Int64, ScalarRefImpl::Int64(v)) => { + json!(v) + } + (DataType::Float32, ScalarRefImpl::Float32(v)) => { + json!(f32::from(v)) + } + (DataType::Float64, ScalarRefImpl::Float64(v)) => { + json!(f64::from(v)) + } + (DataType::Varchar, ScalarRefImpl::Utf8(v)) => { + json!(v) + } + (DataType::Decimal, ScalarRefImpl::Decimal(v)) => { + // fixme + json!(v.to_text()) + } + ( + dt @ DataType::Date + | dt @ DataType::Time + | dt @ DataType::Timestamp + | dt @ DataType::Timestamptz + | dt @ DataType::Interval + | dt @ DataType::Bytea, + scalar, + ) => { + json!(scalar.to_text_with_type(&dt)) + } + (DataType::List { datatype }, ScalarRefImpl::List(list_ref)) => { + let mut vec = Vec::with_capacity(list_ref.values_ref().len()); + let inner_field = Field::unnamed(Box::::into_inner(datatype)); + for sub_datum_ref in list_ref.values_ref() { + let value = datum_to_json_object(&inner_field, sub_datum_ref)?; + vec.push(value); + } + json!(vec) + } + (DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => { + let mut map = Map::with_capacity(st.fields.len()); + for (sub_datum_ref, sub_field) in struct_ref.fields_ref().into_iter().zip_eq_fast( + st.fields + .iter() + .zip_eq_fast(st.field_names.iter()) + .map(|(dt, name)| Field::with_name(dt.clone(), name)), + ) { + let value = datum_to_json_object(&sub_field, sub_datum_ref)?; + map.insert(sub_field.name.clone(), value); + } + json!(map) + } + _ => { + return Err(ArrayError::internal( + "datum_to_json_object: unsupported data type".to_string(), + )); + } + }; + + Ok(value) +} diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 7c2adb1ae0afe..71a05229bd72c 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -21,12 +21,9 @@ use risingwave_common::array::StreamChunk; #[cfg(test)] use risingwave_common::catalog::Field; use risingwave_common::catalog::Schema; -use risingwave_common::row::Row; #[cfg(test)] use risingwave_common::types::DataType; -use risingwave_common::types::{DatumRef, ScalarRefImpl}; use risingwave_common::util::addr::HostAddr; -use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::connector_service::sink_stream_request::write_batch::json_payload::RowOp; use risingwave_pb::connector_service::sink_stream_request::write_batch::{JsonPayload, Payload}; use risingwave_pb::connector_service::sink_stream_request::{ @@ -35,14 +32,12 @@ use risingwave_pb::connector_service::sink_stream_request::{ use risingwave_pb::connector_service::table_schema::Column; use risingwave_pb::connector_service::{SinkResponse, SinkStreamRequest, TableSchema}; use risingwave_rpc_client::ConnectorClient; -use serde_json::Value; -use serde_json::Value::Number; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio_stream::StreamExt; use tonic::{Status, Streaming}; use super::catalog::SinkCatalog; -use crate::sink::{Result, Sink, SinkError}; +use crate::sink::{record_to_json, Result, Sink, SinkError}; use crate::ConnectorParams; pub const VALID_REMOTE_SINKS: [&str; 3] = ["jdbc", "file", "iceberg"]; @@ -254,13 +249,7 @@ impl Sink for RemoteSink { async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { let mut row_ops = vec![]; for (op, row_ref) in chunk.rows() { - let mut map = serde_json::Map::new(); - row_ref - .iter() - .zip_eq_fast(self.schema.fields.iter()) - .for_each(|(v, f)| { - map.insert(f.name.clone(), parse_datum(v)); - }); + let map = record_to_json(row_ref, &self.schema.fields)?; let row_op = RowOp { op_type: op.to_protobuf() as i32, line: serde_json::to_string(&map) @@ -319,35 +308,6 @@ impl Sink for RemoteSink { } } -fn parse_datum(datum: DatumRef<'_>) -> Value { - match datum { - None => Value::Null, - Some(ScalarRefImpl::Int32(v)) => Value::from(v), - Some(ScalarRefImpl::Int64(v)) => Value::from(v), - Some(ScalarRefImpl::Float32(v)) => Value::from(v.into_inner()), - Some(ScalarRefImpl::Float64(v)) => Value::from(v.into_inner()), - Some(ScalarRefImpl::Decimal(v)) => Number(v.to_string().parse().unwrap()), - Some(ScalarRefImpl::Utf8(v)) => Value::from(v), - Some(ScalarRefImpl::Bool(v)) => Value::from(v), - Some(ScalarRefImpl::NaiveDate(v)) => Value::from(v.to_string()), - Some(ScalarRefImpl::NaiveTime(v)) => Value::from(v.to_string()), - Some(ScalarRefImpl::Interval(v)) => Value::from(v.to_string()), - Some(ScalarRefImpl::Struct(v)) => Value::from( - v.fields_ref() - .iter() - .map(|v| parse_datum(*v)) - .collect::>(), - ), - Some(ScalarRefImpl::List(v)) => Value::from( - v.values_ref() - .iter() - .map(|v| parse_datum(*v)) - .collect::>(), - ), - _ => unimplemented!(), - } -} - #[cfg(test)] mod test { use std::sync::Arc;