Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
add produce/fetch timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
dockerzhang committed May 13, 2021
1 parent a50af46 commit 9c88929
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop;

import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

/**
* A delayed create topic operation that is stored in the topic purgatory.
*/
class DelayedProduceAndFetch extends DelayedOperation {

private final AtomicInteger topicPartitionNum;
private final Runnable callback;

DelayedProduceAndFetch(long delayMs, AtomicInteger topicPartitionNum, Runnable callback) {
super(delayMs, Optional.empty());
this.topicPartitionNum = topicPartitionNum;
this.callback = callback;
}

@Override
public void onExpiration() {
callback.run();
}

@Override
public void onComplete() {
callback.run();
}

@Override
public boolean tryComplete() {
if (topicPartitionNum.get() <= 0) {
forceComplete();
return true;
} else {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import io.streamnative.pulsar.handlers.kop.utils.OffsetFinder;
import io.streamnative.pulsar.handlers.kop.utils.ZooKeeperUtils;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory;
import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
Expand Down Expand Up @@ -196,6 +200,17 @@ public class KafkaRequestHandler extends KafkaCommandDecoder {
// key is the topic(partition), value is the future that indicates whether the PersistentTopic instance of the key
// is found.
private final Map<TopicPartition, PendingTopicFutures> pendingTopicFuturesMap = new ConcurrentHashMap<>();
// DelayedOperation for produce and fetch
private final DelayedOperationPurgatory<DelayedOperation> producePurgatory =
DelayedOperationPurgatory.<DelayedOperation>builder()
.purgatoryName("produce")
.timeoutTimer(SystemTimer.builder().executorName("produce").build())
.build();
private final DelayedOperationPurgatory<DelayedOperation> fetchPurgatory =
DelayedOperationPurgatory.<DelayedOperation>builder()
.purgatoryName("fetch")
.timeoutTimer(SystemTimer.builder().executorName("fetch").build())
.build();

// Flag to manage throttling-publish-buffer by atomically enable/disable read-channel.
private final long maxPendingBytes;
Expand Down Expand Up @@ -267,6 +282,8 @@ protected void close() {
log.info("currentConnectedGroup remove {}", clientHost);
currentConnectedGroup.remove(clientHost);
}
producePurgatory.shutdown();
fetchPurgatory.shutdown();
}
}

Expand Down Expand Up @@ -740,11 +757,33 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar,
final int numPartitions = produceRequest.partitionRecordsOrFail().size();

final Map<TopicPartition, PartitionResponse> responseMap = new ConcurrentHashMap<>();
final CompletableFuture<Void> produceFuture = new CompletableFuture<>();
// delay produce
final AtomicInteger topicPartitionNum = new AtomicInteger(produceRequest.partitionRecordsOrFail().size());
int timeoutMs = produceRequest.timeout();
Runnable complete = () -> {
topicPartitionNum.set(0);
// add the topicPartition with timeout error if it's not existed in responseMap
produceRequest.partitionRecordsOrFail().keySet().forEach(topicPartition -> {
if (!responseMap.containsKey(topicPartition)) {
responseMap.put(topicPartition, new PartitionResponse(Errors.REQUEST_TIMED_OUT));
}
});
if (log.isDebugEnabled()) {
log.debug("[{}] Request {}: Complete handle produce.", ctx.channel(), produceHar.toString());
}
requestStats.getHandleProduceRequestStats()
.registerFailedEvent(MathUtils.elapsedNanos(startProduceNanos), TimeUnit.NANOSECONDS);
resultFuture.complete(new ProduceResponse(responseMap));
};
BiConsumer<TopicPartition, PartitionResponse> addPartitionResponse = (topicPartition, response) -> {
responseMap.put(topicPartition, response);
if (responseMap.size() == numPartitions) {
produceFuture.complete(null);
// reset topicPartitionNum
int restTopicPartitionNum = topicPartitionNum.decrementAndGet();
if (restTopicPartitionNum < 0) {
return;
}
if (restTopicPartitionNum == 0) {
complete.run();
}
};

Expand Down Expand Up @@ -809,16 +848,16 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar,
exceptionConsumer.accept(e);
}
});

produceFuture.thenApply(ignored -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Request {}: Complete handle produce.", ctx.channel(), produceHar.toString());
}
requestStats.getHandleProduceRequestStats()
.registerFailedEvent(MathUtils.elapsedNanos(startProduceNanos), TimeUnit.NANOSECONDS);
resultFuture.complete(new ProduceResponse(responseMap));
return null;
});
// delay produce
if (timeoutMs <= 0) {
complete.run();
} else {
List<Object> delayedCreateKeys =
produceRequest.partitionRecordsOrFail().keySet().stream()
.map(DelayedOperationKey.TopicPartitionOperationKey::new).collect(Collectors.toList());
DelayedProduceAndFetch delayedProduce = new DelayedProduceAndFetch(timeoutMs, topicPartitionNum, complete);
producePurgatory.tryCompleteElseWatch(delayedProduce, delayedCreateKeys);
}
}

protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinator,
Expand Down Expand Up @@ -1261,7 +1300,7 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch,
}

MessageFetchContext fetchContext = MessageFetchContext.get(this);
fetchContext.handleFetch(resultFuture, fetch, transactionCoordinator);
fetchContext.handleFetch(resultFuture, fetch, transactionCoordinator, fetchPurgatory);
}

protected void handleJoinGroupRequest(KafkaHeaderAndRequest joinGroup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import io.streamnative.pulsar.handlers.kop.utils.ZooKeeperUtils;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -97,7 +99,8 @@ public void recycle() {
public CompletableFuture<AbstractResponse> handleFetch(
CompletableFuture<AbstractResponse> fetchResponse,
KafkaHeaderAndRequest fetchRequest,
TransactionCoordinator transactionCoordinator) {
TransactionCoordinator transactionCoordinator,
DelayedOperationPurgatory<DelayedOperation> fetchPurgatory) {
final long startPreparingMetadataNanos = MathUtils.nowInNano();
LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();

Expand Down Expand Up @@ -177,7 +180,7 @@ public CompletableFuture<AbstractResponse> handleFetch(
MathUtils.elapsedNanos(startPreparingMetadataNanos), TimeUnit.NANOSECONDS);

readMessages(fetchRequest, partitionCursor, fetchResponse, responseData,
transactionCoordinator, highWaterMarkMap);
transactionCoordinator, highWaterMarkMap, fetchPurgatory);
});

return fetchResponse;
Expand All @@ -191,13 +194,14 @@ private void readMessages(KafkaHeaderAndRequest fetch,
CompletableFuture<AbstractResponse> resultFuture,
LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData,
TransactionCoordinator tc,
Map<TopicPartition, Long> highWaterMarkMap) {
Map<TopicPartition, Long> highWaterMarkMap,
DelayedOperationPurgatory<DelayedOperation> fetchPurgatory) {
AtomicInteger bytesRead = new AtomicInteger(0);
Map<TopicPartition, List<Entry>> entryValues = new ConcurrentHashMap<>();

final long startReadingTotalMessagesNanos = MathUtils.nowInNano();
readMessagesInternal(fetch, cursors, bytesRead, entryValues, resultFuture, responseData, tc, highWaterMarkMap,
startReadingTotalMessagesNanos);
startReadingTotalMessagesNanos, fetchPurgatory);
}

private void readMessagesInternal(KafkaHeaderAndRequest fetch,
Expand All @@ -208,11 +212,44 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch,
LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData,
TransactionCoordinator tc,
Map<TopicPartition, Long> highWaterMarkMap,
long startReadingTotalMessagesNanos) {
long startReadingTotalMessagesNanos,
DelayedOperationPurgatory<DelayedOperation> fetchPurgatory) {

AtomicInteger entriesRead = new AtomicInteger(0);
// here do the real read, and in read callback put cursor back to KafkaTopicConsumerManager.
Map<TopicPartition, CompletableFuture<List<Entry>>> readFutures = readAllCursorOnce(cursors);
// delay fetch
FetchRequest fetchRequestRequest = (FetchRequest) fetch.getRequest();
List<DecodeResult> decodeResults = new ArrayList<>();
final AtomicInteger topicPartitionNum = new AtomicInteger(fetchRequestRequest.fetchData().entrySet().size());
int timeoutMs = fetchRequestRequest.maxWait();
Runnable complete = () -> {
topicPartitionNum.set(0);
// add the topicPartition with timeout error if it's not existed in responseData
fetchRequestRequest.fetchData().keySet().forEach(topicPartition -> {
if (!responseData.containsKey(topicPartition)) {
responseData.put(topicPartition, new FetchResponse.PartitionData<>(
Errors.REQUEST_TIMED_OUT,
FetchResponse.INVALID_HIGHWATERMARK,
FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET,
null,
MemoryRecords.EMPTY));
}
});
resultFuture.complete(
new ResponseCallbackWrapper(
new FetchResponse(
Errors.NONE,
responseData,
((Integer) THROTTLE_TIME_MS.defaultValue),
((FetchRequest) fetch.getRequest()).metadata().sessionId()),
() -> {
// release the batched ByteBuf if necessary
decodeResults.forEach(DecodeResult::release);
}));
this.recycle();
};
CompletableFuture.allOf(readFutures.values().stream().toArray(CompletableFuture<?>[]::new))
.whenComplete((ignore, ex) -> {

Expand Down Expand Up @@ -293,24 +330,20 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch,

int maxBytes = request.maxBytes();
int minBytes = request.minBytes();
int waitTime = request.maxWait(); // in ms
// if endTime <= 0, then no time wait, wait for minBytes.
long endTime = waitTime > 0 ? System.currentTimeMillis() + waitTime : waitTime;

int allSize = bytesRead.get();

if (log.isDebugEnabled()) {
log.debug("Request {}: One round read {} entries, "
+ "allSize/maxBytes/minBytes/endTime: {}/{}/{}/{}",
+ "allSize/maxBytes/minBytes: {}/{}/{}",
fetch.getHeader(), entriesRead.get(),
allSize, maxBytes, minBytes, new Date(endTime));
allSize, maxBytes, minBytes);
}

// all partitions read no entry, return earlier;
// reach maxTime, return;
// reach minBytes if no endTime, return;
if ((allSize == 0 && entriesRead.get() == 0)
|| (endTime > 0 && endTime <= System.currentTimeMillis())
|| allSize > minBytes
|| allSize > maxBytes){
if (log.isDebugEnabled()) {
Expand All @@ -320,7 +353,6 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch,
requestHandler.requestStats.getTotalMessageReadStats().registerSuccessfulEvent(
MathUtils.elapsedNanos(startReadingTotalMessagesNanos), TimeUnit.NANOSECONDS);

List<DecodeResult> decodeResults = new ArrayList<>();
responseValues.entrySet().forEach(responseEntries -> {
final PartitionData partitionData;
TopicPartition kafkaPartition = responseEntries.getKey();
Expand Down Expand Up @@ -400,28 +432,35 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch,
decodeResult.getRecords());
}
responseData.put(kafkaPartition, partitionData);
// reset topicPartitionNum
int restTopicPartitionNum = topicPartitionNum.decrementAndGet();
if (restTopicPartitionNum < 0) {
return;
}
if (restTopicPartitionNum == 0) {
complete.run();
}
});

resultFuture.complete(
new ResponseCallbackWrapper(
new FetchResponse(
Errors.NONE,
responseData,
((Integer) THROTTLE_TIME_MS.defaultValue),
((FetchRequest) fetch.getRequest()).metadata().sessionId()),
() -> {
// release the batched ByteBuf if necessary
decodeResults.forEach(DecodeResult::release);
}));
this.recycle();
// delay fetch
if (timeoutMs <= 0) {
complete.run();
} else {
List<Object> delayedCreateKeys =
fetchRequestRequest.fetchData().keySet().stream()
.map(DelayedOperationKey.TopicPartitionOperationKey::new)
.collect(Collectors.toList());
DelayedProduceAndFetch delayedFetch = new DelayedProduceAndFetch(timeoutMs,
topicPartitionNum, complete);
fetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedCreateKeys);
}
} else {
if (log.isDebugEnabled()) {
log.debug("Request {}: Read time or size not reach, do another round of read before return.",
fetch.getHeader());
}
// need do another round read
readMessagesInternal(fetch, cursors, bytesRead, responseValues, resultFuture, responseData,
tc, highWaterMarkMap, startReadingTotalMessagesNanos);
tc, highWaterMarkMap, startReadingTotalMessagesNanos, fetchPurgatory);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Accessors;
import org.apache.kafka.common.TopicPartition;

/**
* Delayed operation key.
Expand Down Expand Up @@ -86,12 +87,12 @@ public String keyLabel() {
@RequiredArgsConstructor
class TopicPartitionOperationKey implements DelayedOperationKey {

private final String topic;
private final int partition;
private final TopicPartition topicPartition;

@Override
public String keyLabel() {
return String.format("%s-%d", topic, partition);
return String.format("%s-%d", topicPartition.topic(),
topicPartition.partition());
}
}

Expand Down

0 comments on commit 9c88929

Please sign in to comment.