Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Update Kafka wire protocol to 3.4.0 and implement KIP-699 and KIP-709 #1981

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.message.FetchRequestData;

@Slf4j
public class DelayedFetch extends DelayedOperation {
Expand All @@ -33,7 +33,7 @@ public class DelayedFetch extends DelayedOperation {
private final long bytesReadable;
private final int fetchMaxBytes;
private final boolean readCommitted;
private final Map<TopicPartition, FetchRequest.PartitionData> readPartitionInfo;
private final Map<TopicPartition, FetchRequestData.FetchPartition> readPartitionInfo;
private final Map<TopicPartition, PartitionLog.ReadRecordsResult> readRecordsResult;
private final MessageFetchContext context;
protected volatile Boolean hasError;
Expand All @@ -55,7 +55,7 @@ public DelayedFetch(final long delayMs,
final boolean readCommitted,
final MessageFetchContext context,
final ReplicaManager replicaManager,
final Map<TopicPartition, FetchRequest.PartitionData> readPartitionInfo,
final Map<TopicPartition, FetchRequestData.FetchPartition> readPartitionInfo,
final Map<TopicPartition, PartitionLog.ReadRecordsResult> readRecordsResult,
final CompletableFuture<Map<TopicPartition, PartitionLog.ReadRecordsResult>> callback) {
super(delayMs, Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,14 @@ protected void writeAndFlushResponseToClient(Channel channel) {
request, response);
}

final ByteBuf result = responseToByteBuf(response, request, true);
final ByteBuf result;
try {
result = responseToByteBuf(response, request, true);
} catch (Throwable error) {
log.error("[{}] Failed to convert response {} to ByteBuf", channel, response, error);
sendErrorResponse(request, channel, error, true);
return;
}
final int resultSize = result.readableBytes();
channel.writeAndFlush(result).addListener(future -> {
if (response instanceof ResponseCallbackWrapper) {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.exceptions;

import java.io.Serial;

/**
* KoP topic load exception.
*/
public class KoPTopicInitializeException extends KoPBaseException {

@Serial
private static final long serialVersionUID = 0L;

public KoPTopicInitializeException(Throwable throwable) {
super(throwable);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package io.streamnative.pulsar.handlers.kop.storage;

import static io.streamnative.pulsar.handlers.kop.utils.MessageMetadataUtils.isBrokerIndexMetadataInterceptorConfigured;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
Expand All @@ -26,6 +28,7 @@
import io.streamnative.pulsar.handlers.kop.MessagePublishContext;
import io.streamnative.pulsar.handlers.kop.PendingTopicFutures;
import io.streamnative.pulsar.handlers.kop.RequestStats;
import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicInitializeException;
import io.streamnative.pulsar.handlers.kop.exceptions.MetadataCorruptedException;
import io.streamnative.pulsar.handlers.kop.format.DecodeResult;
import io.streamnative.pulsar.handlers.kop.format.EncodeRequest;
Expand Down Expand Up @@ -76,12 +79,12 @@
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.broker.service.Topic;
Expand Down Expand Up @@ -165,15 +168,15 @@ public PartitionLog(KafkaServiceConfiguration kafkaConfig,
public CompletableFuture<PartitionLog> initialise() {
loadTopicProperties().whenComplete((___, errorLoadTopic) -> {
if (errorLoadTopic != null) {
initFuture.completeExceptionally(errorLoadTopic);
initFuture.completeExceptionally(new KoPTopicInitializeException(errorLoadTopic));
return;
}
if (kafkaConfig.isKafkaTransactionCoordinatorEnabled()) {
producerStateManager
.recover(this, recoveryExecutor)
.thenRun(() -> initFuture.complete(this))
.exceptionally(error -> {
initFuture.completeExceptionally(error);
initFuture.completeExceptionally(new KoPTopicInitializeException(error));
return null;
});
} else {
Expand Down Expand Up @@ -291,7 +294,7 @@ protected ReadRecordsResult newObject(Handle<ReadRecordsResult> handle) {
private final Recycler.Handle<ReadRecordsResult> recyclerHandle;

private DecodeResult decodeResult;
private List<FetchResponse.AbortedTransaction> abortedTransactions;
private List<FetchResponseData.AbortedTransaction> abortedTransactions;
private long highWatermark;
private long lastStableOffset;
private Position lastPosition;
Expand All @@ -308,7 +311,7 @@ public Errors errors() {
}

public static ReadRecordsResult get(DecodeResult decodeResult,
List<FetchResponse.AbortedTransaction> abortedTransactions,
List<FetchResponseData.AbortedTransaction> abortedTransactions,
long highWatermark,
long lastStableOffset,
Position lastPosition,
Expand All @@ -324,7 +327,7 @@ public static ReadRecordsResult get(DecodeResult decodeResult,
}

public static ReadRecordsResult get(DecodeResult decodeResult,
List<FetchResponse.AbortedTransaction> abortedTransactions,
List<FetchResponseData.AbortedTransaction> abortedTransactions,
long highWatermark,
long lastStableOffset,
Position lastPosition,
Expand Down Expand Up @@ -368,7 +371,7 @@ public static ReadRecordsResult error(Position position, Errors errors, Partitio
partitionLog);
}

public FetchResponse.PartitionData<Records> toPartitionData() {
public FetchResponseData.PartitionData toPartitionData() {

// There are three cases:
//
Expand All @@ -379,21 +382,20 @@ public FetchResponse.PartitionData<Records> toPartitionData() {
// 3. errors == Others error :
// Get errors.
if (errors != null) {
return new FetchResponse.PartitionData<>(
errors,
FetchResponse.INVALID_HIGHWATERMARK,
FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET,
null,
MemoryRecords.EMPTY);
return new FetchResponseData.PartitionData()
.setErrorCode(errors.code())
.setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK)
.setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
.setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET)
.setRecords(MemoryRecords.EMPTY);
}
return new FetchResponse.PartitionData<>(
Errors.NONE,
highWatermark,
lastStableOffset,
highWatermark, // TODO: should it be changed to the logStartOffset?
abortedTransactions,
decodeResult.getRecords());
return new FetchResponseData.PartitionData()
.setErrorCode(Errors.NONE.code())
.setHighWatermark(highWatermark)
.setLastStableOffset(lastStableOffset)
.setHighWatermark(highWatermark) // TODO: should it be changed to the logStartOffset?
.setAbortedTransactions(abortedTransactions)
.setRecords(decodeResult.getRecords());
}

public void recycle() {
Expand Down Expand Up @@ -460,7 +462,7 @@ public Optional<Long> firstUndecidedOffset() {
return producerStateManager.firstUndecidedOffset();
}

public List<FetchResponse.AbortedTransaction> getAbortedIndexList(long fetchOffset) {
public List<FetchResponseData.AbortedTransaction> getAbortedIndexList(long fetchOffset) {
return producerStateManager.getAbortedIndexList(fetchOffset);
}

Expand Down Expand Up @@ -537,14 +539,14 @@ public Position getLastPosition() {
return persistentTopic.getLastPosition();
}

public CompletableFuture<ReadRecordsResult> readRecords(final FetchRequest.PartitionData partitionData,
public CompletableFuture<ReadRecordsResult> readRecords(final FetchRequestData.FetchPartition partitionData,
final boolean readCommitted,
final AtomicLong limitBytes,
final int maxReadEntriesNum,
final MessageFetchContext context) {
final long startPrepareMetadataNanos = MathUtils.nowInNano();
final CompletableFuture<ReadRecordsResult> future = new CompletableFuture<>();
final long offset = partitionData.fetchOffset;
final long offset = partitionData.fetchOffset();
KafkaTopicManager topicManager = context.getTopicManager();
// The future that is returned by getTopicConsumerManager is always completed normally
topicManager.getTopicConsumerManager(fullPartitionName).thenAccept(tcm -> {
Expand Down Expand Up @@ -592,7 +594,7 @@ public CompletableFuture<ReadRecordsResult> readRecords(final FetchRequest.Parti

requestStats.getPrepareMetadataStats().registerSuccessfulEvent(
MathUtils.elapsedNanos(startPrepareMetadataNanos), TimeUnit.NANOSECONDS);
long adjustedMaxBytes = Math.min(partitionData.maxBytes, limitBytes.get());
long adjustedMaxBytes = Math.min(partitionData.partitionMaxBytes(), limitBytes.get());
if (readCommitted) {
long firstUndecidedOffset = producerStateManager.firstUndecidedOffset().orElse(-1L);
if (firstUndecidedOffset >= 0 && firstUndecidedOffset <= offset) {
Expand Down Expand Up @@ -674,7 +676,7 @@ private void registerPrepareMetadataFailedEvent(long startPrepareMetadataNanos)

private void handleEntries(final CompletableFuture<ReadRecordsResult> future,
final List<Entry> entries,
final FetchRequest.PartitionData partitionData,
final FetchRequestData.FetchPartition partitionData,
final KafkaTopicConsumerManager tcm,
final ManagedCursor cursor,
final boolean readCommitted,
Expand Down Expand Up @@ -716,9 +718,9 @@ private void handleEntries(final CompletableFuture<ReadRecordsResult> future,

// collect consumer metrics
decodeResult.updateConsumerStats(topicPartition, committedEntries.size(), groupName, requestStats);
List<FetchResponse.AbortedTransaction> abortedTransactions = null;
List<FetchResponseData.AbortedTransaction> abortedTransactions = null;
if (readCommitted) {
abortedTransactions = this.getAbortedIndexList(partitionData.fetchOffset);
abortedTransactions = this.getAbortedIndexList(partitionData.fetchOffset());
}
if (log.isDebugEnabled()) {
log.debug("Partition {} read entry completed in {} ns",
Expand Down Expand Up @@ -1133,14 +1135,29 @@ public CompletableFuture<Long> fetchOldestAvailableIndexFromTopic() {
// look for the first entry with data
PositionImpl nextValidPosition = managedLedger.getNextValidPosition(firstPosition);

managedLedger.asyncReadEntry(nextValidPosition, new AsyncCallbacks.ReadEntryCallback() {
fetchOldestAvailableIndexFromTopicReadNext(future, managedLedger, nextValidPosition);

return future;

}

private void fetchOldestAvailableIndexFromTopicReadNext(CompletableFuture<Long> future,
ManagedLedgerImpl managedLedger, PositionImpl position) {
managedLedger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
try {
long startOffset = MessageMetadataUtils.peekBaseOffsetFromEntry(entry);
log.info("First offset for topic {} is {} - position {}", fullPartitionName,
startOffset, entry.getPosition());
future.complete(startOffset);
} catch (MetadataCorruptedException.NoBrokerEntryMetadata noBrokerEntryMetadata) {
long currentOffset = MessageMetadataUtils.getCurrentOffset(managedLedger);
log.info("Legacy entry for topic {} - position {} - returning current offset {}",
fullPartitionName,
entry.getPosition(),
currentOffset);
future.complete(currentOffset);
} catch (Exception err) {
future.completeExceptionally(err);
} finally {
Expand All @@ -1153,9 +1170,6 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);

return future;

}

@VisibleForTesting
Expand Down Expand Up @@ -1189,7 +1203,18 @@ public CompletableFuture<Long> recoverTxEntries(
Executor executor) {
if (!kafkaConfig.isKafkaTransactionCoordinatorEnabled()) {
// no need to scan the topic, because transactions are disabled
return CompletableFuture.completedFuture(Long.valueOf(0));
return CompletableFuture.completedFuture(0L);
}
if (!isBrokerIndexMetadataInterceptorConfigured(persistentTopic.getBrokerService())) {
Copy link
Contributor Author

@gaoran10 gaoran10 Aug 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't have this check, the UpgradeTest will encounter an NPE problem while getting the current offset in method fetchOldestAvailableIndexFromTopic, but we can't disable the Kafka transaction in that test right now, because newly Kafka client requires the producer id.

Actually, I think this check is necessary because the Kafka transaction can't work without continuously offset, maybe we can add this check while starting the broker. @Demogorgon314

// The `UpgradeTest` will set the interceptor to null,
// this will cause NPE problem while `fetchOldestAvailableIndexFromTopic`,
// but we can't disable kafka transaction,
// currently transaction coordinator must set to true (Newly Kafka client requirement).
// TODO Actually, if the AppendIndexMetadataInterceptor is not set, the kafka transaction can't work,
// we need to throw an exception, maybe we need add a new configuration for ProducerId.
log.error("The broker index metadata interceptor is not configured for topic {}, skip recover txn entries.",
fullPartitionName);
return CompletableFuture.completedFuture(0L);
}
return fetchOldestAvailableIndexFromTopic().thenCompose((minOffset -> {
log.info("start recoverTxEntries for {} at offset {} minOffset {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.FetchResponse;

/**
* Producer state manager.
Expand Down Expand Up @@ -339,13 +339,15 @@ public long purgeAbortedTxns(long offset) {
return count.get();
}

public List<FetchResponse.AbortedTransaction> getAbortedIndexList(long fetchOffset) {
public List<FetchResponseData.AbortedTransaction> getAbortedIndexList(long fetchOffset) {
synchronized (abortedIndexList) {
List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
List<FetchResponseData.AbortedTransaction> abortedTransactions = new ArrayList<>();
for (AbortedTxn abortedTxn : abortedIndexList) {
if (abortedTxn.lastOffset() >= fetchOffset) {
abortedTransactions.add(
new FetchResponse.AbortedTransaction(abortedTxn.producerId(), abortedTxn.firstOffset()));
new FetchResponseData.AbortedTransaction()
.setProducerId(abortedTxn.producerId())
.setFirstOffset(abortedTxn.firstOffset()));
}
}
return abortedTransactions;
Expand All @@ -361,8 +363,7 @@ public void handleMissingDataBeforeRecovery(long minOffset, long snapshotOffset)
if (snapshotOffset < minOffset) {
log.info("{} handleMissingDataBeforeRecovery mapEndOffset {} snapshotOffset "
+ "{} minOffset {} RESETTING STATE",
topicPartition,
mapEndOffset, minOffset);
topicPartition, mapEndOffset, snapshotOffset, minOffset);
// topic was not empty (mapEndOffset has some value)
// but there is no more data on the topic (trimmed?)
ongoingTxns.clear();
Expand Down
Loading
Loading