Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
files="AbstractResponse.java"/>

<suppress checks="MethodLength"
files="KerberosLogin.java|RequestResponseTest.java|ConnectMetricsRegistry.java|KafkaConsumer.java"/>
files="(KerberosLogin|RequestResponseTest|ConnectMetricsRegistry|KafkaConsumer|AbstractStickyAssignor).java"/>

<suppress checks="ParameterNumber"
files="(NetworkClient|FieldSpec|KafkaRaftClient).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -25,6 +26,10 @@
import java.util.Set;
import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;

/**
* A cooperative version of the {@link AbstractStickyAssignor AbstractStickyAssignor}. This follows the same (sticky)
Expand All @@ -43,6 +48,15 @@
*/
public class CooperativeStickyAssignor extends AbstractStickyAssignor {

// these schemas are used for preserving useful metadata for the assignment, such as the last stable generation
private static final String GENERATION_KEY_NAME = "generation";
private static final Schema COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0 = new Schema(
new Field(GENERATION_KEY_NAME, Type.INT32));

private int generation = DEFAULT_GENERATION; // consumer group generation



@Override
public String name() {
return "cooperative-sticky";
Expand All @@ -53,9 +67,37 @@ public List<RebalanceProtocol> supportedProtocols() {
return Arrays.asList(RebalanceProtocol.COOPERATIVE, RebalanceProtocol.EAGER);
}

@Override
public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
this.generation = metadata.generationId();
}

@Override
public ByteBuffer subscriptionUserData(Set<String> topics) {
Struct struct = new Struct(COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0);

struct.set(GENERATION_KEY_NAME, generation);
ByteBuffer buffer = ByteBuffer.allocate(COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.sizeOf(struct));
COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.write(buffer, struct);
buffer.flip();
return buffer;
}

@Override
protected MemberData memberData(Subscription subscription) {
return new MemberData(subscription.ownedPartitions(), Optional.empty());
ByteBuffer buffer = subscription.userData();
Optional<Integer> encodedGeneration;
if (buffer == null) {
encodedGeneration = Optional.empty();
} else {
try {
Struct struct = COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.read(buffer);
encodedGeneration = Optional.of(struct.getInt(GENERATION_KEY_NAME));
} catch (Exception e) {
encodedGeneration = Optional.of(DEFAULT_GENERATION);
}
}
return new MemberData(subscription.ownedPartitions(), encodedGeneration);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,32 +73,39 @@ public MemberData(List<TopicPartition> partitions, Optional<Integer> generation)
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
Map<String, List<TopicPartition>> consumerToOwnedPartitions = new HashMap<>();
if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, consumerToOwnedPartitions)) {
Set<TopicPartition> partitionsWithMultiplePreviousOwners = new HashSet<>();
if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, consumerToOwnedPartitions, partitionsWithMultiplePreviousOwners)) {
log.debug("Detected that all consumers were subscribed to same set of topics, invoking the "
+ "optimized assignment algorithm");
partitionsTransferringOwnership = new HashMap<>();
return constrainedAssign(partitionsPerTopic, consumerToOwnedPartitions);
return constrainedAssign(partitionsPerTopic, consumerToOwnedPartitions, partitionsWithMultiplePreviousOwners);
} else {
log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the "
+ "general case assignment algorithm");
// we must set this to null for the general case so the cooperative assignor knows to compute it from scratch
partitionsTransferringOwnership = null;
return generalAssign(partitionsPerTopic, subscriptions);
}
}

/**
* Returns true iff all consumers have an identical subscription. Also fills out the passed in
* {@code consumerToOwnedPartitions} with each consumer's previously owned and still-subscribed partitions
* {@code consumerToOwnedPartitions} with each consumer's previously owned and still-subscribed partitions,
* and the {@code partitionsWithMultiplePreviousOwners} with any partitions claimed by multiple previous owners
*/
private boolean allSubscriptionsEqual(Set<String> allTopics,
Map<String, Subscription> subscriptions,
Map<String, List<TopicPartition>> consumerToOwnedPartitions) {
Set<String> membersWithOldGeneration = new HashSet<>();
Map<String, List<TopicPartition>> consumerToOwnedPartitions,
Set<TopicPartition> partitionsWithMultiplePreviousOwners) {
Set<String> membersOfCurrentHighestGeneration = new HashSet<>();
int maxGeneration = DEFAULT_GENERATION;

Set<String> subscribedTopics = new HashSet<>();

// keep track of all previously owned partitions so we can invalidate them if invalid input is
// detected, eg two consumers somehow claiming the same partition in the same/current generation
Map<TopicPartition, String> allPreviousPartitionsToOwner = new HashMap<>();

for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
String consumer = subscriptionEntry.getKey();
Subscription subscription = subscriptionEntry.getValue();
Expand All @@ -123,7 +130,12 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,

// If the current member's generation is higher, all the previously owned partitions are invalid
if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
allPreviousPartitionsToOwner.clear();
partitionsWithMultiplePreviousOwners.clear();
for (String droppedOutConsumer : membersOfCurrentHighestGeneration) {
consumerToOwnedPartitions.get(droppedOutConsumer).clear();
}

membersOfCurrentHighestGeneration.clear();
maxGeneration = memberData.generation.get();
}
Expand All @@ -132,15 +144,22 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
for (final TopicPartition tp : memberData.partitions) {
// filter out any topics that no longer exist or aren't part of the current subscription
if (allTopics.contains(tp.topic())) {
ownedPartitions.add(tp);
if (!allPreviousPartitionsToOwner.containsKey(tp)) {
allPreviousPartitionsToOwner.put(tp, consumer);
ownedPartitions.add(tp);
} else {
String otherConsumer = allPreviousPartitionsToOwner.get(tp);
log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
+ "same generation {}, this will be invalidated and removed from their previous assignment.",
consumer, otherConsumer, tp, maxGeneration);
consumerToOwnedPartitions.get(otherConsumer).remove(tp);
partitionsWithMultiplePreviousOwners.add(tp);
}
}
}
}
}

for (String consumer : membersWithOldGeneration) {
consumerToOwnedPartitions.get(consumer).clear();
}
return true;
}

Expand All @@ -156,13 +175,15 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
* 4. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we
* should just distribute one partition each to all consumers at min capacity
*
* @param partitionsPerTopic The number of partitions for each subscribed topic
* @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions
* @param partitionsPerTopic The number of partitions for each subscribed topic
* @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions
* @param partitionsWithMultiplePreviousOwners The partitions being claimed in the previous assignment of multiple consumers
*
* @return Map from each member to the list of partitions assigned to them.
* @return Map from each member to the list of partitions assigned to them.
*/
private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic,
Map<String, List<TopicPartition>> consumerToOwnedPartitions) {
Map<String, List<TopicPartition>> consumerToOwnedPartitions,
Set<TopicPartition> partitionsWithMultiplePreviousOwners) {
SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic);

Set<TopicPartition> allRevokedPartitions = new HashSet<>();
Expand All @@ -189,6 +210,16 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer>
List<TopicPartition> ownedPartitions = consumerEntry.getValue();

List<TopicPartition> consumerAssignment = assignment.get(consumer);

for (TopicPartition doublyClaimedPartition : partitionsWithMultiplePreviousOwners) {
if (ownedPartitions.contains(doublyClaimedPartition)) {
log.error("Found partition {} still claimed as owned by consumer {}, despite being claimed by multiple "
+ "consumers already in the same generation. Removing it from the ownedPartitions",
doublyClaimedPartition, consumer);
ownedPartitions.remove(doublyClaimedPartition);
}
}

int i = 0;
// assign the first N partitions up to the max quota, and mark the remaining as being revoked
for (TopicPartition tp : ownedPartitions) {
Expand Down Expand Up @@ -228,7 +259,7 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer>
consumerAssignment.add(tp);
unassignedPartitionsIter.remove();
// We already assigned all possible ownedPartitions, so we know this must be newly to this consumer
if (allRevokedPartitions.contains(tp))
if (allRevokedPartitions.contains(tp) || partitionsWithMultiplePreviousOwners.contains(tp))
partitionsTransferringOwnership.put(tp, consumer);
} else {
break;
Expand Down Expand Up @@ -271,10 +302,12 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer>
// We can skip the bookkeeping of unassignedPartitions and maxCapacityMembers here since we are at the end
assignment.get(underCapacityConsumer).add(unassignedPartition);

if (allRevokedPartitions.contains(unassignedPartition))
if (allRevokedPartitions.contains(unassignedPartition) || partitionsWithMultiplePreviousOwners.contains(unassignedPartition))
partitionsTransferringOwnership.put(unassignedPartition, underCapacityConsumer);
}

log.info("Final assignment of partitions to consumers: \n{}", assignment);

return assignment;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,23 @@
*/
package org.apache.kafka.clients.consumer;

import static java.util.Collections.emptyList;
import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor;
import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;

public class CooperativeStickyAssignorTest extends AbstractStickyAssignorTest {

Expand All @@ -39,6 +46,74 @@ public Subscription buildSubscription(List<String> topics, List<TopicPartition>
return new Subscription(topics, assignor.subscriptionUserData(new HashSet<>(topics)), partitions);
}

@Override
public Subscription buildSubscriptionWithGeneration(List<String> topics, List<TopicPartition> partitions, int generation) {
assignor.onAssignment(null, new ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id", Optional.empty()));
return new Subscription(topics, assignor.subscriptionUserData(new HashSet<>(topics)), partitions);
}

@Test
public void testEncodeAndDecodeGeneration() {
Subscription subscription = new Subscription(topics(topic), assignor.subscriptionUserData(new HashSet<>(topics(topic))));

Optional<Integer> encodedGeneration = ((CooperativeStickyAssignor) assignor).memberData(subscription).generation;
assertTrue(encodedGeneration.isPresent());
assertEquals(encodedGeneration.get(), DEFAULT_GENERATION);

int generation = 10;
assignor.onAssignment(null, new ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id", Optional.empty()));

subscription = new Subscription(topics(topic), assignor.subscriptionUserData(new HashSet<>(topics(topic))));
encodedGeneration = ((CooperativeStickyAssignor) assignor).memberData(subscription).generation;

assertTrue(encodedGeneration.isPresent());
assertEquals(encodedGeneration.get(), generation);
}

@Test
public void testDecodeGeneration() {
Subscription subscription = new Subscription(topics(topic));
assertFalse(((CooperativeStickyAssignor) assignor).memberData(subscription).generation.isPresent());
}

@Test
public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithEqualPartitionsPerConsumer() {
Map<String, Integer> partitionsPerTopic = new HashMap<>();
partitionsPerTopic.put(topic, 3);

subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1))));
subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 2))));
subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList()));

Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1));
assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
// In the cooperative assignor, topic-0 has to be considered "owned" and so it cant be assigned until both have "revoked" it
assertTrue(assignment.get(consumer3).isEmpty());

verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
assertTrue(isFullyBalanced(assignment));
}

@Test
public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithUnequalPartitionsPerConsumer() {
Map<String, Integer> partitionsPerTopic = new HashMap<>();
partitionsPerTopic.put(topic, 4);

subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1))));
subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 2))));
subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList()));

Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1));
assertEquals(partitions(tp(topic, 2), tp(topic, 3)), assignment.get(consumer2));
// In the cooperative assignor, topic-0 has to be considered "owned" and so it cant be assigned until both have "revoked" it
assertTrue(assignment.get(consumer3).isEmpty());

verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
assertTrue(isFullyBalanced(assignment));
}

/**
* The cooperative assignor must do some additional work and verification of some assignments relative to the eager
* assignor, since it may or may not need to trigger a second follow-up rebalance.
Expand Down
Loading