Skip to content

Commit

Permalink
feat(meta): support scaling delta join (risingwavelabs#8694)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao authored Mar 23, 2023
1 parent 837d43a commit 7bac239
Show file tree
Hide file tree
Showing 13 changed files with 209 additions and 53 deletions.
2 changes: 1 addition & 1 deletion src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ where
dropped_actors,
actor_splits,
});
tracing::trace!("update mutation: {mutation:#?}");
tracing::debug!("update mutation: {mutation:#?}");
Some(mutation)
}
};
Expand Down
59 changes: 25 additions & 34 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,10 +432,6 @@ where
// treatment because the upstream and downstream of NoShuffle are always 1-1
// correspondence, so we need to clone the reschedule plan to the downstream of all
// cascading relations.
//
// Delta join will introduce a `NoShuffle` edge between index chain node and lookup node
// (index_mv --NoShuffle--> index_chain --NoShuffle--> lookup) which will break current
// `NoShuffle` scaling assumption. Currently we detect this case and forbid it to scale.
if no_shuffle_source_fragment_ids.contains(fragment_id) {
let mut queue: VecDeque<_> = fragment_dispatcher_map
.get(fragment_id)
Expand All @@ -451,21 +447,12 @@ where

if let Some(downstream_fragments) = fragment_dispatcher_map.get(&downstream_id)
{
// If `NoShuffle` used by other fragment type rather than `ChainNode`, bail.
for downstream_fragment_id in downstream_fragments.keys() {
let downstream_fragment = fragment_map
.get(downstream_fragment_id)
.ok_or_else(|| anyhow!("fragment {fragment_id} does not exist"))?;
if (downstream_fragment.get_fragment_type_mask()
& (FragmentTypeFlag::ChainNode as u32
| FragmentTypeFlag::Mview as u32))
== 0
{
bail!("Rescheduling NoShuffle edge only supports ChainNode and Mview. Other usage for e.g. delta join is forbidden currently.");
}
}
let no_shuffle_downstreams = downstream_fragments
.iter()
.filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
.map(|(fragment_id, _)| fragment_id);

queue.extend(downstream_fragments.keys().cloned());
queue.extend(no_shuffle_downstreams.copied());
}

no_shuffle_reschedule.insert(
Expand Down Expand Up @@ -743,7 +730,12 @@ where
.unwrap();

if let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id) {
for downstream_fragment_id in downstream_fragments.keys() {
let no_shuffle_downstreams = downstream_fragments
.iter()
.filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
.map(|(fragment_id, _)| fragment_id);

for downstream_fragment_id in no_shuffle_downstreams {
arrange_no_shuffle_relation(
ctx,
downstream_fragment_id,
Expand Down Expand Up @@ -1014,20 +1006,19 @@ where
}
}

let downstream_fragment_ids =
if let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(&fragment_id) {
// Skip NoShuffle fragments' downstream
if ctx
.no_shuffle_source_fragment_ids
.contains(&fragment.fragment_id)
{
vec![]
} else {
downstream_fragments.keys().copied().collect_vec()
}
} else {
vec![]
};
let downstream_fragment_ids = if let Some(downstream_fragments) =
ctx.fragment_dispatcher_map.get(&fragment_id)
{
// Skip fragments' no-shuffle downstream, as there's no need to update the merger
// (receiver) of a no-shuffle downstream
downstream_fragments
.iter()
.filter(|(_, dispatcher_type)| *dispatcher_type != &DispatcherType::NoShuffle)
.map(|(fragment_id, _)| *fragment_id)
.collect_vec()
} else {
vec![]
};

let vnode_bitmap_updates = match fragment.distribution_type() {
FragmentDistributionType::Hash => {
Expand Down Expand Up @@ -1123,7 +1114,7 @@ where

let _source_pause_guard = self.source_manager.paused.lock().await;

tracing::trace!("reschedule plan: {:#?}", reschedule_fragment);
tracing::debug!("reschedule plan: {:#?}", reschedule_fragment);

self.barrier_scheduler
.run_command_with_paused(Command::RescheduleFragment(reschedule_fragment))
Expand Down
7 changes: 7 additions & 0 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,13 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
Ok(None)
}
}

/// Update the vnode bitmap of the storage table, returns the previous vnode bitmap.
#[must_use = "the executor should decide whether to manipulate the cache based on the previous vnode bitmap"]
pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
assert_eq!(self.vnodes.len(), new_vnodes.len());
std::mem::replace(&mut self.vnodes, new_vnodes)
}
}

pub trait PkAndRowStream = Stream<Item = StorageResult<(Vec<u8>, OwnedRow)>> + Send;
Expand Down
6 changes: 4 additions & 2 deletions src/stream/src/executor/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ impl ChainExecutor {
// Otherwise, it means we've recovered and the snapshot is already consumed.
let to_consume_snapshot = barrier.is_add_dispatcher(self.actor_id) && !self.upstream_only;

if self.upstream_only {
// If the barrier is a conf change of creating this mview, and the snapshot is not to be
// consumed, we can finish the progress immediately.
if barrier.is_add_dispatcher(self.actor_id) && self.upstream_only {
self.progress.finish(barrier.epoch.curr);
}

Expand All @@ -100,7 +102,7 @@ impl ChainExecutor {
#[for_await]
for msg in upstream {
let msg = msg?;
if let Message::Barrier(barrier) = &msg {
if to_consume_snapshot && let Message::Barrier(barrier) = &msg {
self.progress.finish(barrier.epoch.curr);
}
yield msg;
Expand Down
4 changes: 4 additions & 0 deletions src/stream/src/executor/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ mod impl_;

pub use impl_::LookupExecutorParams;

use super::ActorContextRef;

#[cfg(test)]
mod tests;

Expand All @@ -38,6 +40,8 @@ mod tests;
/// The output schema is `| stream columns | arrangement columns |`.
/// The input is required to be first stream and then arrangement.
pub struct LookupExecutor<S: StateStore> {
ctx: ActorContextRef,

/// the data types of the produced data chunk inside lookup (before reordering)
chunk_data_types: Vec<DataType>,

Expand Down
5 changes: 5 additions & 0 deletions src/stream/src/executor/lookup/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ impl LookupCache {
self.data.update_epoch(epoch);
}

/// Clear the cache.
pub fn clear(&mut self) {
self.data.clear();
}

pub fn new(watermark_epoch: AtomicU64Ref) -> Self {
let cache = ExecutorCache::new(new_unbounded(watermark_epoch));
Self { data: cache }
Expand Down
33 changes: 24 additions & 9 deletions src/stream/src/executor/lookup/impl_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,19 @@ use risingwave_storage::table::TableIter;
use risingwave_storage::StateStore;

use super::sides::{stream_lookup_arrange_prev_epoch, stream_lookup_arrange_this_epoch};
use crate::cache::cache_may_stale;
use crate::common::StreamChunkBuilder;
use crate::executor::error::{StreamExecutorError, StreamExecutorResult};
use crate::executor::lookup::cache::LookupCache;
use crate::executor::lookup::sides::{ArrangeJoinSide, ArrangeMessage, StreamJoinSide};
use crate::executor::lookup::LookupExecutor;
use crate::executor::{Barrier, Executor, Message, PkIndices};
use crate::executor::{ActorContextRef, Barrier, Executor, Message, PkIndices};
use crate::task::AtomicU64Ref;

/// Parameters for [`LookupExecutor`].
pub struct LookupExecutorParams<S: StateStore> {
pub ctx: ActorContextRef,

/// The side for arrangement. Currently, it should be a
/// `MaterializeExecutor`.
pub arrangement: Box<dyn Executor>,
Expand Down Expand Up @@ -116,6 +119,7 @@ pub struct LookupExecutorParams<S: StateStore> {
impl<S: StateStore> LookupExecutor<S> {
pub fn new(params: LookupExecutorParams<S>) -> Self {
let LookupExecutorParams {
ctx,
arrangement,
stream,
arrangement_col_descs,
Expand Down Expand Up @@ -202,6 +206,7 @@ impl<S: StateStore> LookupExecutor<S> {
);

Self {
ctx,
chunk_data_types,
schema: output_schema,
pk_indices,
Expand Down Expand Up @@ -273,10 +278,8 @@ impl<S: StateStore> LookupExecutor<S> {
self.lookup_cache.flush();
}

// Use the new stream barrier epoch as new cache epoch
self.lookup_cache.update_epoch(barrier.epoch.curr);
self.process_barrier(&barrier);

self.process_barrier(barrier.clone()).await?;
if self.arrangement.use_current_epoch {
// When lookup this epoch, stream side barrier always come after arrangement
// ready, so we can forward barrier now.
Expand Down Expand Up @@ -336,11 +339,23 @@ impl<S: StateStore> LookupExecutor<S> {
}
}

/// Store the barrier.
#[expect(clippy::unused_async)]
async fn process_barrier(&mut self, barrier: Barrier) -> StreamExecutorResult<()> {
self.last_barrier = Some(barrier);
Ok(())
/// Process the barrier and apply changes if necessary.
fn process_barrier(&mut self, barrier: &Barrier) {
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.ctx.id) {
let previous_vnode_bitmap = self
.arrangement
.storage_table
.update_vnode_bitmap(vnode_bitmap.clone());

// Manipulate the cache if necessary.
if cache_may_stale(&previous_vnode_bitmap, &vnode_bitmap) {
self.lookup_cache.clear();
}
}

// Use the new stream barrier epoch as new cache epoch
self.lookup_cache.update_epoch(barrier.epoch.curr);
self.last_barrier = Some(barrier.clone());
}

/// Lookup all rows corresponding to a join key in shared buffer.
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/lookup/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::executor::lookup::impl_::LookupExecutorParams;
use crate::executor::lookup::LookupExecutor;
use crate::executor::test_utils::*;
use crate::executor::{
Barrier, BoxedMessageStream, Executor, MaterializeExecutor, Message, PkIndices,
ActorContext, Barrier, BoxedMessageStream, Executor, MaterializeExecutor, Message, PkIndices,
};

fn arrangement_col_descs() -> Vec<ColumnDesc> {
Expand Down Expand Up @@ -218,6 +218,7 @@ async fn test_lookup_this_epoch() {
let arrangement = create_arrangement(table_id, store.clone()).await;
let stream = create_source();
let lookup_executor = Box::new(LookupExecutor::new(LookupExecutorParams {
ctx: ActorContext::create(0),
arrangement,
stream,
arrangement_col_descs: arrangement_col_descs(),
Expand Down Expand Up @@ -281,14 +282,13 @@ async fn test_lookup_this_epoch() {
}

#[tokio::test]
#[ignore]
// Deprecated because the ability to read from prev epoch has been deprecated.
async fn test_lookup_last_epoch() {
let store = MemoryStateStore::new();
let table_id = TableId::new(1);
let arrangement = create_arrangement(table_id, store.clone()).await;
let stream = create_source();
let lookup_executor = Box::new(LookupExecutor::new(LookupExecutorParams {
ctx: ActorContext::create(0),
arrangement,
stream,
arrangement_col_descs: arrangement_col_descs(),
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/from_proto/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ impl ExecutorBuilder for LookupExecutorBuilder {
);

Ok(Box::new(LookupExecutor::new(LookupExecutorParams {
ctx: params.actor_context,
schema: params.schema,
arrangement,
stream,
Expand Down
8 changes: 5 additions & 3 deletions src/tests/simulation/src/ctl_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ pub mod predicate {

/// The fragment is able to be rescheduled. Used for locating random fragment.
pub fn can_reschedule() -> BoxedPredicate {
// The rescheduling of `Chain` must be derived from the upstream `Materialize`, not
// specified by the user.
no_identity_contains("StreamTableScan")
// The rescheduling of no-shuffle downstreams must be derived from the upstream
// `Materialize`, not specified by the user.
let p =
|f: &PbFragment| no_identity_contains("Chain")(f) && no_identity_contains("Lookup")(f);
Box::new(p)
}

/// The fragment with the given id.
Expand Down
5 changes: 4 additions & 1 deletion src/tests/simulation/src/risingwave.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,7 @@

[system]
barrier_interval_ms = 250
checkpoint_frequency = 4
checkpoint_frequency = 4

[server]
telemetry_enabled = false
Loading

0 comments on commit 7bac239

Please sign in to comment.