Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker]Consumer can't consume messages because there has two sames topics in one broker #17526

Merged

Conversation

poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Sep 7, 2022

Motivation

With the transaction feature, we send and receive messages, and at the same time, execute admin API: unload namespace 1000 times. Then the problem occur: Consumer could not receive any message, and there has no error log. After that we tried admin API: get topic stats, and the response showed only producers are registered on topic, and no consumers are registered on topic, but consumer stat is Ready in the client. This means that the state of the consumer is inconsistent between the broker and the client.

Location problem

Then we found the problem: Two PersistentTopic which have the same name registered at a broker node, consumer registered on one (aka topic-c), and producer registered on another one (aka topic-p). At this time, when we send messages, the data flow like this :

client: producer sends a message

broker: handle cmd-send

broker: find the topic by name, it is "topic-p"

broker: find all subscriptions registered on "topic-p"

broker: found one subscription, but it has no consumers registered

broker: no need to send the message to the client

But the consumer exactly registered on another topic: topic-c, so consumer could not receive any message.

Repreduce

How to reproduce two topics registered at the same broker node ?

Make transaction buffer recover, admin unload namespace, client create consumer, client create producer executed at the same time, the process flow like this (at the step-11, the problem occurs ):

Time transaction buffer recoverr admin unload namespace client create consumer client create producer
1 TB recover
2 TB recover failure topic.unload
3 topic.close(false) topic.close(true)
4 brokerService.topics.remove(topicName)
5 remove topic finish lookup
6 create topic-c
7 consumer registered on topic-c
8 brokerService.topics.remove(topic)
9 remove topic-c finish lookup
10 create topic-p
11 producer registered on topic-p
  • Each column means the individual process. e.g. client create consumer, client create producer.
  • Multiple processes are going on at the same time, and all effet the brokerService.topics.
  • Column Time is used only to indicate the order of each step, not the actual time.
  • The important steps are explained below:

step 3

Even if persistent topic propertyisClosingOrDeleting have already changed to true, it still can be executed another once, see line-1247:

public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
lock.writeLock().lock();
try {
// closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
// forcefully wants to close managed-ledger without waiting all resources to be closed.
if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
fenceTopicToCloseOrDelete();
} else {

Whether close can be executed depends on two predicates: is closing or @param closeWithoutWaitingClientDisconnect is true. This means that method topic.close can be reentrant executed when @param closeWithoutWaitingClientDisconnect is true, and in the implementation of admin API: unload namespace the parameter closeWithoutWaitingClientDisconnect is exactly true.

public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, long timeout, TimeUnit timeoutUnit) {
return unloadNamespaceBundle(bundle, timeout, timeoutUnit, true);
}

So when transaction buffer recover fail and admin unload namespace is executed at the same time, and transaction buffer recover fail before admin unload namespace, the topic will be removed from brokerService.topics twice.

step-4 / step-8

Because of the current implementation of BrokerService. removeTopicFromCache use cmd map.remove(key), not use map.remove(key, value), So this cmd can remove any value in the map, even if it's not the desired one.

To sum up: We should make these two changes:

  • Make method topic.close non-reentrant. Also prevent reentrant between topic.close and topic.delete.
  • Use cmd map.remove(key, value) instead of map.remove(key) in implementation of BrokerService. removeTopicFromCache. This change will apply to both scenes: topic.close and topic.delete.

Modifications

Documentation

  • doc-required

  • doc-not-needed

  • doc

  • doc-complete

Matching PR in forked repository

PR in forked repository(All check passed):

@poorbarcode
Copy link
Contributor Author

This PR should merge into these branches:

  • branch-2.8
  • branch-2.9
  • branch-2.10
  • branch-2.11
  • master

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Sep 7, 2022
@poorbarcode
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

Copy link
Contributor

@liangyepianzhou liangyepianzhou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work!

if (!createTopicFuture.isDone()){
return CompletableFuture.completedFuture(null);
}
return createTopicFuture.thenCompose(topicOptional -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The createTopicFuture might be completed with the exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point.

I have covered this logic branch: If the future in cache has exception complete, the topic instance in the cache is not the same as the @param topic, so the delete will return success

@poorbarcode poorbarcode force-pushed the fix/topic_repeat_registry_split_1 branch from d7475f6 to 33e4f11 Compare September 14, 2022 15:57
@poorbarcode
Copy link
Contributor Author

rebase master

@poorbarcode poorbarcode requested review from codelipenghui and removed request for Technoboy- and gaoran10 September 14, 2022 15:59
@poorbarcode
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

@poorbarcode
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

@codelipenghui codelipenghui merged commit 260f5c6 into apache:master Sep 21, 2022
@poorbarcode poorbarcode deleted the fix/topic_repeat_registry_split_1 branch September 21, 2022 14:36
Technoboy- pushed a commit that referenced this pull request Sep 26, 2022
@Technoboy- Technoboy- modified the milestones: 2.12.0, 2.11.0 Sep 26, 2022
Jason918 pushed a commit that referenced this pull request Sep 26, 2022
…mes topics in one broker (#17526)

(cherry picked from commit 260f5c6)
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Sep 28, 2022
…mes topics in one broker (apache#17526)

(cherry picked from commit 260f5c6)
(cherry picked from commit ddd642e)
congbobo184 pushed a commit that referenced this pull request Nov 14, 2022
…mes topics in one broker (#17526)

(cherry picked from commit 260f5c6)
@congbobo184 congbobo184 added the cherry-picked/branch-2.9 Archived: 2.9 is end of life label Nov 14, 2022
congbobo184 pushed a commit that referenced this pull request Nov 26, 2022
…mes topics in one broker (#17526)

(cherry picked from commit 260f5c6)
lhotari added a commit to lhotari/pulsar that referenced this pull request Jun 7, 2023
… race conditions

- solution was introduced in apache#17526
- however, it was accidentially replaced with a call to the incorrect
  method signature in apache#17736
Copy link
Contributor

@heesung-sn heesung-sn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we use the same thread to update states of the same pulsar resource?

I think we can make the same thread update the same resource to minimize update conflicts.

One idea is to use resource update task queues and have one thread consume and run the tasks for the same topic.

For example,
// topic op task pub
TopicRemoveTask task = new TopicRemoveTask()
Int Qid = hash(topic)
Queues(qid).add(task) // we can have a set data structure to dedup the same topic operations

//topic op task consume
TopicTask task= Queues(this.thread.id).poll()
run(task);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants