Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): iterative streaming scheduler (part 2) #7659

Merged
merged 30 commits into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
508cb0a
directly fill mapping
BugenZhao Feb 1, 2023
c0c9edb
singleton req
BugenZhao Feb 1, 2023
ad3e901
fix link id
BugenZhao Feb 2, 2023
c5cd76b
move rewrite to actor builder
BugenZhao Feb 2, 2023
303bffd
minor refactor to rewrite
BugenZhao Feb 2, 2023
9c758f6
pass locations
BugenZhao Feb 2, 2023
e551e9e
fix test
BugenZhao Feb 2, 2023
869fe03
use option for parallelism
BugenZhao Feb 2, 2023
29cd9eb
new edge
BugenZhao Feb 2, 2023
42c3a7f
use complete graph for building
BugenZhao Feb 2, 2023
848c41d
record location
BugenZhao Feb 2, 2023
7653b39
rewrite chain
BugenZhao Feb 2, 2023
90f996e
cleanup unncessary code
BugenZhao Feb 2, 2023
aac75c1
more cleanups
BugenZhao Feb 2, 2023
d8a5b91
distinguish external locations
BugenZhao Feb 3, 2023
12cdb91
put stuff into context
BugenZhao Feb 3, 2023
1e271e0
initialize actor status when create table fragments
BugenZhao Feb 3, 2023
3ff8792
add comments
BugenZhao Feb 3, 2023
d357b7b
add more doc and comments
BugenZhao Feb 3, 2023
bd29344
add more docs
BugenZhao Feb 3, 2023
b54f64c
remove same worker node
BugenZhao Feb 3, 2023
00a9193
fix rewrite other nodes
BugenZhao Feb 3, 2023
92acf26
Merge remote-tracking branch 'origin/main' into bz/new-scheduler-part-2
BugenZhao Feb 6, 2023
43251cf
minor fixes
BugenZhao Feb 6, 2023
8251598
fix default parallelism
BugenZhao Feb 6, 2023
30585a1
fix unit tests
BugenZhao Feb 6, 2023
96697b5
fix unit test
BugenZhao Feb 7, 2023
3a640a3
Merge remote-tracking branch 'origin/main' into bz/new-scheduler-part-2
BugenZhao Feb 7, 2023
9e1ff93
sort the parallel units for better locality
BugenZhao Feb 7, 2023
96df668
Merge branch 'main' into bz/new-scheduler-part-2
mergify[bot] Feb 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 1 addition & 54 deletions dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 9 additions & 11 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,9 @@ enum ChainType {
// 1. MergeNode (as a placeholder) for streaming read.
// 2. BatchPlanNode for snapshot read.
message ChainNode {
reserved 5;
reserved "same_worker_node";

uint32 table_id = 1;
// The schema of input stream, which will be used to build a MergeNode
repeated plan_common.Field upstream_fields = 2;
Expand All @@ -356,8 +359,6 @@ message ChainNode {
// large. However, in some cases, e.g., shared state, the barrier cannot be rearranged in ChainNode.
// ChainType is used to decide which implementation for the ChainNode.
ChainType chain_type = 4;
// Whether to place this chain on the same worker node as upstream actors.
bool same_worker_node = 5;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We remove this as it's actually always true, due to NoShuffle exchange.

// Whether the upstream materialize is and this chain should be a singleton.
// FIXME: This is a workaround for fragmenter since the distribution info will be lost if there's only one
// fragment in the downstream mview. Remove this when we refactor the fragmenter.
Expand Down Expand Up @@ -562,13 +563,11 @@ message Dispatcher {
repeated uint32 downstream_actor_id = 5;
}

// Used to place an actor together with another actor in the same worker node.
message ColocatedActorId {
uint32 id = 1;
}

// A StreamActor is a running fragment of the overall stream graph,
message StreamActor {
reserved 7;
reserved "colocated_upstream_actor_id";

uint32 actor_id = 1;
uint32 fragment_id = 2;
StreamNode nodes = 3;
Expand All @@ -578,8 +577,6 @@ message StreamActor {
// It is painstaking to traverse through the node tree and get upstream actor id from the root StreamNode.
// We duplicate the information here to ease the parsing logic in stream manager.
repeated uint32 upstream_actor_id = 6;
// Placement rule for actor, need to stay on the same node as a specified upstream actor.
ColocatedActorId colocated_upstream_actor_id = 7;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field is here because the StreamActor message is a protocol between the old graph builder and the old scheduler. As we remove the old scheduler and schedule fragments ahead of time, we can remove this.

// Vnodes that the executors in this actor own.
// If the fragment is a singleton, this field will not be set and leave a `None`.
common.Buffer vnode_bitmap = 8;
Expand Down Expand Up @@ -619,10 +616,11 @@ message StreamFragmentGraph {
}

message StreamFragmentEdge {
reserved 2;
reserved "same_worker_node";

// Dispatch strategy for the fragment.
DispatchStrategy dispatch_strategy = 1;
// Whether the two linked nodes should be placed on the same worker node
bool same_worker_node = 2;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be derived from the NoShuffle strategy.

// A unique identifier of this edge. Generally it should be exchange node's operator id. When
// rewriting fragments into delta joins or when inserting 1-to-1 exchange, there will be
// virtual links generated.
Expand Down
14 changes: 14 additions & 0 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,15 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {
self[vnode]
}

/// Get the item matched by the virtual nodes indicated by high bits in the given `bitmap`.
/// Returns `None` if the no virtual node is set in the bitmap.
pub fn get_matched(&self, bitmap: &Bitmap) -> Option<T::Item> {
bitmap
.iter_ones()
.next() // only need to check the first one
.map(|i| self.get(VirtualNode::from_index(i)))
}

/// Iterate over all items in this mapping, in the order of vnodes.
pub fn iter(&self) -> impl Iterator<Item = T::Item> + '_ {
self.data
Expand Down Expand Up @@ -144,6 +153,11 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {
self.data.iter().copied().sorted().dedup()
}

/// Returns the item if it's the only item in this mapping, otherwise returns `None`.
pub fn to_single(&self) -> Option<T::Item> {
self.data.iter().copied().dedup().exactly_one().ok()
}

/// Convert this vnode mapping to a mapping from items to bitmaps, where each bitmap represents
/// the vnodes mapped to the item.
pub fn to_bitmaps(&self) -> HashMap<T::Item, Bitmap> {
Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ impl StreamIndexScan {
],
node_body: Some(ProstStreamNode::Chain(ChainNode {
table_id: self.logical.table_desc().table_id.table_id,
same_worker_node: true,
chain_type: self.chain_type as i32,
// The fields from upstream
upstream_fields: self
Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ impl StreamTableScan {
],
node_body: Some(ProstStreamNode::Chain(ChainNode {
table_id: self.logical.table_desc().table_id.table_id,
same_worker_node: false,
chain_type: self.chain_type as i32,
// The fields from upstream
upstream_fields: self
Expand Down
4 changes: 0 additions & 4 deletions src/frontend/src/stream_fragmenter/graph/fragment_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ pub struct StreamFragmentEdge {
/// Dispatch strategy for the fragment.
pub dispatch_strategy: DispatchStrategy,

/// Whether the two linked nodes should be placed on the same worker node
pub same_worker_node: bool,

/// A unique identifier of this edge. Generally it should be exchange node's operator id. When
/// rewriting fragments into delta joins or when inserting 1-to-1 exchange, there will be
/// virtual links generated.
Expand Down Expand Up @@ -139,7 +136,6 @@ impl StreamFragmentGraph {
upstream_id,
downstream_id,
dispatch_strategy: Some(edge.dispatch_strategy),
same_worker_node: edge.same_worker_node,
link_id: edge.link_id,
};

Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/stream_fragmenter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,6 @@ fn build_fragment(
current_fragment.fragment_id,
StreamFragmentEdge {
dispatch_strategy: exchange_node_strategy,
same_worker_node: false,
link_id: child_node.operator_id,
},
);
Expand Down
8 changes: 0 additions & 8 deletions src/frontend/src/stream_fragmenter/rewrite/delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ fn build_delta_join_inner(
lookup_0_frag.fragment_id,
StreamFragmentEdge {
dispatch_strategy: dispatch_no_shuffle(),
same_worker_node: true,
link_id: exchange_a0l0.operator_id,
},
);
Expand All @@ -223,8 +222,6 @@ fn build_delta_join_inner(
.map(|x| *x as u32)
.collect_vec(),
),
// stream input doesn't need to be on the same worker node as lookup
same_worker_node: false,
link_id: exchange_a0l1.operator_id,
},
);
Expand All @@ -242,8 +239,6 @@ fn build_delta_join_inner(
.map(|x| *x as u32)
.collect_vec(),
),
// stream input doesn't need to be on the same worker node as lookup
same_worker_node: false,
link_id: exchange_a1l0.operator_id,
},
);
Expand All @@ -255,7 +250,6 @@ fn build_delta_join_inner(
lookup_1_frag.fragment_id,
StreamFragmentEdge {
dispatch_strategy: dispatch_no_shuffle(),
same_worker_node: true,
link_id: exchange_a1l1.operator_id,
},
);
Expand All @@ -282,7 +276,6 @@ fn build_delta_join_inner(
current_fragment.fragment_id,
StreamFragmentEdge {
dispatch_strategy: dispatch_consistent_hash_shuffle(node.stream_key.clone()),
same_worker_node: false,
link_id: exchange_l0m.operator_id,
},
);
Expand All @@ -292,7 +285,6 @@ fn build_delta_join_inner(
current_fragment.fragment_id,
StreamFragmentEdge {
dispatch_strategy: dispatch_consistent_hash_shuffle(node.stream_key.clone()),
same_worker_node: false,
link_id: exchange_l1m.operator_id,
},
);
Expand Down
7 changes: 2 additions & 5 deletions src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,6 @@ mod tests {
use risingwave_common::constants::hummock::PROPERTIES_RETENTION_SECOND_KEY;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_pb::meta::table_fragments::Fragment;
use risingwave_pb::stream_plan::StreamEnvironment;

use crate::hummock::manager::compaction_group_manager::CompactionGroupManagerInner;
use crate::hummock::manager::versioning::Versioning;
Expand Down Expand Up @@ -864,7 +863,7 @@ mod tests {
#[tokio::test]
async fn test_manager() {
let (_, compaction_group_manager, ..) = setup_compute_env(8080).await;
let table_fragment_1 = TableFragments::new(
let table_fragment_1 = TableFragments::for_test(
TableId::new(10),
BTreeMap::from([(
1,
Expand All @@ -874,9 +873,8 @@ mod tests {
..Default::default()
},
)]),
StreamEnvironment::default(),
);
let table_fragment_2 = TableFragments::new(
let table_fragment_2 = TableFragments::for_test(
TableId::new(20),
BTreeMap::from([(
2,
Expand All @@ -886,7 +884,6 @@ mod tests {
..Default::default()
},
)]),
StreamEnvironment::default(),
);

// Test register_table_fragments
Expand Down
Loading