Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

how to improve kafka producer throughput? #137

Closed
whitelilis opened this issue Mar 3, 2014 · 4 comments
Closed

how to improve kafka producer throughput? #137

whitelilis opened this issue Mar 3, 2014 · 4 comments

Comments

@whitelilis
Copy link

hi,all
When I use sync producer, the message is send slowly.
When I use async producer to deal with lots of messages, it work faster. I see python generate another process( to buffer and flush ?), and the memory it used is keep growing. At last, the system oom killer will kill some of it, then another.
I guess it's because my message generate rate is larger than kafka's throughput. Am I right ? If it is, how to imporve? Thanks.

@wizzat
Copy link
Collaborator

wizzat commented Mar 3, 2014

I would say that there are several solutions to this:

  • If throughput is more important than integrity, it may be worth turning off server acks (req_acks=False).
  • If integrity is more important than throughput, it may be worth leaving the producer in sync mode.
  • If the messages come in "batches" it may be worth setting a maxsize on the multiprocessing Queue. This would mean that the main app will eventually block under sustained high load, but at least the OOM killer won't murder your process.
  • A pass with a profiler is a good idea. I have an open pull request which should increase performance by ~5-10% (anecdotally).
  • It may be worth creating more than one process or scaling out to multiple producer machines.

@whitelilis
Copy link
Author

hi, wizzat,
Thanks very much for your answer. I have already used req_acks=False, and used multiprocesing. What do you mean by 'setting a maxsize on multiprocessing Queue' ? How I can do that ?
Some of my code looks like:
import multiprocessing
pool = []
for p in range(int(options.proc_num)): pool.append(multiprocessing.Process(target=expand_common.mq_worker, args=(doing_queue_name, callback)))

for p in pool:
p.start()
for p in pool:
p.join()

@wizzat
Copy link
Collaborator

wizzat commented Mar 4, 2014

Setting the maxsize on the multiprocessing Queue would involve a code change to Kafka-Python to accept an async queue size and pass it into the Queue constructor (producer:122).

@whitelilis
Copy link
Author

Thank you.
At last, I use batch_send mode and one client per thread solve my problem.

wbarnha added a commit to bradenneal1/kafka-python that referenced this issue Mar 20, 2024
* KIP-345 Add static consumer membership support

* KIP-345 Add examples to docs

* KIP-345 Add leave_group_on_close flag

https://issues.apache.org/jira/browse/KAFKA-6995

* KIP-345 Add tests for static membership

* KIP-345 Update docs for leave_group_on_close option

* Update changelog.rst

* remove six from base.py

* Update base.py

* Update base.py

* Update base.py

* Update changelog.rst

* Update README.rst

---------

Co-authored-by: Denis Kazakov <d.kazakov@mcplat.ru>
Co-authored-by: Denis Kazakov <denis@kazakov.ru.net>
bradenneal1 pushed a commit to bradenneal1/kafka-python that referenced this issue May 16, 2024
* KIP-345 Add static consumer membership support

* KIP-345 Add examples to docs

* KIP-345 Add leave_group_on_close flag

https://issues.apache.org/jira/browse/KAFKA-6995

* KIP-345 Add tests for static membership

* KIP-345 Update docs for leave_group_on_close option

* Update changelog.rst

* remove six from base.py

* Update base.py

* Update base.py

* Update base.py

* Update changelog.rst

* Update README.rst

---------

Co-authored-by: Denis Kazakov <d.kazakov@mcplat.ru>
Co-authored-by: Denis Kazakov <denis@kazakov.ru.net>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants