Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Update Kafka wireprotocol to 3.4.0 and implement KIP-699 and KIP-709
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed Mar 7, 2023
1 parent e4931e1 commit 71b77b2
Show file tree
Hide file tree
Showing 28 changed files with 909 additions and 340 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.message.FetchRequestData;

@Slf4j
public class DelayedFetch extends DelayedOperation {
Expand All @@ -33,7 +33,7 @@ public class DelayedFetch extends DelayedOperation {
private final long bytesReadable;
private final int fetchMaxBytes;
private final boolean readCommitted;
private final Map<TopicPartition, FetchRequest.PartitionData> readPartitionInfo;
private final Map<TopicPartition, FetchRequestData.FetchPartition> readPartitionInfo;
private final Map<TopicPartition, PartitionLog.ReadRecordsResult> readRecordsResult;
private final MessageFetchContext context;
protected volatile Boolean hasError;
Expand All @@ -55,7 +55,7 @@ public DelayedFetch(final long delayMs,
final boolean readCommitted,
final MessageFetchContext context,
final ReplicaManager replicaManager,
final Map<TopicPartition, FetchRequest.PartitionData> readPartitionInfo,
final Map<TopicPartition, FetchRequestData.FetchPartition> readPartitionInfo,
final Map<TopicPartition, PartitionLog.ReadRecordsResult> readRecordsResult,
final CompletableFuture<Map<TopicPartition, PartitionLog.ReadRecordsResult>> callback) {
super(delayMs, Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,14 @@ protected void writeAndFlushResponseToClient(Channel channel) {
this, request, response);
}

final ByteBuf result = responseToByteBuf(response, request, true);
final ByteBuf result;
try {
result = responseToByteBuf(response, request, true);
} catch (Throwable error) {
log.error("[{}] Failed to convert response {} to ByteBuf", channel, response, error);
sendErrorResponse(request, channel, error, true);
return;
}
final int resultSize = result.readableBytes();
channel.writeAndFlush(result).addListener(future -> {
if (response instanceof ResponseCallbackWrapper) {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.broker.service.Topic;
Expand Down Expand Up @@ -304,7 +304,7 @@ protected ReadRecordsResult newObject(Handle<ReadRecordsResult> handle) {
private final Recycler.Handle<ReadRecordsResult> recyclerHandle;

private DecodeResult decodeResult;
private List<FetchResponse.AbortedTransaction> abortedTransactions;
private List<FetchResponseData.AbortedTransaction> abortedTransactions;
private long highWatermark;
private long lastStableOffset;
private Position lastPosition;
Expand All @@ -321,7 +321,7 @@ public Errors errors() {
}

public static ReadRecordsResult get(DecodeResult decodeResult,
List<FetchResponse.AbortedTransaction> abortedTransactions,
List<FetchResponseData.AbortedTransaction> abortedTransactions,
long highWatermark,
long lastStableOffset,
Position lastPosition,
Expand All @@ -337,7 +337,7 @@ public static ReadRecordsResult get(DecodeResult decodeResult,
}

public static ReadRecordsResult get(DecodeResult decodeResult,
List<FetchResponse.AbortedTransaction> abortedTransactions,
List<FetchResponseData.AbortedTransaction> abortedTransactions,
long highWatermark,
long lastStableOffset,
Position lastPosition,
Expand Down Expand Up @@ -368,7 +368,7 @@ public static ReadRecordsResult error(Position position, Errors errors, Partitio
partitionLog);
}

public FetchResponse.PartitionData<Records> toPartitionData() {
public FetchResponseData.PartitionData toPartitionData() {

// There are three cases:
//
Expand All @@ -379,21 +379,20 @@ public FetchResponse.PartitionData<Records> toPartitionData() {
// 3. errors == Others error :
// Get errors.
if (errors != null) {
return new FetchResponse.PartitionData<>(
errors,
FetchResponse.INVALID_HIGHWATERMARK,
FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET,
null,
MemoryRecords.EMPTY);
return new FetchResponseData.PartitionData()
.setErrorCode(errors.code())
.setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK)
.setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
.setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET)
.setRecords(MemoryRecords.EMPTY);
}
return new FetchResponse.PartitionData<>(
Errors.NONE,
highWatermark,
lastStableOffset,
highWatermark, // TODO: should it be changed to the logStartOffset?
abortedTransactions,
decodeResult.getRecords());
return new FetchResponseData.PartitionData()
.setErrorCode(Errors.NONE.code())
.setHighWatermark(highWatermark)
.setLastStableOffset(lastStableOffset)
.setHighWatermark(highWatermark) // TODO: should it be changed to the logStartOffset?
.setAbortedTransactions(abortedTransactions)
.setRecords(decodeResult.getRecords());
}

public void recycle() {
Expand Down Expand Up @@ -460,7 +459,7 @@ public Optional<Long> firstUndecidedOffset() {
return producerStateManager.firstUndecidedOffset();
}

public List<FetchResponse.AbortedTransaction> getAbortedIndexList(long fetchOffset) {
public List<FetchResponseData.AbortedTransaction> getAbortedIndexList(long fetchOffset) {
return producerStateManager.getAbortedIndexList(fetchOffset);
}

Expand Down Expand Up @@ -538,14 +537,14 @@ public Position getLastPosition() {
return persistentTopic.getLastPosition();
}

public CompletableFuture<ReadRecordsResult> readRecords(final FetchRequest.PartitionData partitionData,
public CompletableFuture<ReadRecordsResult> readRecords(final FetchRequestData.FetchPartition partitionData,
final boolean readCommitted,
final AtomicLong limitBytes,
final int maxReadEntriesNum,
final MessageFetchContext context) {
final long startPrepareMetadataNanos = MathUtils.nowInNano();
final CompletableFuture<ReadRecordsResult> future = new CompletableFuture<>();
final long offset = partitionData.fetchOffset;
final long offset = partitionData.fetchOffset();
KafkaTopicManager topicManager = context.getTopicManager();
// The future that is returned by getTopicConsumerManager is always completed normally
topicManager.getTopicConsumerManager(fullPartitionName).thenAccept(tcm -> {
Expand Down Expand Up @@ -593,7 +592,7 @@ public CompletableFuture<ReadRecordsResult> readRecords(final FetchRequest.Parti

requestStats.getPrepareMetadataStats().registerSuccessfulEvent(
MathUtils.elapsedNanos(startPrepareMetadataNanos), TimeUnit.NANOSECONDS);
long adjustedMaxBytes = Math.min(partitionData.maxBytes, limitBytes.get());
long adjustedMaxBytes = Math.min(partitionData.partitionMaxBytes(), limitBytes.get());
readEntries(cursor, topicPartition, cursorOffset, maxReadEntriesNum, adjustedMaxBytes,
fullPartitionName -> {
topicManager.invalidateCacheForFencedManagerLedgerOnTopic(fullPartitionName);
Expand Down Expand Up @@ -661,7 +660,7 @@ private void registerPrepareMetadataFailedEvent(long startPrepareMetadataNanos)

private void handleEntries(final CompletableFuture<ReadRecordsResult> future,
final List<Entry> entries,
final FetchRequest.PartitionData partitionData,
final FetchRequestData.FetchPartition partitionData,
final KafkaTopicConsumerManager tcm,
final ManagedCursor cursor,
final boolean readCommitted,
Expand Down Expand Up @@ -707,9 +706,9 @@ private void handleEntries(final CompletableFuture<ReadRecordsResult> future,

// collect consumer metrics
decodeResult.updateConsumerStats(topicPartition, committedEntries.size(), groupName, requestStats);
List<FetchResponse.AbortedTransaction> abortedTransactions = null;
List<FetchResponseData.AbortedTransaction> abortedTransactions = null;
if (readCommitted) {
abortedTransactions = this.getAbortedIndexList(partitionData.fetchOffset);
abortedTransactions = this.getAbortedIndexList(partitionData.fetchOffset());
}
if (log.isDebugEnabled()) {
log.debug("Partition {} read entry completed in {} ns",
Expand Down Expand Up @@ -1125,14 +1124,29 @@ public CompletableFuture<Long> fetchOldestAvailableIndexFromTopic() {
// look for the first entry with data
PositionImpl nextValidPosition = managedLedger.getNextValidPosition(firstPosition);

managedLedger.asyncReadEntry(nextValidPosition, new AsyncCallbacks.ReadEntryCallback() {
fetchOldestAvailableIndexFromTopicReadNext(future, managedLedger, nextValidPosition);

return future;

}

private void fetchOldestAvailableIndexFromTopicReadNext(CompletableFuture<Long> future,
ManagedLedgerImpl managedLedger, PositionImpl position) {
managedLedger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
try {
long startOffset = MessageMetadataUtils.peekBaseOffsetFromEntry(entry);
log.info("First offset for topic {} is {} - position {}", fullPartitionName,
startOffset, entry.getPosition());
future.complete(startOffset);
} catch (MetadataCorruptedException.NoBrokerEntryMetadata noBrokerEntryMetadata) {
long currentOffset = MessageMetadataUtils.getCurrentOffset(managedLedger);
log.info("Legacy entry for topic {} - position {} - returning current offset {}",
fullPartitionName,
entry.getPosition(),
currentOffset);
future.complete(currentOffset);
} catch (Exception err) {
future.completeExceptionally(err);
} finally {
Expand All @@ -1145,9 +1159,6 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);

return future;

}

@VisibleForTesting
Expand Down Expand Up @@ -1180,7 +1191,8 @@ public CompletableFuture<Long> forcePurgeAbortTx() {
public CompletableFuture<Long> recoverTxEntries(
long offset,
Executor executor) {
if (!kafkaConfig.isKafkaTransactionCoordinatorEnabled()) {
if (!kafkaConfig.isKafkaTransactionCoordinatorEnabled()
|| !MessageMetadataUtils.isInterceptorConfigured(persistentTopic.getManagedLedger())) {
// no need to scan the topic, because transactions are disabled
return CompletableFuture.completedFuture(Long.valueOf(0));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.FetchResponse;

/**
* Producer state manager.
Expand Down Expand Up @@ -355,13 +355,15 @@ public long purgeAbortedTxns(long offset) {
return count.get();
}

public List<FetchResponse.AbortedTransaction> getAbortedIndexList(long fetchOffset) {
public List<FetchResponseData.AbortedTransaction> getAbortedIndexList(long fetchOffset) {
synchronized (abortedIndexList) {
List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
List<FetchResponseData.AbortedTransaction> abortedTransactions = new ArrayList<>();
for (AbortedTxn abortedTxn : abortedIndexList) {
if (abortedTxn.lastOffset() >= fetchOffset) {
abortedTransactions.add(
new FetchResponse.AbortedTransaction(abortedTxn.producerId(), abortedTxn.firstOffset()));
new FetchResponseData.AbortedTransaction()
.setProducerId(abortedTxn.producerId())
.setFirstOffset(abortedTxn.firstOffset()));
}
}
return abortedTransactions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
Expand Down Expand Up @@ -236,7 +236,7 @@ public CompletableFuture<Map<TopicPartition, PartitionLog.ReadRecordsResult>> fe
final long timeout,
final int fetchMinBytes,
final int fetchMaxBytes,
final ConcurrentHashMap<TopicPartition, FetchRequest.PartitionData> fetchInfos,
final ConcurrentHashMap<TopicPartition, FetchRequestData.FetchPartition> fetchInfos,
final IsolationLevel isolationLevel,
final MessageFetchContext context) {
CompletableFuture<Map<TopicPartition, PartitionLog.ReadRecordsResult>> future =
Expand Down Expand Up @@ -290,7 +290,7 @@ public CompletableFuture<Map<TopicPartition, PartitionLog.ReadRecordsResult>> re
final boolean readCommitted,
final int fetchMaxBytes,
final int maxReadEntriesNum,
final Map<TopicPartition, FetchRequest.PartitionData> readPartitionInfo,
final Map<TopicPartition, FetchRequestData.FetchPartition> readPartitionInfo,
final MessageFetchContext context) {
AtomicLong limitBytes = new AtomicLong(fetchMaxBytes);
CompletableFuture<Map<TopicPartition, PartitionLog.ReadRecordsResult>> resultFuture = new CompletableFuture<>();
Expand Down
Loading

0 comments on commit 71b77b2

Please sign in to comment.