Skip to content

Commit

Permalink
refactor(barrier): add control request to explicitly create partial g…
Browse files Browse the repository at this point in the history
…raph (#19383)
  • Loading branch information
wenym1 authored Nov 14, 2024
1 parent daed1f2 commit c920694
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 68 deletions.
12 changes: 11 additions & 1 deletion proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,19 @@ message WaitEpochCommitResponse {
}

message StreamingControlStreamRequest {
message InitRequest {
message InitialPartialGraph {
uint64 partial_graph_id = 1;
repeated stream_plan.SubscriptionUpstreamInfo subscriptions = 2;
}

message InitRequest {
repeated InitialPartialGraph graphs = 1;
}

message CreatePartialGraphRequest {
uint64 partial_graph_id = 1;
}

message RemovePartialGraphRequest {
repeated uint64 partial_graph_ids = 1;
}
Expand All @@ -75,6 +84,7 @@ message StreamingControlStreamRequest {
InitRequest init = 1;
InjectBarrierRequest inject_barrier = 2;
RemovePartialGraphRequest remove_partial_graph = 3;
CreatePartialGraphRequest create_partial_graph = 4;
}
}

Expand Down
18 changes: 11 additions & 7 deletions src/meta/src/barrier/checkpoint/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use risingwave_meta_model::WorkerId;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::PbSubscriptionUpstreamInfo;
use risingwave_pb::stream_service::BarrierCompleteResponse;
use tracing::{debug, warn};

Expand All @@ -40,7 +39,7 @@ use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
use crate::barrier::utils::collect_creating_job_commit_epoch_info;
use crate::barrier::{
BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
SnapshotBackfillInfo, TracedEpoch,
InflightSubscriptionInfo, SnapshotBackfillInfo, TracedEpoch,
};
use crate::manager::ActiveStreamingWorkerNodes;
use crate::rpc::metrics::GLOBAL_META_METRICS;
Expand Down Expand Up @@ -147,6 +146,7 @@ impl CheckpointControl {
} else {
new_database.state.in_flight_prev_epoch().clone()
};
control_stream_manager.add_partial_graph(database_id, None)?;
(entry.insert(new_database), max_prev_epoch)
}
Command::Flush
Expand Down Expand Up @@ -276,10 +276,12 @@ impl CheckpointControl {
.for_each(|database| database.create_mview_tracker.abort_all());
}

pub(crate) fn subscriptions(&self) -> impl Iterator<Item = PbSubscriptionUpstreamInfo> + '_ {
self.databases
.values()
.flat_map(|database| &database.state.inflight_subscription_info)
pub(crate) fn subscriptions(
&self,
) -> impl Iterator<Item = (DatabaseId, &InflightSubscriptionInfo)> + '_ {
self.databases.iter().map(|(database_id, database)| {
(*database_id, &database.state.inflight_subscription_info)
})
}
}

Expand Down Expand Up @@ -828,8 +830,10 @@ impl DatabaseCheckpointControl {
.expect("checked Some")
.to_mutation(None)
.expect("should have some mutation in `CreateStreamingJob` command");
let job_id = info.table_fragments.table_id();
control_stream_manager.add_partial_graph(self.database_id, Some(job_id))?;
self.creating_streaming_job_controls.insert(
info.table_fragments.table_id(),
job_id,
CreatingStreamingJobControl::new(
info.clone(),
snapshot_backfill_info.clone(),
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/barrier/context/context_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use futures::future::try_join_all;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::SubscriptionUpstreamInfo;
use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
use risingwave_pb::stream_service::WaitEpochCommitRequest;
use risingwave_rpc_client::StreamingControlHandle;

Expand Down Expand Up @@ -70,9 +70,9 @@ impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
async fn new_control_stream(
&self,
node: &WorkerNode,
subscriptions: impl Iterator<Item = SubscriptionUpstreamInfo>,
init_request: &PbInitRequest,
) -> MetaResult<StreamingControlHandle> {
self.new_control_stream_impl(node, subscriptions).await
self.new_control_stream_impl(node, init_request).await
}

async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/barrier/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::Arc;
use arc_swap::ArcSwap;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_plan::SubscriptionUpstreamInfo;
use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
use risingwave_rpc_client::StreamingControlHandle;

use crate::barrier::command::CommandContext;
Expand Down Expand Up @@ -60,7 +60,7 @@ pub(super) trait GlobalBarrierWorkerContext: Send + Sync + 'static {
async fn new_control_stream(
&self,
node: &WorkerNode,
subscriptions: impl Iterator<Item = SubscriptionUpstreamInfo>,
init_request: &PbInitRequest,
) -> MetaResult<StreamingControlHandle>;

async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot>;
Expand Down
61 changes: 46 additions & 15 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use risingwave_pb::common::{ActorInfo, WorkerNode};
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_plan::{Barrier, BarrierMutation, StreamActor, SubscriptionUpstreamInfo};
use risingwave_pb::stream_service::streaming_control_stream_request::RemovePartialGraphRequest;
use risingwave_pb::stream_service::streaming_control_stream_request::{
CreatePartialGraphRequest, PbInitRequest, PbInitialPartialGraph, RemovePartialGraphRequest,
};
use risingwave_pb::stream_service::{
streaming_control_stream_request, streaming_control_stream_response, BarrierCompleteResponse,
InjectBarrierRequest, StreamingControlStreamRequest,
Expand Down Expand Up @@ -94,7 +96,7 @@ impl ControlStreamManager {
pub(super) async fn add_worker(
&mut self,
node: WorkerNode,
subscriptions: impl Iterator<Item = SubscriptionUpstreamInfo>,
initial_subscriptions: impl Iterator<Item = (DatabaseId, &InflightSubscriptionInfo)>,
context: &impl GlobalBarrierWorkerContext,
) {
let node_id = node.id as WorkerId;
Expand All @@ -106,13 +108,10 @@ impl ControlStreamManager {
let mut backoff = ExponentialBackoff::from_millis(100)
.max_delay(Duration::from_secs(3))
.factor(5);
let subscriptions = subscriptions.collect_vec();
let init_request = Self::collect_init_request(initial_subscriptions);
const MAX_RETRY: usize = 5;
for i in 1..=MAX_RETRY {
match context
.new_control_stream(&node, subscriptions.iter().cloned())
.await
{
match context.new_control_stream(&node, &init_request).await {
Ok(handle) => {
assert!(self
.nodes
Expand Down Expand Up @@ -141,16 +140,14 @@ impl ControlStreamManager {

pub(super) async fn reset(
&mut self,
subscriptions: impl Iterator<Item = &InflightSubscriptionInfo>,
initial_subscriptions: impl Iterator<Item = (DatabaseId, &InflightSubscriptionInfo)>,
nodes: &HashMap<WorkerId, WorkerNode>,
context: &impl GlobalBarrierWorkerContext,
) -> MetaResult<()> {
let subscriptions = subscriptions.cloned().collect_vec();
let subscriptions = &subscriptions;
let init_request = Self::collect_init_request(initial_subscriptions);
let init_request = &init_request;
let nodes = try_join_all(nodes.iter().map(|(worker_id, node)| async move {
let handle = context
.new_control_stream(node, subscriptions.iter().flatten())
.await?;
let handle = context.new_control_stream(node, init_request).await?;
Result::<_, MetaError>::Ok((
*worker_id,
ControlStreamNode {
Expand Down Expand Up @@ -270,6 +267,19 @@ impl ControlStreamManager {
tracing::debug!(?errors, "collected stream errors");
errors
}

fn collect_init_request(
initial_subscriptions: impl Iterator<Item = (DatabaseId, &InflightSubscriptionInfo)>,
) -> PbInitRequest {
PbInitRequest {
graphs: initial_subscriptions
.map(|(database_id, info)| PbInitialPartialGraph {
partial_graph_id: to_partial_graph_id(database_id, None),
subscriptions: info.into_iter().collect_vec(),
})
.collect(),
}
}
}

impl ControlStreamManager {
Expand Down Expand Up @@ -436,6 +446,27 @@ impl ControlStreamManager {
Ok(node_need_collect)
}

pub(super) fn add_partial_graph(
&mut self,
database_id: DatabaseId,
creating_job_id: Option<TableId>,
) -> MetaResult<()> {
let partial_graph_id = to_partial_graph_id(database_id, creating_job_id);
self.nodes.iter().try_for_each(|(_, node)| {
node.handle
.request_sender
.send(StreamingControlStreamRequest {
request: Some(
streaming_control_stream_request::Request::CreatePartialGraph(
CreatePartialGraphRequest { partial_graph_id },
),
),
})
.map_err(|_| anyhow!("failed to add partial graph"))
})?;
Ok(())
}

pub(super) fn remove_partial_graph(
&mut self,
database_id: DatabaseId,
Expand Down Expand Up @@ -472,14 +503,14 @@ impl GlobalBarrierWorkerContextImpl {
pub(super) async fn new_control_stream_impl(
&self,
node: &WorkerNode,
subscriptions: impl Iterator<Item = SubscriptionUpstreamInfo>,
init_request: &PbInitRequest,
) -> MetaResult<StreamingControlHandle> {
let handle = self
.env
.stream_client_pool()
.get(node)
.await?
.start_streaming_control(subscriptions)
.start_streaming_control(init_request.clone())
.await?;
Ok(handle)
}
Expand Down
7 changes: 4 additions & 3 deletions src/meta/src/barrier/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::HashMap;
use std::mem::replace;
use std::sync::Arc;
use std::sync::{Arc, LazyLock};
use std::time::Duration;

use arc_swap::ArcSwap;
Expand Down Expand Up @@ -46,7 +46,7 @@ use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager};
use crate::barrier::schedule::PeriodicBarriers;
use crate::barrier::{
schedule, BarrierKind, BarrierManagerRequest, BarrierManagerStatus,
BarrierWorkerRuntimeInfoSnapshot, RecoveryReason, TracedEpoch,
BarrierWorkerRuntimeInfoSnapshot, InflightSubscriptionInfo, RecoveryReason, TracedEpoch,
};
use crate::error::MetaErrorInner;
use crate::hummock::HummockManagerRef;
Expand Down Expand Up @@ -558,9 +558,10 @@ impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {

let mut control_stream_manager = ControlStreamManager::new(self.env.clone());
let reset_start_time = Instant::now();
let empty_subscriptions = LazyLock::new(InflightSubscriptionInfo::default);
control_stream_manager
.reset(
subscription_infos.values(),
database_fragment_infos.keys().map(|database_id| (*database_id, subscription_infos.get(database_id).unwrap_or_else(|| &*empty_subscriptions))),
active_streaming_nodes.current(),
&*self.context,
)
Expand Down
9 changes: 3 additions & 6 deletions src/rpc_client/src/stream_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ use futures::TryStreamExt;
use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE;
use risingwave_common::monitor::{EndpointExt, TcpConfig};
use risingwave_common::util::addr::HostAddr;
use risingwave_pb::stream_plan::SubscriptionUpstreamInfo;
use risingwave_pb::stream_service::stream_service_client::StreamServiceClient;
use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest;
use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
use risingwave_pb::stream_service::streaming_control_stream_response::InitResponse;
use risingwave_pb::stream_service::*;
use tokio_stream::wrappers::UnboundedReceiverStream;
Expand Down Expand Up @@ -86,13 +85,11 @@ pub type StreamingControlHandle =
impl StreamClient {
pub async fn start_streaming_control(
&self,
subscriptions: impl Iterator<Item = SubscriptionUpstreamInfo>,
init_request: PbInitRequest,
) -> Result<StreamingControlHandle> {
let first_request = StreamingControlStreamRequest {
request: Some(streaming_control_stream_request::Request::Init(
InitRequest {
subscriptions: subscriptions.collect(),
},
init_request,
)),
};
let mut client = self.0.to_owned();
Expand Down
Loading

0 comments on commit c920694

Please sign in to comment.