-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka client not able to connect to broker, after broker is abruptly shut down. #1511
Comments
Logs: hese are the message i see after that change |
Are you getting any more logs after this last one? Is this somewhat easy for you to reproduce? If so, can you try latest librdkafka master? |
rdkafka_debug.txt
Description
The broker is running on a kubernetes cluster. The node has bunch of different docker pods. One of them is running the kafka broker. On deleting the pod which is running the broker, we see that the client shows the producer thread in the CONNECT state, even after the pod is bring up after sometime, the producer is not able to move to UP state, and keep trying to connect.
NAME READY STATUS RESTARTS AGE IP NODE
kafka-broker 1/1 Running 0 4h 10.32.0.12 localhost.localdomain
...
..
How to reproduce
Summary of the rdkafka_dump_stats
rd_kafka_t 0x436a5270: rdkafka#producer-1
producer.msg_cnt 0 (0 bytes)
rk_rep reply queue: 0 ops
brokers:
rd_kafka_broker_t 0x40af6b20: :0/internal NodeId -1 in state UP (for 8121.787s)
refcnt 2
outbuf_cnt: 0 waitresp_cnt: 0
0 messages sent, 0 bytes, 0 errors, 0 timeouts
0 messages received, 0 bytes, 0 errors
0 messageset transmissions were retried
0 toppars:
rd_kafka_broker_t 0x44a044f0: ssl://10.6.100.23:9093/1001 NodeId 1001 in state CONNECT (for 7903.902s)
refcnt 194
outbuf_cnt: 0 waitresp_cnt: 0
590 messages sent, 252442 bytes, 0 errors, 0 timeouts
590 messages received, 50294 bytes, 1 errors
0 messageset transmissions were retried
192 toppars:
THUNDER_SLB_L4_STATS [8] leader ssl://10.6.100.23:9093/1001
Summary of kafka threads bt.
[Switching to thread 2 (Thread 0x7f0fd2060700 (LWP 19634))]#0 0x00007f13124a1383 in poll () from /lib64/libc.so.6
(gdb) bt
#0 0x00007f13124a1383 in poll () from /lib64/libc.so.6
#1 0x00007f1323964a58 in rd_kafka_transport_poll (rktrans=0x7f0fc8014540, tmout=999) at rdkafka_transport.c:1457
#2 0x00007f1323964322 in rd_kafka_transport_io_serve (rktrans=0x7f0fc8014540, timeout_ms=999) at rdkafka_transport.c:1316
#3 0x00007f132394923c in rd_kafka_broker_serve (rkb=0x44a044f0, abs_timeout=532931874971) at rdkafka_broker.c:2263
#4 0x00007f132394937d in rd_kafka_broker_ua_idle (rkb=0x44a044f0, timeout_ms=1000) at rdkafka_broker.c:2325
#5 0x00007f132394f52c in rd_kafka_broker_thread_main (arg=0x44a044f0) at rdkafka_broker.c:3176
#6 0x00007f132399e24f in _thrd_wrapper_function (aArg=) at tinycthread.c:624
#7 0x00007f132adc9aa1 in start_thread () from /lib64/libpthread.so.0
#8 0x00007f13124aabcd in clone () from /lib64/libc.so.6
(gdb) thread 3
[Switching to thread 3 (Thread 0x7f0fd2861700 (LWP 19633))]#0 0x00007f132adcda5e in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
(gdb) bt
#0 0x00007f132adcda5e in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1 0x00007f132399e33c in cnd_timedwait_ms (cnd=0x436ab2c8, mtx=0x436ab2a0, timeout_ms=) at tinycthread.c:501
#2 0x00007f132396954c in rd_kafka_q_pop_serve (rkq=0x436ab2a0, timeout_ms=999, version=0, cb_type=RD_KAFKA_Q_CB_RETURN, callback=0, opaque=0x0) at rdkafka_queue.c:364
#3 0x00007f1323969640 in rd_kafka_q_pop (rkq=0x436ab2a0, timeout_ms=999, version=0) at rdkafka_queue.c:395
#4 0x00007f13239491a6 in rd_kafka_broker_serve (rkb=0x40af6b20, abs_timeout=532931699192) at rdkafka_broker.c:2239
#5 0x00007f132394937d in rd_kafka_broker_ua_idle (rkb=0x40af6b20, timeout_ms=1000) at rdkafka_broker.c:2325
#6 0x00007f132394f5ad in rd_kafka_broker_thread_main (arg=0x40af6b20) at rdkafka_broker.c:3195
#7 0x00007f132399e24f in _thrd_wrapper_function (aArg=) at tinycthread.c:624
#8 0x00007f132adc9aa1 in start_thread () from /lib64/libpthread.so.0
#9 0x00007f13124aabcd in clone () from /lib64/libc.so.6
(gdb) thread 4
[Switching to thread 4 (Thread 0x7f0fd3062700 (LWP 19632))]#0 0x00007f132adcda5e in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
(gdb) bt
#0 0x00007f132adcda5e in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1 0x00007f132399e33c in cnd_timedwait_ms (cnd=0x436a5de8, mtx=0x436a5dc0, timeout_ms=) at tinycthread.c:501
#2 0x00007f1323969753 in rd_kafka_q_serve (rkq=0x436a5dc0, timeout_ms=1000, max_cnt=0, cb_type=RD_KAFKA_Q_CB_CALLBACK, callback=0, opaque=0x0) at rdkafka_queue.c:440
#3 0x00007f13239360e6 in rd_kafka_thread_main (arg=0x436a5270) at rdkafka.c:1229
#4 0x00007f132399e24f in _thrd_wrapper_function (aArg=) at tinycthread.c:624
#5 0x00007f132adc9aa1 in start_thread () from /lib64/libpthread.so.0
#6 0x00007f13124aabcd in clone () from /lib64/libc.so.6
(gdb)
Checklist
Please provide the following information:
https://github.com/edenhill/librdkafka/releases/tag/v0.11.1
kafka-0.10.2.1
rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
rd_kafka_conf_set(conf, "api.version.request", "1", NULL, 0); //to support kafka streams we need to set this to 1.
rd_kafka_conf_set(conf, "queue.buffering.max.messages", "500000", NULL, 0);
rd_kafka_conf_set(conf, "message.send.max.retries", "3", NULL, 0);
rd_kafka_conf_set(conf, "retry.backoff.ms", "500", NULL, 0);
rd_kafka_conf_set(conf, "batch.num.messages", "100000", NULL, 0); //batch the max number of messages in a sec
rd_kafka_conf_set(conf, "message.max.bytes", "100000000", NULL, 0); //batch the max bytes in the messageSet
Linux alaksh 3.19.0 Bugfix in example code #1 SMP Thu Nov 2 02:27:37 GMT 2017 x86_64 x86_64 x86_64 GNU/Linux
debug=..
as necessary) from librdkafkait is critical issue, as kafka producer cannot recover.
The text was updated successfully, but these errors were encountered: