Skip to content

Py3: use bytes for Message payload and key #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 68 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,40 +1,96 @@
Confluent's Apache Kafka client for Python
==========================================

Confluent's Kafka client for Python wraps the librdkafka C library, providing
full Kafka protocol support with great performance and reliability.

Prerequisites
===============
The Python bindings provides a high-level Producer and Consumer with support
for the balanced consumer groups of Apache Kafka 0.9.

librdkafka >=0.9.1 (or master>=2016-04-13)
py.test (pip install pytest)
See the [API documentation](http://docs.confluent.io/current/clients/confluent-kafka-python/index.html) for more info.

**License**: [Apache License v2.0](http://www.apache.org/licenses/LICENSE-2.0)

Build

Usage
=====

python setup.by build
**Producer:**

from confluent_kafka import Producer

p = Producer({'bootstrap.servers': 'mybroker,mybroker2'})
for data in some_data_source:
p.produce('mytopic', data.encode('utf-8'))
p.flush()


**High-level Consumer:**

from confluent_kafka import Consumer

c = Consumer({'bootstrap.servers': 'mybroker', 'group.id': 'mygroup',
'default.topic.config': {'auto.offset.reset': 'smallest'}})
c.subscribe(['mytopic'])
while running:
msg = c.poll()
if not msg.error():
print('Received message: %s' % msg.value().decode('utf-8'))
c.close()



See [examples](examples) for more examples.



Prerequisites
=============

* Python >= 2.7 or Python 3.x
* [librdkafka](https://github.com/edenhill/librdkafka) >= 0.9.1



Install
=======
Preferably in a virtualenv:

**Install from PyPi:**

pip install confluent-kafka


**Install from source / tarball:**

pip install .


Run unit-tests
==============
Build
=====

python setup.by build




Tests
=====


**Run unit-tests:**

py.test

**NOTE**: Requires py.test, install by `pip install pytest`

Run integration tests
=====================
**WARNING**: These tests require an active Kafka cluster and will make use of a topic named 'test'.

**Run integration tests:**

examples/integration_test.py <kafka-broker>

**WARNING**: These tests require an active Kafka cluster and will make use of a topic named 'test'.




Generate documentation
Expand All @@ -51,7 +107,3 @@ or:
Documentation will be generated in `docs/_build/`


Examples
========

See [examples](examples)
4 changes: 2 additions & 2 deletions confluent_kafka/src/Producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,8 @@ static PyMethodDef Producer_methods[] = {
"message has been succesfully delivered or permanently fails delivery.\n"
"\n"
" :param str topic: Topic to produce message to\n"
" :param str value: Message payload\n"
" :param str key: Message key\n"
" :param str|bytes value: Message payload\n"
" :param str|bytes key: Message key\n"
" :param int partition: Partition to produce to, elses uses the "
"configured partitioner.\n"
" :param func on_delivery(err,msg): Delivery report callback to call "
Expand Down
10 changes: 5 additions & 5 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -339,12 +339,12 @@ static PyMethodDef Message_methods[] = {

{ "value", (PyCFunction)Message_value, METH_NOARGS,
" :returns: message value (payload) or None if not available.\n"
" :rtype: str or None\n"
" :rtype: str|bytes or None\n"
"\n"
},
{ "key", (PyCFunction)Message_key, METH_NOARGS,
" :returns: message key or None if not available.\n"
" :rtype: str or None\n"
" :rtype: str|bytes or None\n"
"\n"
},
{ "topic", (PyCFunction)Message_topic, METH_NOARGS,
Expand Down Expand Up @@ -486,10 +486,10 @@ PyObject *Message_new0 (const rd_kafka_message_t *rkm) {
self->topic = cfl_PyUnistr(
_FromString(rd_kafka_topic_name(rkm->rkt)));
if (rkm->payload)
self->value = cfl_PyUnistr(_FromStringAndSize(rkm->payload,
rkm->len));
self->value = cfl_PyBin(_FromStringAndSize(rkm->payload,
rkm->len));
if (rkm->key)
self->key = cfl_PyUnistr(
self->key = cfl_PyBin(
_FromStringAndSize(rkm->key, rkm->key_len));

self->partition = rkm->partition;
Expand Down
9 changes: 5 additions & 4 deletions confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@
*
****************************************************************************/

#ifdef PY3
#ifdef PY3 /* Python 3 */
/**
* @brief Binary type, use as cfl_PyBin(_X(A,B)) where _X() is the type-less
* suffix of a PyBinary/Str_X() function
* suffix of a PyBytes/Str_X() function
*/
#define cfl_PyBin(X) PyBinary ## X
#define cfl_PyBin(X) PyBytes ## X

/**
* @brief Unicode type, same usage as PyBin()
Expand All @@ -62,7 +62,8 @@
* @returns Unicode Python string object
*/
#define cfl_PyObject_Unistr(X) PyObject_Str(X)
#else

#else /* Python 2 */

/* See comments above */
#define cfl_PyBin(X) PyString ## X
Expand Down