Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bottleneck in publishing on a tight while loop #30

Closed
cmccarthy1 opened this issue Jun 5, 2020 · 9 comments
Closed

Bottleneck in publishing on a tight while loop #30

cmccarthy1 opened this issue Jun 5, 2020 · 9 comments
Assignees

Comments

@cmccarthy1
Copy link
Contributor

cmccarthy1 commented Jun 5, 2020

Internally raised issue

Describe the bug

  • Publishing data on a tight while loop without polling for delivery report functions as expected (hitting queue full at the 100k messages sent)
  • Polling after every publish however blocks on C function call (rd_kafka_poll) within 3k messages. Increasing system buffer size does not appear to change how quickly this behaviour arises.

To Reproduce

consumer script

\l kfk.q

kfk_cfg:(!) . flip(
  (`metadata.broker.list;`localhost:9011);
  (`group.id;`0);
  (`queue.buffering.max.ms;`2);
  (`enable.partition.eof;`0)
  );
client:.kfk.Consumer[kfk_cfg];

data:();
kfk.consumecb:{[msg]
  msg[`data]:"c"$msg[`data];
  msg[`rcvtime]:.z.p;
  data,::enlist msg;}

.kfk.Sub[client;`random;enlist .kfk.PARTITION_UA];

producer script

\l kfk.q
kfk_cfg:(!) . flip(
  (`metadata.broker.list; `localhost:9011);
  (`queue.buffering.max.ms;`10)
  );
producer:.kfk.Producer[kfk_cfg];

random:.kfk.Topic[producer;`random;()!()];

n:0;
run:{
  .kfk.Pub[random;.kfk.PARTITION_UA; raze string -8!n+:1;""];
  //.kfk.Poll[producer; 1; 100];
  show .kfk.OutQLen producer;
  };

show "Publishing...";
while[1b; run[]];

Expected behavior
Producer should not block in this scenario.

Screenshots
No applicable screenshots to explain this scenario further

Desktop (please complete the following information):
Behaviour has been seen in a variety of Linux environments and on MacOS so should be reproducible across multiple environments

Additional context
No applicable additional context

@cmccarthy1 cmccarthy1 changed the title Bottleneck in consumption on a tight while loop Bottleneck in publishing on a tight while loop Jun 5, 2020
@sshanks-kx
Copy link
Contributor

Re-created. Remember to change scripts broker connection details on both scripts.
Uncommenting the poll line shows issue.

@sshanks-kx
Copy link
Contributor

sshanks-kx commented Jun 10, 2020

Thread with issue

#0  0x00007f8c9d42154d in __lll_lock_wait () from /lib64/libpthread.so.0
#1  0x00007f8c9d423d3c in _L_cond_lock_847 () from /lib64/libpthread.so.0
#2  0x00007f8c9d423bd1 in __pthread_mutex_cond_lock () from /lib64/libpthread.so.0
#3  0x00007f8c9d41eeb4 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#4  0x00007f8c96bc1dd5 in cnd_timedwait_ms () from /lib64/librdkafka.so.1
#5  0x00007f8c96b8cbf2 in rd_kafka_q_serve () from /lib64/librdkafka.so.1
#6  0x00007f8c96e2052e in pollClient (rk=0x17ee7a0, timeout=1, maxcnt=<optimized out>) at kfk.c:531

All threads

(gdb) thread apply all bt

Thread 4 (Thread 0x7f8c95b11700 (LWP 82)):
#0  0x00007f8c9d41ede2 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f8c96bc1dd5 in cnd_timedwait_ms () from /lib64/librdkafka.so.1
#2  0x00007f8c96b8cbf2 in rd_kafka_q_serve () from /lib64/librdkafka.so.1
#3  0x00007f8c96b5d424 in rd_kafka_thread_main () from /lib64/librdkafka.so.1
#4  0x00007f8c96bc1b47 in _thrd_wrapper_function () from /lib64/librdkafka.so.1
#5  0x00007f8c9d41aea5 in start_thread () from /lib64/libpthread.so.0
#6  0x00007f8c9d1438dd in clone () from /lib64/libc.so.6

Thread 3 (Thread 0x7f8c95310700 (LWP 83)):
#0  0x00007f8c9d41ede2 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f8c96bc1dd5 in cnd_timedwait_ms () from /lib64/librdkafka.so.1
#2  0x00007f8c96b8c9d0 in rd_kafka_q_pop_serve () from /lib64/librdkafka.so.1
#3  0x00007f8c96b7475f in rd_kafka_broker_ops_serve () from /lib64/librdkafka.so.1
#4  0x00007f8c96b74821 in rd_kafka_broker_serve () from /lib64/librdkafka.so.1
#5  0x00007f8c96b74cdf in rd_kafka_broker_ua_idle () from /lib64/librdkafka.so.1
#6  0x00007f8c96b7637b in rd_kafka_broker_thread_main () from /lib64/librdkafka.so.1
#7  0x00007f8c96bc1b47 in _thrd_wrapper_function () from /lib64/librdkafka.so.1
#8  0x00007f8c9d41aea5 in start_thread () from /lib64/libpthread.so.0
#9  0x00007f8c9d1438dd in clone () from /lib64/libc.so.6

Thread 2 (Thread 0x7f8c94b0f700 (LWP 84)):
#0  0x00007f8c9d4216fd in write () from /lib64/libpthread.so.0
#1  0x00007f8c96b6644b in rd_kafka_q_io_event () from /lib64/librdkafka.so.1
#2  0x00007f8c96b6f8c9 in rd_kafka_dr_msgq () from /lib64/librdkafka.so.1
#3  0x00007f8c96b959a4 in rd_kafka_handle_Produce () from /lib64/librdkafka.so.1
#4  0x00007f8c96b88af6 in rd_kafka_buf_callback () from /lib64/librdkafka.so.1
#5  0x00007f8c96b6ca0d in rd_kafka_recv () from /lib64/librdkafka.so.1
#6  0x00007f8c96b85f28 in rd_kafka_transport_io_event () from /lib64/librdkafka.so.1
#7  0x00007f8c96b748b8 in rd_kafka_broker_serve () from /lib64/librdkafka.so.1
#8  0x00007f8c96b761f1 in rd_kafka_broker_thread_main () from /lib64/librdkafka.so.1
#9  0x00007f8c96bc1b47 in _thrd_wrapper_function () from /lib64/librdkafka.so.1
#10 0x00007f8c9d41aea5 in start_thread () from /lib64/libpthread.so.0
#11 0x00007f8c9d1438dd in clone () from /lib64/libc.so.6

Thread 1 (Thread 0x7f8c9df56000 (LWP 81)):
#0  0x00007f8c9d42154d in __lll_lock_wait () from /lib64/libpthread.so.0
#1  0x00007f8c9d423d3c in _L_cond_lock_847 () from /lib64/libpthread.so.0
#2  0x00007f8c9d423bd1 in __pthread_mutex_cond_lock () from /lib64/libpthread.so.0
#3  0x00007f8c9d41eeb4 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#4  0x00007f8c96bc1dd5 in cnd_timedwait_ms () from /lib64/librdkafka.so.1
#5  0x00007f8c96b8cbf2 in rd_kafka_q_serve () from /lib64/librdkafka.so.1
#6  0x00007f8c96e2052e in pollClient (rk=0x17ee7a0, timeout=1, maxcnt=<optimized out>) at kfk.c:531
#7  0x00007f8c96e20656 in kfkPoll (x=0x7f8c99002a30, y=0x7f8c99000bf0, z=0x7f8c99001500) at kfk.c:564

@sshanks-kx
Copy link
Contributor

Retrying with latest librdkafka...

@sshanks-kx
Copy link
Contributor

Tried with latest Kafka release - no difference

Ref: current redhat 7 version
yum install librdkafka-devel
.kfk.Version
722175i
.kfk.VersionSym[]
`0.11.4

Newest release
cd /source/kafka
wget https://github.com/edenhill/librdkafka/archive/v1.4.2.tar.gz
tar xvf v1.4.2.tar.gz --strip-components=1
./configure
make
make install

.kfk.Version
17040127i
.kfk.VersionSym[]
`1.4.2

@sshanks-kx
Copy link
Contributor

Ref: https://github.com/edenhill/librdkafka/blob/master/examples/producer.c
Installed as part of a build. Change code to pub on infinite loop - Runs ok. Run example
./producer broker:29092 random

@sshanks-kx sshanks-kx self-assigned this Jun 10, 2020
@sshanks-kx
Copy link
Contributor

Temp removing
rd_kafka_queue_io_event_enable(rd_kafka_queue_get_main(rk),spair[1],"X",1);
stop the hang

@sshanks-kx
Copy link
Contributor

Think I can see the potential root of the problem now.
Its like the opposite of #37.

In this occasion, thread is consuming all of the thread operations (main kdb thread). The poll is letting Kafka events happen. When Kafka events happen, they try to inform kdb to do stuff via a file descriptor (this happens every time the queue gets emptied). Queue is not able to be consumed by KDB for housekeeping & react to data to be read cos its spending 100% of its time publishing.

I expect if the above example was changed to do a certain amount on a timer/etc each time it'd cause it not to hang (though this isn't the desired action the user wants to do).

Will work on a change.

@sshanks-kx
Copy link
Contributor

To check whether return code error from pub is appropriate. Code can present queue full/etc were user could take action.
ref:
https://librdkafka.dpldocs.info/deimos.rdkafka.rd_kafka_produce.html
https://github.com/edenhill/librdkafka/blob/master/examples/producer.c

@sshanks-kx
Copy link
Contributor

linked to confluentinc/librdkafka#2932

sshanks-kx added a commit to sshanks-kx/kafka that referenced this issue Jun 15, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants