-
Notifications
You must be signed in to change notification settings - Fork 609
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(storage): pass epoch and table id before barrier #17635
Conversation
{ | ||
let node_to_collect = match self.control_stream_manager.inject_barrier( | ||
command_ctx.clone(), | ||
self.state.inflight_actor_infos.existing_table_ids(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to confirm, self.state.inflight_actor_infos.existing_table_ids()
contains both created MV's and creating MV's state table ids, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This is more like a refactoring PR, and the logic of partial checkpoint based backfill is not included yet.
@@ -727,7 +758,7 @@ struct UnsyncData { | |||
instance_table_id: HashMap<LocalInstanceId, TableId>, | |||
// TODO: this is only used in spill to get existing epochs and can be removed | |||
// when we support spill not based on epoch | |||
epochs: BTreeMap<HummockEpoch, ()>, | |||
epochs: BTreeMap<HummockEpoch, HashSet<TableId>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to be conflicting with #17539
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -454,6 +454,15 @@ impl HummockStorage { | |||
) | |||
} | |||
|
|||
/// Declare the start of an epoch. This information is provided for spill so that the spill task won't | |||
/// include data of two or more syncs. | |||
// TODO: remote this method when we support spill task that can include data of more two or more syncs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: remove
// When drop/cancel a streaming job, for the barrier to stop actor, the | ||
// local instance will call `local_seal_epoch`, but the `next_epoch` won't be | ||
// called `start_epoch` because we have stopped writing on it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean that the actors for the dropping/cancelling streaming job will be included in actors_to_send
but the table_ids for the streaming job will not be included in the barrier?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This is the key difference between table_ids_to_sync
and actors_to_collect
in the InjectBarrierRequest
. actors_to_collect
means actors that this barrier will flow through, and the dropping streaming job will stop after the barrier flow through the actor.
On the other hand, table_ids_to_sync
means the table id that will start writing data on the epoch.curr
after the barrier, so it won't include the table_ids of dropping streaming job. For this reason, in LocalBarrierWorker
, we store the table_ids_to_sync
of the previous barrier (let's say epoch{curr=2, prev=1}), and when a barrier(epoch{curr=3, prev=2}) flows through the streaming graph, it means the table id in the table_ids_to_sync
of the previous barrier(epoch{curr=2, prev=1}) has finished writing the data of epoch 2, and then we call sync
with the table_ids_to_sync
of the previous barrier.
assert_gt!(next_epoch, max_epoch); | ||
} | ||
debug!(?table_id, epoch, next_epoch, "table data has stopped"); | ||
table_data.stopped_next_epoch = Some(next_epoch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so stopped_next_epoch
is introduced only for assertion check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
// When drop/cancel a streaming job, for the barrier to stop actor, the | ||
// local instance will call `local_seal_epoch`, but the `next_epoch` won't be | ||
// called `start_epoch` because we have stopped writing on it. | ||
if !table_data.unsync_epochs.contains_key(&next_epoch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible that table_data
for the dropping/cancelling streaming job tables are absent causing L823 to panic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unlikely to happen.
Let's say the barrier that stops the actor has epoch{curr=3, prev=2}. Before calling sync
on epoch.prev
, we must have called start_epoch(epoch.prev)
. Before calling this local_seal_epoch(epoch.curr)
, this barrier won't be collected, and we cannot call sync(epoch.prev)
. Before sync(epoch.prev)
, the unsync_epochs
must contain epoch.prev
, and therefore the table_data
is non-empty and won't be dropped, and then in L823 the table won't be absent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Thanks for the explanation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
// When drop/cancel a streaming job, for the barrier to stop actor, the | ||
// local instance will call `local_seal_epoch`, but the `next_epoch` won't be | ||
// called `start_epoch` because we have stopped writing on it. | ||
if !table_data.unsync_epochs.contains_key(&next_epoch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Thanks for the explanation.
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Pass the table_ids that will start writing on an epoch at the beginning of
inject_barrier
. This is for helping HummockUploader to spill data correctly so that data of two separate syncs won't be mixed in a spill task.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.