Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.kafka.controller;

import org.apache.kafka.metadata.placement.PartitionAssignment;

import java.util.ArrayList;
import java.util.Set;
import java.util.List;
Expand All @@ -27,22 +29,24 @@
class PartitionReassignmentReplicas {
private final List<Integer> removing;
private final List<Integer> adding;
private final List<Integer> merged;
private final List<Integer> replicas;

private static Set<Integer> calculateDifference(List<Integer> a, List<Integer> b) {
Set<Integer> result = new TreeSet<>(a);
result.removeAll(b);
return result;
}

PartitionReassignmentReplicas(List<Integer> currentReplicas,
List<Integer> targetReplicas) {
Set<Integer> removing = calculateDifference(currentReplicas, targetReplicas);
PartitionReassignmentReplicas(
PartitionAssignment currentAssignment,
PartitionAssignment targetAssignment
) {
Set<Integer> removing = calculateDifference(currentAssignment.replicas(), targetAssignment.replicas());
this.removing = new ArrayList<>(removing);
Set<Integer> adding = calculateDifference(targetReplicas, currentReplicas);
Set<Integer> adding = calculateDifference(targetAssignment.replicas(), currentAssignment.replicas());
this.adding = new ArrayList<>(adding);
this.merged = new ArrayList<>(targetReplicas);
this.merged.addAll(removing);
this.replicas = new ArrayList<>(targetAssignment.replicas());
this.replicas.addAll(removing);
}

List<Integer> removing() {
Expand All @@ -53,13 +57,13 @@ List<Integer> adding() {
return adding;
}

List<Integer> merged() {
return merged;
List<Integer> replicas() {
return replicas;
}

@Override
public int hashCode() {
return Objects.hash(removing, adding, merged);
return Objects.hash(removing, adding, replicas);
}

@Override
Expand All @@ -68,14 +72,14 @@ public boolean equals(Object o) {
PartitionReassignmentReplicas other = (PartitionReassignmentReplicas) o;
return removing.equals(other.removing) &&
adding.equals(other.adding) &&
merged.equals(other.merged);
replicas.equals(other.replicas);
}

@Override
public String toString() {
return "PartitionReassignmentReplicas(" +
"removing=" + removing + ", " +
"adding=" + adding + ", " +
"merged=" + merged + ")";
"replicas=" + replicas + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,9 @@ private ApiError createTopic(ControllerRequestContext context,
"Found multiple manual partition assignments for partition " +
assignment.partitionIndex());
}
validateManualPartitionAssignment(assignment.brokerIds(), replicationFactor);
validateManualPartitionAssignment(
new PartitionAssignment(assignment.brokerIds()),
replicationFactor);
replicationFactor = OptionalInt.of(assignment.brokerIds().size());
List<Integer> isr = assignment.brokerIds().stream().
filter(clusterControl::isActive).collect(Collectors.toList());
Expand Down Expand Up @@ -1559,7 +1561,7 @@ void createPartitions(ControllerRequestContext context,
isrs = new ArrayList<>();
for (int i = 0; i < topic.assignments().size(); i++) {
CreatePartitionsAssignment assignment = topic.assignments().get(i);
validateManualPartitionAssignment(assignment.brokerIds(),
validateManualPartitionAssignment(new PartitionAssignment(assignment.brokerIds()),
OptionalInt.of(replicationFactor));
partitionAssignments.add(new PartitionAssignment(assignment.brokerIds()));
List<Integer> isr = assignment.brokerIds().stream().
Expand Down Expand Up @@ -1607,13 +1609,15 @@ void createPartitions(ControllerRequestContext context,
}
}

void validateManualPartitionAssignment(List<Integer> assignment,
OptionalInt replicationFactor) {
if (assignment.isEmpty()) {
void validateManualPartitionAssignment(
PartitionAssignment assignment,
OptionalInt replicationFactor
) {
if (assignment.replicas().isEmpty()) {
throw new InvalidReplicaAssignmentException("The manual partition " +
"assignment includes an empty replica list.");
}
List<Integer> sortedBrokerIds = new ArrayList<>(assignment);
List<Integer> sortedBrokerIds = new ArrayList<>(assignment.replicas());
sortedBrokerIds.sort(Integer::compare);
Integer prevBrokerId = null;
for (Integer brokerId : sortedBrokerIds) {
Expand Down Expand Up @@ -1841,18 +1845,21 @@ Optional<ApiMessageAndVersion> changePartitionReassignment(TopicIdPartition tp,
PartitionRegistration part,
ReassignablePartition target) {
// Check that the requested partition assignment is valid.
validateManualPartitionAssignment(target.replicas(), OptionalInt.empty());
PartitionAssignment currentAssignment = new PartitionAssignment(Replicas.toList(part.replicas));
PartitionAssignment targetAssignment = new PartitionAssignment(target.replicas());

validateManualPartitionAssignment(targetAssignment, OptionalInt.empty());

List<Integer> currentReplicas = Replicas.toList(part.replicas);
PartitionReassignmentReplicas reassignment =
new PartitionReassignmentReplicas(currentReplicas, target.replicas());
new PartitionReassignmentReplicas(currentAssignment, targetAssignment);
PartitionChangeBuilder builder = new PartitionChangeBuilder(part,
tp.topicId(),
tp.partitionId(),
clusterControl::isActive,
featureControl.metadataVersion().isLeaderRecoverySupported());
if (!reassignment.merged().equals(currentReplicas)) {
builder.setTargetReplicas(reassignment.merged());
if (!reassignment.replicas().equals(currentReplicas)) {
builder.setTargetReplicas(reassignment.replicas());
}
if (!reassignment.removing().isEmpty()) {
builder.setTargetRemoving(reassignment.removing());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.metadata.placement.PartitionAssignment;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand Down Expand Up @@ -243,10 +244,10 @@ public void testRevertReassignment() {
@Test
public void testRemovingReplicaReassignment() {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
Replicas.toList(FOO.replicas), Arrays.asList(1, 2));
new PartitionAssignment(Replicas.toList(FOO.replicas)), new PartitionAssignment(Arrays.asList(1, 2)));
assertEquals(Collections.singletonList(3), replicas.removing());
assertEquals(Collections.emptyList(), replicas.adding());
assertEquals(Arrays.asList(1, 2, 3), replicas.merged());
assertEquals(Arrays.asList(1, 2, 3), replicas.replicas());
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
Expand All @@ -255,26 +256,26 @@ public void testRemovingReplicaReassignment() {
setLeader(1),
PARTITION_CHANGE_RECORD.highestSupportedVersion())),
createFooBuilder().
setTargetReplicas(replicas.merged()).
setTargetReplicas(replicas.replicas()).
setTargetRemoving(replicas.removing()).
build());
}

@Test
public void testAddingReplicaReassignment() {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
Replicas.toList(FOO.replicas), Arrays.asList(1, 2, 3, 4));
new PartitionAssignment(Replicas.toList(FOO.replicas)), new PartitionAssignment(Arrays.asList(1, 2, 3, 4)));
assertEquals(Collections.emptyList(), replicas.removing());
assertEquals(Collections.singletonList(4), replicas.adding());
assertEquals(Arrays.asList(1, 2, 3, 4), replicas.merged());
assertEquals(Arrays.asList(1, 2, 3, 4), replicas.replicas());
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
setReplicas(Arrays.asList(1, 2, 3, 4)).
setAddingReplicas(Collections.singletonList(4)),
PARTITION_CHANGE_RECORD.highestSupportedVersion())),
createFooBuilder().
setTargetReplicas(replicas.merged()).
setTargetReplicas(replicas.replicas()).
setTargetAdding(replicas.adding()).
build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Arrays;
import java.util.Collections;

import org.apache.kafka.metadata.placement.PartitionAssignment;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

Expand All @@ -31,45 +32,45 @@ public class PartitionReassignmentReplicasTest {
@Test
public void testNoneAddedOrRemoved() {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 1));
new PartitionAssignment(Arrays.asList(3, 2, 1)), new PartitionAssignment(Arrays.asList(3, 2, 1)));
assertEquals(Collections.emptyList(), replicas.removing());
assertEquals(Collections.emptyList(), replicas.adding());
assertEquals(Arrays.asList(3, 2, 1), replicas.merged());
assertEquals(Arrays.asList(3, 2, 1), replicas.replicas());
}

@Test
public void testAdditions() {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
Arrays.asList(3, 2, 1), Arrays.asList(3, 6, 2, 1, 5));
new PartitionAssignment(Arrays.asList(3, 2, 1)), new PartitionAssignment(Arrays.asList(3, 6, 2, 1, 5)));
assertEquals(Collections.emptyList(), replicas.removing());
assertEquals(Arrays.asList(5, 6), replicas.adding());
assertEquals(Arrays.asList(3, 6, 2, 1, 5), replicas.merged());
assertEquals(Arrays.asList(3, 6, 2, 1, 5), replicas.replicas());
}

@Test
public void testRemovals() {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
Arrays.asList(3, 2, 1, 0), Arrays.asList(3, 1));
new PartitionAssignment(Arrays.asList(3, 2, 1, 0)), new PartitionAssignment(Arrays.asList(3, 1)));
assertEquals(Arrays.asList(0, 2), replicas.removing());
assertEquals(Collections.emptyList(), replicas.adding());
assertEquals(Arrays.asList(3, 1, 0, 2), replicas.merged());
assertEquals(Arrays.asList(3, 1, 0, 2), replicas.replicas());
}

@Test
public void testAdditionsAndRemovals() {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
Arrays.asList(3, 2, 1, 0), Arrays.asList(7, 3, 1, 9));
new PartitionAssignment(Arrays.asList(3, 2, 1, 0)), new PartitionAssignment(Arrays.asList(7, 3, 1, 9)));
assertEquals(Arrays.asList(0, 2), replicas.removing());
assertEquals(Arrays.asList(7, 9), replicas.adding());
assertEquals(Arrays.asList(7, 3, 1, 9, 0, 2), replicas.merged());
assertEquals(Arrays.asList(7, 3, 1, 9, 0, 2), replicas.replicas());
}

@Test
public void testRearrangement() {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
Arrays.asList(3, 2, 1, 0), Arrays.asList(0, 1, 3, 2));
new PartitionAssignment(Arrays.asList(3, 2, 1, 0)), new PartitionAssignment(Arrays.asList(0, 1, 3, 2)));
assertEquals(Collections.emptyList(), replicas.removing());
assertEquals(Collections.emptyList(), replicas.adding());
assertEquals(Arrays.asList(0, 1, 3, 2), replicas.merged());
assertEquals(Arrays.asList(0, 1, 3, 2), replicas.replicas());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.metadata.placement.PartitionAssignment;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
Expand Down Expand Up @@ -1380,13 +1381,13 @@ public void testCreatePartitionsISRInvariants() throws Exception {
public void testValidateGoodManualPartitionAssignments() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ctx.registerBrokers(1, 2, 3);
ctx.replicationControl.validateManualPartitionAssignment(asList(1),
ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList(1)),
OptionalInt.of(1));
ctx.replicationControl.validateManualPartitionAssignment(asList(1),
ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList(1)),
OptionalInt.empty());
ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 3),
ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList(1, 2, 3)),
OptionalInt.of(3));
ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 3),
ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList(1, 2, 3)),
OptionalInt.empty());
}

Expand All @@ -1396,20 +1397,20 @@ public void testValidateBadManualPartitionAssignments() throws Exception {
ctx.registerBrokers(1, 2);
assertEquals("The manual partition assignment includes an empty replica list.",
assertThrows(InvalidReplicaAssignmentException.class, () ->
ctx.replicationControl.validateManualPartitionAssignment(asList(),
ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList()),
OptionalInt.empty())).getMessage());
assertEquals("The manual partition assignment includes broker 3, but no such " +
"broker is registered.", assertThrows(InvalidReplicaAssignmentException.class, () ->
ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 3),
ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList(1, 2, 3)),
OptionalInt.empty())).getMessage());
assertEquals("The manual partition assignment includes the broker 2 more than " +
"once.", assertThrows(InvalidReplicaAssignmentException.class, () ->
ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 2),
ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList(1, 2, 2)),
OptionalInt.empty())).getMessage());
assertEquals("The manual partition assignment includes a partition with 2 " +
"replica(s), but this is not consistent with previous partitions, which have " +
"3 replica(s).", assertThrows(InvalidReplicaAssignmentException.class, () ->
ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2),
ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList(1, 2)),
OptionalInt.of(3))).getMessage());
}

Expand Down