Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/>

<suppress checks="NPathComplexity"
files="MemoryRecordsTest|MetricsTest|TestSslUtils|AclAuthorizerBenchmark"/>
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark"/>

<suppress checks="(WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrLiveLeader;
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrTopicState;
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState;
import org.apache.kafka.common.message.LeaderAndIsrResponseData;
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError;
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
Expand All @@ -43,12 +45,15 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
public static class Builder extends AbstractControlRequest.Builder<LeaderAndIsrRequest> {

private final List<LeaderAndIsrPartitionState> partitionStates;
private final Map<String, Uuid> topicIds;
private final Collection<Node> liveLeaders;

public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch,
List<LeaderAndIsrPartitionState> partitionStates, Collection<Node> liveLeaders) {
List<LeaderAndIsrPartitionState> partitionStates, Map<String, Uuid> topicIds,
Collection<Node> liveLeaders) {
super(ApiKeys.LEADER_AND_ISR, version, controllerId, controllerEpoch, brokerEpoch);
this.partitionStates = partitionStates;
this.topicIds = topicIds;
this.liveLeaders = liveLeaders;
}

Expand All @@ -67,7 +72,7 @@ public LeaderAndIsrRequest build(short version) {
.setLiveLeaders(leaders);

if (version >= 2) {
Map<String, LeaderAndIsrTopicState> topicStatesMap = groupByTopic(partitionStates);
Map<String, LeaderAndIsrTopicState> topicStatesMap = groupByTopic(partitionStates, topicIds);
data.setTopicStates(new ArrayList<>(topicStatesMap.values()));
} else {
data.setUngroupedPartitionStates(partitionStates);
Expand All @@ -76,13 +81,14 @@ public LeaderAndIsrRequest build(short version) {
return new LeaderAndIsrRequest(data, version);
}

private static Map<String, LeaderAndIsrTopicState> groupByTopic(List<LeaderAndIsrPartitionState> partitionStates) {
private static Map<String, LeaderAndIsrTopicState> groupByTopic(List<LeaderAndIsrPartitionState> partitionStates, Map<String, Uuid> topicIds) {
Map<String, LeaderAndIsrTopicState> topicStates = new HashMap<>();
// We don't null out the topic name in LeaderAndIsrRequestPartition since it's ignored by
// the generated code if version >= 2
for (LeaderAndIsrPartitionState partition : partitionStates) {
LeaderAndIsrTopicState topicState = topicStates.computeIfAbsent(partition.topicName(),
t -> new LeaderAndIsrTopicState().setTopicName(partition.topicName()));
LeaderAndIsrTopicState topicState = topicStates.computeIfAbsent(partition.topicName(), t -> new LeaderAndIsrTopicState()
.setTopicName(partition.topicName())
.setTopicId(topicIds.getOrDefault(partition.topicName(), Uuid.ZERO_UUID)));
topicState.partitionStates().add(partition);
}
return topicStates;
Expand All @@ -96,6 +102,7 @@ public String toString() {
.append(", controllerEpoch=").append(controllerEpoch)
.append(", brokerEpoch=").append(brokerEpoch)
.append(", partitionStates=").append(partitionStates)
.append(", topicIds=").append(topicIds)
.append(", liveLeaders=(").append(Utils.join(liveLeaders, ", ")).append(")")
.append(")");
return bld.toString();
Expand Down Expand Up @@ -129,15 +136,34 @@ public LeaderAndIsrResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Errors error = Errors.forException(e);
responseData.setErrorCode(error.code());

List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
for (LeaderAndIsrPartitionState partition : partitionStates()) {
partitions.add(new LeaderAndIsrPartitionError()
.setTopicName(partition.topicName())
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(error.code()));
if (version() < 5) {
List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
for (LeaderAndIsrPartitionState partition : partitionStates()) {
partitions.add(new LeaderAndIsrPartitionError()
.setTopicName(partition.topicName())
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(error.code()));
}
responseData.setPartitionErrors(partitions);
return new LeaderAndIsrResponse(responseData, version());
}

List<LeaderAndIsrTopicError> topics = new ArrayList<>(data.topicStates().size());
Map<String, Uuid> topicIds = topicIds();
for (LeaderAndIsrTopicState topicState : data.topicStates()) {
LeaderAndIsrTopicError topicError = new LeaderAndIsrTopicError();
topicError.setTopicId(topicIds.get(topicState.topicName()));
List<LeaderAndIsrPartitionError> partitions = new ArrayList<>(topicState.partitionStates().size());
for (LeaderAndIsrPartitionState partition : topicState.partitionStates()) {
partitions.add(new LeaderAndIsrPartitionError()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(error.code()));
}
topicError.setPartitionErrors(partitions);
topics.add(topicError);
}
responseData.setPartitionErrors(partitions);
return new LeaderAndIsrResponse(responseData);
responseData.setTopics(topics);
return new LeaderAndIsrResponse(responseData, version());
}

@Override
Expand All @@ -162,6 +188,11 @@ public Iterable<LeaderAndIsrPartitionState> partitionStates() {
return data.ungroupedPartitionStates();
}

public Map<String, Uuid> topicIds() {
return data.topicStates().stream()
.collect(Collectors.toMap(LeaderAndIsrTopicState::topicName, LeaderAndIsrTopicState::topicId));
}

public List<LeaderAndIsrLiveLeader> liveLeaders() {
return Collections.unmodifiableList(data.liveLeaders());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.LeaderAndIsrResponseData;
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError;
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.FlattenedIterator;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.HashMap;
import java.util.Map;

public class LeaderAndIsrResponse extends AbstractResponse {
Expand All @@ -36,14 +41,24 @@ public class LeaderAndIsrResponse extends AbstractResponse {
* STALE_BROKER_EPOCH (77)
*/
private final LeaderAndIsrResponseData data;
private short version;

public LeaderAndIsrResponse(LeaderAndIsrResponseData data) {
public LeaderAndIsrResponse(LeaderAndIsrResponseData data, short version) {
super(ApiKeys.LEADER_AND_ISR);
this.data = data;
this.version = version;
}

public List<LeaderAndIsrPartitionError> partitions() {
return data.partitionErrors();
public List<LeaderAndIsrTopicError> topics() {
return this.data.topics();
}

public Iterable<LeaderAndIsrPartitionError> partitions() {
if (version < 5) {
return data.partitionErrors();
}
return () -> new FlattenedIterator<>(data.topics().iterator(),
topic -> topic.partitionErrors().iterator());
}

public Errors error() {
Expand All @@ -53,22 +68,49 @@ public Errors error() {
@Override
public Map<Errors, Integer> errorCounts() {
Errors error = error();
if (error != Errors.NONE)
if (error != Errors.NONE) {
// Minor optimization since the top-level error applies to all partitions
return Collections.singletonMap(error, data.partitionErrors().size() + 1);
Map<Errors, Integer> errors = errorCounts(data.partitionErrors().stream().map(l -> Errors.forCode(l.errorCode())));
// Top level error
if (version < 5)
return Collections.singletonMap(error, data.partitionErrors().size() + 1);
return Collections.singletonMap(error,
data.topics().stream().mapToInt(t -> t.partitionErrors().size()).sum() + 1);
}
Map<Errors, Integer> errors;
if (version < 5)
errors = errorCounts(data.partitionErrors().stream().map(l -> Errors.forCode(l.errorCode())));
else
errors = errorCounts(data.topics().stream().flatMap(t -> t.partitionErrors().stream()).map(l ->
Errors.forCode(l.errorCode())));
updateErrorCounts(errors, Errors.NONE);
return errors;
}

public Map<TopicPartition, Errors> partitionErrors(Map<Uuid, String> topicNames) {
Map<TopicPartition, Errors> errors = new HashMap<>();
if (version < 5) {
data.partitionErrors().forEach(partition ->
errors.put(new TopicPartition(partition.topicName(), partition.partitionIndex()),
Errors.forCode(partition.errorCode())));
} else {
for (LeaderAndIsrTopicError topic : data.topics()) {
String topicName = topicNames.get(topic.topicId());
if (topicName != null) {
topic.partitionErrors().forEach(partition ->
errors.put(new TopicPartition(topicName, partition.partitionIndex()),
Errors.forCode(partition.errorCode())));
}
}
}
return errors;
}

@Override
public int throttleTimeMs() {
return DEFAULT_THROTTLE_TIME;
}

public static LeaderAndIsrResponse parse(ByteBuffer buffer, short version) {
return new LeaderAndIsrResponse(new LeaderAndIsrResponseData(new ByteBufferAccessor(buffer), version));
return new LeaderAndIsrResponse(new LeaderAndIsrResponseData(new ByteBufferAccessor(buffer), version), version);
}

@Override
Expand Down
12 changes: 10 additions & 2 deletions clients/src/main/resources/common/message/LeaderAndIsrRequest.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
//
// Version 2 adds broker epoch and reorganizes the partitions by topic.
//
// Version 3 adds AddingReplicas and RemovingReplicas
"validVersions": "0-4",
// Version 3 adds AddingReplicas and RemovingReplicas.
//
// Version 4 is the first flexible version.
//
// Version 5 adds Topic ID and Type to the TopicStates, as described in KIP-516.
"validVersions": "0-5",
"flexibleVersions": "4+",
"fields": [
{ "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
Expand All @@ -31,6 +35,8 @@
"about": "The current controller epoch." },
{ "name": "BrokerEpoch", "type": "int64", "versions": "2+", "ignorable": true, "default": "-1",
"about": "The current broker epoch." },
{ "name": "Type", "type": "int8", "versions": "5+",
"about": "The type that indicates whether all topics are included in the request"},
{ "name": "UngroupedPartitionStates", "type": "[]LeaderAndIsrPartitionState", "versions": "0-1",
"about": "The state of each partition, in a v0 or v1 message." },
// In v0 or v1 requests, each partition is listed alongside its topic name.
Expand All @@ -40,6 +46,8 @@
"about": "Each topic.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "2+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "5+", "ignorable": true,
"about": "The unique topic ID." },
{ "name": "PartitionStates", "type": "[]LeaderAndIsrPartitionState", "versions": "2+",
"about": "The state of each partition" }
]},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,29 @@
// Version 2 is the same as version 1.
//
// Version 3 is the same as version 2.
"validVersions": "0-4",
//
// Version 4 is the first flexible version.
//
// Version 5 removes TopicName and replaces it with TopicId and reorganizes
// the partitions by topic, as described by KIP-516.
"validVersions": "0-5",
"flexibleVersions": "4+",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0+",
"about": "Each partition.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0-4",
"about": "Each partition in v0 to v4 message."},
{ "name": "Topics", "type": "[]LeaderAndIsrTopicError", "versions": "5+",
"about": "Each topic", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "5+", "about": "The unique topic ID" },
{ "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "5+",
"about": "Each partition."}
]}
],
"commonStructs": [
{ "name": "LeaderAndIsrPartitionError", "versions": "0+", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0-4", "entityType": "topicName", "ignorable": true,
"about": "The topic name."},
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
Expand Down
Loading