From 34e880665724cf069a2bb5d8ac23437f5b07cdd8 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Fri, 7 May 2021 11:40:24 +0800 Subject: [PATCH] KAFKA-12464: refactor codes and add logs --- .../internals/AbstractStickyAssignor.java | 53 ++++++++++++------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java index 24c8107f25e3b..5798909927461 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java @@ -178,17 +178,17 @@ private Map> constrainedAssign(Map int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); - // the expected number of members with maxQuota assignment - int expectedNumMembersHavingMorePartitions = totalPartitionsCount % numberOfConsumers; - // the number of members with exactly maxQuota partitions assigned - int numMembersHavingMorePartitions = 0; + // the expected number of members with over minQuota assignment + int expectedNumMembersAssignedOverMinQuota = totalPartitionsCount % numberOfConsumers; + // the number of members with over minQuota partitions assigned + int numMembersAssignedOverMinQuota = 0; // initialize the assignment map with an empty array of size maxQuota for all members Map> assignment = new HashMap<>( consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota)))); List assignedPartitions = new ArrayList<>(); - // Reassign as many previously owned partitions as possible + // Reassign previously owned partitions to the expected number for (Map.Entry> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List ownedPartitions = consumerEntry.getValue(); @@ -203,10 +203,10 @@ private Map> constrainedAssign(Map assignedPartitions.addAll(ownedPartitions); } unfilledMembers.add(consumer); - } else if (ownedPartitions.size() >= maxQuota && numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions) { - // consumer owned the "maxQuota" of partitions or more, and we're still under the number of expected max capacity members - // so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions - numMembersHavingMorePartitions++; + } else if (ownedPartitions.size() >= maxQuota && numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) { + // consumer owned the "maxQuota" of partitions or more, and we're still under the number of expected members + // with more than the minQuota partitions, so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions + numMembersAssignedOverMinQuota++; List maxQuotaPartitions = ownedPartitions.subList(0, maxQuota); consumerAssignment.addAll(maxQuotaPartitions); assignedPartitions.addAll(maxQuotaPartitions); @@ -218,8 +218,10 @@ private Map> constrainedAssign(Map consumerAssignment.addAll(minQuotaPartitions); assignedPartitions.addAll(minQuotaPartitions); allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size())); - // this consumer is potential maxQuota candidate since we're still under the number of expected max capacity members - if (numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions) { + // this consumer is potential maxQuota candidate since we're still under the number of expected members + // with more than the minQuota partitions. Note, if the number of expected members with more than + // the minQuota partitions is 0, it means minQuota == maxQuota, so they won't be put into unfilledMembers + if (numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) { unfilledMembers.add(consumer); } } @@ -242,6 +244,9 @@ private Map> constrainedAssign(Map if (unfilledMembers.isEmpty()) { // Should not enter here since we have calculated the exact number to assign to each consumer // There might be issues in the assigning algorithm, or maybe assigning the same partition to two owners. + int currentPartitionIndex = unassignedPartitions.indexOf(unassignedPartition); + log.error("No more unfilled consumers to be assigned. The remaining unassigned partitions are: {}", + unassignedPartitions.subList(currentPartitionIndex, unassignedPartitions.size())); throw new IllegalStateException("No more unfilled consumers to be assigned."); } unfilledConsumerIter = unfilledMembers.iterator(); @@ -255,27 +260,35 @@ private Map> constrainedAssign(Map partitionsTransferringOwnership.put(unassignedPartition, consumer); int currentAssignedCount = consumerAssignment.size(); - int expectedAssignedCount = numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions ? maxQuota : minQuota; + int expectedAssignedCount = numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota ? maxQuota : minQuota; if (currentAssignedCount == expectedAssignedCount) { if (currentAssignedCount == maxQuota) { - numMembersHavingMorePartitions++; + numMembersAssignedOverMinQuota++; } unfilledConsumerIter.remove(); } } if (!unfilledMembers.isEmpty()) { - // we expected all the remaining unfilled members have minQuota partitions and we're already at the allowed number - // of max capacity members. Otherwise, there must be error here. - if (numMembersHavingMorePartitions != expectedNumMembersHavingMorePartitions) { - throw new IllegalStateException(String.format("We haven't reached the allowed number of max capacity members, " + - "but no more partitions to be assigned to unfilled consumers: %s", unfilledMembers)); + // we expected all the remaining unfilled members have minQuota partitions and we're already at the expected number + // of members with more than the minQuota partitions. Otherwise, there must be error here. + if (numMembersAssignedOverMinQuota != expectedNumMembersAssignedOverMinQuota) { + log.error("Current number of members with more than the minQuota partitions: {}, is less than the expected number " + + "of members with more than the minQuota partitions: {}, and no more partitions to be assigned to the remaining unfilled consumers: {}", + numMembersAssignedOverMinQuota, expectedNumMembersAssignedOverMinQuota, unfilledMembers); + throw new IllegalStateException("We haven't reached the expected number of members with " + + "more than the minQuota partitions, but no more partitions to be assigned"); } else { for (String unfilledMember : unfilledMembers) { int assignedPartitionsCount = assignment.get(unfilledMember).size(); if (assignedPartitionsCount != minQuota) { - throw new IllegalStateException(String.format("Consumer: [%s] should have %d partitions, but got %d partitions, " + - "and no more partitions to be assigned", unfilledMember, minQuota, assignedPartitionsCount)); + log.error("Consumer: [{}] should have {} partitions, but got {} partitions, and no more partitions " + + "to be assigned. The remaining unfilled consumers are: {}", unfilledMember, minQuota, assignedPartitionsCount, unfilledMembers); + throw new IllegalStateException(String.format("Consumer: [%s] doesn't reach minQuota partitions, " + + "and no more partitions to be assigned", unfilledMember)); + } else { + log.trace("skip over this unfilled member: [{}] because we've reached the expected number of " + + "members with more than the minQuota partitions, and this member already have minQuota partitions", unfilledMember); } } }