diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index db760cb29f8ec..8163f78f85fe3 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -42,7 +42,7 @@ files="AbstractResponse.java"/> + files="(KerberosLogin|RequestResponseTest|ConnectMetricsRegistry|KafkaConsumer|AbstractStickyAssignor).java"/> diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java index c7c0679575a9b..b2af7a008ebce 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -25,6 +26,10 @@ import java.util.Set; import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.protocol.types.Type; /** * A cooperative version of the {@link AbstractStickyAssignor AbstractStickyAssignor}. This follows the same (sticky) @@ -43,6 +48,15 @@ */ public class CooperativeStickyAssignor extends AbstractStickyAssignor { + // these schemas are used for preserving useful metadata for the assignment, such as the last stable generation + private static final String GENERATION_KEY_NAME = "generation"; + private static final Schema COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0 = new Schema( + new Field(GENERATION_KEY_NAME, Type.INT32)); + + private int generation = DEFAULT_GENERATION; // consumer group generation + + + @Override public String name() { return "cooperative-sticky"; @@ -53,9 +67,37 @@ public List supportedProtocols() { return Arrays.asList(RebalanceProtocol.COOPERATIVE, RebalanceProtocol.EAGER); } + @Override + public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) { + this.generation = metadata.generationId(); + } + + @Override + public ByteBuffer subscriptionUserData(Set topics) { + Struct struct = new Struct(COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0); + + struct.set(GENERATION_KEY_NAME, generation); + ByteBuffer buffer = ByteBuffer.allocate(COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.sizeOf(struct)); + COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.write(buffer, struct); + buffer.flip(); + return buffer; + } + @Override protected MemberData memberData(Subscription subscription) { - return new MemberData(subscription.ownedPartitions(), Optional.empty()); + ByteBuffer buffer = subscription.userData(); + Optional encodedGeneration; + if (buffer == null) { + encodedGeneration = Optional.empty(); + } else { + try { + Struct struct = COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.read(buffer); + encodedGeneration = Optional.of(struct.getInt(GENERATION_KEY_NAME)); + } catch (Exception e) { + encodedGeneration = Optional.of(DEFAULT_GENERATION); + } + } + return new MemberData(subscription.ownedPartitions(), encodedGeneration); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java index 7e42e44a7d4c8..3c5f60905051e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java @@ -73,14 +73,16 @@ public MemberData(List partitions, Optional generation) public Map> assign(Map partitionsPerTopic, Map subscriptions) { Map> consumerToOwnedPartitions = new HashMap<>(); - if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, consumerToOwnedPartitions)) { + Set partitionsWithMultiplePreviousOwners = new HashSet<>(); + if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, consumerToOwnedPartitions, partitionsWithMultiplePreviousOwners)) { log.debug("Detected that all consumers were subscribed to same set of topics, invoking the " + "optimized assignment algorithm"); partitionsTransferringOwnership = new HashMap<>(); - return constrainedAssign(partitionsPerTopic, consumerToOwnedPartitions); + return constrainedAssign(partitionsPerTopic, consumerToOwnedPartitions, partitionsWithMultiplePreviousOwners); } else { log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the " + "general case assignment algorithm"); + // we must set this to null for the general case so the cooperative assignor knows to compute it from scratch partitionsTransferringOwnership = null; return generalAssign(partitionsPerTopic, subscriptions); } @@ -88,17 +90,22 @@ public Map> assign(Map partitionsP /** * Returns true iff all consumers have an identical subscription. Also fills out the passed in - * {@code consumerToOwnedPartitions} with each consumer's previously owned and still-subscribed partitions + * {@code consumerToOwnedPartitions} with each consumer's previously owned and still-subscribed partitions, + * and the {@code partitionsWithMultiplePreviousOwners} with any partitions claimed by multiple previous owners */ private boolean allSubscriptionsEqual(Set allTopics, Map subscriptions, - Map> consumerToOwnedPartitions) { - Set membersWithOldGeneration = new HashSet<>(); + Map> consumerToOwnedPartitions, + Set partitionsWithMultiplePreviousOwners) { Set membersOfCurrentHighestGeneration = new HashSet<>(); int maxGeneration = DEFAULT_GENERATION; Set subscribedTopics = new HashSet<>(); + // keep track of all previously owned partitions so we can invalidate them if invalid input is + // detected, eg two consumers somehow claiming the same partition in the same/current generation + Map allPreviousPartitionsToOwner = new HashMap<>(); + for (Map.Entry subscriptionEntry : subscriptions.entrySet()) { String consumer = subscriptionEntry.getKey(); Subscription subscription = subscriptionEntry.getValue(); @@ -123,7 +130,12 @@ private boolean allSubscriptionsEqual(Set allTopics, // If the current member's generation is higher, all the previously owned partitions are invalid if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) { - membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration); + allPreviousPartitionsToOwner.clear(); + partitionsWithMultiplePreviousOwners.clear(); + for (String droppedOutConsumer : membersOfCurrentHighestGeneration) { + consumerToOwnedPartitions.get(droppedOutConsumer).clear(); + } + membersOfCurrentHighestGeneration.clear(); maxGeneration = memberData.generation.get(); } @@ -132,15 +144,22 @@ private boolean allSubscriptionsEqual(Set allTopics, for (final TopicPartition tp : memberData.partitions) { // filter out any topics that no longer exist or aren't part of the current subscription if (allTopics.contains(tp.topic())) { - ownedPartitions.add(tp); + if (!allPreviousPartitionsToOwner.containsKey(tp)) { + allPreviousPartitionsToOwner.put(tp, consumer); + ownedPartitions.add(tp); + } else { + String otherConsumer = allPreviousPartitionsToOwner.get(tp); + log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the " + + "same generation {}, this will be invalidated and removed from their previous assignment.", + consumer, otherConsumer, tp, maxGeneration); + consumerToOwnedPartitions.get(otherConsumer).remove(tp); + partitionsWithMultiplePreviousOwners.add(tp); + } } } } } - for (String consumer : membersWithOldGeneration) { - consumerToOwnedPartitions.get(consumer).clear(); - } return true; } @@ -156,13 +175,15 @@ private boolean allSubscriptionsEqual(Set allTopics, * 4. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we * should just distribute one partition each to all consumers at min capacity * - * @param partitionsPerTopic The number of partitions for each subscribed topic - * @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions + * @param partitionsPerTopic The number of partitions for each subscribed topic + * @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions + * @param partitionsWithMultiplePreviousOwners The partitions being claimed in the previous assignment of multiple consumers * - * @return Map from each member to the list of partitions assigned to them. + * @return Map from each member to the list of partitions assigned to them. */ private Map> constrainedAssign(Map partitionsPerTopic, - Map> consumerToOwnedPartitions) { + Map> consumerToOwnedPartitions, + Set partitionsWithMultiplePreviousOwners) { SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic); Set allRevokedPartitions = new HashSet<>(); @@ -189,6 +210,16 @@ private Map> constrainedAssign(Map List ownedPartitions = consumerEntry.getValue(); List consumerAssignment = assignment.get(consumer); + + for (TopicPartition doublyClaimedPartition : partitionsWithMultiplePreviousOwners) { + if (ownedPartitions.contains(doublyClaimedPartition)) { + log.error("Found partition {} still claimed as owned by consumer {}, despite being claimed by multiple " + + "consumers already in the same generation. Removing it from the ownedPartitions", + doublyClaimedPartition, consumer); + ownedPartitions.remove(doublyClaimedPartition); + } + } + int i = 0; // assign the first N partitions up to the max quota, and mark the remaining as being revoked for (TopicPartition tp : ownedPartitions) { @@ -228,7 +259,7 @@ private Map> constrainedAssign(Map consumerAssignment.add(tp); unassignedPartitionsIter.remove(); // We already assigned all possible ownedPartitions, so we know this must be newly to this consumer - if (allRevokedPartitions.contains(tp)) + if (allRevokedPartitions.contains(tp) || partitionsWithMultiplePreviousOwners.contains(tp)) partitionsTransferringOwnership.put(tp, consumer); } else { break; @@ -271,10 +302,12 @@ private Map> constrainedAssign(Map // We can skip the bookkeeping of unassignedPartitions and maxCapacityMembers here since we are at the end assignment.get(underCapacityConsumer).add(unassignedPartition); - if (allRevokedPartitions.contains(unassignedPartition)) + if (allRevokedPartitions.contains(unassignedPartition) || partitionsWithMultiplePreviousOwners.contains(unassignedPartition)) partitionsTransferringOwnership.put(unassignedPartition, underCapacityConsumer); } + log.info("Final assignment of partitions to consumers: \n{}", assignment); + return assignment; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java index d7d671e1a2abe..29ee27a7b7e28 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java @@ -16,16 +16,23 @@ */ package org.apache.kafka.clients.consumer; +import static java.util.Collections.emptyList; +import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription; import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor; import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest; import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.Test; public class CooperativeStickyAssignorTest extends AbstractStickyAssignorTest { @@ -39,6 +46,74 @@ public Subscription buildSubscription(List topics, List return new Subscription(topics, assignor.subscriptionUserData(new HashSet<>(topics)), partitions); } + @Override + public Subscription buildSubscriptionWithGeneration(List topics, List partitions, int generation) { + assignor.onAssignment(null, new ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id", Optional.empty())); + return new Subscription(topics, assignor.subscriptionUserData(new HashSet<>(topics)), partitions); + } + + @Test + public void testEncodeAndDecodeGeneration() { + Subscription subscription = new Subscription(topics(topic), assignor.subscriptionUserData(new HashSet<>(topics(topic)))); + + Optional encodedGeneration = ((CooperativeStickyAssignor) assignor).memberData(subscription).generation; + assertTrue(encodedGeneration.isPresent()); + assertEquals(encodedGeneration.get(), DEFAULT_GENERATION); + + int generation = 10; + assignor.onAssignment(null, new ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id", Optional.empty())); + + subscription = new Subscription(topics(topic), assignor.subscriptionUserData(new HashSet<>(topics(topic)))); + encodedGeneration = ((CooperativeStickyAssignor) assignor).memberData(subscription).generation; + + assertTrue(encodedGeneration.isPresent()); + assertEquals(encodedGeneration.get(), generation); + } + + @Test + public void testDecodeGeneration() { + Subscription subscription = new Subscription(topics(topic)); + assertFalse(((CooperativeStickyAssignor) assignor).memberData(subscription).generation.isPresent()); + } + + @Test + public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithEqualPartitionsPerConsumer() { + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 3); + + subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1)))); + subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 2)))); + subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList())); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2)); + // In the cooperative assignor, topic-0 has to be considered "owned" and so it cant be assigned until both have "revoked" it + assertTrue(assignment.get(consumer3).isEmpty()); + + verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); + assertTrue(isFullyBalanced(assignment)); + } + + @Test + public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithUnequalPartitionsPerConsumer() { + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 4); + + subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1)))); + subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 2)))); + subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList())); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 2), tp(topic, 3)), assignment.get(consumer2)); + // In the cooperative assignor, topic-0 has to be considered "owned" and so it cant be assigned until both have "revoked" it + assertTrue(assignment.get(consumer3).isEmpty()); + + verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); + assertTrue(isFullyBalanced(assignment)); + } + /** * The cooperative assignor must do some additional work and verification of some assignments relative to the eager * assignor, since it may or may not need to trigger a second follow-up rebalance. diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java index edc522bb082ec..678f62d5e8220 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer; +import static java.util.Collections.emptyList; import static org.apache.kafka.clients.consumer.StickyAssignor.serializeTopicPartitionAssignment; import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -51,6 +52,48 @@ public Subscription buildSubscription(List topics, List serializeTopicPartitionAssignment(new MemberData(partitions, Optional.of(DEFAULT_GENERATION)))); } + @Override + public Subscription buildSubscriptionWithGeneration(List topics, List partitions, int generation) { + return new Subscription(topics, + serializeTopicPartitionAssignment(new MemberData(partitions, Optional.of(generation)))); + } + + @Test + public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithEqualPartitionsPerConsumer() { + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 3); + + subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1)))); + subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 2)))); + subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList())); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2)); + assertEquals(partitions(tp(topic, 0)), assignment.get(consumer3)); + + verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); + assertTrue(isFullyBalanced(assignment)); + } + + @Test + public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithUnequalPartitionsPerConsumer() { + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 4); + + subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1)))); + subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 2)))); + subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList())); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 2), tp(topic, 3)), assignment.get(consumer2)); + assertEquals(partitions(tp(topic, 0)), assignment.get(consumer3)); + + verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); + assertTrue(isFullyBalanced(assignment)); + } + @Test public void testAssignmentWithMultipleGenerations1() { String consumer1 = "consumer1"; @@ -215,11 +258,6 @@ public void testSchemaBackwardCompatibility() { assertTrue(isFullyBalanced(assignment)); } - private Subscription buildSubscriptionWithGeneration(List topics, List partitions, int generation) { - return new Subscription(topics, - serializeTopicPartitionAssignment(new MemberData(partitions, Optional.of(generation)))); - } - private static Subscription buildSubscriptionWithOldSchema(List topics, List partitions) { Struct struct = new Struct(StickyAssignor.STICKY_ASSIGNOR_USER_DATA_V0); List topicAssignments = new ArrayList<>(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java index 35785401770a4..c7b359df740d1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java @@ -34,6 +34,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -43,13 +44,21 @@ public abstract class AbstractStickyAssignorTest { protected AbstractStickyAssignor assignor; protected String consumerId = "consumer"; + protected String consumer1 = "consumer1"; + protected String consumer2 = "consumer2"; + protected String consumer3 = "consumer3"; protected Map subscriptions; protected String topic = "topic"; + protected String topic1 = "topic1"; + protected String topic2 = "topic2"; + protected String topic3 = "topic3"; protected abstract AbstractStickyAssignor createAssignor(); protected abstract Subscription buildSubscription(List topics, List partitions); + protected abstract Subscription buildSubscriptionWithGeneration(List topics, List partitions, int generation); + @BeforeEach public void setUp() { assignor = createAssignor(); @@ -651,6 +660,62 @@ public void testReassignmentWithRandomSubscriptionsAndChanges() { } } + @Test + public void testAllConsumersReachExpectedQuotaAndAreConsideredFilled() { + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 4); + + subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1)))); + subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 2)))); + subscriptions.put(consumer3, buildSubscription(topics(topic), Collections.emptyList())); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(partitions(tp(topic, 0), tp(topic, 1)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2)); + assertEquals(partitions(tp(topic, 3)), assignment.get(consumer3)); + + verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); + assertTrue(isFullyBalanced(assignment)); + } + + @Test + public void testOwnedPartitionsAreInvalidatedForConsumerWithStaleGeneration() { + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 3); + partitionsPerTopic.put(topic2, 3); + + int currentGeneration = 10; + + subscriptions.put(consumer1, buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1)), currentGeneration)); + subscriptions.put(consumer2, buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1)), currentGeneration - 1)); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1))), new HashSet<>(assignment.get(consumer1))); + assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 0), tp(topic2, 2))), new HashSet<>(assignment.get(consumer2))); + + verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); + assertTrue(isFullyBalanced(assignment)); + } + + @Test + public void testOwnedPartitionsAreInvalidatedForConsumerWithNoGeneration() { + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 3); + partitionsPerTopic.put(topic2, 3); + + int currentGeneration = 10; + + subscriptions.put(consumer1, buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1)), currentGeneration)); + subscriptions.put(consumer2, buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1)), DEFAULT_GENERATION)); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1))), new HashSet<>(assignment.get(consumer1))); + assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 0), tp(topic2, 2))), new HashSet<>(assignment.get(consumer2))); + + verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); + assertTrue(isFullyBalanced(assignment)); + } + private String getTopicName(int i, int maxNum) { return getCanonicalName("t", i, maxNum); }