Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,11 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
// generation amongst
for (final TopicPartition tp : memberData.partitions) {
if (allTopics.contains(tp.topic())) {
String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
String otherConsumer = allPreviousPartitionsToOwner.get(tp);
if (otherConsumer == null) {
// this partition is not owned by other consumer in the same generation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd Have to add a put here too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, plz take a look

ownedPartitions.add(tp);
allPreviousPartitionsToOwner.put(tp, consumer);
} else {
final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);

Expand Down Expand Up @@ -1172,7 +1173,7 @@ private boolean isBalanced() {
if (!currentAssignment.get(consumer).contains(topicPartition)) {
String otherConsumer = allPartitions.get(topicPartition);
int otherConsumerPartitionCount = currentAssignment.get(otherConsumer).size();
if (consumerPartitionCount < otherConsumerPartitionCount) {
if (consumerPartitionCount + 1 < otherConsumerPartitionCount) {
log.debug("{} can be moved from consumer {} to consumer {} for a more balanced assignment.",
topicPartition, otherConsumer, consumer);
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
import org.apache.kafka.clients.consumer.StickyAssignor;
Expand Down Expand Up @@ -724,6 +725,91 @@ public void testLargeAssignmentAndGroupWithNonEqualSubscription(boolean hasConsu
assignor.assignPartitions(partitionsPerTopic, subscriptions);
}

@Timeout(90)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an opinion here: Other tests also use timeout annotation, but it might be better to use TestUtils.waitForCondition(() from other tests because it directly tests the timeout of the assignPartitions invocation. To me, it is a bit more precise about where the tests are failing. Also, any specific reason we use 90? I assume the unit is second? I see there's a variation of timeout we are using, from 30->90 😅.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this 90 should be sec, I set 90 sec because in my m1 air this test may run about 25 sec or more time.

I tried TestUtils.waitForCondition but seems it is not effective and couldn't throw Exception when execution use time more than expected, I'm not familiar with this and couldn't find the reason.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah no problem.

25s is surprisingly long!

@ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK)
@ValueSource(booleans = {false, true})
public void testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean hasConsumerRack) {
initializeRacks(hasConsumerRack ? RackConfig.BROKER_AND_CONSUMER_RACK : RackConfig.NO_CONSUMER_RACK);
int topicCount = hasConsumerRack ? 50 : 100;
int partitionCount = 2_00;
int consumerCount = 5_00;

List<String> topics = new ArrayList<>();
Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
for (int i = 0; i < topicCount; i++) {
String topicName = getTopicName(i, topicCount);
topics.add(topicName);
partitionsPerTopic.put(topicName, partitionInfos(topicName, partitionCount));
}
for (int i = 0; i < consumerCount; i++) {
if (i % 4 == 0) {
subscriptions.put(getConsumerName(i, consumerCount),
subscription(topics.subList(0, topicCount / 2), i));
} else {
subscriptions.put(getConsumerName(i, consumerCount),
subscription(topics.subList(topicCount / 2, topicCount), i));
}
}

Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);

for (int i = 1; i < consumerCount; i++) {
String consumer = getConsumerName(i, consumerCount);
if (i % 4 == 0) {
subscriptions.put(
consumer,
buildSubscriptionV2Above(topics.subList(0, topicCount / 2),
assignment.get(consumer), generationId, i)
);
} else {
subscriptions.put(
consumer,
buildSubscriptionV2Above(topics.subList(topicCount / 2, topicCount),
assignment.get(consumer), generationId, i)
);
}
}

assignor.assignPartitions(partitionsPerTopic, subscriptions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we directly test the GeneralAssignmentBuilder.build() ? It seems like that's where isBalanced is triggered. We could test it at the top level, but as the bug occurs in isBalanced() which is only triggered in the GeneralAssignmentBuilder, it would be more direct, when failing, for people to debug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now GeneralAssignmentBuilder is the inner private class of AbstractStickyAssignor, test it directly need modify it to protected access level.

}

@Test
public void testSubscriptionNotEqualAndAssignSamePartitionWith3Generation() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the test! I wrote a similar test, testEnsurePartitionsAssignedToHighestGeneration but just the generation order was 0, 1, 2 instead of, per your suggestions, 0, 2, 1. Do you think we could parametrize that test or reuse the same test style, to make things more consistent, but change the assignment order? Also, is the title accurate? This happens in allSubscriptionsEqual, so it doesn't matter if the subscriptions are equal or not right? The goal of this is to ensure consumer with the highest generation gets the right assignment, regardless of the generation order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is to test allSubscriptionsEqual so its name is a mistake, thx!

but this bug in allSubscriptionsEqual not lead to the specified partition that previously assigned to at least 3 generation consumers cannot be owned by the highest generation, but would lead this partition assign to more than one consumers so add to assignedPartitions more than once and also the function getUnassignedPartitions will get more partitions than actuality, as a result, many partitions already assigned to consumers will be added to unassignedPartitions, all of these partitions would be assigned twice.

I couldn't understand how to reuse such test style, could you provide any example?

Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
partitionsPerTopic.put(topic, partitionInfos(topic, 6));
partitionsPerTopic.put(topic1, partitionInfos(topic1, 1));
int[][] sequence = new int[][]{{1, 2, 3}, {1, 3, 2}, {2, 1, 3}, {2, 3, 1}, {3, 1, 2}, {3, 2, 1}};
for (int[] ints : sequence) {
subscriptions.put(
consumer1,
buildSubscriptionV2Above(topics(topic),
partitions(tp(topic, 0), tp(topic, 2)), ints[0], 0)
);
subscriptions.put(
consumer2,
buildSubscriptionV2Above(topics(topic),
partitions(tp(topic, 1), tp(topic, 2), tp(topic, 3)), ints[1], 1)
);
subscriptions.put(
consumer3,
buildSubscriptionV2Above(topics(topic),
partitions(tp(topic, 2), tp(topic, 4), tp(topic, 5)), ints[2], 2)
);
subscriptions.put(
consumer4,
buildSubscriptionV2Above(topics(topic1),
partitions(tp(topic1, 0)), 2, 3)
);

Map<String, List<TopicPartition>> assign = assignor.assignPartitions(partitionsPerTopic, subscriptions);
assertEquals(assign.values().stream().mapToInt(List::size).sum(),
assign.values().stream().flatMap(List::stream).collect(Collectors.toSet()).size());
for (List<TopicPartition> list: assign.values()) {
assertTrue(list.size() >= 1 && list.size() <= 2);
}
}
}

@Timeout(60)
@ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK)
@ValueSource(booleans = {false, true})
Expand Down