Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gevent and KafkaClient.copy patch? #271

Closed
jshaw86 opened this issue Dec 4, 2014 · 5 comments
Closed

Gevent and KafkaClient.copy patch? #271

jshaw86 opened this issue Dec 4, 2014 · 5 comments

Comments

@jshaw86
Copy link
Contributor

jshaw86 commented Dec 4, 2014

This is referencing #145 I'm seeing copy failing still when monkey patching and then constructing a multiprocess. If this was punted on that's cool I can look at adding my own multiprocessing or threading pool to scale my upstream socket reads but gevent is nicer for this pattern.

versions:
gevent==1.0.1
kafka-python==0.9.2

Works:

    >>> from kafka.client import KafkaClient
    >>> from kafka.producer import SimpleProducer
    >>> kclient = KafkaClient("broker1,broker2,broker3")
    No handlers could be found for logger "kafka"
    >>> kproducer = SimpleProducer(kclient,batch_send=True, batch_send_every_n=100,  batch_send_every_t=10)

Fails:

    >>> import gevent.monkey
    >>> gevent.monkey.patch_all() 
    >>> kclient = KafkaClient("broker1,broker2,broker3")
    >>> kproducer = SimpleProducer(kclient,batch_send=True, batch_send_every_n=100, batch_send_every_t=10)
  Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python2.7/dist-packages/kafka_python-0.9.2-py2.7.egg/kafka/producer.py", line 240, in __init__
    batch_send_every_t)
  File "/usr/local/lib/python2.7/dist-packages/kafka_python-0.9.2-py2.7.egg/kafka/producer.py", line 143, in __init__
    self.client.copy(),
  File "/usr/local/lib/python2.7/dist-packages/kafka_python-0.9.2-py2.7.egg/kafka/client.py", line 235, in copy
    c = copy.deepcopy(self)
  File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
    y = copier(x, memo)
  File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
    y = copier(x, memo)
  File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
    y = copier(x, memo)
  File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
    y = copier(x, memo)
  File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/lib/python2.7/copy.py", line 182, in deepcopy
    rv = reductor(2)
TypeError: cannot serialize 'Hub' object
@jshaw86
Copy link
Contributor Author

jshaw86 commented Dec 10, 2014

So i got around this by passing a batch of messages into kproducer.send_messages effectively batching on my own. This is fine with me I'll leave the ticket open since it's broke but it's non urgent for me at this point.

@coffenbacher
Copy link

Yep, this is broken for me too. +1. I worked around with async=False for now.

@edenhill
Copy link

edenhill commented Jul 1, 2015

It'd be good if this was fixed to allow proper use in gevent-based applications.

@jackyfkc
Copy link

jackyfkc commented Jul 7, 2015

In my opinion, gevent only works well (increase throughput) when there are lots of independent connections. As for each green thread, it can use an independent connection. However kafka-python does not maintain a connection pool for each broker. So for current kafka-python, gevent could only work properly when publishing messages to different brokers (each broker has a owned connection) at the same time.

We'd better to implement a connection pool if we want to use gevent to increase throughput. Other option could be integrated with Tornado.

@dpkp
Copy link
Owner

dpkp commented Jan 30, 2016

Most of the inner-workings of kafka-python have changed, so I'm going to close this and only keep open a single 'work with kombu' issue.

@dpkp dpkp closed this as completed Jan 30, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants