Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Fix KoP will cause Kafka Errors.REQUEST_TIMED_OUT when consume multi TopicPartition in one consumer request #654

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.MathUtils;
Expand Down Expand Up @@ -90,6 +91,7 @@ protected MessageFetchContext newObject(Handle<MessageFetchContext> handle) {
private FetchRequest fetchRequest;
private RequestHeader header;
private volatile CompletableFuture<AbstractResponse> resultFuture;
private AtomicBoolean hasComplete;

// recycler and get for this object
public static MessageFetchContext get(KafkaRequestHandler requestHandler,
Expand All @@ -107,6 +109,26 @@ public static MessageFetchContext get(KafkaRequestHandler requestHandler,
context.fetchRequest = (FetchRequest) kafkaHeaderAndRequest.getRequest();
context.header = kafkaHeaderAndRequest.getHeader();
context.resultFuture = resultFuture;
context.hasComplete = new AtomicBoolean(false);
return context;
}

//only used for unit test
public static MessageFetchContext getForTest(FetchRequest fetchRequest,
CompletableFuture<AbstractResponse> resultFuture) {
MessageFetchContext context = RECYCLER.get();
context.responseData = new ConcurrentHashMap<>();
context.decodeResults = new ConcurrentLinkedQueue<>();
context.requestHandler = null;
context.maxReadEntriesNum = 0;
context.topicManager = null;
context.statsLogger = null;
context.tc = null;
context.clientHost = null;
context.fetchRequest = fetchRequest;
context.header = null;
context.resultFuture = resultFuture;
context.hasComplete = new AtomicBoolean(false);
return context;
}

Expand All @@ -127,9 +149,22 @@ private void recycle() {
fetchRequest = null;
header = null;
resultFuture = null;
hasComplete = null;
recyclerHandle.recycle(this);
}

//only used for unit test
public void addErrorPartitionResponseForTest(TopicPartition topicPartition, Errors errors) {
responseData.put(topicPartition, new PartitionData<>(
errors,
FetchResponse.INVALID_HIGHWATERMARK,
FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET,
null,
MemoryRecords.EMPTY));
tryComplete();
}

private void addErrorPartitionResponse(TopicPartition topicPartition, Errors errors) {
responseData.put(topicPartition, new PartitionData<>(
errors,
Expand All @@ -142,7 +177,8 @@ private void addErrorPartitionResponse(TopicPartition topicPartition, Errors err
}

private void tryComplete() {
if (responseData.size() >= fetchRequest.fetchData().size()) {
if (resultFuture != null && responseData.size() >= fetchRequest.fetchData().size()
&& hasComplete.compareAndSet(false, true)) {
complete();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop;

import static org.testng.Assert.assertFalse;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.ResponseCallbackWrapper;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.collections.Maps;

public class MessageFetchContextTest {

private static final Map<TopicPartition, FetchRequest.PartitionData> fetchData = Maps.newHashMap();

private static CompletableFuture<AbstractResponse> resultFuture = null;

FetchRequest fetchRequest = FetchRequest.Builder
.forConsumer(0, 0, fetchData).build();

private static final String topicName = "test-fetch";
private static final TopicPartition tp1 = new TopicPartition(topicName, 1);
private static final TopicPartition tp2 = new TopicPartition(topicName, 2);

private static volatile MessageFetchContext messageFetchContext = null;

@BeforeMethod
protected void setup() throws Exception {
fetchData.put(tp1, null);
fetchData.put(tp2, null);
resultFuture = new CompletableFuture<>();
messageFetchContext = MessageFetchContext.getForTest(fetchRequest, resultFuture);
}

private void startThreads() throws Exception {
Thread run1 = new Thread(() -> {
// For the synchronous method, we can check it only once,
// because if there is still has problem, it will eventually become a flaky test,
// If it does not become a flaky test, then we can keep this
messageFetchContext.addErrorPartitionResponseForTest(tp1, Errors.NONE);
});

Thread run2 = new Thread(() -> {
// As comment described in run1, we can check it only once.
messageFetchContext.addErrorPartitionResponseForTest(tp2, Errors.NONE);
});

run1.start();
run2.start();
run1.join();
run2.join();
}

private void startAndGetResult(AtomicReference<Set<Errors>> errorsSet)
throws Exception {

BiConsumer<AbstractResponse, Throwable> action = (response, exception) -> {
ResponseCallbackWrapper responseCallbackWrapper = (ResponseCallbackWrapper) response;
Set<Errors> currentErrors = errorsSet.get();
currentErrors.addAll(responseCallbackWrapper.errorCounts().keySet());
errorsSet.set(currentErrors);
};

startThreads();
resultFuture.whenComplete(action);
}

// Run the actually modified code logic in MessageFetchContext
// to avoid changing the MessageFetchContext in the future
// and failing to catch possible errors.
// We need to ensure that resultFuture.complete has no REQUEST_TIMED_OUT error.
@Test
public void testHandleFetchSafe() throws Exception {
AtomicReference<Set<Errors>> errorsSet = new AtomicReference<>(new HashSet<>());
startAndGetResult(errorsSet);
assertFalse(errorsSet.get().contains(Errors.REQUEST_TIMED_OUT));
}

}