Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

getting "This socket is already used by another greenlet" #709

Closed
vershininm opened this issue May 31, 2016 · 4 comments
Closed

getting "This socket is already used by another greenlet" #709

vershininm opened this issue May 31, 2016 · 4 comments

Comments

@vershininm
Copy link

Hi,
I know you are not testing kafka-python with gevent, anyway it seems work quite good with gevent.
We currently experiencing only one issue with it. From time to time we getting following exception:

  File "brain/series/writer.py", line 555, in submit
    kafka_async_producer.send('seriesupdates', su)
  File "kafka/producer/kafka.py", line 414, in send
    self._sender.wakeup()
  File "kafka/producer/sender.py", line 293, in wakeup
    self._client.wakeup()
  File "kafka/client_async.py", line 686, in wakeup
    if self._wake_w.send(b'x') != 1:
  File "gevent/_socket2.py", line 325, in send
    self._wait(self._write_event)
  File "gevent/_socket2.py", line 173, in _wait
    raise _socketcommon.ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (watcher.callback, ))

Could you please look into this? Probably some thread lock could solve it?

Thanks!

@BlackRider97
Copy link

@dribler How you are using kafka-python with gevent ?
Can you give me sample code for producer with greenlate threads?

@vershininm
Copy link
Author

vershininm commented Jun 3, 2016

We don't actually use greenlets explicitly. Our producer is a web app and we use uwsgi to run it:

uwsgi --socket=0.0.0.0:9000 --wsgi-file app.py --loop gevent  --master --gevent 250 ....

In app.py we just do at first line

from gevent.monkey import patch_all; patch_all()

We use single KafkaProducer instance per uwsgi worker.

I was able to reproduce same issue with following:

from gevent.monkey import patch_all
patch_all()

from gevent import spawn, sleep
from kafka import KafkaProducer
import time

producer = KafkaProducer(....)

def produce():
    time.sleep(5)   # didn't look deep into this, but doesn't work w/o it
    for i in range(1000):
        producer.send('test', str(i) * 1024)
    print('finished')

g1 = spawn(produce)
g2 = spawn(produce)

g1.join(), g2.join()

As quick fix for me worked simple threading.Lock. So actually i have temporary solution.

    def wakeup(self):
        with self._wakeup_lock:
            if self._wake_w.send(b'x') != 1:
                log.warning('Unable to send to wakeup socket!')

@jianbin-wei
Copy link
Contributor

We also got exception like following. kafka-python 0.9.5.

2016-07-18 16:22:25,952 ERROR       ns_producer             send:94 25344   140033434666816 trid=542f6ed5-f2d2-4fba-8e98-e57c97fdf517   failed to send message: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent.hub.Waiter object at 0xa056b90>> re-init producer
Traceback (most recent call last):
  File "/opt/ns/nsenv/local/lib/python2.7/site-packages/kafka/producer/keyed.py", line 43, in send_messages
    return self._send_messages(topic, partition, *msg, key=key)
  File "/opt/ns/nsenv/local/lib/python2.7/site-packages/kafka/producer/base.py", line 411, in _send_messages
    fail_on_error=self.sync_fail_on_error
  File "/opt/ns/nsenv/local/lib/python2.7/site-packages/kafka/client.py", line 610, in send_produce_request
    resps = self._send_broker_aware_request(payloads, encoder, decoder)
  File "/opt/ns/nsenv/local/lib/python2.7/site-packages/kafka/client.py", line 247, in _send_broker_aware_request
    response = conn.recv(requestId)
  File "/opt/ns/nsenv/local/lib/python2.7/site-packages/kafka/conn.py", line 164, in recv
    resp = self._read_bytes(4)
  File "/opt/ns/nsenv/local/lib/python2.7/site-packages/kafka/conn.py", line 97, in _read_bytes
    data = self._sock.recv(min(bytes_left, 4096))
  File "/opt/ns/nsenv/local/lib/python2.7/site-packages/gevent/_socket2.py", line 280, in recv
    self._wait(self._read_event)
  File "/opt/ns/nsenv/local/lib/python2.7/site-packages/gevent/_socket2.py", line 173, in _wait
    raise _socketcommon.ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (watcher.callback, ))
ConcurrentObjectUseError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent.hub.Waiter object at 0xa056b90>>

@dpkp
Copy link
Owner

dpkp commented Jul 21, 2016 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants