Skip to content

Commit

Permalink
feat: add kafka backfill frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jan 16, 2024
1 parent 593923c commit 6c2027c
Show file tree
Hide file tree
Showing 49 changed files with 1,825 additions and 556 deletions.
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ d70dba827c303373f3220c9733f7c7443e5c2d37

# chore: cargo +nightly fmt (#13162) (format let-chains)
c583e2c6c054764249acf484438c7bf7197765f4

# chore: replace all ProstXxx with PbXxx (#8621)
6fd8821f2e053957b183d648bea9c95b6703941f
13 changes: 10 additions & 3 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,16 @@ message StreamSourceInfo {
SchemaRegistryNameStrategy name_strategy = 10;
optional string key_message_name = 11;
plan_common.ExternalTableDesc external_table = 12;
// Whether the stream source is a cdc source streaming job.
// We need this field to differentiate the cdc source job until we fully implement risingwavelabs/rfcs#72.
bool cdc_source_job = 13;
// Whether the stream source has a streaming job.
// This is related with [RFC: Reusable Source Executor](https://github.com/risingwavelabs/rfcs/pull/72).
// Currently, the following sources have streaming jobs:
// - Direct CDC sources (mysql & postgresql)
// - MQ sources (Kafka, Pulsar, Kinesis, etc.)
bool has_streaming_job = 13;
// Only used when `has_streaming_job` is `true`.
// If `false`, `requires_singleton` will be set in the stream plan.
bool is_distributed = 15;
reserved "cdc_source_job"; // deprecated
// Options specified by user in the FORMAT ENCODE clause.
map<string, string> format_encode_options = 14;
}
Expand Down
19 changes: 19 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,23 @@ message StreamFsFetchNode {
StreamFsFetch node_inner = 1;
}

message SourceBackfillNode {
uint32 source_id = 1;
optional uint32 row_id_index = 3;
// XXX: is this all columns or only required columns?
repeated plan_common.ColumnCatalog columns = 4;
catalog.StreamSourceInfo info = 7;
string source_name = 8;
map<string, string> with_properties = 6;
// Streaming rate limit
// optional uint32 rate_limit = 9;

// fields above are the same as StreamSource

// `| partition_id | backfill_progress |`
catalog.Table state_table = 2;
}

message SinkDesc {
reserved 4;
reserved "columns";
Expand Down Expand Up @@ -758,6 +775,7 @@ message StreamNode {
StreamFsFetchNode stream_fs_fetch = 138;
StreamCdcScanNode stream_cdc_scan = 139;
CdcFilterNode cdc_filter = 140;
SourceBackfillNode source_backfill = 141;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down Expand Up @@ -852,6 +870,7 @@ enum FragmentTypeFlag {
FRAGMENT_TYPE_FLAG_VALUES = 64;
FRAGMENT_TYPE_FLAG_DML = 128;
FRAGMENT_TYPE_FLAG_CDC_FILTER = 256;
FRAGMENT_TYPE_FLAG_SOURCE_BACKFILL = 512;
}

// The streaming context associated with a stream plan
Expand Down
26 changes: 26 additions & 0 deletions src/common/src/util/iter_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,29 @@ where
{
a.into_iter().zip_eq_fast(b)
}

pub trait IntoIteratorExt
where
for<'a> &'a Self: IntoIterator,
{
/// Shorter version of `self.iter().map(f).collect()`.
fn map_collect<A, B, F, BCollection>(&self, f: F) -> BCollection
where
F: FnMut(&A) -> B,
for<'a> &'a Self: IntoIterator<Item = &'a A>,
BCollection: FromIterator<B>,
{
self.into_iter().map(f).collect()
}

/// Shorter version of `self.iter().map(f).collect_vec()`.
fn map_to_vec<A, B, F>(&self, f: F) -> Vec<B>
where
F: FnMut(&A) -> B,
for<'a> &'a Self: IntoIterator<Item = &'a A>,
{
self.map_collect(f)
}
}

impl<T> IntoIteratorExt for T where for<'a> &'a Self: IntoIterator {}
3 changes: 3 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ pub fn visit_stream_node_tables_inner<F>(
always!(source.state_table, "FsFetch");
}
}
NodeBody::SourceBackfill(node) => {
always!(node.state_table, "SourceBackfill")
}

// Sink
NodeBody::Sink(node) => {
Expand Down
1 change: 1 addition & 0 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ impl StreamService for StreamServiceImpl {
.inspect_err(
|err| tracing::error!(error = %err.as_report(), "failed to collect barrier"),
)?;
tracing::trace!(?create_mview_progress, ?kind, "barrier_complete");

let (synced_sstables, table_watermarks) = sync_result
.map(|sync_result| (sync_result.uncommitted_ssts, sync_result.table_watermarks))
Expand Down
4 changes: 4 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ pub trait SourceProperties: TryFromHashmap + Clone + WithOptions {
type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;
type SplitReader: SplitReader<Split = Self::Split, Properties = Self>;

/// Load additional info from `PbSource`. Currently only used by CDC.
fn init_from_pb_source(&mut self, _source: &PbSource) {}

/// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {}
}

Expand Down Expand Up @@ -447,10 +449,12 @@ impl ConnectorProperties {
matches!(self, ConnectorProperties::Kinesis(_))
}

/// Load additional info from `PbSource`. Currently only used by CDC.
pub fn init_from_pb_source(&mut self, source: &PbSource) {
dispatch_source_prop!(self, prop, prop.init_from_pb_source(source))
}

/// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
pub fn init_from_pb_cdc_table_desc(&mut self, cdc_table_desc: &ExternalTableDesc) {
dispatch_source_prop!(self, prop, prop.init_from_pb_cdc_table_desc(cdc_table_desc))
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ where
};
self.table_schema = table_schema;
if let Some(info) = source.info.as_ref() {
self.is_multi_table_shared = info.cdc_source_job;
self.is_multi_table_shared = info.has_streaming_job;
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/frontend/src/binder/relation/table_or_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ impl From<&SourceCatalog> for BoundSource {
}
}

impl BoundSource {
pub fn can_backfill(&self) -> bool {
self.catalog.info.has_streaming_job
}
}

impl Binder {
/// Binds table or source, or logical view according to what we get from the catalog.
pub fn bind_relation_by_name_inner(
Expand Down
17 changes: 9 additions & 8 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ fn bind_columns_from_source_for_cdc(
row_encode: row_encode_to_prost(&source_schema.row_encode) as i32,
format_encode_options,
use_schema_registry: json_schema_infer_use_schema_registry(&schema_config),
cdc_source_job: true,
has_streaming_job: true,
..Default::default()
};
if !format_encode_options_to_consume.is_empty() {
Expand Down Expand Up @@ -1130,18 +1130,22 @@ pub async fn handle_create_source(
ensure_table_constraints_supported(&stmt.constraints)?;
let sql_pk_names = bind_sql_pk_names(&stmt.columns, &stmt.constraints)?;

// gated the feature with a session variable
let create_cdc_source_job = if is_cdc_connector(&with_properties) {
CdcTableType::from_properties(&with_properties).can_backfill()
} else {
false
};
let has_streaming_job = create_cdc_source_job || is_kafka_connector(&with_properties);

let (columns_from_resolve_source, source_info) = if create_cdc_source_job {
let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job {
bind_columns_from_source_for_cdc(&session, &source_schema, &with_properties)?
} else {
bind_columns_from_source(&session, &source_schema, &with_properties).await?
};
if has_streaming_job {
source_info.has_streaming_job = true;
source_info.is_distributed = !create_cdc_source_job;
}
let columns_from_sql = bind_sql_columns(&stmt.columns)?;

let mut columns = bind_all_columns(
Expand Down Expand Up @@ -1235,21 +1239,18 @@ pub async fn handle_create_source(

let catalog_writer = session.catalog_writer()?;

if create_cdc_source_job {
// create a streaming job for the cdc source, which will mark as *singleton* in the Fragmenter
if has_streaming_job {
let graph = {
let context = OptimizerContext::from_handler_args(handler_args);
// cdc source is an append-only source in plain json format
let source_node = LogicalSource::new(
Some(Rc::new(SourceCatalog::from(&source))),
columns.clone(),
row_id_index,
false,
false, // Do not gen RowID. Gen RowID after backfill node instead.
false,
context.into(),
)?;

// generate stream graph for cdc source job
let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?;
let mut graph = build_graph(stream_plan)?;
graph.parallelism =
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/logical_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,8 @@ impl ToStream for LogicalSource {
{
plan_prefix = Some(self.rewrite_new_s3_plan()?);
}

// TODO: after SourceBackfill is added, we shouldn't put generated columns/row id here, and put them after backfill instead.
plan = if self.core.for_table {
dispatch_new_s3_plan(self.rewrite_to_stream_batch_source(), plan_prefix)
} else {
Expand Down
Loading

0 comments on commit 6c2027c

Please sign in to comment.