-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[pulsar-broker] clean up active consumer on already closed connection #13196
Conversation
@rdhabalia:Thanks for your contribution. For this PR, do we need to update docs? |
consumer.close(); | ||
} | ||
} catch (Exception be) { | ||
log.info("Failed to clean up consumer on closed connection {}, {}", consumer, be.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason this log is info level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's just info and doesn't have to be critical. therefore, log level is info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just want to make sure that if it's an exception goes here, the problem you mentioned won't occur,
as "subsequent consumer creation requests are keep failing" is totally worth to be an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me. It seems like a bandaid. I think we should see how we landed in this situation. Perhaps we should have clean the consumer at the connection close itself. If there is a race condition we should the race condition itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Jason918 changed to error log level.
@pkumar-singh this part handles resiliency to avoid consumer unavailability due to any kind of race condition.
@rdhabalia is this a bug fix? (do not need to update docs?) |
Yes, a bug fix. |
ping. PR is ready to merge. |
ping |
consumer = subscriptionFuture.isDone() ? getActiveConsumer(subscriptionFuture.get()) : null; | ||
// cleanup consumer if connection is already closed | ||
if (consumer != null && !consumer.cnx().isActive()) { | ||
consumer.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I my understanding, this consumer should be closed elsewhere in normal situation. This means there is a chance that consumer.close
wii be called multi times. I think we should add closed state check in consumer.close
to avoid other unexpected issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to check closed state because if consumer connection is not active and if consumer is still pretends as an active consumer then it requires cleanup by closing it immedaitely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if consumer connection is not active and if consumer is still pretends as an active consumer then it requires cleanup by closing it immedaitely.
I get what you are trying to do, and I am +1 for this.
But do you agree that consumer.close
may be called in two threads at the same time?
One is here, and the other may be in org.apache.pulsar.broker.service.ServerCnx#channelInactive
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it won't happen in this code path. however, in any case it would be safe to call close for already closed consumer,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
however, in any case it would be safe to call close for already closed consumer,
Great to know. Thx
ping. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
ping. can someone please review the PR? |
latch.countDown(); | ||
}).subscribe(); | ||
} catch (Exception e) { | ||
// Ok |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we perform some checks on this Exception? At least match the Java class.
Otherwise here we can pass the tests even for other kinds of bugs
…#13196) ### Motivation We have been frequently seeing an issue with an exclusive subscription where the broker doesn't clean up consumers on closed on connection and because of that exclusive consumer keeps failing with `ConsumerBusyException` even though none of the consumers is actually connected to the broker. Below log example shows that connection is closed but still broker couldn't clean up consumer in race condition when consumer quickly disconnects after creating the subscription and subsequent consumer creation requests are keep failing. ``` 23:30:16.896 [pulsar-io-23-42] INFO org.apache.pulsar.broker.service.ServerCnx - [/10.11.12.13:60851] Subscribing on topic persistent://my-prop/my-cluster/ns/topic / sub1 : 23:30:17.223 [pulsar-io-23-42] INFO org.apache.pulsar.broker.service.ServerCnx - [/10.11.12.13:60851] Closed consumer, consumerId=20 : 23:30:17.291 [bookkeeper-ml-workers-OrderedExecutor-1-0] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://my-prop/my-cluster/ns/topic][sub1] Created new subscription for 25 : 23:30:17.301 [pulsar-io-23-42] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /10.11.12.13:60851 : 23:30:17.302 [pulsar-io-23-42] INFO org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=PersistentSubscription{topic=persistent://my-prop/my-cluster/ns/topic, name=sub1}, consumerId=21, consumerName=c2302, address=/10.11.12.13:60851} : 23:30:17.302 [bookkeeper-ml-workers-OrderedExecutor-1-0] INFO org.apache.pulsar.broker.service.ServerCnx - [/10.11.12.13:60851] Created subscription on topic persistent://my-prop/my-cluster/ns/topic / sub1 : : : 23:30:17.496 [pulsar-io-23-36] WARN org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://my-prop/my-cluster/ns/topic][sub1] Consumer 25 7a977 already connected 23:30:17.885 [pulsar-io-23-36] INFO org.apache.pulsar.broker.service.ServerCnx - [/10.11.12.13:60853] Subscribing on topic persistent://my-prop/my-cluster/ns/topic / sub1 23:30:17.886 [pulsar-io-23-36] WARN org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://my-prop/my-cluster/ns/topic][sub1] Consumer 25 7a977 already connected 23:30:18.637 [pulsar-io-23-36] INFO org.apache.pulsar.broker.service.ServerCnx - [/10.11.12.13:60853] Subscribing on topic persistent://my-prop/my-cluster/ns/topic / sub1 ``` ### Modification Broker should clean up consumer if connection is already closed and allow consumer to reconnect as an active consumer. (cherry picked from commit 496afa7)
Motivation
We have been frequently seeing an issue with an exclusive subscription where the broker doesn't clean up consumers on closed on connection and because of that exclusive consumer keeps failing with
ConsumerBusyException
even though none of the consumers is actually connected to the broker.Below log example shows that connection is closed but still broker couldn't clean up consumer in race condition when consumer quickly disconnects after creating the subscription and subsequent consumer creation requests are keep failing.
Modification
Broker should clean up consumer if connection is already closed and allow consumer to reconnect as an active consumer.