Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Fix many non-durable cursors created on a topic with multiple groups #695

Conversation

BewareMyPower
Copy link
Collaborator

Motivation

KoP's KafkaTopicConsumerManager (aka TCM) maintains some non-durable cursors and the associated offsets for a consumer on a specific topic. If the fetch offset doesn't exist in TCM, TCM will create a non-durable cursor whose position is associated with the offset. Each time a message is consumed, the offset and cursor pair will be removed.

Currently there's a global map consumerTopicManagers whose key is topic name and value is the future of TCM. However, for a topic with multiple consumer groups (subscriptions), all consumers share the same TCM. There's a great possibility that different consumers fetch different offsets concurrently from the same TCM. In this case, a lot of non-durable cursors could be created.

Modifications

Add a singleton class KafkaTopicConsumerManagerCache to manage TCMs. The internal cache has two keys. The first key is the topic name, the second key is the remote address to identify different consumers on the same topic.

To ensure only one non-durable cursor is created for a TCM if no reconnection happened, this PR adds a field numCreatedCursors to record the total count of non-durable cursor creations. testCursorCountForMultiGroups was added to verify this behavior, it creates 5 consumers to consume the same topic with different group id in parallel. After consuming completed, check all TCMs to ensure each TCM's numCreatedCursors is 1.

@BewareMyPower BewareMyPower merged commit ddd443e into streamnative:master Sep 1, 2021
@BewareMyPower BewareMyPower deleted the bewaremypower/fix-many-cursors branch September 1, 2021 14:02
BewareMyPower added a commit that referenced this pull request Sep 1, 2021
…695)

KoP's `KafkaTopicConsumerManager` (aka TCM) maintains some non-durable cursors and the associated offsets for a consumer on a specific topic. If the fetch offset doesn't exist in TCM, TCM will create a non-durable cursor whose position is associated with the offset. Each time a message is consumed, the offset and cursor pair will be removed.

Currently there's a global map `consumerTopicManagers` whose key is topic name and value is the future of TCM. However, for a topic with multiple consumer groups (subscriptions), all consumers share the same TCM. There's a great possibility that different consumers fetch different offsets concurrently from the same TCM. In this case, a lot of non-durable cursors could be created.

Add a singleton class `KafkaTopicConsumerManagerCache` to manage TCMs. The internal cache has two keys. The first key is the topic name, the second key is the remote address to identify different consumers on the same topic.

To ensure only one non-durable cursor is created for a TCM if no reconnection happened, this PR adds a field `numCreatedCursors` to record the total count of non-durable cursor creations. `testCursorCountForMultiGroups` was added to verify this behavior, it creates 5 consumers to consume the same topic with different group id in parallel. After consuming completed, check all TCMs to ensure each TCM's `numCreatedCursors` is 1.

Solve the conflict by #692 that
updates the `ctx` field of `ServerCnx`.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants