Skip to content

Commit

Permalink
Update Kafka wire protocol to 3.4.0 and implement KIP-699 and KIP-709 (
Browse files Browse the repository at this point in the history
…streamnative#1981)

(cherry picked from commit 71b77b2)

### Motivation

Update Kafka wire protocol to 3.4.0 and implement KIP-699 and KIP-709.

---------

Co-authored-by: Enrico Olivelli <enrico.olivelli@datastax.com>

(cherry picked from commit 19801c1)
  • Loading branch information
gaoran10 authored and Demogorgon314 committed Aug 15, 2023
1 parent 8c1da0a commit c846ccc
Show file tree
Hide file tree
Showing 26 changed files with 626 additions and 248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.message.FetchRequestData;

@Slf4j
public class DelayedFetch extends DelayedOperation {
Expand All @@ -33,7 +33,7 @@ public class DelayedFetch extends DelayedOperation {
private final long bytesReadable;
private final int fetchMaxBytes;
private final boolean readCommitted;
private final Map<TopicPartition, FetchRequest.PartitionData> readPartitionInfo;
private final Map<TopicPartition, FetchRequestData.FetchPartition> readPartitionInfo;
private final Map<TopicPartition, PartitionLog.ReadRecordsResult> readRecordsResult;
private final MessageFetchContext context;
protected volatile Boolean hasError;
Expand All @@ -55,7 +55,7 @@ public DelayedFetch(final long delayMs,
final boolean readCommitted,
final MessageFetchContext context,
final ReplicaManager replicaManager,
final Map<TopicPartition, FetchRequest.PartitionData> readPartitionInfo,
final Map<TopicPartition, FetchRequestData.FetchPartition> readPartitionInfo,
final Map<TopicPartition, PartitionLog.ReadRecordsResult> readRecordsResult,
final CompletableFuture<Map<TopicPartition, PartitionLog.ReadRecordsResult>> callback) {
super(delayMs, Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,14 @@ protected void writeAndFlushResponseToClient(Channel channel) {
request, response);
}

final ByteBuf result = responseToByteBuf(response, request, true);
final ByteBuf result;
try {
result = responseToByteBuf(response, request, true);
} catch (Throwable error) {
log.error("[{}] Failed to convert response {} to ByteBuf", channel, response, error);
sendErrorResponse(request, channel, error, true);
return;
}
final int resultSize = result.readableBytes();
channel.writeAndFlush(result).addListener(future -> {
if (response instanceof ResponseCallbackWrapper) {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.exceptions;

import java.io.Serial;

/**
* KoP topic load exception.
*/
public class KoPTopicInitializeException extends KoPBaseException {

@Serial
private static final long serialVersionUID = 0L;

public KoPTopicInitializeException(Throwable throwable) {
super(throwable);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.streamnative.pulsar.handlers.kop.MessagePublishContext;
import io.streamnative.pulsar.handlers.kop.PendingTopicFutures;
import io.streamnative.pulsar.handlers.kop.RequestStats;
import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicInitializeException;
import io.streamnative.pulsar.handlers.kop.exceptions.MetadataCorruptedException;
import io.streamnative.pulsar.handlers.kop.format.DecodeResult;
import io.streamnative.pulsar.handlers.kop.format.EncodeRequest;
Expand Down Expand Up @@ -82,13 +83,13 @@
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.broker.service.Topic;
Expand Down Expand Up @@ -173,15 +174,15 @@ public PartitionLog(KafkaServiceConfiguration kafkaConfig,
public CompletableFuture<PartitionLog> initialise() {
loadTopicProperties().whenComplete((___, errorLoadTopic) -> {
if (errorLoadTopic != null) {
initFuture.completeExceptionally(errorLoadTopic);
initFuture.completeExceptionally(new KoPTopicInitializeException(errorLoadTopic));
return;
}
if (kafkaConfig.isKafkaTransactionCoordinatorEnabled()) {
producerStateManager
.recover(this, recoveryExecutor)
.thenRun(() -> initFuture.complete(this))
.exceptionally(error -> {
initFuture.completeExceptionally(error);
initFuture.completeExceptionally(new KoPTopicInitializeException(error));
return null;
});
} else {
Expand Down Expand Up @@ -300,7 +301,7 @@ protected ReadRecordsResult newObject(Handle<ReadRecordsResult> handle) {
private final Recycler.Handle<ReadRecordsResult> recyclerHandle;

private DecodeResult decodeResult;
private List<FetchResponse.AbortedTransaction> abortedTransactions;
private List<FetchResponseData.AbortedTransaction> abortedTransactions;
private long highWatermark;
private long lastStableOffset;
private Position lastPosition;
Expand All @@ -317,7 +318,7 @@ public Errors errors() {
}

public static ReadRecordsResult get(DecodeResult decodeResult,
List<FetchResponse.AbortedTransaction> abortedTransactions,
List<FetchResponseData.AbortedTransaction> abortedTransactions,
long highWatermark,
long lastStableOffset,
Position lastPosition,
Expand All @@ -333,7 +334,7 @@ public static ReadRecordsResult get(DecodeResult decodeResult,
}

public static ReadRecordsResult get(DecodeResult decodeResult,
List<FetchResponse.AbortedTransaction> abortedTransactions,
List<FetchResponseData.AbortedTransaction> abortedTransactions,
long highWatermark,
long lastStableOffset,
Position lastPosition,
Expand Down Expand Up @@ -377,7 +378,7 @@ public static ReadRecordsResult error(Position position, Errors errors, Partitio
partitionLog);
}

public FetchResponse.PartitionData<Records> toPartitionData() {
public FetchResponseData.PartitionData toPartitionData() {

// There are three cases:
//
Expand All @@ -388,21 +389,20 @@ public FetchResponse.PartitionData<Records> toPartitionData() {
// 3. errors == Others error :
// Get errors.
if (errors != null) {
return new FetchResponse.PartitionData<>(
errors,
FetchResponse.INVALID_HIGHWATERMARK,
FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET,
null,
MemoryRecords.EMPTY);
return new FetchResponseData.PartitionData()
.setErrorCode(errors.code())
.setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK)
.setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
.setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET)
.setRecords(MemoryRecords.EMPTY);
}
return new FetchResponse.PartitionData<>(
Errors.NONE,
highWatermark,
lastStableOffset,
highWatermark, // TODO: should it be changed to the logStartOffset?
abortedTransactions,
decodeResult.getRecords());
return new FetchResponseData.PartitionData()
.setErrorCode(Errors.NONE.code())
.setHighWatermark(highWatermark)
.setLastStableOffset(lastStableOffset)
.setHighWatermark(highWatermark) // TODO: should it be changed to the logStartOffset?
.setAbortedTransactions(abortedTransactions)
.setRecords(decodeResult.getRecords());
}

public void recycle() {
Expand Down Expand Up @@ -469,7 +469,7 @@ public Optional<Long> firstUndecidedOffset() {
return producerStateManager.firstUndecidedOffset();
}

public List<FetchResponse.AbortedTransaction> getAbortedIndexList(long fetchOffset) {
public List<FetchResponseData.AbortedTransaction> getAbortedIndexList(long fetchOffset) {
return producerStateManager.getAbortedIndexList(fetchOffset);
}

Expand Down Expand Up @@ -552,14 +552,14 @@ public Position getLastPosition() {
return persistentTopic.getLastPosition();
}

public CompletableFuture<ReadRecordsResult> readRecords(final FetchRequest.PartitionData partitionData,
public CompletableFuture<ReadRecordsResult> readRecords(final FetchRequestData.FetchPartition partitionData,
final boolean readCommitted,
final AtomicLong limitBytes,
final int maxReadEntriesNum,
final MessageFetchContext context) {
final long startPrepareMetadataNanos = MathUtils.nowInNano();
final CompletableFuture<ReadRecordsResult> future = new CompletableFuture<>();
final long offset = partitionData.fetchOffset;
final long offset = partitionData.fetchOffset();
KafkaTopicManager topicManager = context.getTopicManager();
// The future that is returned by getTopicConsumerManager is always completed normally
topicManager.getTopicConsumerManager(fullPartitionName).thenAccept(tcm -> {
Expand Down Expand Up @@ -607,7 +607,7 @@ public CompletableFuture<ReadRecordsResult> readRecords(final FetchRequest.Parti

requestStats.getPrepareMetadataStats().registerSuccessfulEvent(
MathUtils.elapsedNanos(startPrepareMetadataNanos), TimeUnit.NANOSECONDS);
long adjustedMaxBytes = Math.min(partitionData.maxBytes, limitBytes.get());
long adjustedMaxBytes = Math.min(partitionData.partitionMaxBytes(), limitBytes.get());
if (readCommitted) {
long firstUndecidedOffset = producerStateManager.firstUndecidedOffset().orElse(-1L);
if (firstUndecidedOffset >= 0 && firstUndecidedOffset <= offset) {
Expand Down Expand Up @@ -690,7 +690,7 @@ private void registerPrepareMetadataFailedEvent(long startPrepareMetadataNanos)

private void handleEntries(final CompletableFuture<ReadRecordsResult> future,
final List<Entry> entries,
final FetchRequest.PartitionData partitionData,
final FetchRequestData.FetchPartition partitionData,
final KafkaTopicConsumerManager tcm,
final ManagedCursor cursor,
final boolean readCommitted,
Expand Down Expand Up @@ -732,9 +732,9 @@ private void handleEntries(final CompletableFuture<ReadRecordsResult> future,

// collect consumer metrics
decodeResult.updateConsumerStats(topicPartition, committedEntries.size(), groupName, requestStats);
List<FetchResponse.AbortedTransaction> abortedTransactions = null;
List<FetchResponseData.AbortedTransaction> abortedTransactions = null;
if (readCommitted) {
abortedTransactions = this.getAbortedIndexList(partitionData.fetchOffset);
abortedTransactions = this.getAbortedIndexList(partitionData.fetchOffset());
}
if (log.isDebugEnabled()) {
log.debug("Partition {} read entry completed in {} ns",
Expand Down Expand Up @@ -1217,6 +1217,7 @@ public CompletableFuture<Long> fetchOldestAvailableIndexFromTopic() {
fetchOldestAvailableIndexFromTopicReadNext(future, managedLedger, nextValidPosition);

return future;

}

private void fetchOldestAvailableIndexFromTopicReadNext(CompletableFuture<Long> future,
Expand Down Expand Up @@ -1281,7 +1282,7 @@ public CompletableFuture<Long> recoverTxEntries(
Executor executor) {
if (!kafkaConfig.isKafkaTransactionCoordinatorEnabled()) {
// no need to scan the topic, because transactions are disabled
return CompletableFuture.completedFuture(Long.valueOf(0));
return CompletableFuture.completedFuture(0L);
}
if (!isBrokerIndexMetadataInterceptorConfigured(persistentTopic.getBrokerService())) {
// The `UpgradeTest` will set the interceptor to null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.FetchResponse;

/**
* Producer state manager.
Expand Down Expand Up @@ -339,13 +339,15 @@ public long purgeAbortedTxns(long offset) {
return count.get();
}

public List<FetchResponse.AbortedTransaction> getAbortedIndexList(long fetchOffset) {
public List<FetchResponseData.AbortedTransaction> getAbortedIndexList(long fetchOffset) {
synchronized (abortedIndexList) {
List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
List<FetchResponseData.AbortedTransaction> abortedTransactions = new ArrayList<>();
for (AbortedTxn abortedTxn : abortedIndexList) {
if (abortedTxn.lastOffset() >= fetchOffset) {
abortedTransactions.add(
new FetchResponse.AbortedTransaction(abortedTxn.producerId(), abortedTxn.firstOffset()));
new FetchResponseData.AbortedTransaction()
.setProducerId(abortedTxn.producerId())
.setFirstOffset(abortedTxn.firstOffset()));
}
}
return abortedTransactions;
Expand All @@ -361,8 +363,7 @@ public void handleMissingDataBeforeRecovery(long minOffset, long snapshotOffset)
if (snapshotOffset < minOffset) {
log.info("{} handleMissingDataBeforeRecovery mapEndOffset {} snapshotOffset "
+ "{} minOffset {} RESETTING STATE",
topicPartition,
mapEndOffset, minOffset);
topicPartition, mapEndOffset, snapshotOffset, minOffset);
// topic was not empty (mapEndOffset has some value)
// but there is no more data on the topic (trimmed?)
ongoingTxns.clear();
Expand Down
Loading

0 comments on commit c846ccc

Please sign in to comment.