Skip to content

Commit

Permalink
feat(barrier): maintain per database scheduled command queue (#19592)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Nov 27, 2024
1 parent c121fa7 commit ac5cb40
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 68 deletions.
13 changes: 9 additions & 4 deletions src/meta/src/barrier/context/context_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;

use futures::future::try_join_all;
use risingwave_common::catalog::DatabaseId;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::PausedReason;
Expand Down Expand Up @@ -42,16 +43,20 @@ impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
self.scheduled_barriers.next_scheduled().await
}

fn abort_and_mark_blocked(&self, recovery_reason: RecoveryReason) {
fn abort_and_mark_blocked(
&self,
database_id: Option<DatabaseId>,
recovery_reason: RecoveryReason,
) {
self.set_status(BarrierManagerStatus::Recovering(recovery_reason));

// Mark blocked and abort buffered schedules, they might be dirty already.
self.scheduled_barriers
.abort_and_mark_blocked("cluster is under recovering");
.abort_and_mark_blocked(database_id, "cluster is under recovering");
}

fn mark_ready(&self) {
self.scheduled_barriers.mark_ready();
fn mark_ready(&self, database_id: Option<DatabaseId>) {
self.scheduled_barriers.mark_ready(database_id);
self.set_status(BarrierManagerStatus::Running);
}

Expand Down
9 changes: 7 additions & 2 deletions src/meta/src/barrier/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::future::Future;
use std::sync::Arc;

use arc_swap::ArcSwap;
use risingwave_common::catalog::DatabaseId;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
Expand All @@ -42,8 +43,12 @@ pub(super) trait GlobalBarrierWorkerContext: Send + Sync + 'static {
) -> impl Future<Output = MetaResult<HummockVersionStats>> + Send + '_;

async fn next_scheduled(&self) -> Scheduled;
fn abort_and_mark_blocked(&self, recovery_reason: RecoveryReason);
fn mark_ready(&self);
fn abort_and_mark_blocked(
&self,
database_id: Option<DatabaseId>,
recovery_reason: RecoveryReason,
);
fn mark_ready(&self, database_id: Option<DatabaseId>);

fn post_collect_command<'a>(
&'a self,
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/barrier/context/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl GlobalBarrierWorkerContextImpl {
tracing::info!("recovered mview progress");

// This is a quick path to accelerate the process of dropping and canceling streaming jobs.
let _ = self.scheduled_barriers.pre_apply_drop_cancel();
let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);

let mut active_streaming_nodes =
ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
Expand Down Expand Up @@ -178,7 +178,7 @@ impl GlobalBarrierWorkerContextImpl {
})?
};

if self.scheduled_barriers.pre_apply_drop_cancel() {
if self.scheduled_barriers.pre_apply_drop_cancel(None) {
info = self.resolve_graph_info().await.inspect_err(|err| {
warn!(error = %err.as_report(), "resolve actor info failed");
})?
Expand Down
176 changes: 119 additions & 57 deletions src/meta/src/barrier/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::iter::once;
use std::sync::Arc;
use std::time::{Duration, Instant};

use anyhow::{anyhow, Context};
use assert_matches::assert_matches;
use itertools::Itertools;
use parking_lot::Mutex;
use prometheus::HistogramTimer;
use risingwave_common::catalog::{DatabaseId, TableId};
Expand Down Expand Up @@ -64,22 +65,27 @@ enum QueueStatus {
}

struct ScheduledQueueItem {
database_id: DatabaseId,
command: Command,
notifiers: Vec<Notifier>,
send_latency_timer: HistogramTimer,
span: tracing::Span,
}

pub(super) struct ScheduledQueue {
queue: VecDeque<ScheduledQueueItem>,
struct StatusQueue<T> {
queue: T,
status: QueueStatus,
}

impl ScheduledQueue {
fn new() -> Self {
type DatabaseScheduledQueue = StatusQueue<VecDeque<ScheduledQueueItem>>;
type ScheduledQueue = StatusQueue<HashMap<DatabaseId, DatabaseScheduledQueue>>;

impl<T> StatusQueue<T> {
fn new() -> Self
where
T: Default,
{
Self {
queue: VecDeque::new(),
queue: T::default(),
status: QueueStatus::Ready,
}
}
Expand All @@ -92,11 +98,7 @@ impl ScheduledQueue {
self.status = QueueStatus::Ready;
}

fn len(&self) -> usize {
self.queue.len()
}

fn push_back(&mut self, scheduled: ScheduledQueueItem) -> MetaResult<()> {
fn validate_item(&mut self, scheduled: &ScheduledQueueItem) -> MetaResult<()> {
// We don't allow any command to be scheduled when the queue is blocked, except for dropping streaming jobs.
// Because we allow dropping streaming jobs when the cluster is under recovery, so we have to buffer the drop
// command and execute it when the cluster is ready to clean up it.
Expand All @@ -110,7 +112,6 @@ impl ScheduledQueue {
{
return Err(MetaError::unavailable(reason));
}
self.queue.push_back(scheduled);
Ok(())
}
}
Expand All @@ -131,15 +132,13 @@ impl Inner {
/// Create a new scheduled barrier with the given `checkpoint`, `command` and `notifiers`.
fn new_scheduled(
&self,
database_id: DatabaseId,
command: Command,
notifiers: impl IntoIterator<Item = Notifier>,
) -> ScheduledQueueItem {
// Create a span only if we're being traced, instead of for every periodic barrier.
let span = tracing_span();

ScheduledQueueItem {
database_id,
command,
notifiers: notifiers.into_iter().collect(),
send_latency_timer: self.metrics.barrier_send_latency.start_timer(),
Expand Down Expand Up @@ -181,20 +180,38 @@ impl BarrierScheduler {
}

/// Push a scheduled barrier into the queue.
fn push(&self, scheduleds: impl IntoIterator<Item = ScheduledQueueItem>) -> MetaResult<()> {
fn push(
&self,
database_id: DatabaseId,
scheduleds: impl IntoIterator<Item = ScheduledQueueItem>,
) -> MetaResult<()> {
let mut queue = self.inner.queue.lock();
let scheduleds = scheduleds.into_iter().collect_vec();
scheduleds
.iter()
.try_for_each(|scheduled| queue.validate_item(scheduled))?;
let queue = queue
.queue
.entry(database_id)
.or_insert_with(DatabaseScheduledQueue::new);
scheduleds
.iter()
.try_for_each(|scheduled| queue.validate_item(scheduled))?;
for scheduled in scheduleds {
queue.push_back(scheduled)?;
if queue.len() == 1 {
queue.queue.push_back(scheduled);
if queue.queue.len() == 1 {
self.inner.changed_tx.send(()).ok();
}
}
Ok(())
}

/// Try to cancel scheduled cmd for create streaming job, return true if cancelled.
pub fn try_cancel_scheduled_create(&self, table_id: TableId) -> bool {
/// Try to cancel scheduled cmd for create streaming job, return true if the command exists previously and get cancelled.
pub fn try_cancel_scheduled_create(&self, database_id: DatabaseId, table_id: TableId) -> bool {
let queue = &mut self.inner.queue.lock();
let Some(queue) = queue.queue.get_mut(&database_id) else {
return false;
};

if let Some(idx) = queue.queue.iter().position(|scheduled| {
if let Command::CreateStreamingJob { info, .. } = &scheduled.command
Expand Down Expand Up @@ -232,7 +249,6 @@ impl BarrierScheduler {

contexts.push((started_rx, collect_rx));
scheduleds.push(self.inner.new_scheduled(
database_id,
command,
once(Notifier {
started: Some(started_tx),
Expand All @@ -241,7 +257,7 @@ impl BarrierScheduler {
));
}

self.push(scheduleds)?;
self.push(database_id, scheduleds)?;

for (injected_rx, collect_rx) in contexts {
// Wait for this command to be injected, and record the result.
Expand Down Expand Up @@ -375,18 +391,20 @@ impl PeriodicBarriers {

impl ScheduledBarriers {
pub(super) async fn next_scheduled(&self) -> Scheduled {
loop {
'outer: loop {
let mut rx = self.inner.changed_tx.subscribe();
{
let mut queue = self.inner.queue.lock();
if let Some(item) = queue.queue.pop_front() {
item.send_latency_timer.observe_duration();
break Scheduled {
database_id: item.database_id,
command: item.command,
notifiers: item.notifiers,
span: item.span,
};
for (database_id, queue) in &mut queue.queue {
if let Some(item) = queue.queue.pop_front() {
item.send_latency_timer.observe_duration();
break 'outer Scheduled {
database_id: *database_id,
command: item.command,
notifiers: item.notifiers,
span: item.span,
};
}
}
}
rx.changed().await.unwrap();
Expand All @@ -396,52 +414,96 @@ impl ScheduledBarriers {

impl ScheduledBarriers {
/// Pre buffered drop and cancel command, return true if any.
pub(super) fn pre_apply_drop_cancel(&self) -> bool {
self.pre_apply_drop_cancel_scheduled()
pub(super) fn pre_apply_drop_cancel(&self, database_id: Option<DatabaseId>) -> bool {
self.pre_apply_drop_cancel_scheduled(database_id)
}

/// Mark command scheduler as blocked and abort all queued scheduled command and notify with
/// specific reason.
pub(super) fn abort_and_mark_blocked(&self, reason: impl Into<String> + Copy) {
pub(super) fn abort_and_mark_blocked(
&self,
database_id: Option<DatabaseId>,
reason: impl Into<String> + Copy,
) {
let mut queue = self.inner.queue.lock();
queue.mark_blocked(reason.into());
while let Some(ScheduledQueueItem { notifiers, .. }) = queue.queue.pop_front() {
notifiers
.into_iter()
.for_each(|notify| notify.notify_collection_failed(anyhow!(reason.into()).into()))
let mark_blocked_and_notify_failed = |queue: &mut DatabaseScheduledQueue| {
queue.mark_blocked(reason.into());
while let Some(ScheduledQueueItem { notifiers, .. }) = queue.queue.pop_front() {
notifiers.into_iter().for_each(|notify| {
notify.notify_collection_failed(anyhow!(reason.into()).into())
})
}
};
if let Some(database_id) = database_id {
let queue = queue
.queue
.entry(database_id)
.or_insert_with(DatabaseScheduledQueue::new);
mark_blocked_and_notify_failed(queue);
} else {
queue.mark_blocked(reason.into());
for queue in queue.queue.values_mut() {
mark_blocked_and_notify_failed(queue);
}
}
}

/// Mark command scheduler as ready to accept new command.
pub(super) fn mark_ready(&self) {
pub(super) fn mark_ready(&self, database_id: Option<DatabaseId>) {
let mut queue = self.inner.queue.lock();
queue.mark_ready();
if let Some(database_id) = database_id {
queue
.queue
.entry(database_id)
.or_insert_with(DatabaseScheduledQueue::new)
.mark_ready();
} else {
queue.mark_ready();
for queue in queue.queue.values_mut() {
queue.mark_ready();
}
}
}

/// Try to pre apply drop and cancel scheduled command and return them if any.
/// It should only be called in recovery.
pub(super) fn pre_apply_drop_cancel_scheduled(&self) -> bool {
pub(super) fn pre_apply_drop_cancel_scheduled(&self, database_id: Option<DatabaseId>) -> bool {
let mut queue = self.inner.queue.lock();
assert_matches!(queue.status, QueueStatus::Blocked(_));
let mut applied = false;

while let Some(ScheduledQueueItem {
notifiers, command, ..
}) = queue.queue.pop_front()
{
match command {
Command::DropStreamingJobs { .. } => {
applied = true;
}
Command::DropSubscription { .. } => {}
_ => {
unreachable!("only drop and cancel streaming jobs should be buffered");
let mut pre_apply_drop_cancel = |queue: &mut DatabaseScheduledQueue| {
while let Some(ScheduledQueueItem {
notifiers, command, ..
}) = queue.queue.pop_front()
{
match command {
Command::DropStreamingJobs { .. } => {
applied = true;
}
Command::DropSubscription { .. } => {}
_ => {
unreachable!("only drop and cancel streaming jobs should be buffered");
}
}
notifiers.into_iter().for_each(|notify| {
notify.notify_collected();
});
}
};

if let Some(database_id) = database_id {
assert_matches!(queue.status, QueueStatus::Ready);
if let Some(queue) = queue.queue.get_mut(&database_id) {
assert_matches!(queue.status, QueueStatus::Blocked(_));
pre_apply_drop_cancel(queue);
}
} else {
assert_matches!(queue.status, QueueStatus::Blocked(_));
for queue in queue.queue.values_mut() {
pre_apply_drop_cancel(queue);
}
notifiers.into_iter().for_each(|notify| {
notify.notify_collected();
});
}

applied
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/barrier/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,12 +512,12 @@ impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
err: Option<MetaError>,
recovery_reason: RecoveryReason,
) {
self.context.abort_and_mark_blocked(recovery_reason);
self.context.abort_and_mark_blocked(None, recovery_reason);
// Clear all control streams to release resources (connections to compute nodes) first.
self.control_stream_manager.clear();

self.recovery_inner(paused_reason, err).await;
self.context.mark_ready();
self.context.mark_ready(None);
}

async fn recovery_inner(
Expand Down
Loading

0 comments on commit ac5cb40

Please sign in to comment.