diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java index 96ae408563265..9555d729c56cb 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java +++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java @@ -17,6 +17,8 @@ package org.apache.kafka.controller; +import org.apache.kafka.metadata.placement.PartitionAssignment; + import java.util.ArrayList; import java.util.Set; import java.util.List; @@ -27,7 +29,7 @@ class PartitionReassignmentReplicas { private final List removing; private final List adding; - private final List merged; + private final List replicas; private static Set calculateDifference(List a, List b) { Set result = new TreeSet<>(a); @@ -35,14 +37,16 @@ private static Set calculateDifference(List a, List b return result; } - PartitionReassignmentReplicas(List currentReplicas, - List targetReplicas) { - Set removing = calculateDifference(currentReplicas, targetReplicas); + PartitionReassignmentReplicas( + PartitionAssignment currentAssignment, + PartitionAssignment targetAssignment + ) { + Set removing = calculateDifference(currentAssignment.replicas(), targetAssignment.replicas()); this.removing = new ArrayList<>(removing); - Set adding = calculateDifference(targetReplicas, currentReplicas); + Set adding = calculateDifference(targetAssignment.replicas(), currentAssignment.replicas()); this.adding = new ArrayList<>(adding); - this.merged = new ArrayList<>(targetReplicas); - this.merged.addAll(removing); + this.replicas = new ArrayList<>(targetAssignment.replicas()); + this.replicas.addAll(removing); } List removing() { @@ -53,13 +57,13 @@ List adding() { return adding; } - List merged() { - return merged; + List replicas() { + return replicas; } @Override public int hashCode() { - return Objects.hash(removing, adding, merged); + return Objects.hash(removing, adding, replicas); } @Override @@ -68,7 +72,7 @@ public boolean equals(Object o) { PartitionReassignmentReplicas other = (PartitionReassignmentReplicas) o; return removing.equals(other.removing) && adding.equals(other.adding) && - merged.equals(other.merged); + replicas.equals(other.replicas); } @Override @@ -76,6 +80,6 @@ public String toString() { return "PartitionReassignmentReplicas(" + "removing=" + removing + ", " + "adding=" + adding + ", " + - "merged=" + merged + ")"; + "replicas=" + replicas + ")"; } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 2b21d5c0b227a..5ecc01406b294 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -632,7 +632,9 @@ private ApiError createTopic(ControllerRequestContext context, "Found multiple manual partition assignments for partition " + assignment.partitionIndex()); } - validateManualPartitionAssignment(assignment.brokerIds(), replicationFactor); + validateManualPartitionAssignment( + new PartitionAssignment(assignment.brokerIds()), + replicationFactor); replicationFactor = OptionalInt.of(assignment.brokerIds().size()); List isr = assignment.brokerIds().stream(). filter(clusterControl::isActive).collect(Collectors.toList()); @@ -1559,7 +1561,7 @@ void createPartitions(ControllerRequestContext context, isrs = new ArrayList<>(); for (int i = 0; i < topic.assignments().size(); i++) { CreatePartitionsAssignment assignment = topic.assignments().get(i); - validateManualPartitionAssignment(assignment.brokerIds(), + validateManualPartitionAssignment(new PartitionAssignment(assignment.brokerIds()), OptionalInt.of(replicationFactor)); partitionAssignments.add(new PartitionAssignment(assignment.brokerIds())); List isr = assignment.brokerIds().stream(). @@ -1607,13 +1609,15 @@ void createPartitions(ControllerRequestContext context, } } - void validateManualPartitionAssignment(List assignment, - OptionalInt replicationFactor) { - if (assignment.isEmpty()) { + void validateManualPartitionAssignment( + PartitionAssignment assignment, + OptionalInt replicationFactor + ) { + if (assignment.replicas().isEmpty()) { throw new InvalidReplicaAssignmentException("The manual partition " + "assignment includes an empty replica list."); } - List sortedBrokerIds = new ArrayList<>(assignment); + List sortedBrokerIds = new ArrayList<>(assignment.replicas()); sortedBrokerIds.sort(Integer::compare); Integer prevBrokerId = null; for (Integer brokerId : sortedBrokerIds) { @@ -1841,18 +1845,21 @@ Optional changePartitionReassignment(TopicIdPartition tp, PartitionRegistration part, ReassignablePartition target) { // Check that the requested partition assignment is valid. - validateManualPartitionAssignment(target.replicas(), OptionalInt.empty()); + PartitionAssignment currentAssignment = new PartitionAssignment(Replicas.toList(part.replicas)); + PartitionAssignment targetAssignment = new PartitionAssignment(target.replicas()); + + validateManualPartitionAssignment(targetAssignment, OptionalInt.empty()); List currentReplicas = Replicas.toList(part.replicas); PartitionReassignmentReplicas reassignment = - new PartitionReassignmentReplicas(currentReplicas, target.replicas()); + new PartitionReassignmentReplicas(currentAssignment, targetAssignment); PartitionChangeBuilder builder = new PartitionChangeBuilder(part, tp.topicId(), tp.partitionId(), clusterControl::isActive, featureControl.metadataVersion().isLeaderRecoverySupported()); - if (!reassignment.merged().equals(currentReplicas)) { - builder.setTargetReplicas(reassignment.merged()); + if (!reassignment.replicas().equals(currentReplicas)) { + builder.setTargetReplicas(reassignment.replicas()); } if (!reassignment.removing().isEmpty()) { builder.setTargetRemoving(reassignment.removing()); diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java index fedfa8c0a5023..e3063a74fc784 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.metadata.LeaderRecoveryState; import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.metadata.placement.PartitionAssignment; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -243,10 +244,10 @@ public void testRevertReassignment() { @Test public void testRemovingReplicaReassignment() { PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas( - Replicas.toList(FOO.replicas), Arrays.asList(1, 2)); + new PartitionAssignment(Replicas.toList(FOO.replicas)), new PartitionAssignment(Arrays.asList(1, 2))); assertEquals(Collections.singletonList(3), replicas.removing()); assertEquals(Collections.emptyList(), replicas.adding()); - assertEquals(Arrays.asList(1, 2, 3), replicas.merged()); + assertEquals(Arrays.asList(1, 2, 3), replicas.replicas()); assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord(). setTopicId(FOO_ID). setPartitionId(0). @@ -255,7 +256,7 @@ public void testRemovingReplicaReassignment() { setLeader(1), PARTITION_CHANGE_RECORD.highestSupportedVersion())), createFooBuilder(). - setTargetReplicas(replicas.merged()). + setTargetReplicas(replicas.replicas()). setTargetRemoving(replicas.removing()). build()); } @@ -263,10 +264,10 @@ public void testRemovingReplicaReassignment() { @Test public void testAddingReplicaReassignment() { PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas( - Replicas.toList(FOO.replicas), Arrays.asList(1, 2, 3, 4)); + new PartitionAssignment(Replicas.toList(FOO.replicas)), new PartitionAssignment(Arrays.asList(1, 2, 3, 4))); assertEquals(Collections.emptyList(), replicas.removing()); assertEquals(Collections.singletonList(4), replicas.adding()); - assertEquals(Arrays.asList(1, 2, 3, 4), replicas.merged()); + assertEquals(Arrays.asList(1, 2, 3, 4), replicas.replicas()); assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord(). setTopicId(FOO_ID). setPartitionId(0). @@ -274,7 +275,7 @@ public void testAddingReplicaReassignment() { setAddingReplicas(Collections.singletonList(4)), PARTITION_CHANGE_RECORD.highestSupportedVersion())), createFooBuilder(). - setTargetReplicas(replicas.merged()). + setTargetReplicas(replicas.replicas()). setTargetAdding(replicas.adding()). build()); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java index c74090e5c5dad..16ff1e3c112c5 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.Collections; +import org.apache.kafka.metadata.placement.PartitionAssignment; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -31,45 +32,45 @@ public class PartitionReassignmentReplicasTest { @Test public void testNoneAddedOrRemoved() { PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas( - Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 1)); + new PartitionAssignment(Arrays.asList(3, 2, 1)), new PartitionAssignment(Arrays.asList(3, 2, 1))); assertEquals(Collections.emptyList(), replicas.removing()); assertEquals(Collections.emptyList(), replicas.adding()); - assertEquals(Arrays.asList(3, 2, 1), replicas.merged()); + assertEquals(Arrays.asList(3, 2, 1), replicas.replicas()); } @Test public void testAdditions() { PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas( - Arrays.asList(3, 2, 1), Arrays.asList(3, 6, 2, 1, 5)); + new PartitionAssignment(Arrays.asList(3, 2, 1)), new PartitionAssignment(Arrays.asList(3, 6, 2, 1, 5))); assertEquals(Collections.emptyList(), replicas.removing()); assertEquals(Arrays.asList(5, 6), replicas.adding()); - assertEquals(Arrays.asList(3, 6, 2, 1, 5), replicas.merged()); + assertEquals(Arrays.asList(3, 6, 2, 1, 5), replicas.replicas()); } @Test public void testRemovals() { PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas( - Arrays.asList(3, 2, 1, 0), Arrays.asList(3, 1)); + new PartitionAssignment(Arrays.asList(3, 2, 1, 0)), new PartitionAssignment(Arrays.asList(3, 1))); assertEquals(Arrays.asList(0, 2), replicas.removing()); assertEquals(Collections.emptyList(), replicas.adding()); - assertEquals(Arrays.asList(3, 1, 0, 2), replicas.merged()); + assertEquals(Arrays.asList(3, 1, 0, 2), replicas.replicas()); } @Test public void testAdditionsAndRemovals() { PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas( - Arrays.asList(3, 2, 1, 0), Arrays.asList(7, 3, 1, 9)); + new PartitionAssignment(Arrays.asList(3, 2, 1, 0)), new PartitionAssignment(Arrays.asList(7, 3, 1, 9))); assertEquals(Arrays.asList(0, 2), replicas.removing()); assertEquals(Arrays.asList(7, 9), replicas.adding()); - assertEquals(Arrays.asList(7, 3, 1, 9, 0, 2), replicas.merged()); + assertEquals(Arrays.asList(7, 3, 1, 9, 0, 2), replicas.replicas()); } @Test public void testRearrangement() { PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas( - Arrays.asList(3, 2, 1, 0), Arrays.asList(0, 1, 3, 2)); + new PartitionAssignment(Arrays.asList(3, 2, 1, 0)), new PartitionAssignment(Arrays.asList(0, 1, 3, 2))); assertEquals(Collections.emptyList(), replicas.removing()); assertEquals(Collections.emptyList(), replicas.adding()); - assertEquals(Arrays.asList(0, 1, 3, 2), replicas.merged()); + assertEquals(Arrays.asList(0, 1, 3, 2), replicas.replicas()); } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 975062fbde73f..a1ee29bfe2b08 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -78,6 +78,7 @@ import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.metadata.RecordTestUtils; import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.metadata.placement.PartitionAssignment; import org.apache.kafka.metadata.placement.StripedReplicaPlacer; import org.apache.kafka.metadata.placement.UsableBroker; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -1380,13 +1381,13 @@ public void testCreatePartitionsISRInvariants() throws Exception { public void testValidateGoodManualPartitionAssignments() throws Exception { ReplicationControlTestContext ctx = new ReplicationControlTestContext(); ctx.registerBrokers(1, 2, 3); - ctx.replicationControl.validateManualPartitionAssignment(asList(1), + ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList(1)), OptionalInt.of(1)); - ctx.replicationControl.validateManualPartitionAssignment(asList(1), + ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList(1)), OptionalInt.empty()); - ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 3), + ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList(1, 2, 3)), OptionalInt.of(3)); - ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 3), + ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList(1, 2, 3)), OptionalInt.empty()); } @@ -1396,20 +1397,20 @@ public void testValidateBadManualPartitionAssignments() throws Exception { ctx.registerBrokers(1, 2); assertEquals("The manual partition assignment includes an empty replica list.", assertThrows(InvalidReplicaAssignmentException.class, () -> - ctx.replicationControl.validateManualPartitionAssignment(asList(), + ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList()), OptionalInt.empty())).getMessage()); assertEquals("The manual partition assignment includes broker 3, but no such " + "broker is registered.", assertThrows(InvalidReplicaAssignmentException.class, () -> - ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 3), + ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList(1, 2, 3)), OptionalInt.empty())).getMessage()); assertEquals("The manual partition assignment includes the broker 2 more than " + "once.", assertThrows(InvalidReplicaAssignmentException.class, () -> - ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 2), + ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList(1, 2, 2)), OptionalInt.empty())).getMessage()); assertEquals("The manual partition assignment includes a partition with 2 " + "replica(s), but this is not consistent with previous partitions, which have " + "3 replica(s).", assertThrows(InvalidReplicaAssignmentException.class, () -> - ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2), + ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList(1, 2)), OptionalInt.of(3))).getMessage()); }