Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
add produce/fetch timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
dockerzhang committed May 11, 2021
1 parent d8f2bbd commit 0d364ed
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseCallbackWrapper;
import org.apache.kafka.common.requests.ResponseHeader;
Expand All @@ -63,11 +65,13 @@ public abstract class KafkaCommandDecoder extends ChannelInboundHandlerAdapter {
protected final RequestStats requestStats;
@Getter
protected final KafkaServiceConfiguration kafkaConfig;
private long requestTimeoutMs;

public KafkaCommandDecoder(StatsLogger statsLogger, KafkaServiceConfiguration kafkaConfig) {
this.requestStats = new RequestStats(statsLogger);
this.kafkaConfig = kafkaConfig;
this.requestQueue = new FakeArrayBlockingQueue(kafkaConfig.getMaxQueuedRequests());
this.requestTimeoutMs = kafkaConfig.getRequestTimeoutMs();
}

@Override
Expand Down Expand Up @@ -200,6 +204,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
break;
case PRODUCE:
handleProduceRequest(kafkaHeaderAndRequest, responseFuture);
// replace default request timeout
requestTimeoutMs = ((ProduceRequest) kafkaHeaderAndRequest.getRequest()).timeout();
break;
case FIND_COORDINATOR:
handleFindCoordinatorRequest(kafkaHeaderAndRequest, responseFuture);
Expand All @@ -215,6 +221,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
break;
case FETCH:
handleFetchRequest(kafkaHeaderAndRequest, responseFuture);
// replace default request timeout
requestTimeoutMs = ((FetchRequest) kafkaHeaderAndRequest.getRequest()).maxWait();
break;
case JOIN_GROUP:
handleJoinGroupRequest(kafkaHeaderAndRequest, responseFuture);
Expand Down Expand Up @@ -300,7 +308,7 @@ protected void writeAndFlushResponseToClient(Channel channel) {
final CompletableFuture<AbstractResponse> responseFuture = responseAndRequest.getResponseFuture();
final long nanoSecondsSinceCreated = responseAndRequest.nanoSecondsSinceCreated();
final boolean expired =
(nanoSecondsSinceCreated > TimeUnit.MILLISECONDS.toNanos(kafkaConfig.getRequestTimeoutMs()));
(nanoSecondsSinceCreated > TimeUnit.MILLISECONDS.toNanos(requestTimeoutMs));
if (!responseFuture.isDone() && !expired) {
// case 1: responseFuture is not completed or expired, stop polling responses from responseQueue
break;
Expand All @@ -316,7 +324,7 @@ protected void writeAndFlushResponseToClient(Channel channel) {
// case 2: responseFuture is expired
if (expired) {
log.error("[{}] request {} is not completed for {} ns (> {} ms)",
channel, request.getHeader(), nanoSecondsSinceCreated, kafkaConfig.getRequestTimeoutMs());
channel, request.getHeader(), nanoSecondsSinceCreated, requestTimeoutMs);
responseFuture.complete(null);
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import io.streamnative.pulsar.handlers.kop.utils.ZooKeeperUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -294,23 +293,20 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch,
int maxBytes = request.maxBytes();
int minBytes = request.minBytes();
int waitTime = request.maxWait(); // in ms
// if endTime <= 0, then no time wait, wait for minBytes.
long endTime = waitTime > 0 ? System.currentTimeMillis() + waitTime : waitTime;

int allSize = bytesRead.get();

if (log.isDebugEnabled()) {
log.debug("Request {}: One round read {} entries, "
+ "allSize/maxBytes/minBytes/endTime: {}/{}/{}/{}",
+ "allSize/maxBytes/minBytes: {}/{}/{}",
fetch.getHeader(), entriesRead.get(),
allSize, maxBytes, minBytes, new Date(endTime));
allSize, maxBytes, minBytes);
}

// all partitions read no entry, return earlier;
// reach maxTime, return;
// reach minBytes if no endTime, return;
if ((allSize == 0 && entriesRead.get() == 0)
|| (endTime > 0 && endTime <= System.currentTimeMillis())
|| allSize > minBytes
|| allSize > maxBytes){
if (log.isDebugEnabled()) {
Expand Down

0 comments on commit 0d364ed

Please sign in to comment.