Skip to content

Commit

Permalink
feat(source): store source split state as jsonb (risingwavelabs#8602)
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 authored Mar 16, 2023
1 parent cfc0349 commit 08fc246
Show file tree
Hide file tree
Showing 19 changed files with 174 additions and 104 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions src/common/src/array/jsonb_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,16 @@ impl JsonbVal {
let v = Value::from_sql(&Type::JSONB, buf).ok()?;
Some(Self(v.into()))
}

pub fn take(mut self) -> Value {
self.0.take()
}
}

impl From<Value> for JsonbVal {
fn from(v: Value) -> Self {
Self(v.into())
}
}

impl JsonbRef<'_> {
Expand Down
29 changes: 25 additions & 4 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,18 @@ macro_rules! impl_split {
}
}

fn encode_to_bytes(&self) -> Bytes {
Bytes::from(ConnectorSplit::from(self).encode_to_vec())
fn encode_to_json(&self) -> JsonbVal {
use serde_json::json;
let inner = self.encode_to_json_inner().take();
json!({ SPLIT_TYPE_FIELD: self.get_type(), SPLIT_INFO_FIELD: inner}).into()
}

fn restore_from_bytes(bytes: &[u8]) -> Result<Self> {
SplitImpl::try_from(&ConnectorSplit::decode(bytes)?)
fn restore_from_json(value: JsonbVal) -> Result<Self> {
let mut value = value.take();
let json_obj = value.as_object_mut().unwrap();
let split_type = json_obj.remove(SPLIT_TYPE_FIELD).unwrap().as_str().unwrap().to_string();
let inner_value = json_obj.remove(SPLIT_INFO_FIELD).unwrap();
Self::restore_from_json_inner(&split_type, inner_value.into())
}
}

Expand All @@ -98,6 +104,21 @@ macro_rules! impl_split {
$( Self::$variant_name(inner) => Self::$variant_name(inner.copy_with_offset(start_offset)), )*
}
}

pub fn encode_to_json_inner(&self) -> JsonbVal {
match self {
$( Self::$variant_name(inner) => inner.encode_to_json(), )*
}
}

fn restore_from_json_inner(split_type: &str, value: JsonbVal) -> Result<Self> {
match split_type.to_lowercase().as_str() {
$( $connector_name => <$split>::restore_from_json(value).map(SplitImpl::$variant_name), )*
other => {
Err(anyhow!("connector '{}' is not supported", other))
}
}
}
}
}
}
Expand Down
37 changes: 33 additions & 4 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ use enum_as_inner::EnumAsInner;
use futures::stream::BoxStream;
use itertools::Itertools;
use parking_lot::Mutex;
use prost::Message;
use risingwave_common::array::StreamChunk;
use risingwave_common::array::{JsonbVal, StreamChunk};
use risingwave_common::catalog::TableId;
use risingwave_common::error::{ErrorCode, ErrorSuppressor, Result as RwResult, RwError};
use risingwave_common::types::Scalar;
use risingwave_pb::connector_service::TableSchema;
use risingwave_pb::source::ConnectorSplit;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -66,6 +66,9 @@ use crate::source::pulsar::{
};
use crate::{impl_connector_properties, impl_split, impl_split_enumerator, impl_split_reader};

const SPLIT_TYPE_FIELD: &str = "split_type";
const SPLIT_INFO_FIELD: &str = "split_info";

/// [`SplitEnumerator`] fetches the split metadata from the external source service.
/// NOTE: It runs in the meta server, so probably it should be moved to the `meta` crate.
#[async_trait]
Expand Down Expand Up @@ -403,8 +406,18 @@ impl Eq for SourceMessage {}
/// The metadata of a split.
pub trait SplitMetaData: Sized {
fn id(&self) -> SplitId;
fn encode_to_bytes(&self) -> Bytes;
fn restore_from_bytes(bytes: &[u8]) -> Result<Self>;
fn encode_to_bytes(&self) -> Bytes {
self.encode_to_json()
.as_scalar_ref()
.value_serialize()
.into()
}
fn restore_from_bytes(bytes: &[u8]) -> Result<Self> {
Self::restore_from_json(JsonbVal::value_deserialize(bytes).unwrap())
}

fn encode_to_json(&self) -> JsonbVal;
fn restore_from_json(value: JsonbVal) -> Result<Self>;
}

/// [`ConnectorState`] maintains the consuming splits' info. In specific split readers,
Expand All @@ -427,6 +440,7 @@ mod tests {
let get_value = split_impl.into_kafka().unwrap();
println!("{:?}", get_value);
assert_eq!(split.encode_to_bytes(), get_value.encode_to_bytes());
assert_eq!(split.encode_to_json(), get_value.encode_to_json());

Ok(())
}
Expand All @@ -441,6 +455,21 @@ mod tests {
split_impl.encode_to_bytes(),
restored_split_impl.encode_to_bytes()
);
assert_eq!(
split_impl.encode_to_json(),
restored_split_impl.encode_to_json()
);

let encoded_split = split_impl.encode_to_json();
let restored_split_impl = SplitImpl::restore_from_json(encoded_split)?;
assert_eq!(
split_impl.encode_to_bytes(),
restored_split_impl.encode_to_bytes()
);
assert_eq!(
split_impl.encode_to_json(),
restored_split_impl.encode_to_json()
);
Ok(())
}

Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/source/cdc/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use anyhow::anyhow;
use bytes::Bytes;
use risingwave_common::array::JsonbVal;
use serde::{Deserialize, Serialize};

use crate::source::{SplitId, SplitMetaData};
Expand All @@ -31,12 +31,12 @@ impl SplitMetaData for CdcSplit {
format!("{}", self.source_id).into()
}

fn encode_to_bytes(&self) -> Bytes {
Bytes::from(serde_json::to_string(self).unwrap())
fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
}

fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
serde_json::from_slice(bytes).map_err(|e| anyhow!(e))
fn encode_to_json(&self) -> JsonbVal {
serde_json::to_value(self.clone()).unwrap().into()
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/source/datagen/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use anyhow::anyhow;
use bytes::Bytes;
use risingwave_common::array::JsonbVal;
use serde::{Deserialize, Serialize};

use crate::source::base::SplitMetaData;
Expand All @@ -32,12 +32,12 @@ impl SplitMetaData for DatagenSplit {
format!("{}-{}", self.split_num, self.split_index).into()
}

fn encode_to_bytes(&self) -> Bytes {
Bytes::from(serde_json::to_string(self).unwrap())
fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
}

fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
serde_json::from_slice(bytes).map_err(|e| anyhow!(e))
fn encode_to_json(&self) -> JsonbVal {
serde_json::to_value(self.clone()).unwrap().into()
}
}

Expand Down
9 changes: 5 additions & 4 deletions src/connector/src/source/filesystem/file_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use anyhow::anyhow;
use risingwave_common::array::JsonbVal;
use serde::{Deserialize, Serialize};

use crate::source::{SplitId, SplitMetaData};
Expand All @@ -30,12 +31,12 @@ impl SplitMetaData for FsSplit {
self.name.as_str().into()
}

fn encode_to_bytes(&self) -> bytes::Bytes {
bytes::Bytes::from(serde_json::to_string(self).unwrap())
fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
}

fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
serde_json::from_slice(bytes).map_err(|e| anyhow!(e))
fn encode_to_json(&self) -> JsonbVal {
serde_json::to_value(self.clone()).unwrap().into()
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/source/google_pubsub/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use anyhow::anyhow;
use bytes::Bytes;
use risingwave_common::array::JsonbVal;
use serde::{Deserialize, Serialize};

use crate::source::{SplitId, SplitMetaData};
Expand Down Expand Up @@ -47,12 +47,12 @@ impl PubsubSplit {
}

impl SplitMetaData for PubsubSplit {
fn encode_to_bytes(&self) -> Bytes {
Bytes::from(serde_json::to_string(self).unwrap())
fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
}

fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
serde_json::from_slice(bytes).map_err(|e| anyhow!(e))
fn encode_to_json(&self) -> JsonbVal {
serde_json::to_value(self.clone()).unwrap().into()
}

fn id(&self) -> SplitId {
Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/source/kafka/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use anyhow::anyhow;
use bytes::Bytes;
use risingwave_common::array::JsonbVal;
use serde::{Deserialize, Serialize};

use crate::source::{SplitId, SplitMetaData};
Expand All @@ -32,12 +32,12 @@ impl SplitMetaData for KafkaSplit {
format!("{}", self.partition).into()
}

fn encode_to_bytes(&self) -> Bytes {
Bytes::from(serde_json::to_string(self).unwrap())
fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
}

fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
serde_json::from_slice(bytes).map_err(|e| anyhow!(e))
fn encode_to_json(&self) -> JsonbVal {
serde_json::to_value(self.clone()).unwrap().into()
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/source/kinesis/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use anyhow::anyhow;
use bytes::Bytes;
use risingwave_common::array::JsonbVal;
use serde::{Deserialize, Serialize};

use crate::source::{SplitId, SplitMetaData};
Expand All @@ -39,12 +39,12 @@ impl SplitMetaData for KinesisSplit {
self.shard_id.clone()
}

fn encode_to_bytes(&self) -> Bytes {
Bytes::from(serde_json::to_string(self).unwrap())
fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
}

fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
serde_json::from_slice(bytes).map_err(|e| anyhow!(e))
fn encode_to_json(&self) -> JsonbVal {
serde_json::to_value(self.clone()).unwrap().into()
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/source/nexmark/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use anyhow::anyhow;
use bytes::Bytes;
use risingwave_common::array::JsonbVal;
use serde::{Deserialize, Serialize};

use crate::source::{SplitId, SplitMetaData};
Expand All @@ -31,12 +31,12 @@ impl SplitMetaData for NexmarkSplit {
format!("{}-{}", self.split_num, self.split_index).into()
}

fn encode_to_bytes(&self) -> Bytes {
Bytes::from(serde_json::to_string(self).unwrap())
fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
}

fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
serde_json::from_slice(bytes).map_err(|e| anyhow!(e))
fn encode_to_json(&self) -> JsonbVal {
serde_json::to_value(self.clone()).unwrap().into()
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/source/pulsar/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use anyhow::anyhow;
use bytes::Bytes;
use risingwave_common::array::JsonbVal;
use serde::{Deserialize, Serialize};

use crate::source::pulsar::topic::Topic;
Expand Down Expand Up @@ -46,11 +46,11 @@ impl SplitMetaData for PulsarSplit {
self.topic.to_string().into()
}

fn encode_to_bytes(&self) -> Bytes {
Bytes::from(serde_json::to_string(self).unwrap())
fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
}

fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
serde_json::from_slice(bytes).map_err(|e| anyhow!(e))
fn encode_to_json(&self) -> JsonbVal {
serde_json::to_value(self.clone()).unwrap().into()
}
}
Loading

0 comments on commit 08fc246

Please sign in to comment.