Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drain mode: PC keeps polling messages after closeDrainFirst #552

Closed
dannpopescu opened this issue Feb 21, 2023 · 4 comments
Closed

Drain mode: PC keeps polling messages after closeDrainFirst #552

dannpopescu opened this issue Feb 21, 2023 · 4 comments

Comments

@dannpopescu
Copy link

Hello everyone. First of all, thanks for building this library.

I want to discuss something that I stumbled upon while experimenting with different modes to shutdown the PC. From my tests, the DRAIN mode doesn't work as expected: it keeps polling the messages even after calling closeDrainFirst.

I'm using the version 0.5.2.4. Below are some logs from my tests. I've posted full logs at https://justpaste.it/cj138.

First I initialize the PC:

14:12:36.770 [main] INFO  i.c.p.i.AbstractParallelEoSStreamProcessor -- Confluent Parallel Consumer initialise... groupId: test-group-id, Options: ParallelConsumerOptions(consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6492fab5, producer=null, managedExecutorService=java:comp/DefaultManagedExecutorService, managedThreadFactory=java:comp/DefaultManagedThreadFactory, allowEagerProcessingDuringTransactionCommit=false, commitLockAcquisitionTimeout=PT5M, produceLockAcquisitionTimeout=PT1M, commitInterval=PT0.3S, ordering=KEY, commitMode=PERIODIC_CONSUMER_SYNC, maxConcurrency=100, defaultMessageRetryDelay=PT1S, retryDelayProvider=null, sendTimeout=PT10S, offsetCommitTimeout=PT10S, batchSize=1, thresholdForTimeSpendInQueueWarning=PT10S, maxFailureHistory=10)

It polls 300 messages in batches of 10 then pauses the polling and processes for a while:

14:12:36.888 [pc-broker-poll] DEBUG i.c.p.internal.ConsumerManager -- Poll completed normally (after timeout of PT2S) and returned 10...
…
14:12:36.971 [pc-broker-poll] DEBUG i.c.p.internal.BrokerPollSystem -- Pausing subs
14:12:36.971 [pc-broker-poll] DEBUG i.c.p.internal.BrokerPollSystem -- Subscriptions are paused: true

Then comes the instruction to close and drain:

14:12:38.799 [main] INFO  i.c.p.i.AbstractParallelEoSStreamProcessor -- Signaling to close...
...
14:12:38.802 [pc-control] DEBUG i.c.p.i.AbstractParallelEoSStreamProcessor -- Signaling to drain...

But the PC continues to poll:

14:12:40.164 [pc-broker-poll] DEBUG i.c.p.internal.ConsumerManager -- Poll completed normally (after timeout of PT2S) and returned 10...

From what I understand at the moment, once the PC is set to DRAINING, the partitions assigned to the consumer are paused. My assumption currently is that the BrokerPollSystem resumes the partitions in managePauseOfSubscription() because the implementation takes into consideration only the throttling mechanism and doesn't look at the draining state:

14:12:40.164 [pc-broker-poll] DEBUG i.c.p.internal.ConsumerManager -- Poll completed normally (after timeout of PT2S) and returned 10...
14:12:40.164 [pc-broker-poll] DEBUG i.c.p.internal.BrokerPollSystem -- Poll completed
14:12:40.164 [pc-broker-poll] DEBUG i.c.p.internal.BrokerPollSystem -- Got 10 records in poll result
14:12:40.164 [pc-broker-poll] DEBUG i.c.p.internal.BrokerPollSystem -- Pausing subs
14:12:40.164 [pc-broker-poll] DEBUG i.c.p.internal.BrokerPollSystem -- Resuming consumer, waking up
14:12:40.164 [pc-broker-poll] DEBUG i.c.p.internal.BrokerPollSystem -- Subscriptions are paused: false
14:12:40.164 [pc-broker-poll] DEBUG i.c.p.internal.BrokerPollSystem -- Long polling broker with timeout PT2S, might appear to sleep here if subs are paused, or no data available on broker. Run state: draining
14:12:40.164 [pc-broker-poll] DEBUG i.c.p.internal.ConsumerManager -- Poll starting with timeout: PT2S
14:12:40.165 [pc-broker-poll] DEBUG i.c.p.internal.ConsumerManager -- Poll completed normally (after timeout of PT2S) and returned 10...
14:12:40.165 [pc-broker-poll] DEBUG i.c.p.internal.BrokerPollSystem -- Poll completed
14:12:40.165 [pc-broker-poll] DEBUG i.c.p.internal.BrokerPollSystem -- Got 10 records in poll result
14:12:40.165 [pc-broker-poll] DEBUG i.c.p.internal.BrokerPollSystem -- Pausing subs
14:12:40.165 [pc-broker-poll] DEBUG i.c.p.internal.BrokerPollSystem -- Resuming consumer, waking up
14:12:40.165 [pc-broker-poll] DEBUG i.c.p.internal.BrokerPollSystem -- Subscriptions are paused: false
14:12:40.165 [pc-broker-poll] DEBUG i.c.p.internal.BrokerPollSystem -- Long polling broker with timeout PT2S, might appear to sleep here if subs are paused, or no data available on broker. Run state: draining
14:12:40.165 [pc-broker-poll] DEBUG i.c.p.internal.ConsumerManager -- Poll starting with timeout: PT2S
14:12:40.171 [pc-broker-poll] DEBUG i.c.p.internal.ConsumerManager -- Poll completed normally (after timeout of PT2S) and returned 10...

What do you think, is there a bug or there's something wrong with my setup?

@dannpopescu
Copy link
Author

Here is my humble attempt to write a test for this issue and a possible fix: dannpopescu@4c17e89

Some mentions:

  • I've used some Thread.sleep in the test because I couldn't find a way to access and assert the needed state of the PC.
  • Some existing tests are failing on my laptop even on master branch so I'm not sure this fix doesn't break anything.

@antonmos
Copy link
Contributor

@dannpopescu is this still an issue? I am considering using PC and this could be a showstopper

@dannpopescu
Copy link
Author

@antonmos there's no fix for it yet.

In the meantime I've found another bug in this flow: if there are in-flight messages and the internal PC queue is empty, when you close the PC in DRAIN mode, it will interrupt the in-flight messages. AFAIU this is happening because the PC is transitioning from DRAIN to CLOSING prematurely (it doesn't check to see that there are no in-flight messages). So even though you tell the PC to close and drain, you end up with the don't drain behavior which also has issues IMO (see #559).

I've created a repo with some tests if anyone is interested: https://github.com/dannpopescu/parallel-consumer-shutdown-poc

@eddyv eddyv mentioned this issue Jun 27, 2023
2 tasks
@rkolesnev
Copy link
Contributor

@dannpopescu, @antonmos - We just released 0.5.2.6 build of PC - this and #559 are both fixed in it.

The issue with inflight messages getting killed by pool shutdown when closing in DRAIN mode is addressed by those fixes as well - as on transition to CLOSING - the inflight messages will be allowed to complete (up to shutdown timeout).
In addition Drain timeout and separate shutdown timeout are made configurable through parallel consumer options to allow to specify maximum allowed time separately for those phases.
Closing the issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants