diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 577a0bef2536..927374da3158 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -524,14 +524,8 @@ impl Command { } Command::SourceSplitAssignment(change) => { - let mut checked_assignment = change.clone(); - checked_assignment - .iter_mut() - .for_each(|(_, assignment)| validate_assignment(assignment)); - let mut diff = HashMap::new(); - - for actor_splits in checked_assignment.values() { + for actor_splits in change.values() { diff.extend(actor_splits.clone()); } @@ -583,7 +577,11 @@ impl Command { let mut checked_split_assignment = split_assignment.clone(); checked_split_assignment .iter_mut() - .for_each(|(_, assignment)| validate_assignment(assignment)); + .for_each(|(_, assignment)| { + // No related actor running before, we don't need to check the mutation + // should be wrapped with pause. + validate_assignment(assignment); + }); let actor_splits = checked_split_assignment .values() .flat_map(build_actor_connector_splits) @@ -791,7 +789,8 @@ impl Command { for reschedule in reschedules.values() { let mut checked_assignment = reschedule.actor_splits.clone(); - validate_assignment(&mut checked_assignment); + // Update mutation always wrapped by Pause and Resume mutation. no further action needed. + _ = validate_assignment(&mut checked_assignment); for (actor_id, splits) in &checked_assignment { actor_splits.insert( diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index ae5ca2a610b9..68d74902c1c7 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -588,21 +588,24 @@ where ) } -pub fn validate_assignment(assignment: &mut HashMap>) { +pub fn validate_assignment(assignment: &mut HashMap>) -> bool { + let mut dup_assignment_found_flag = false; + // check if one split is assign to multiple actors let mut split_to_actor = HashMap::new(); for (actor_id, splits) in &mut *assignment { - let _ = splits.iter().map(|split| { + for split in splits { split_to_actor .entry(split.id()) .or_insert_with(Vec::new) - .push(*actor_id) - }); + .push(*actor_id); + } } for (split_id, actor_ids) in &mut split_to_actor { if actor_ids.len() > 1 { tracing::warn!(split_id = ?split_id, actor_ids = ?actor_ids, "split is assigned to multiple actors"); + dup_assignment_found_flag = true; } // keep the first actor and remove the rest from the assignment for actor_id in actor_ids.iter().skip(1) { @@ -612,6 +615,8 @@ pub fn validate_assignment(assignment: &mut HashMap>) { .retain(|split| split.id() != *split_id); } } + + dup_assignment_found_flag } fn align_backfill_splits( @@ -1136,15 +1141,28 @@ impl SourceManager { /// The command will first updates `SourceExecutor`'s splits, and finally calls `Self::apply_source_change` /// to update states in `SourceManager`. async fn tick(&self) -> MetaResult<()> { - let split_assignment = { + let mut split_assignment = { let core_guard = self.core.lock().await; core_guard.reassign_splits().await? }; + let dup_assignment_flag = split_assignment + .iter_mut() + .map(|(_, assignment)| validate_assignment(assignment)) + .reduce(|a, b| a || b) + .unwrap_or(false); + if !split_assignment.is_empty() { let command = Command::SourceSplitAssignment(split_assignment); tracing::info!(command = ?command, "pushing down split assignment command"); - self.barrier_scheduler.run_command(command).await?; + if dup_assignment_flag { + tracing::warn!("duplicate split assignment found, wrap with pause and resume"); + self.barrier_scheduler + .run_config_change_command_with_pause(command) + .await?; + } else { + self.barrier_scheduler.run_command(command).await?; + } } Ok(()) @@ -1353,10 +1371,12 @@ mod tests { 1 => test_assignment, }; - fragment_assignment.iter_mut().for_each(|(_, assignment)| { - validate_assignment(assignment); - }); - + let dup_assignment_flag = fragment_assignment + .iter_mut() + .map(|(_, assignment)| validate_assignment(assignment)) + .reduce(|a, b| a || b) + .unwrap_or(false); + assert!(dup_assignment_flag); { let mut split_to_actor = HashMap::new(); for actor_to_splits in fragment_assignment.values() {