From a2fbbf688402a33b48ade378b4fe6a1e1efd6ee6 Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Wed, 25 May 2016 16:04:41 +0200 Subject: [PATCH 1/4] Py3: use bytes for Message payload and key --- confluent_kafka/src/Producer.c | 4 ++-- confluent_kafka/src/confluent_kafka.c | 10 +++++----- confluent_kafka/src/confluent_kafka.h | 9 +++++---- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/confluent_kafka/src/Producer.c b/confluent_kafka/src/Producer.c index 438642051..c69ca07dc 100644 --- a/confluent_kafka/src/Producer.c +++ b/confluent_kafka/src/Producer.c @@ -370,8 +370,8 @@ static PyMethodDef Producer_methods[] = { "message has been succesfully delivered or permanently fails delivery.\n" "\n" " :param str topic: Topic to produce message to\n" - " :param str value: Message payload\n" - " :param str key: Message key\n" + " :param str|bytes value: Message payload\n" + " :param str|bytes key: Message key\n" " :param int partition: Partition to produce to, elses uses the " "configured partitioner.\n" " :param func on_delivery(err,msg): Delivery report callback to call " diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c index a39a0c4b4..e85ef2025 100644 --- a/confluent_kafka/src/confluent_kafka.c +++ b/confluent_kafka/src/confluent_kafka.c @@ -339,12 +339,12 @@ static PyMethodDef Message_methods[] = { { "value", (PyCFunction)Message_value, METH_NOARGS, " :returns: message value (payload) or None if not available.\n" - " :rtype: str or None\n" + " :rtype: str|bytes or None\n" "\n" }, { "key", (PyCFunction)Message_key, METH_NOARGS, " :returns: message key or None if not available.\n" - " :rtype: str or None\n" + " :rtype: str|bytes or None\n" "\n" }, { "topic", (PyCFunction)Message_topic, METH_NOARGS, @@ -486,10 +486,10 @@ PyObject *Message_new0 (const rd_kafka_message_t *rkm) { self->topic = cfl_PyUnistr( _FromString(rd_kafka_topic_name(rkm->rkt))); if (rkm->payload) - self->value = cfl_PyUnistr(_FromStringAndSize(rkm->payload, - rkm->len)); + self->value = cfl_PyBin(_FromStringAndSize(rkm->payload, + rkm->len)); if (rkm->key) - self->key = cfl_PyUnistr( + self->key = cfl_PyBin( _FromStringAndSize(rkm->key, rkm->key_len)); self->partition = rkm->partition; diff --git a/confluent_kafka/src/confluent_kafka.h b/confluent_kafka/src/confluent_kafka.h index e8b15c90a..b96e88129 100644 --- a/confluent_kafka/src/confluent_kafka.h +++ b/confluent_kafka/src/confluent_kafka.h @@ -41,12 +41,12 @@ * ****************************************************************************/ -#ifdef PY3 +#ifdef PY3 /* Python 3 */ /** * @brief Binary type, use as cfl_PyBin(_X(A,B)) where _X() is the type-less - * suffix of a PyBinary/Str_X() function + * suffix of a PyBytes/Str_X() function */ -#define cfl_PyBin(X) PyBinary ## X +#define cfl_PyBin(X) PyBytes ## X /** * @brief Unicode type, same usage as PyBin() @@ -62,7 +62,8 @@ * @returns Unicode Python string object */ #define cfl_PyObject_Unistr(X) PyObject_Str(X) -#else + +#else /* Python 2 */ /* See comments above */ #define cfl_PyBin(X) PyString ## X From 1652550528b3fcd3ec2909a74039ba23ce50a320 Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Wed, 25 May 2016 16:05:26 +0200 Subject: [PATCH 2/4] Improved README --- README.md | 82 ++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 66 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 578679b76..51349e583 100644 --- a/README.md +++ b/README.md @@ -1,40 +1,94 @@ Confluent's Apache Kafka client for Python ========================================== +Confluent's Kafka client for Python wraps the librdkafka C library, providing +full Kafka protocol support at great performance and reliability. -Prerequisites -=============== +The Python bindings provides a high-level Producer and Consumer with support +for the balanced consumer groups of Apache Kafka 0.9. - librdkafka >=0.9.1 (or master>=2016-04-13) - py.test (pip install pytest) +See the [API documentation](http://docs.confluent.io/3.0.0/clients/confluent-kafka-python/index.html) for more info. -Build +Usage ===== - python setup.by build +**Producer:** + + from confluent_kafka import Producer + + p = Producer({'bootstrap.servers': 'mybroker,mybroker2'}) + for data in some_data_source: + p.produce('mytopic', data.encode('utf-8')) + p.flush() + + +**High-level Consumer:** + + from confluent_kafka import Consumer + + c = Consumer({'bootstrap.servers': 'mybroker', 'group.id': 'mygroup', + 'default.topic.config': {'auto.offset.reset': 'smallest'}}) + c.subscribe(['mytopic']) + while running: + msg = c.poll() + if not msg.error(): + print('Received message: %s' % msg.value().decode('utf-8')) + c.close() + + + +See [examples](examples) for more examples. + + + +Prerequisites +============= + + * Python >= 2.7 or Python 3.x + * [librdkafka](https://github.com/edenhill/librdkafka) >= 0.9.1 Install ======= -Preferably in a virtualenv: + +**Install from PyPi:** + + pip install confluent-kafka + + +**Install from source / tarball:** pip install . -Run unit-tests -============== +Build +===== + + python setup.by build + + + + +Tests +===== + + +**Run unit-tests:** py.test +**NOTE**: Requires py.test, install by `pip install pytest` -Run integration tests -===================== -**WARNING**: These tests require an active Kafka cluster and will make use of a topic named 'test'. + +**Run integration tests:** examples/integration_test.py +**WARNING**: These tests require an active Kafka cluster and will make use of a topic named 'test'. + + Generate documentation @@ -51,7 +105,3 @@ or: Documentation will be generated in `docs/_build/` -Examples -======== - -See [examples](examples) From 1eab59620324a473ff909a5c23963561f2bc8d1e Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Wed, 25 May 2016 23:11:39 +0200 Subject: [PATCH 3/4] Corrected API doc link --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 51349e583..790f855c2 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ full Kafka protocol support at great performance and reliability. The Python bindings provides a high-level Producer and Consumer with support for the balanced consumer groups of Apache Kafka 0.9. -See the [API documentation](http://docs.confluent.io/3.0.0/clients/confluent-kafka-python/index.html) for more info. +See the [API documentation](http://docs.confluent.io/current/clients/confluent-kafka-python/index.html) for more info. Usage From 99b641372e0eb60d32766afd3cc717c3bcca3b72 Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Thu, 26 May 2016 09:03:29 +0200 Subject: [PATCH 4/4] Updated README --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 790f855c2..c1fda7ca0 100644 --- a/README.md +++ b/README.md @@ -2,13 +2,15 @@ Confluent's Apache Kafka client for Python ========================================== Confluent's Kafka client for Python wraps the librdkafka C library, providing -full Kafka protocol support at great performance and reliability. +full Kafka protocol support with great performance and reliability. The Python bindings provides a high-level Producer and Consumer with support for the balanced consumer groups of Apache Kafka 0.9. See the [API documentation](http://docs.confluent.io/current/clients/confluent-kafka-python/index.html) for more info. +**License**: [Apache License v2.0](http://www.apache.org/licenses/LICENSE-2.0) + Usage =====