-
Notifications
You must be signed in to change notification settings - Fork 596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(meta): iterative streaming scheduler (part 2) #7659
Conversation
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
@@ -356,8 +359,6 @@ message ChainNode { | |||
// large. However, in some cases, e.g., shared state, the barrier cannot be rearranged in ChainNode. | |||
// ChainType is used to decide which implementation for the ChainNode. | |||
ChainType chain_type = 4; | |||
// Whether to place this chain on the same worker node as upstream actors. | |||
bool same_worker_node = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We remove this as it's actually always true
, due to NoShuffle exchange.
@@ -577,8 +576,6 @@ message StreamActor { | |||
// It is painstaking to traverse through the node tree and get upstream actor id from the root StreamNode. | |||
// We duplicate the information here to ease the parsing logic in stream manager. | |||
repeated uint32 upstream_actor_id = 6; | |||
// Placement rule for actor, need to stay on the same node as a specified upstream actor. | |||
ColocatedActorId colocated_upstream_actor_id = 7; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The field is here because the StreamActor
message is a protocol between the old graph builder and the old scheduler. As we remove the old scheduler and schedule fragments ahead of time, we can remove this.
// Dispatch strategy for the fragment. | ||
DispatchStrategy dispatch_strategy = 1; | ||
// Whether the two linked nodes should be placed on the same worker node | ||
bool same_worker_node = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be derived from the NoShuffle
strategy.
) -> Self { | ||
let actor_status = actor_locations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We directly fill the Inactive
state on initialization.
@@ -264,25 +284,6 @@ impl TableFragments { | |||
.collect() | |||
} | |||
|
|||
/// Returns fragments that contains Chain node. | |||
pub fn chain_fragment_ids(&self) -> HashSet<FragmentId> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot of code in this file can be removed due to no more need for resolve_chain
.
let (distribution, actor_ids) = match current_fragment { | ||
// For building fragments, we need to generate the actor builders. | ||
EitherFragment::Building(current_fragment) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's possible we are visiting the upstream Materialize
(existing) fragment now, as the topological order is on the complete graph. So we need to distinguish between them.
// TODO: remove this after scheduler refactoring. | ||
pub fn into_inner(self) -> StreamFragmentGraph { | ||
self.graph | ||
/// Generate topological order of **all** fragments in this graph, including the ones that are |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The interfaces below used to be the FragmentGraph
's. Now it's modified to operate on the CompleteGraph
, where the existing fragments will be considered.
/// This fragment is singleton, and should be scheduled to the default parallel unit. | ||
DefaultSingleton, | ||
/// This fragment is hash-distributed, and should be scheduled by the default hash mapping. | ||
DefaultHash, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of providing the default distribution to the datalog, we make the result more explicit and let the caller apply it.
pub building_locations: Locations, | ||
|
||
/// The locations of the existing actors, essentially the upstream mview actors to update. | ||
pub existing_locations: Locations, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is derived from the ActorGraphBuilder
, and can be directly used for RPCs like building hanging channels or broadcast actor info.
@@ -14,32 +14,37 @@ | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'll be better to review this file from ~L970 (stream graph), then ~L600 (actor graph), then from the beginning (actor builder). 🥺
I'm going to split it into multiple files in next PRs.
I'm still fixing the unit tests and some minor issues, while the e2e runs pretty well. Feel free to review the main part of this huge PR. 🥵 |
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Codecov Report
@@ Coverage Diff @@
## main #7659 +/- ##
==========================================
+ Coverage 71.62% 71.69% +0.07%
==========================================
Files 1098 1098
Lines 175119 174524 -595
==========================================
- Hits 125421 125126 -295
+ Misses 49698 49398 -300
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
Result::DefaultSingleton => { | ||
Distribution::Singleton(self.default_singleton_parallel_unit) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use different parallel units for a streaming job with multiple singleton fragments instead of a fixed one? For example create materialized view v as select count(*) from t1 union all select count(*) from t2 union all select count(*) from t3;
this sql has 3 singleton fragments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I've also thought of this but I'm still trying to find a way to represent it in crepe
, as the current rules only propagate the requirement through no-shuffle, but don't go back and check whether the final result is still correct. If we assign different parallel units to them, it's possible that some no-shuffles are violated... Maybe we need a SameDist
relation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like a tough task to assign different parallel units for different singleton fragments with the NoShuffle
restrictions, because when we assign a parallel unit to a singleton fragment, we need to propagate it to other fragment connected by NoShuffle
edge and after that we can assign another different parallel unit to another singleton fragment, but crepe
might can not guarantee this assign ordering. It could assign two different parallel units before propagation and then violate the NoShuffle
restrictions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or we can use a crepe
schedule loop (instead of one crepe
schedule) to assign a parallel unit to Result::DefaultSingleton
and propagate it through the NoShuffle
edge. Once all fragments get their specific parallel unit we are done, but it would be more complicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. For now, assuming that the singleton executors won't process too much data and there won't be too many singletons in a graph, the current implementation just works.
As long as we extract the scheduling of the whole graph as a separate step like this, we can benefit from this global knowledge of scheduling and make the following steps easier to implement. The algorithm can be optimized or replaced if necessary in the future.
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks so great to me!!! The current scheduler is much clearer and easier to maintain than the previous one. Your work is excellent, thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks so successful to me!!! What an excellent work!!!
This PR extends the new actor graph builder introduced in #7659 for replacing table plan, and adds the procedure of preparing the `TableFragments`. Note that the RPC with the compute nodes is not implemented so this is not utilized by the frontend yet. Building an actor graph for schema change can be similar and symmetric to the MV on MV. - For MV on MV, we have existing upstream `Materialize` fragments that give the requirements to the distribution of current `Chain`. We'll generate new dispatchers for the upstream `Materialize` in the result. - For replacing table plan for schema change, we (may) have existing downstream `Chain` fragments that give the requirements to the distribution of current `Materialize`. We'll generate updates for the merger for the downstream `Chain` in the result. Related: - #7908 Approved-By: chenzl25 Approved-By: yezizp2012
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
This PR is big but it should be very readable. Please follow the comments to get it reviewed. Thanks! 🥰🥵
What's changed and what's your intention?
This PR fully utilizes the new scheduler in the process of creating streaming jobs, simplifying and generalizing a lot of Chain-related logic.
Extends the scheduler to also emit which parallel unit a singleton fragment should lie on. Previously we only tell that this fragment should be a singleton but did not decide whether it's required to be scheduled to a specific parallel unit. This is insufficient as we need to distinguish the No-Shuffle singletons (like MV on singleton MV), and normal singletons that can be randomly scheduled.
Use the
CompleteFragmentGraph
for building actors. The complete graph contains both the fragment graph of the current job, and the upstream fragments that are connected to the current job (that is, upstreamMaterialize
). So when we're visiting the edge between the upstreamMaterialize
and currentChain
, we can naturally fill the distribution info of theChain
and build the new dispatchers forMaterialize
as usual. By doing this, we can totally remove the patches likeresolve_chain_node
and make the whole progress much clearer.Remove the old
Scheduler
and related fields likesame_worker_node
orcolocated_actor
. As we have enough knowledge about the distribution of each fragment (includingChain
!) when building the actor graph, now theActorGraph
can be complete and immutable once built. So at the same time, we can make theContext
immutable as well, which improves the readability a lot.Checklist
./risedev check
(or alias,./risedev c
)Refer to a related PR or issue link (optional)