Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Fix KoP will cause Kafka Errors.REQUEST_TIMED_OUT when consume multi TopicPartition in one consumer request #654

Merged

Conversation

wenbingshen
Copy link
Contributor

Fixes #604

Motivation

When consume multi TopicPartition in one request, in MessageFetchContext.handleFetch(), tryComplete() may enter race condition
When one topicPartition removed from responseData, maybe removed again, which will cause Errors.REQUEST_TIMED_OUT

ReadEntries and CompletableFuture.complete operations for each partition are all performed by BookKeeperClientWorker- Different threads in the OrderedExecutor thread pool are executed. When the partition can read data, because the read data and decode operations will take uncertain time, the competition in this case is relatively weak; and when the partition has no data to write, and the consumer After all the data has been consumed, I have been making empty fetch requests, which can be reproduced stably at this time.
Stable steps to reproduce:

A single broker has two partition leaders for one topic;
The topic is not writing data, and consumers have consumed the old data;
At this time, the consumer client continues to send Fetch requests to broker;
Basically, you will soon see that the server returns error_code=7, and the client will down。
1、One fetch request, two partitions, and two threads. The data obtained is an empty set without any protocol conversion operation.
2、When the BookKeeperClientWorker-OrderedExecutor-25-0 thread adds test_kop_222-1 to the responseData, BookKeeperClientWorker-OrderedExecutor-23- 0 thread adds test_kop_222-3 to responseData,
3、at this time responseData.size() >= fetchRequest.fetchData().size(), because tryComplete has no synchronization operation, two threads enter at the same time,
4、fetchRequest.fetchData().keySet() .forEach two threads traverse at the same time, resulting in the same partition multiple times responseData.remove(topicPartition), partitionData is null and cause the REQUEST_TIMED_OUT error.

image

Modifications

MessageFetchContext.tryComplete add synchronization lock

…Context.handleFetch(), tryComplete() may enter race condition

When one topicPartition removed from responseData, maybe removed again, which will cause Errors.REQUEST_TIMED_OUT
@wenbingshen
Copy link
Contributor Author

@BewareMyPower Hi, Yunze brother, how about your think of this issue? PTAL. :)

Copy link
Collaborator

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great find. Just leave some suggestions.

  1. We use TestNG in KoP, please replace JUnit with TestNG.
  2. MessageFetchContextTest doesn't rely on the KopProtocolHandlerTestBase, so it should be put under kafka-impl/tests instead of the tests/ directory.
  3. MessageFetchContextTest doesn't test MessageFetchContext directly, it looks like a simulation. i.e, if MessageFetchContext was modified in future, this test would still pass so it couldn't protect the code.

@wenbingshen
Copy link
Contributor Author

@BewareMyPower Thanks for your review very much. I will modify it to TestNG and move it to the correct directory, I will consider modifying the test cases to protect the code. Thanks. :)

@wenbingshen
Copy link
Contributor Author

@BewareMyPower Hi, Yunze brother, I have addressed your comment. PTAL :)

@BewareMyPower
Copy link
Collaborator

It looks like your test cannot pass, see kop mvn build check and kafka-impl test / build (pull_request).

[INFO] Running io.streamnative.pulsar.handlers.kop.MessageFetchContextTest
Error: The operation was canceled.

@BewareMyPower
Copy link
Collaborator

If it's not convenient to add the test, it will be acceptable to not add the test.

The race condition happens when complete() method is called by two threads. It's because responseData.size() and responseData.add() are not an atomic operation.

@wenbingshen
Copy link
Contributor Author

@BewareMyPower
Take two partitions as an example, after tryComplete adds the synchronization lock,
The first situation:
If after the first thread responseData.put, the second thread immediately follows responseData.put, and the first thread just enters tryComplete, the judgment (responseData.size() >= fetchRequest.fetchData().size()) condition is satisfied , The first thread enters complete, and finally recycles many object resources, which will set responseData to null, and the second thread enters tryComplete will cause a null pointer exception;
The second case:
If after the first thread responseData.put, the first thread enters tryComplete first, and the judgment (responseData.size() >= fetchRequest.fetchData().size()) condition is not met, the first thread will not enter complete , And at this time the second thread responseData.put, (responseData.size() >= fetchRequest.fetchData().size()) conditions are met, and complete is executed, and everything is executed normally at this time.

Therefore, we need to make a null pointer judgment on responseData in tryComplete.

image

image

…micBoolean to avoid thread blocking & fix test failed
@BewareMyPower
Copy link
Collaborator

You're right, I think

        if (resultFuture == null) {
            // the context has been recycled
            return;
        }

in complete() method should be moved to tryComplete().

@wenbingshen
Copy link
Contributor Author

@BewareMyPower for Codacy Static Code Analysis error, what should I do, please teach me. :)

@wenbingshen
Copy link
Contributor Author

@BewareMyPower All checks have passed, PTAL :)

@BewareMyPower
Copy link
Collaborator

The test can be simplified. MessageFetchContextTest compares MessageFetchContext and MessageFetchContextTest itself. When isSafe is false, addErrorPartitionResponse only operates on the fields of MessageFetchContextTest, this behavior is not related to any code.

IMO, only testHandleFetchSafe is required because if MessageFetchContext is not thread safe, this test will fail. We don't need to add a contrast implementation to show what the wrong implementation is.

@wenbingshen
Copy link
Contributor Author

The test can be simplified. MessageFetchContextTest compares MessageFetchContext and MessageFetchContextTest itself. When isSafe is false, addErrorPartitionResponse only operates on the fields of MessageFetchContextTest, this behavior is not related to any code.

IMO, only testHandleFetchSafe is required because if MessageFetchContext is not thread safe, this test will fail. We don't need to add a contrast implementation to show what the wrong implementation is.

@BewareMyPower Gotcha! I have addressed your comment. PTAL

@BewareMyPower
Copy link
Collaborator

I still have questions about the test. When I modified the tryComplete() back to the original implementation:

    private void tryComplete() {
        if (responseData.size() >= fetchRequest.fetchData().size()) {
            complete();
        }
    }

The test can still pass in my local env.

@wenbingshen
Copy link
Contributor Author

@BewareMyPower I set invocationCount = 5000 locally and copied REQUEST_TIMED_OUT once and this.responseData is empty three times. I didn’t set invocationCount = 5000 in pr because I think if the code has concurrency problems, it will definitely become a flaky test in future code submissions. , What do you think should be done?

image

image

image

@BewareMyPower
Copy link
Collaborator

Okay, it makes sense.

@BewareMyPower BewareMyPower merged commit df436e3 into streamnative:master Aug 16, 2021
BewareMyPower pushed a commit that referenced this pull request Aug 19, 2021
…TopicPartition in one consumer request (#654)

Fixes #604 

### Motivation
When consume multi TopicPartition in one request, in MessageFetchContext.handleFetch(), tryComplete() may enter race condition
When one topicPartition removed from responseData, maybe removed again, which will cause Errors.REQUEST_TIMED_OUT

ReadEntries and CompletableFuture.complete operations for each partition are all performed by BookKeeperClientWorker- Different threads in the OrderedExecutor thread pool are executed. When the partition can read data, because the read data and decode operations will take uncertain time, the competition in this case is relatively weak; and when the partition has no data to write, and the consumer After all the data has been consumed, I have been making empty fetch requests, which can be reproduced stably at this time.
Stable steps to reproduce:

A single broker has two partition leaders for one topic;
The topic is not writing data, and consumers have consumed the old data;
At this time, the consumer client continues to send Fetch requests to broker;
Basically, you will soon see that the server returns error_code=7, and the client will down。
1、One fetch request, two partitions, and two threads. The data obtained is an empty set without any protocol conversion operation.
2、When the BookKeeperClientWorker-OrderedExecutor-25-0 thread adds test_kop_222-1 to the responseData, BookKeeperClientWorker-OrderedExecutor-23- 0 thread adds test_kop_222-3 to responseData,
3、at this time responseData.size() >= fetchRequest.fetchData().size(), because tryComplete has no synchronization operation, two threads enter at the same time,
4、fetchRequest.fetchData().keySet() .forEach two threads traverse at the same time, resulting in the same partition multiple times responseData.remove(topicPartition), partitionData is null and cause the REQUEST_TIMED_OUT error.

![image](https://user-images.githubusercontent.com/35599757/129462871-37fbfc6f-1603-4da8-9815-95a278195936.png)

### Modifications
`MessageFetchContext.tryComplete` add synchronization lock
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] KoP will cause Kafka Errors.REQUEST_TIMED_OUT when consume multi TopicPartition in one comsume request
2 participants