Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,8 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
subscriptionMetadata = group.computeSubscriptionMetadata(
member,
updatedMember,
metadataImage.topics()
metadataImage.topics(),
metadataImage.cluster()
);

if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
Expand Down Expand Up @@ -942,7 +943,8 @@ private List<Record> consumerGroupFenceMember(
Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata(
member,
null,
metadataImage.topics()
metadataImage.topics(),
metadataImage.cluster()
);

if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,24 @@ public static Record newGroupSubscriptionMetadataRecord(
Map<String, TopicMetadata> newSubscriptionMetadata
) {
ConsumerGroupPartitionMetadataValue value = new ConsumerGroupPartitionMetadataValue();
newSubscriptionMetadata.forEach((topicName, topicMetadata) ->
newSubscriptionMetadata.forEach((topicName, topicMetadata) -> {
List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>();
// If the partition rack information map is empty, store an empty list in the record.
if (!topicMetadata.partitionRacks().isEmpty()) {
topicMetadata.partitionRacks().forEach((partition, racks) ->
partitionMetadata.add(new ConsumerGroupPartitionMetadataValue.PartitionMetadata()
.setPartition(partition)
.setRacks(new ArrayList<>(racks))
)
);
}
value.topics().add(new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(topicMetadata.id())
.setTopicName(topicMetadata.name())
.setNumPartitions(topicMetadata.numPartitions())
)
);
.setPartitionMetadata(partitionMetadata)
);
});

return new Record(
new ApiMessageAndVersion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.kafka.coordinator.group.assignor;

import org.apache.kafka.common.Uuid;

import java.util.Map;
import java.util.Objects;

Expand All @@ -26,59 +24,39 @@
*/
public class AssignmentSpec {
/**
* The members keyed by member id.
* The member metadata keyed by member Id.
*/
private final Map<String, AssignmentMemberSpec> members;

/**
* The topics' metadata keyed by topic id.
*/
private final Map<Uuid, AssignmentTopicMetadata> topics;

public AssignmentSpec(
Map<String, AssignmentMemberSpec> members,
Map<Uuid, AssignmentTopicMetadata> topics
Map<String, AssignmentMemberSpec> members
) {
Objects.requireNonNull(members);
Objects.requireNonNull(topics);
this.members = members;
this.topics = topics;
}

/**
* @return Member metadata keyed by member Ids.
* @return Member metadata keyed by member Id.
*/
public Map<String, AssignmentMemberSpec> members() {
return members;
}

/**
* @return Topic metadata keyed by topic Ids.
*/
public Map<Uuid, AssignmentTopicMetadata> topics() {
return topics;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AssignmentSpec that = (AssignmentSpec) o;
if (!members.equals(that.members)) return false;
return topics.equals(that.topics);
return members.equals(that.members);
}

@Override
public int hashCode() {
int result = members.hashCode();
result = 31 * result + topics.hashCode();
return result;
return members.hashCode();
}

@Override
public String toString() {
return "AssignmentSpec(members=" + members +
", topics=" + topics +
')';
return "AssignmentSpec(members=" + members + ')';
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,17 @@
*/
@InterfaceStability.Unstable
public interface PartitionAssignor {

/**
* Unique name for this assignor.
*/
String name();

/**
* Perform the group assignment given the current members and
* topic metadata.
* Assigns partitions to group members based on the given assignment specification and topic metadata.
*
* @param assignmentSpec The assignment spec.
* @param assignmentSpec The assignment spec which includes member metadata.
* @param subscribedTopicDescriber The topic and partition metadata describer.
* @return The new assignment for the group.
*/
GroupAssignment assign(AssignmentSpec assignmentSpec) throws PartitionAssignorException;
GroupAssignment assign(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@
* This Range Assignor inherits properties of both the range assignor and the sticky assignor.
* The properties are as follows:
* <ol>
* <li> Each member must get at least one partition from every topic that it is subscribed to. The only exception is when
* the number of subscribed members is greater than the number of partitions for that topic. (Range) </li>
* <li> Each member must get at least one partition from every topic that it is subscribed to.
* The only exception is when the number of subscribed members is greater than the
* number of partitions for that topic. (Range) </li>
* <li> Partitions should be assigned to members in a way that facilitates the join operation when required. (Range)
* This can only be done if every member is subscribed to the same topics and the topics are co-partitioned.
* Two streams are co-partitioned if the following conditions are met:
Expand Down Expand Up @@ -76,25 +77,28 @@ public MemberWithRemainingAssignments(String memberId, int remaining) {
}

/**
* @return Map of topic ids to a list of members subscribed to them.
* Returns a map of topic Ids to a list of members subscribed to them,
* based on the given assignment specification and metadata.
*
* @param assignmentSpec The specification for member assignments.
* @param subscribedTopicDescriber The metadata describer for subscribed topics and clusters.
* @return A map of topic Ids to a list of member Ids subscribed to them.
*
* @throws PartitionAssignorException If a member is subscribed to a non-existent topic.
*/
private Map<Uuid, List<String>> membersPerTopic(final AssignmentSpec assignmentSpec) {
private Map<Uuid, List<String>> membersPerTopic(final AssignmentSpec assignmentSpec, final SubscribedTopicDescriber subscribedTopicDescriber) {
Map<Uuid, List<String>> membersPerTopic = new HashMap<>();
Map<String, AssignmentMemberSpec> membersData = assignmentSpec.members();

membersData.forEach((memberId, memberMetadata) -> {
Collection<Uuid> topics = memberMetadata.subscribedTopicIds();
for (Uuid topicId : topics) {
// Only topics that are present in both the subscribed topics list and the topic metadata should be
// considered for assignment.
if (assignmentSpec.topics().containsKey(topicId)) {
membersPerTopic
.computeIfAbsent(topicId, k -> new ArrayList<>())
.add(memberId);
} else {
throw new PartitionAssignorException("Member " + memberId + " subscribed to topic " +
topicId + " which doesn't exist in the topic metadata");
if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
throw new PartitionAssignorException("Member is subscribed to a non-existent topic");
}
membersPerTopic
.computeIfAbsent(topicId, k -> new ArrayList<>())
.add(memberId);
}
});

Expand All @@ -118,14 +122,14 @@ private Map<Uuid, List<String>> membersPerTopic(final AssignmentSpec assignmentS
* </ol>
*/
@Override
public GroupAssignment assign(final AssignmentSpec assignmentSpec) throws PartitionAssignorException {
public GroupAssignment assign(final AssignmentSpec assignmentSpec, final SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException {
Map<String, MemberAssignment> newAssignment = new HashMap<>();

// Step 1
Map<Uuid, List<String>> membersPerTopic = membersPerTopic(assignmentSpec);
Map<Uuid, List<String>> membersPerTopic = membersPerTopic(assignmentSpec, subscribedTopicDescriber);

membersPerTopic.forEach((topicId, membersForTopic) -> {
int numPartitionsForTopic = assignmentSpec.topics().get(topicId).numPartitions();
int numPartitionsForTopic = subscribedTopicDescriber.numPartitions(topicId);
int minRequiredQuota = numPartitionsForTopic / membersForTopic.size();
// Each member can get only ONE extra partition per topic after receiving the minimum quota.
int numMembersWithExtraPartition = numPartitionsForTopic % membersForTopic.size();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Set;

/**
* The subscribed topic describer is used by the {@link PartitionAssignor}
* to obtain topic and partition metadata of the subscribed topics.
*
* The interface is kept in an internal module until KIP-848 is fully
* implemented and ready to be released.
*/
@InterfaceStability.Unstable
public interface SubscribedTopicDescriber {
/**
* The number of partitions for the given topic Id.
*
* @param topicId Uuid corresponding to the topic.
* @return The number of partitions corresponding to the given topic Id,
* or -1 if the topic Id does not exist.
*/
int numPartitions(Uuid topicId);

/**
* Returns all the available racks associated with the replicas of the given partition.
*
* @param topicId Uuid corresponding to the partition's topic.
* @param partition Partition Id within topic.
* @return The set of racks corresponding to the replicas of the topic's partition.
* If the topic Id does not exist, an empty set is returned.
*/
Set<String> racksForPartition(Uuid topicId, int partition);
}
Loading