Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka consumer gets stuck after exceeding max.poll.interval.ms #344

Open
3 of 7 tasks
im-abeer opened this issue Jun 24, 2019 · 38 comments
Open
3 of 7 tasks

Kafka consumer gets stuck after exceeding max.poll.interval.ms #344

im-abeer opened this issue Jun 24, 2019 · 38 comments

Comments

@im-abeer
Copy link

Description

When the consumer does not receives a message for 5 mins (default value of max.poll.interval.ms 300000ms) the consumer comes to a halt without exiting the program. The consumer process hangs and does not consume any more messages.

The following error message gets logged

MAXPOLL|rdkafka#consumer-1| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 255ms (adjust max.poll.interval.ms for long-running message processing): leaving group

I see that ErrMaxPollExceeded is defined here but unable to find where it is getting raised.

If any such error is raised, why does the program not exit ?

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version(master) and confluent_kafka.libversion(1.0.0)):
  • Apache Kafka broker version: v1.1.0
  • Client configuration: { "bootstrap.servers": "my.kafka.host", "group.id": "my.group.id", "auto.offset.reset": "earliest", "enable.auto.commit": false }
  • Operating system:
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

Unless you are using the channel consumer (which you shouldn't use), you need to call Poll() or ReadMessage() at least every max.poll.interval.ms-1.

@edenhill
Copy link
Contributor

See the "max.poll.interval.ms is enforced" chapter here: https://github.com/edenhill/librdkafka/releases/v1.0.0

@keyan
Copy link

keyan commented Aug 9, 2019

Hello @edenhill, I’m running into a similar issue as the original poster, I’m using a -1 timeout but calling in an infinite loop, e.g.:

for {
        msg, err := consumer.ReadMessage(-1)
        if err != nil {
                <do some logging>
        }
}

My producer stopped writing messages for a few minutes and I logged this:

Application maximum poll interval (300000ms) exceeded by 164ms

Subsequently my producer was back up but the consumer seemed to be hanging on ReadMessage(-1) indefinitely.

According to the doc you linked to:

requires the application to call rd_kafka_consumer_poll()/rd_kafka_poll() at least every max.poll.interval.ms. Failure to do so will make the consumer automatically leave the group […] and not rejoin the group until the application has called ..poll() again

I’d expect that my consumer did indeed leave the group, but the subsequent call to ReadMessage() should have made the consumer rejoin the group and continue to see new messages.


Is this a configuration issue? Would using a shorter timeout for ReadMessage() resolve this?

Or is this a manifestation of confluentinc/librdkafka#2266?

librdkafka version: 1.0.0
confluent-kafka-go version: v1.0.0
Client configuration: ["auto.offset.reset": "earliest", "enable.auto.commit": false]

@edenhill
Copy link
Contributor

This looks like confluentinc/librdkafka@80e9b1e which is fixed in librdkafka 1.1.0.

@csrgxtu
Copy link

csrgxtu commented Jun 13, 2020

i am having this issue too, how to fix this anyway?

@szalapski
Copy link

I am having this issue with librdkafka 1.5.0, exactly as keyan said. Can anyone help?

@rogaha
Copy link

rogaha commented Oct 31, 2020

having this this issue as well with v1.4.2. Any hints?

@rogaha
Copy link

rogaha commented Oct 31, 2020

my code:

for {
		select {
		case <-ctx.Done():
			err = kc.Close()
			if err != nil {
				r.Logger.Error(err)
				panic(err)
			}
			r.Logger.Info("Done")
			return nil
		default:
			msg, err := kc.ReadMessage(100 * time.Millisecond)
			if err == nil {
				if err := p.Perform(ctx, msg.Value); err != nil {
					r.Logger.Error(err)
				}
			}
		}
	}

@keyan
Copy link

keyan commented Oct 31, 2020

To clarify, are you all seeing that your consumer won't rejoin on the subsequent ReadMessage() call? Just seeing the poll interval exceeded message is not abnormal or unexpected.

FWIW, after upgrading to the v1.1.0 client and also changing from a -1 to a sane large timeout, I stopped after rejoining issues. But leaving the consumer group still happens as is expected.

@jgao54
Copy link

jgao54 commented Dec 15, 2020

Using v1.5.2. Also calling ReadMessage(-1) in an infinite loop, and not seeing rejoining after consumer leaving group, worked around it by setting timeout to be less than max.poll.interval.ms instead of -1, but wondering why it's not rejoining as expected.

@rogaha
Copy link

rogaha commented Feb 24, 2021

I'm using v1.5.2 and the issue persists. The worker is "dead" after receiving the "leave group" error:

%4|1614171022.957|MAXPOLL|rdkafka#consumer-10| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 123ms (adjust max.poll.interval.ms for long-running message processing): leaving group
%4|1614171022.960|MAXPOLL|rdkafka#consumer-8| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 481ms (adjust max.poll.interval.ms for long-running message processing): leaving group
%4|1614171022.980|MAXPOLL|rdkafka#consumer-2| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 423ms (adjust max.poll.interval.ms for long-running message processing): leaving group
%4|1614171023.957|MAXPOLL|rdkafka#consumer-3| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 500ms (adjust max.poll.interval.ms for long-running message processing): leaving group
%4|1614171024.447|MAXPOLL|rdkafka#consumer-7| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 84ms (adjust max.poll.interval.ms for long-running message processing): leaving group
%4|1614171025.955|MAXPOLL|rdkafka#consumer-5| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 195ms (adjust max.poll.interval.ms for long-running message processing): leaving group
%4|1614171025.961|MAXPOLL|rdkafka#consumer-1| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 55ms (adjust max.poll.interval.ms for long-running message processing): leaving group

Restarting the worker fixes the problem. It seems to be a queue management issue.

@edenhill
Copy link
Contributor

What means of consuming messages are you using? Channel consumer? ReadMessage? Poll?

@rogaha
Copy link

rogaha commented Feb 24, 2021

ReadMessage

@edenhill
Copy link
Contributor

And you are calling ReadMessage() more often than the max.poll.interval.ms (30s) ?

@rogaha
Copy link

rogaha commented Feb 24, 2021

yes:

for {
		select {
		case <-ctx.Done():
			err = kc.Close()
			if err != nil {
				r.Logger.Error(err)
				panic(err)
			}
			r.Logger.Info("Done")
			return nil
		default:
			msg, err := kc.ReadMessage(100 * time.Millisecond)

@gabrielhartmann
Copy link

gabrielhartmann commented Jun 3, 2021

@edenhill Why is this issue closed? I'm seeing the same thing as everyone else, also calling ReadMessage quickly in an infinite loop.

I personally see this issue associated with a broker rebalance. On another thread prior to the stuck consumer symptom everyone else reports I see this:

msg="failed to commit message with error: Broker: Group rebalance in progress" consumer=default

@AdbReverse
Copy link

I am having the same issue as well

@AdbReverse
Copy link

There is something weird in the code.
Is it possible that we are constantly getting ev=nil return value from Poll()?
Then, the function would stuck on an infinite loop

@anatolebeuzon
Copy link

anatolebeuzon commented Jul 13, 2021

Same issue here. If for some reason Poll() has not been called recently, the app receives ErrMaxPollExceeded, as expected. However, once the app resumes calling Poll(), no event is ever returned. The app needs to call consumer.Assign() again to resume processing events.

@edenhill this seems like a bug? It's contrary to what the "max.poll.interval.ms is enforced" chapter here suggests: https://github.com/edenhill/librdkafka/releases/v1.0.0

Failure to do so will make the consumer automatically leave the group, causing a group rebalance,
and not rejoin the group until the application has called ..poll() again, triggering yet another group rebalance.

@GOODLIFEZW
Copy link

i am having this issue too, how to fix this anyway? "%4|1627976105.775|MAXPOLL|rdkafka#consumer-1| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 29ms (adjust max.poll.interval.ms for long-running message processing): leaving group"

@askie
Copy link

askie commented Oct 20, 2021

+1

1 similar comment
@guotie
Copy link

guotie commented Dec 1, 2021

+1

@guotie
Copy link

guotie commented Dec 1, 2021

confluent-kafka-go: 1.5.2

@bry00
Copy link

bry00 commented Dec 15, 2021

+1

@edenhill
Copy link
Contributor

Please try to reproduce this on the latest release (1.8.2) with debug=cgrp enabled.
Thanks

@edenhill edenhill reopened this Dec 15, 2021
@kashifgrocerkey
Copy link

kashifgrocerkey commented Dec 15, 2021

I got this same issue on 1.8.2. As this issue came on Production so there I don't have debug=cgrp enabled
This is what I just got last night with 1.8.2

%4|1639508773.124|MAXPOLL|rdkafka#consumer-4| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 196ms (adjust max.poll.interval.ms for long-running message processing): leaving group

@SoniCoder
Copy link

Encountering this too ... randomly appears even after poll of 0.1 ms:

2021-12-27 04:53:05,259 - some error in message: <cimpl.Message object at 0x000001B7B16D9140> code: -147 errorstr: Application maximum poll interval (300000ms) exceeded by 307ms

@namdiag
Copy link

namdiag commented Jan 23, 2022

I encounter this error often also, librdkafka 1.7.0. I just hope i can detect this error and restart K8s pod though

@afreeland
Copy link

afreeland commented Mar 21, 2022

Have also found this issue happening in production as well on 1.8.2. Usually identified when lag starts randomly spiking and pods need restarted. What is strange is that in dev/qa environments that see less traffic and definitely have potential for longer times between messages...I never see this particular error. Only on high throughput environments...

Also, when it does get stuck, it seems like there are no active members...so no clients assigned to partitions. Curious if this is the same behavior others are seeing?

@edenhill
Copy link
Contributor

It would be great with a reproduce with debug=cgrp enabled so we can figure out what is going on.
Thanks

@vasilistefanenko
Copy link

It would be great with a reproduce with debug=cgrp enabled so we can figure out what is going on.
Thanks

%7|1653565854.500|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic topic-name [0]: stored offset 807046, committed offset 807014: setting stored offset 807046 for commit
%7|1653565854.500|COMMIT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Committing offsets for 1 partition(s) with generation-id 31 in join-state steady: manual
%7|1653565854.548|COMMIT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: OffsetCommit for 1 partition(s) in join-state steady: manual: returned: Success
%7|1653565854.548|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=1, wait_stop_cnt=0)
%7|1653565854.548|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 1 partition(s):
%7|1653565854.548|DUMP_ALL|rdkafka#consumer-1| [thrd:main]:  topic-name [0] offset STORED
%7|1653565854.548|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1653565854.548|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1653565854.548|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1653565854.548|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "group-name": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1653565855.417|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565858.427|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565861.433|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565864.439|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565867.451|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565870.460|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565873.467|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565876.479|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565879.489|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565882.497|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565885.504|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565888.513|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565891.518|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565894.521|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565897.532|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565900.534|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565903.545|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565906.551|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565909.554|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565912.558|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565915.564|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565918.565|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565921.571|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565924.574|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565927.581|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565930.592|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565933.596|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565936.600|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565939.603|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565942.605|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565945.608|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565948.613|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565951.615|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565954.617|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565957.620|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565960.622|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565963.626|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565966.631|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565969.636|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565972.640|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565975.646|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565978.648|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565981.654|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565984.664|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565987.671|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565990.680|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565993.688|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565996.690|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653565999.697|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566002.705|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566005.711|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566008.716|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566011.722|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566014.732|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566017.742|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566020.751|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566023.757|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566026.759|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566029.764|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566032.772|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566035.778|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566038.784|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566041.789|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566044.794|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566047.805|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566050.812|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566053.815|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566056.822|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566059.824|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566062.832|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566065.839|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566068.842|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566071.845|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566074.851|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566077.856|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566080.863|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566083.867|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566086.873|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566089.882|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566092.887|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566095.890|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566098.896|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566101.904|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566104.916|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566107.923|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566110.924|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566113.929|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566116.935|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566119.936|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566122.940|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566125.950|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566128.958|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566131.962|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566134.967|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566137.968|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566140.975|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566143.977|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566146.984|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566149.988|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%7|1653566152.996|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1025: Heartbeat for group "group-name" generation id 31
%4|1653566154.719|MAXPOLL|rdkafka#consumer-1| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 171ms (adjust max.poll.interval.ms for long-running message processing): leaving group
%7|1653566154.719|MEMBERID|rdkafka#consumer-1| [thrd:main]: Group "group-name": updating member id "rdkafka-bb3f2850-cb06-4479-81d3-31070f201620" -> ""
%7|1653566154.719|LEAVE|rdkafka#consumer-1| [thrd:main]: Group "group-name": leave (in state up)
%7|1653566154.719|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "group-name" initiating rebalance (EAGER) in state up (join-state steady) with 1 assigned partition(s) (lost): max.poll.interval.ms exceeded
%7|1653566154.719|LOST|rdkafka#consumer-1| [thrd:main]: Group "group-name": current assignment of 1 partition(s) lost: max.poll.interval.ms exceeded: revoking assignment and rejoining
%7|1653566154.719|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "group-name" changed join state steady -> wait-unassign-call (state up)
%7|1653566154.719|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "group-name": delegating revoke of 1 partition(s) to application on queue rd_kafka_cgrp_new: max.poll.interval.ms exceeded
%7|1653566154.719|PAUSE|rdkafka#consumer-1| [thrd:main]: Pausing fetchers for 1 assigned partition(s): rebalance
%7|1653566154.719|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Group "group-name": clearing group assignment
%7|1653566154.757|LEAVEGROUP|rdkafka#consumer-1| [thrd:main]: LeaveGroup response received in state up

@ideasculptor
Copy link
Contributor

Add us to the set of people who are definitely seeing this problem despite calling Poll far more frequently than max.poll.interval.ms. In our case, we implemented a heartbeat that gets emitted from the loop that contains Poll() and have a separate thread that will alert if the heartbeat stops. We are seeing max poll interval exceeded and getting kicked out of the consumer group even though the heartbeat is continuous. Additionally, we are also checking for kafka errors in the poll results, specifically looking for the kafka.ErrMaxPollExceeded error code. So we are definitely calling Poll every 100ms and emitting a heartbeat from that loop. We are definitely NOT receiving MaxPollExceeded error in the poll results even when we are kicked out of the consumer group for apparently exceeding the max poll interval. The implication is that there is a failure between calling consumer.Poll() in the go package and the go package actually calling poll() in librdkafka. Not only that, but we are using a logger which emits json and the only evidence we have of the error occurring is the log message emitted to stdout via the client, which is NOT wrapped in json. So we are calling Poll() but we are neveer receiving the error that can only be received via poll(), and that error is that we are not calling poll() even though we know we are. There is clearly a bug inside the go consumer.Poll() before the actual call to librdkafka's poll() function which is not generating any useful output to the caller.

One hypothesis we are about to test is that this is caused by linking dynamically to librdkafka when doing a musl build when using an alpine container, which might explain why no one at confluent seems able to reproduce this behaviour when so many of us are seeing it.

@bothra90
Copy link

bothra90 commented Mar 24, 2023

We've run into this issue as well - the consumer gets hung when it's working with a topic/partition with a huge backlog. We could work around this by handling RevokedPartitions event in the rebalance callback like so:

var rebalanceCb func(c *kafka.Consumer, e kafka.Event) error
rebalanceCb = func(c *kafka.Consumer, e kafka.Event) error {
	zap.L().Info("Got kafka partition rebalance event: ", zap.String("topic", topic), zap.String("consumer", c.String()), zap.String("event", e.String()))
	switch e.(type) {
	case kafka.RevokedPartitions:
		// Resubscribe to the topic to get new partitions assigned.
		err := c.SubscribeTopics([]string{topic}, rebalanceCb)
		if err != nil {
			return err
		}
	}
	return nil
}

And FWIW, we also run the consumer on arm64 machines with dynamically linked librdkafka.

@chenrulongmaster
Copy link

We also facing same issue, by using @bothra90 's solution, can make application work properly..

@supernomad
Copy link

Alright we are seeing this issue as well, and have tried all of the available solutions in this issue. We consistently get this error even if we don't do anything at all with the message after polling and have a tight loop calling many hundreds of times a second.... Clearly something wrong in the underlying library interaction here between go <> librdkafka 🤔

fwiw we are running on x86_64 machines inside of debian based containers. We have metrics emitted for every time we call Poll so we know exactly how often we are calling the method, and also have a histogram for tracking how long we wait in the poll. As mentioned above we are calling 100's of times a second and are seeing single digit ms latency for the call at the p99 quantile. We also have the max.poll.interval.ms value set to 20minutes just to confirm without a shadow of a doubt we are calling multiple times in the interval.

The rebalance fix above "works" in that we re-subscribe and start collecting messages again, however this causes a ton of thrashing in our cluster, so its not ideal.

@kovetskiy
Copy link

Alright we are seeing this issue as well, and have tried all of the available solutions in this issue. We consistently get this error even if we don't do anything at all with the message after polling and have a tight loop calling many hundreds of times a second.... Clearly something wrong in the underlying library interaction here between go <> librdkafka 🤔

fwiw we are running on x86_64 machines inside of debian based containers. We have metrics emitted for every time we call Poll so we know exactly how often we are calling the method, and also have a histogram for tracking how long we wait in the poll. As mentioned above we are calling 100's of times a second and are seeing single digit ms latency for the call at the p99 quantile. We also have the max.poll.interval.ms value set to 20minutes just to confirm without a shadow of a doubt we are calling multiple times in the interval.

The rebalance fix above "works" in that we re-subscribe and start collecting messages again, however this causes a ton of thrashing in our cluster, so its not ideal.

Do you happen to adjust amount of bytes you fetch in your consumers? There is this idea that it could be related, see comment: zendesk/racecar#288 (comment)

@bandiawy
Copy link

bandiawy commented Jun 3, 2023

Add us to the set of people who are definitely seeing this problem despite calling Poll far more frequently than max.poll.interval.ms. In our case, we implemented a heartbeat that gets emitted from the loop that contains Poll() and have a separate thread that will alert if the heartbeat stops. We are seeing max poll interval exceeded and getting kicked out of the consumer group even though the heartbeat is continuous. Additionally, we are also checking for kafka errors in the poll results, specifically looking for the kafka.ErrMaxPollExceeded error code. So we are definitely calling Poll every 100ms and emitting a heartbeat from that loop. We are definitely NOT receiving MaxPollExceeded error in the poll results even when we are kicked out of the consumer group for apparently exceeding the max poll interval. The implication is that there is a failure between calling consumer.Poll() in the go package and the go package actually calling poll() in librdkafka. Not only that, but we are using a logger which emits json and the only evidence we have of the error occurring is the log message emitted to stdout via the client, which is NOT wrapped in json. So we are calling Poll() but we are neveer receiving the error that can only be received via poll(), and that error is that we are not calling poll() even though we know we are. There is clearly a bug inside the go consumer.Poll() before the actual call to librdkafka's poll() function which is not generating any useful output to the caller.

One hypothesis we are about to test is that this is caused by linking dynamically to librdkafka when doing a musl build when using an alpine container, which might explain why no one at confluent seems able to reproduce this behaviour when so many of us are seeing it.

I found upgrade to v.2.1.1 can solve this question which fixed #980

@akshatraika-moment
Copy link

We are seeing this same issue in 2.4.0 @edenhill

We poll from kafka in a busy loop, so we should see the error in the Poll return, right? We don't get any rebalance revoke event on this either.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests