diff --git a/.gitignore b/.gitignore index cc1d06f81..dbbd340fa 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ build dist MANIFEST env +*~ diff --git a/kafka/client.py b/kafka/client.py index a683fe006..10a79cf9e 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -38,6 +38,9 @@ def __init__(self, hosts, client_id=CLIENT_ID, self.topic_partitions = {} # topic_id -> [0, 1, 2, ...] self.load_metadata_for_topics() # bootstrap with all metadata + def create_connection(self, host, port): + return KafkaConnection(host, port, timeout=self.timeout) + ################## # Private API # ################## @@ -56,8 +59,7 @@ def _get_conn_for_broker(self, broker): Get or create a connection to a broker """ if (broker.host, broker.port) not in self.conns: - self.conns[(broker.host, broker.port)] = \ - KafkaConnection(broker.host, broker.port, timeout=self.timeout) + self.conns[(broker.host, broker.port)] = self.create_connection(broker.host, broker.port) return self._get_conn(broker.host, broker.port) diff --git a/kafka/conn.py b/kafka/conn.py index 7538e8d3b..e609a327b 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -54,11 +54,9 @@ def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): super(KafkaConnection, self).__init__() self.host = host self.port = port - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((host, port)) + self._sock = None self.timeout = timeout - self._sock.settimeout(self.timeout) - self._dirty = False + self.reinit() def __repr__(self): return "" % (self.host, self.port) diff --git a/kafka/green/__init__.py b/kafka/green/__init__.py new file mode 100644 index 000000000..798ba5108 --- /dev/null +++ b/kafka/green/__init__.py @@ -0,0 +1,11 @@ +from kafka import * + +from kafka.green.producer import _Producer, _SimpleProducer, _KeyedProducer +from kafka.green.conn import _KafkaConnection +from kafka.green.client import _KafkaClient + +Producer=_Producer +SimpleProducer=_SimpleProducer +KeyedProducer=_KeyedProducer +KafkaConnection=_KafkaConnection +KafkaClient=_KafkaClient diff --git a/kafka/green/client.py b/kafka/green/client.py new file mode 100644 index 000000000..0e6575bd3 --- /dev/null +++ b/kafka/green/client.py @@ -0,0 +1,17 @@ +from kafka.client import KafkaClient, DEFAULT_SOCKET_TIMEOUT_SECONDS + +from .conn import _KafkaConnection + +class _KafkaClient(KafkaClient): + + def __init__(self, hosts, client_id=KafkaClient.CLIENT_ID, + timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): + super(_KafkaClient, self).__init__(hosts=hosts, client_id=client_id, timeout=timeout) + + def copy(self): + # have to override this since copy.deepcopy cannot serialize + # a gevent.socket + return _KafkaClient(['{}:{}'.format(entry[0], entry[1]) for entry in self.hosts], self.client_id, self.timeout) + + def create_connection(self, host, port): + return _KafkaConnection(host, port, timeout=self.timeout) diff --git a/kafka/green/conn.py b/kafka/green/conn.py new file mode 100644 index 000000000..a89058ee3 --- /dev/null +++ b/kafka/green/conn.py @@ -0,0 +1,21 @@ +import gevent.socket as socket + +from kafka.conn import KafkaConnection + +class _KafkaConnection(KafkaConnection): + """ + Gevent version of kafka.KafkaConnection class. Uses + gevent.socket instead of socket.socket. + """ + def __init__(self, host, port, timeout=10): + super(_KafkaConnection, self).__init__(host, port, timeout) + + def reinit(self): + """ + Re-initialize the socket connection + """ + self.close() + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.connect((self.host, self.port)) + self._sock.settimeout(self.timeout) + self._dirty = False diff --git a/kafka/green/producer.py b/kafka/green/producer.py new file mode 100644 index 000000000..be39b3284 --- /dev/null +++ b/kafka/green/producer.py @@ -0,0 +1,33 @@ +from kafka.producer import Producer, _send_upstream, STOP_ASYNC_PRODUCER, BATCH_SEND_MSG_COUNT, BATCH_SEND_DEFAULT_INTERVAL, SimpleProducer, KeyedProducer + +import gevent +from gevent.queue import Queue + +class _ProducerMixin(object): + + def _setup_async(self, batch_send_every_n, batch_send_every_t): + if self.async: + self.queue = Queue() # Messages are sent through this queue + self.job = gevent.spawn(_send_upstream, + self.queue, + self.client.copy(), + batch_send_every_t, + batch_send_every_n, + self.req_acks, + self.ack_timeout) + + def stop(self, timeout=1): + if self.async: + self.queue.put((STOP_ASYNC_PRODUCER, None)) + self.job.join(timeout) + if self.job.dead is False: + self.job.kill() + +class _Producer(_ProducerMixin, Producer): + pass + +class _SimpleProducer(_ProducerMixin, SimpleProducer): + pass + +class _KeyedProducer(_ProducerMixin, KeyedProducer): + pass diff --git a/kafka/producer.py b/kafka/producer.py index 12a293401..fa33ab5eb 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -117,7 +117,9 @@ def __init__(self, client, async=False, self.async = async self.req_acks = req_acks self.ack_timeout = ack_timeout + self._setup_async(batch_send_every_n, batch_send_every_t) + def _setup_async(self, batch_send_every_n, batch_send_every_t): if self.async: self.queue = Queue() # Messages are sent through this queue self.proc = Process(target=_send_upstream, diff --git a/setup.py b/setup.py index 417613553..5969066f7 100644 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ def run(self): tests_require=["tox"], cmdclass={"test": Tox}, - packages=["kafka"], + packages=["kafka", "kafka.green"], author="David Arthur", author_email="mumrah@gmail.com",