Skip to content

Commit

Permalink
feat(meta): iterative streaming scheduler (part 1) (#7490)
Browse files Browse the repository at this point in the history
(as explained in #7420) This PR introduces a brand new streaming scheduler which is done by automatic iteration based on a Datalog syntax. In short, by simply pre-defining some rules, the scheduler can directly derive the distribution (or vnode mapping) for the given fragment graph and fill the exchange with correct dispatchers, without thinking about the topological order or handling edge cases anymore.

https://github.com/risingwavelabs/risingwave/blob/b5b70668cf275a83b169b0088e3403686173697d/src/meta/src/stream/stream_graph/schedule.rs#L83-L98

In this PR, we only utilize this new scheduler in parallelism assignment, while the derived distribution itself is ignored. This can be fixed by removing the old scheduler in the next PRs.

By making the scheduling phase ahead of time, we can also simplify the logic like actor ID assignments, as we're building actors after we know the total count of them.

Approved-By: chenzl25
Approved-By: yezizp2012
  • Loading branch information
BugenZhao authored Feb 1, 2023
1 parent 2eea462 commit 17cdbd7
Show file tree
Hide file tree
Showing 9 changed files with 664 additions and 370 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub type ExpandedMapping<T> = Vec<<T as VnodeMappingItem>::Item>;
/// The representation is compressed as described in [`compress_data`], which is optimized for the
/// mapping with a small number of items and good locality.
#[derive(Derivative)]
#[derivative(Debug, Clone, PartialEq, Eq)]
#[derivative(Debug, Clone, PartialEq, Eq, Hash)]
pub struct VnodeMapping<T: VnodeMappingItem> {
original_indices: Vec<u32>,
data: Vec<T::Item>,
Expand Down
2 changes: 2 additions & 0 deletions src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ bytes = { version = "1", features = ["serde"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
clap = { version = "3", features = ["derive", "env"] }
crc32fast = "1"
crepe = "0.1"
derivative = "2"
either = "1"
enum-as-inner = "0.5"
etcd-client = { version = "0.2", package = "madsim-etcd-client" }
fail = "0.5"
function_name = "0.3.0"
Expand Down
23 changes: 22 additions & 1 deletion src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_connector::source::SplitImpl;
use risingwave_pb::common::{Buffer, ParallelUnit, ParallelUnitMapping, WorkerNode};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::table_fragments::{ActorStatus, State};
use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State};
use risingwave_pb::meta::FragmentParallelUnitMapping;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{
Expand Down Expand Up @@ -841,6 +841,7 @@ where
}

// we will read three things at once, avoiding locking too much.
// TODO: remove this after scheduler refactoring
pub async fn get_build_graph_info(
&self,
table_ids: &HashSet<TableId>,
Expand All @@ -859,6 +860,26 @@ where
Ok(info)
}

/// Get the upstream `Materialize` fragments of the specified tables.
pub async fn get_upstream_mview_fragments(
&self,
table_ids: &HashSet<TableId>,
) -> MetaResult<HashMap<TableId, Fragment>> {
let map = &self.core.read().await.table_fragments;
let mut fragments = HashMap::new();

for &table_id in table_ids {
let table_fragments = map
.get(&table_id)
.context(format!("table_fragment not exist: id={}", table_id))?;
if let Some(fragment) = table_fragments.mview_fragment() {
fragments.insert(table_id, fragment);
}
}

Ok(fragments)
}

pub async fn get_mview_vnode_bitmap_info(
&self,
table_ids: &HashSet<TableId>,
Expand Down
10 changes: 10 additions & 0 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,16 @@ impl TableFragments {
})
}

/// Returns the fragment with the `Mview` type flag.
pub fn mview_fragment(&self) -> Option<Fragment> {
self.fragments
.values()
.find(|fragment| {
(fragment.get_fragment_type_mask() & FragmentTypeFlag::Mview as u32) != 0
})
.cloned()
}

/// Returns actors that contains Chain node.
pub fn chain_actor_ids(&self) -> HashSet<ActorId> {
Self::filter_actor_ids(self, |fragment_type_mask| {
Expand Down
13 changes: 10 additions & 3 deletions src/meta/src/rpc/service/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use crate::manager::{
use crate::model::TableFragments;
use crate::storage::MetaStore;
use crate::stream::{
visit_fragment, ActorGraphBuilder, CreateStreamingJobContext, GlobalStreamManagerRef,
SourceManagerRef, StreamFragmentGraph,
visit_fragment, ActorGraphBuilder, CompleteStreamFragmentGraph, CreateStreamingJobContext,
GlobalStreamManagerRef, SourceManagerRef, StreamFragmentGraph,
};
use crate::{MetaError, MetaResult};

Expand Down Expand Up @@ -619,8 +619,15 @@ where
..Default::default()
};

let upstream_mview_fragments = self
.fragment_manager
.get_upstream_mview_fragments(dependent_relations)
.await?;
let complete_graph =
CompleteStreamFragmentGraph::new(fragment_graph, upstream_mview_fragments)?;

// TODO(bugen): we should merge this step with the `Scheduler`.
let actor_graph_builder = ActorGraphBuilder::new(fragment_graph, default_parallelism);
let actor_graph_builder = ActorGraphBuilder::new(complete_graph, default_parallelism)?;

let graph = actor_graph_builder
.generate_graph(self.env.id_gen_manager_ref(), &mut ctx)
Expand Down
Loading

0 comments on commit 17cdbd7

Please sign in to comment.