diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index ad0331a98e13b..36476aabd4dad 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -70,8 +70,7 @@ pub struct Reschedule { /// The downstream fragments of this fragment. pub downstream_fragment_ids: Vec, - /// Reassigned splits for source actors. - /// It becomes the `actor_splits` in `UpdateMutation`. + /// Reassigned splits for source actors pub actor_splits: HashMap>, /// Whether this fragment is injectable. The injectable means whether the fragment contains diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 9566ff4659543..f5d4a2ec5da08 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -194,10 +194,8 @@ pub struct RescheduleContext { worker_nodes: HashMap, /// Index of all `Actor` upstreams, specific to `Dispatcher` upstream_dispatchers: HashMap>, - /// Fragments with `StreamSource` + /// Fragments with stream source stream_source_fragment_ids: HashSet, - /// Fragments with `StreamSourceBackfill` - stream_source_backfill_fragment_ids: HashSet, /// Target fragments in `NoShuffle` relation no_shuffle_target_fragment_ids: HashSet, /// Source fragments in `NoShuffle` relation @@ -665,7 +663,6 @@ impl ScaleController { } let mut stream_source_fragment_ids = HashSet::new(); - let mut stream_source_backfill_fragment_ids = HashSet::new(); let mut no_shuffle_reschedule = HashMap::new(); for ( fragment_id, @@ -739,12 +736,6 @@ impl ScaleController { stream_source_fragment_ids.insert(*fragment_id); } } - if (fragment.get_fragment_type_mask() & FragmentTypeFlag::SourceScan as u32) != 0 { - let stream_node = fragment.actor_template.nodes.as_ref().unwrap(); - if stream_node.find_source_backfill().is_some() { - stream_source_backfill_fragment_ids.insert(*fragment_id); - } - } // Check if the reschedule plan is valid. let current_parallel_units = fragment @@ -820,7 +811,6 @@ impl ScaleController { worker_nodes, upstream_dispatchers, stream_source_fragment_ids, - stream_source_backfill_fragment_ids, no_shuffle_target_fragment_ids, no_shuffle_source_fragment_ids, fragment_dispatcher_map, @@ -1218,9 +1208,9 @@ impl ScaleController { .await?; } - // For stream source & source backfill fragments, we need to reallocate the splits. + // For stream source fragments, we need to reallocate the splits. // Because we are in the Pause state, so it's no problem to reallocate - let mut fragment_actor_splits = HashMap::new(); + let mut fragment_stream_source_actor_splits = HashMap::new(); for fragment_id in reschedules.keys() { let actors_after_reschedule = fragment_actors_after_reschedule.get(fragment_id).unwrap(); @@ -1238,39 +1228,11 @@ impl ScaleController { let actor_splits = self .source_manager - .migrate_splits_for_source_actors( - *fragment_id, - &prev_actor_ids, - &curr_actor_ids, - ) + .migrate_splits(*fragment_id, &prev_actor_ids, &curr_actor_ids) .await?; - fragment_actor_splits.insert(*fragment_id, actor_splits); - } - } - // Loop another round to make sure source actors are migrated first, and then align backfill actors - if !ctx.stream_source_backfill_fragment_ids.is_empty() { - for fragment_id in reschedules.keys() { - let actors_after_reschedule = - fragment_actors_after_reschedule.get(fragment_id).unwrap(); - - if ctx - .stream_source_backfill_fragment_ids - .contains(fragment_id) - { - let fragment = ctx.fragment_map.get(fragment_id).unwrap(); - - let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec(); - - let actor_splits = self.source_manager.migrate_splits_for_backfill_actors( - *fragment_id, - &fragment.upstream_fragment_ids, - &curr_actor_ids, - &fragment_actor_splits, - &no_shuffle_upstream_actor_map, - )?; - fragment_actor_splits.insert(*fragment_id, actor_splits); - } + fragment_stream_source_actor_splits.insert(*fragment_id, actor_splits); + todo!("migrate_splits_backfill"); } } @@ -1424,7 +1386,7 @@ impl ScaleController { let upstream_fragment_dispatcher_ids = upstream_fragment_dispatcher_set.into_iter().collect_vec(); - let actor_splits = fragment_actor_splits + let actor_splits = fragment_stream_source_actor_splits .get(&fragment_id) .cloned() .unwrap_or_default(); diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index c40d8679b4aba..5bd931b180506 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -471,13 +471,13 @@ impl Default for SplitDiffOptions { } /// Reassigns splits if there are new splits or dropped splits, -/// i.e., `actor_splits` and `discovered_splits` differ, or actors are rescheduled. +/// i.e., `actor_splits` and `discovered_splits` differ. /// /// The existing splits will remain unmoved in their currently assigned actor. /// /// If an actor has an upstream actor, it should be a backfill executor, -/// and its splits should be aligned with the upstream actor. **`reassign_splits` should not be used in this case. -/// Use `align_backfill_splits` instead.** +/// and its splits should be aligned with the upstream actor. `reassign_splits` should not be used in this case. +/// Use `align_backfill_splits` instead. /// /// - `fragment_id`: just for logging /// @@ -774,10 +774,11 @@ impl SourceManager { /// Migrates splits from previous actors to the new actors for a rescheduled fragment. /// - /// Very occasionally split removal may happen during scaling, in which case we need to - /// use the old splits for reallocation instead of the latest splits (which may be missing), - /// so that we can resolve the split removal in the next command. - pub async fn migrate_splits_for_source_actors( + /// Very occasionally split removal may happen + /// during scaling, in which case we need to use the old splits for reallocation instead of the + /// latest splits (which may be missing), so that we can resolve the split removal in the next + /// command. + pub async fn migrate_splits( &self, fragment_id: FragmentId, prev_actor_ids: &[ActorId], @@ -800,7 +801,7 @@ impl SourceManager { fragment_id, empty_actor_splits, &prev_splits, - // pre-allocate splits is the first time getting splits and it does not have scale-in scene + // pre-allocate splits is the first time getting splits and it does not have scale in scene SplitDiffOptions::default(), ) .unwrap_or_default(); @@ -808,37 +809,6 @@ impl SourceManager { Ok(diff) } - /// Migrates splits from previous actors to the new actors for a rescheduled fragment. - pub fn migrate_splits_for_backfill_actors( - &self, - fragment_id: FragmentId, - upstream_fragment_ids: &Vec, - curr_actor_ids: &[ActorId], - fragment_actor_splits: &HashMap>>, - no_shuffle_upstream_actor_map: &HashMap>, - ) -> MetaResult>> { - // align splits for backfill fragments with its upstream source fragment - debug_assert!(upstream_fragment_ids.len() == 1); - let upstream_fragment_id = upstream_fragment_ids[0]; - let actors = no_shuffle_upstream_actor_map - .iter() - .filter(|(id, _)| curr_actor_ids.contains(id)) - .map(|(id, upstream_fragment_actors)| { - debug_assert!(upstream_fragment_actors.len() == 1); - ( - *id, - vec![*upstream_fragment_actors.get(&upstream_fragment_id).unwrap()], - ) - }); - let upstream_assignment = fragment_actor_splits.get(&upstream_fragment_id).unwrap(); - Ok(align_backfill_splits( - actors, - upstream_assignment, - fragment_id, - upstream_fragment_id, - )?) - } - /// Allocates splits to actors for a newly created source executor. pub async fn allocate_splits(&self, table_id: &TableId) -> MetaResult { let core = self.core.lock().await;