-
Notifications
You must be signed in to change notification settings - Fork 609
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: actor wait barrier manager inject barrier #17613
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. The idea is straight-forward.
Can you please elaborate more in the PR body? |
yield process_msg(msg, |barrier| { | ||
mutation_subscriber.get_or_insert_with(|| { | ||
local_barrier_manager.subscribe_barrier_mutation(self_actor_id, barrier) | ||
}) | ||
}) | ||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There used to be no need for local exchanges to acquire mutation from the channel because we don't prune it on the sender side. So this seems to be a regression. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There shouldn't be too much regression when we have #17613. When local input receives a barrier, in most cases the mutation already be sent by the LocalBarrierWorker
to the mutation subscribe channel.
Updated. In brief, the motivation of this PR is to provide the flexibility that different actors can have different mutations. This will be helpful in partial checkpoint implementation, or for #15490. For #15490, a rough idea in my mind is, when we do scale, we don't need to inject the mutation from source via the newly injected barrier. We can first locate the most upstream fragment that needs to receive the scale mutation, and then figure out the smallest epoch that has not started being processed by any actor in this fragment, and then we inject the mutation to the actors of this fragment by sending a different mutation from |
Sounds interesting but delicate under asynchronous checkpointing. 🤣 |
Do you mean our current global explicit |
It sounds like the correctness can be guaranteed.
I agree that extending this mechanism to local input/output can benefit a lot for partial checkpoint, Anyway, the impl LGTM. |
@yezizp2012 @BugenZhao Is it ok to merge this PR? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
From the recovery test failure, it is highly likely that there was a problem with the recovery process. Retrying running slt for 5 or 10 more times should be enough already. 🤔 |
Indeed. I saw repeatedly |
The recovery test has passed. The previous cause is that, on recovery, the catalog manager only notify failure at the beginning of recovery. If a create mv ddl attaches a finish notifier to the catalog manager after having notified failure, the notifier will never be notified. The create mv ddl holds the reschedule read lock, and then in recovery it will never be able to acquire the reschedule write lock, and keep recovery without notifying the failure, and cause deadlock. |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Currently, we erase the mutation of barrier in remote output, and in remote input, we restore the barrier mutation by getting it from the
LocalBarrierWorker
. This can ensure that all actors on a CN will not start processing the barrier before theLocalBarrierWorker
on the CN receives the barrier injected from meta node.In this PR, we extend this mechanism to local input/output, to ensure that, the mutation of each actor is always obtained from the
LocalBarrierWorker
, or further, from meta node. This feature is not leveraged yet, but provide us with better flexibility that different actors can receive different mutations for a same barrier.Since now both remote and local exchange dispatcher will erase the mutation from barrier, we can make the mutation type of barrier to be generic. For barrier in the exchange dispatcher, the mutation is
()
, and has type aliastype DispatcherBarrier = BarrierInner<()>
, and for the barrier in actor, the mutation is the original one. In this way, we can statically ensure the mutation is erased in dispatcher, and that we always fetch the mutation fromLocalBarrierWorker
before emitting it to the real processing logic of actors. TheMessage
enum is also introduced a generic mutation type, to indicate whether the message is flowing within actors, or in the dispatcher.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.