Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Fix KoP will cause Kafka Errors.REQUEST_TIMED_OUT when consume multi …
Browse files Browse the repository at this point in the history
…TopicPartition in one consumer request (#654)

Fixes #604 

### Motivation
When consume multi TopicPartition in one request, in MessageFetchContext.handleFetch(), tryComplete() may enter race condition
When one topicPartition removed from responseData, maybe removed again, which will cause Errors.REQUEST_TIMED_OUT

ReadEntries and CompletableFuture.complete operations for each partition are all performed by BookKeeperClientWorker- Different threads in the OrderedExecutor thread pool are executed. When the partition can read data, because the read data and decode operations will take uncertain time, the competition in this case is relatively weak; and when the partition has no data to write, and the consumer After all the data has been consumed, I have been making empty fetch requests, which can be reproduced stably at this time.
Stable steps to reproduce:

A single broker has two partition leaders for one topic;
The topic is not writing data, and consumers have consumed the old data;
At this time, the consumer client continues to send Fetch requests to broker;
Basically, you will soon see that the server returns error_code=7, and the client will down。
1、One fetch request, two partitions, and two threads. The data obtained is an empty set without any protocol conversion operation.
2、When the BookKeeperClientWorker-OrderedExecutor-25-0 thread adds test_kop_222-1 to the responseData, BookKeeperClientWorker-OrderedExecutor-23- 0 thread adds test_kop_222-3 to responseData,
3、at this time responseData.size() >= fetchRequest.fetchData().size(), because tryComplete has no synchronization operation, two threads enter at the same time,
4、fetchRequest.fetchData().keySet() .forEach two threads traverse at the same time, resulting in the same partition multiple times responseData.remove(topicPartition), partitionData is null and cause the REQUEST_TIMED_OUT error.

![image](https://user-images.githubusercontent.com/35599757/129462871-37fbfc6f-1603-4da8-9815-95a278195936.png)

### Modifications
`MessageFetchContext.tryComplete` add synchronization lock
  • Loading branch information
wenbingshen authored Aug 16, 2021
1 parent 2274524 commit df436e3
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.MathUtils;
Expand Down Expand Up @@ -90,6 +91,7 @@ protected MessageFetchContext newObject(Handle<MessageFetchContext> handle) {
private FetchRequest fetchRequest;
private RequestHeader header;
private volatile CompletableFuture<AbstractResponse> resultFuture;
private AtomicBoolean hasComplete;

// recycler and get for this object
public static MessageFetchContext get(KafkaRequestHandler requestHandler,
Expand All @@ -107,6 +109,26 @@ public static MessageFetchContext get(KafkaRequestHandler requestHandler,
context.fetchRequest = (FetchRequest) kafkaHeaderAndRequest.getRequest();
context.header = kafkaHeaderAndRequest.getHeader();
context.resultFuture = resultFuture;
context.hasComplete = new AtomicBoolean(false);
return context;
}

//only used for unit test
public static MessageFetchContext getForTest(FetchRequest fetchRequest,
CompletableFuture<AbstractResponse> resultFuture) {
MessageFetchContext context = RECYCLER.get();
context.responseData = new ConcurrentHashMap<>();
context.decodeResults = new ConcurrentLinkedQueue<>();
context.requestHandler = null;
context.maxReadEntriesNum = 0;
context.topicManager = null;
context.statsLogger = null;
context.tc = null;
context.clientHost = null;
context.fetchRequest = fetchRequest;
context.header = null;
context.resultFuture = resultFuture;
context.hasComplete = new AtomicBoolean(false);
return context;
}

Expand All @@ -127,9 +149,22 @@ private void recycle() {
fetchRequest = null;
header = null;
resultFuture = null;
hasComplete = null;
recyclerHandle.recycle(this);
}

//only used for unit test
public void addErrorPartitionResponseForTest(TopicPartition topicPartition, Errors errors) {
responseData.put(topicPartition, new PartitionData<>(
errors,
FetchResponse.INVALID_HIGHWATERMARK,
FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET,
null,
MemoryRecords.EMPTY));
tryComplete();
}

private void addErrorPartitionResponse(TopicPartition topicPartition, Errors errors) {
responseData.put(topicPartition, new PartitionData<>(
errors,
Expand All @@ -142,7 +177,8 @@ private void addErrorPartitionResponse(TopicPartition topicPartition, Errors err
}

private void tryComplete() {
if (responseData.size() >= fetchRequest.fetchData().size()) {
if (resultFuture != null && responseData.size() >= fetchRequest.fetchData().size()
&& hasComplete.compareAndSet(false, true)) {
complete();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop;

import static org.testng.Assert.assertFalse;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.ResponseCallbackWrapper;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.collections.Maps;

public class MessageFetchContextTest {

private static final Map<TopicPartition, FetchRequest.PartitionData> fetchData = Maps.newHashMap();

private static CompletableFuture<AbstractResponse> resultFuture = null;

FetchRequest fetchRequest = FetchRequest.Builder
.forConsumer(0, 0, fetchData).build();

private static final String topicName = "test-fetch";
private static final TopicPartition tp1 = new TopicPartition(topicName, 1);
private static final TopicPartition tp2 = new TopicPartition(topicName, 2);

private static volatile MessageFetchContext messageFetchContext = null;

@BeforeMethod
protected void setup() throws Exception {
fetchData.put(tp1, null);
fetchData.put(tp2, null);
resultFuture = new CompletableFuture<>();
messageFetchContext = MessageFetchContext.getForTest(fetchRequest, resultFuture);
}

private void startThreads() throws Exception {
Thread run1 = new Thread(() -> {
// For the synchronous method, we can check it only once,
// because if there is still has problem, it will eventually become a flaky test,
// If it does not become a flaky test, then we can keep this
messageFetchContext.addErrorPartitionResponseForTest(tp1, Errors.NONE);
});

Thread run2 = new Thread(() -> {
// As comment described in run1, we can check it only once.
messageFetchContext.addErrorPartitionResponseForTest(tp2, Errors.NONE);
});

run1.start();
run2.start();
run1.join();
run2.join();
}

private void startAndGetResult(AtomicReference<Set<Errors>> errorsSet)
throws Exception {

BiConsumer<AbstractResponse, Throwable> action = (response, exception) -> {
ResponseCallbackWrapper responseCallbackWrapper = (ResponseCallbackWrapper) response;
Set<Errors> currentErrors = errorsSet.get();
currentErrors.addAll(responseCallbackWrapper.errorCounts().keySet());
errorsSet.set(currentErrors);
};

startThreads();
resultFuture.whenComplete(action);
}

// Run the actually modified code logic in MessageFetchContext
// to avoid changing the MessageFetchContext in the future
// and failing to catch possible errors.
// We need to ensure that resultFuture.complete has no REQUEST_TIMED_OUT error.
@Test
public void testHandleFetchSafe() throws Exception {
AtomicReference<Set<Errors>> errorsSet = new AtomicReference<>(new HashSet<>());
startAndGetResult(errorsSet);
assertFalse(errorsSet.get().contains(Errors.REQUEST_TIMED_OUT));
}

}

0 comments on commit df436e3

Please sign in to comment.