Skip to content

Commit

Permalink
Revert "support scaling source backfill" (For a new PR)
Browse files Browse the repository at this point in the history
This reverts commit 231b93c.
  • Loading branch information
xxchan committed Mar 28, 2024
1 parent 231b93c commit 0339b38
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 86 deletions.
3 changes: 1 addition & 2 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ pub struct Reschedule {
/// The downstream fragments of this fragment.
pub downstream_fragment_ids: Vec<FragmentId>,

/// Reassigned splits for source actors.
/// It becomes the `actor_splits` in `UpdateMutation`.
/// Reassigned splits for source actors
pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,

/// Whether this fragment is injectable. The injectable means whether the fragment contains
Expand Down
52 changes: 7 additions & 45 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,8 @@ pub struct RescheduleContext {
worker_nodes: HashMap<WorkerId, WorkerNode>,
/// Index of all `Actor` upstreams, specific to `Dispatcher`
upstream_dispatchers: HashMap<ActorId, Vec<(FragmentId, DispatcherId, DispatcherType)>>,
/// Fragments with `StreamSource`
/// Fragments with stream source
stream_source_fragment_ids: HashSet<FragmentId>,
/// Fragments with `StreamSourceBackfill`
stream_source_backfill_fragment_ids: HashSet<FragmentId>,
/// Target fragments in `NoShuffle` relation
no_shuffle_target_fragment_ids: HashSet<FragmentId>,
/// Source fragments in `NoShuffle` relation
Expand Down Expand Up @@ -665,7 +663,6 @@ impl ScaleController {
}

let mut stream_source_fragment_ids = HashSet::new();
let mut stream_source_backfill_fragment_ids = HashSet::new();
let mut no_shuffle_reschedule = HashMap::new();
for (
fragment_id,
Expand Down Expand Up @@ -739,12 +736,6 @@ impl ScaleController {
stream_source_fragment_ids.insert(*fragment_id);
}
}
if (fragment.get_fragment_type_mask() & FragmentTypeFlag::SourceScan as u32) != 0 {
let stream_node = fragment.actor_template.nodes.as_ref().unwrap();
if stream_node.find_source_backfill().is_some() {
stream_source_backfill_fragment_ids.insert(*fragment_id);
}
}

// Check if the reschedule plan is valid.
let current_parallel_units = fragment
Expand Down Expand Up @@ -820,7 +811,6 @@ impl ScaleController {
worker_nodes,
upstream_dispatchers,
stream_source_fragment_ids,
stream_source_backfill_fragment_ids,
no_shuffle_target_fragment_ids,
no_shuffle_source_fragment_ids,
fragment_dispatcher_map,
Expand Down Expand Up @@ -1218,9 +1208,9 @@ impl ScaleController {
.await?;
}

// For stream source & source backfill fragments, we need to reallocate the splits.
// For stream source fragments, we need to reallocate the splits.
// Because we are in the Pause state, so it's no problem to reallocate
let mut fragment_actor_splits = HashMap::new();
let mut fragment_stream_source_actor_splits = HashMap::new();
for fragment_id in reschedules.keys() {
let actors_after_reschedule =
fragment_actors_after_reschedule.get(fragment_id).unwrap();
Expand All @@ -1238,39 +1228,11 @@ impl ScaleController {

let actor_splits = self
.source_manager
.migrate_splits_for_source_actors(
*fragment_id,
&prev_actor_ids,
&curr_actor_ids,
)
.migrate_splits(*fragment_id, &prev_actor_ids, &curr_actor_ids)
.await?;

fragment_actor_splits.insert(*fragment_id, actor_splits);
}
}
// Loop another round to make sure source actors are migrated first, and then align backfill actors
if !ctx.stream_source_backfill_fragment_ids.is_empty() {
for fragment_id in reschedules.keys() {
let actors_after_reschedule =
fragment_actors_after_reschedule.get(fragment_id).unwrap();

if ctx
.stream_source_backfill_fragment_ids
.contains(fragment_id)
{
let fragment = ctx.fragment_map.get(fragment_id).unwrap();

let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();

let actor_splits = self.source_manager.migrate_splits_for_backfill_actors(
*fragment_id,
&fragment.upstream_fragment_ids,
&curr_actor_ids,
&fragment_actor_splits,
&no_shuffle_upstream_actor_map,
)?;
fragment_actor_splits.insert(*fragment_id, actor_splits);
}
fragment_stream_source_actor_splits.insert(*fragment_id, actor_splits);
todo!("migrate_splits_backfill");
}
}

Expand Down Expand Up @@ -1424,7 +1386,7 @@ impl ScaleController {
let upstream_fragment_dispatcher_ids =
upstream_fragment_dispatcher_set.into_iter().collect_vec();

let actor_splits = fragment_actor_splits
let actor_splits = fragment_stream_source_actor_splits
.get(&fragment_id)
.cloned()
.unwrap_or_default();
Expand Down
48 changes: 9 additions & 39 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,13 +471,13 @@ impl Default for SplitDiffOptions {
}

/// Reassigns splits if there are new splits or dropped splits,
/// i.e., `actor_splits` and `discovered_splits` differ, or actors are rescheduled.
/// i.e., `actor_splits` and `discovered_splits` differ.
///
/// The existing splits will remain unmoved in their currently assigned actor.
///
/// If an actor has an upstream actor, it should be a backfill executor,
/// and its splits should be aligned with the upstream actor. **`reassign_splits` should not be used in this case.
/// Use `align_backfill_splits` instead.**
/// and its splits should be aligned with the upstream actor. `reassign_splits` should not be used in this case.
/// Use `align_backfill_splits` instead.
///
/// - `fragment_id`: just for logging
///
Expand Down Expand Up @@ -774,10 +774,11 @@ impl SourceManager {

/// Migrates splits from previous actors to the new actors for a rescheduled fragment.
///
/// Very occasionally split removal may happen during scaling, in which case we need to
/// use the old splits for reallocation instead of the latest splits (which may be missing),
/// so that we can resolve the split removal in the next command.
pub async fn migrate_splits_for_source_actors(
/// Very occasionally split removal may happen
/// during scaling, in which case we need to use the old splits for reallocation instead of the
/// latest splits (which may be missing), so that we can resolve the split removal in the next
/// command.
pub async fn migrate_splits(
&self,
fragment_id: FragmentId,
prev_actor_ids: &[ActorId],
Expand All @@ -800,45 +801,14 @@ impl SourceManager {
fragment_id,
empty_actor_splits,
&prev_splits,
// pre-allocate splits is the first time getting splits and it does not have scale-in scene
// pre-allocate splits is the first time getting splits and it does not have scale in scene
SplitDiffOptions::default(),
)
.unwrap_or_default();

Ok(diff)
}

/// Migrates splits from previous actors to the new actors for a rescheduled fragment.
pub fn migrate_splits_for_backfill_actors(
&self,
fragment_id: FragmentId,
upstream_fragment_ids: &Vec<FragmentId>,
curr_actor_ids: &[ActorId],
fragment_actor_splits: &HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>,
no_shuffle_upstream_actor_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
// align splits for backfill fragments with its upstream source fragment
debug_assert!(upstream_fragment_ids.len() == 1);
let upstream_fragment_id = upstream_fragment_ids[0];
let actors = no_shuffle_upstream_actor_map
.iter()
.filter(|(id, _)| curr_actor_ids.contains(id))
.map(|(id, upstream_fragment_actors)| {
debug_assert!(upstream_fragment_actors.len() == 1);
(
*id,
vec![*upstream_fragment_actors.get(&upstream_fragment_id).unwrap()],
)
});
let upstream_assignment = fragment_actor_splits.get(&upstream_fragment_id).unwrap();
Ok(align_backfill_splits(
actors,
upstream_assignment,
fragment_id,
upstream_fragment_id,
)?)
}

/// Allocates splits to actors for a newly created source executor.
pub async fn allocate_splits(&self, table_id: &TableId) -> MetaResult<SplitAssignment> {
let core = self.core.lock().await;
Expand Down

0 comments on commit 0339b38

Please sign in to comment.