Skip to content

using async in names is deprecated on python 3.6 #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions confluent_kafka/kafkatest/verifiable_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def on_commit (self, err, partitions):
self.send(d)


def do_commit (self, immediate=False, async=None):
def do_commit (self, immediate=False, asynchronous=None):
""" Commit every 1000 messages or whenever there is a consume timeout
or immediate. """
if (self.use_auto_commit or
Expand All @@ -145,17 +145,17 @@ def do_commit (self, immediate=False, async=None):
if self.consumed_msgs_at_last_commit < self.consumed_msgs:
self.send_records_consumed(immediate=True)

if async is None:
if asynchronous is None:
async_mode = self.use_async_commit
else:
async_mode = async
async_mode = asynchronous

self.dbg('Committing %d messages (Async=%s)' %
(self.consumed_msgs - self.consumed_msgs_at_last_commit,
async_mode))

try:
self.consumer.commit(async=async_mode)
self.consumer.commit(asynchronous=async_mode)
except KafkaException as e:
if e.args[0].code() == KafkaError._WAIT_COORD:
self.dbg('Ignoring commit failure, still waiting for coordinator')
Expand Down Expand Up @@ -279,7 +279,7 @@ def to_dict (self):
vc.dbg('Closing consumer')
vc.send_records_consumed(immediate=True)
if not vc.use_auto_commit:
vc.do_commit(immediate=True, async=False)
vc.do_commit(immediate=True, asynchronous=False)

vc.consumer.close()

Expand Down
20 changes: 9 additions & 11 deletions confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ static PyObject *Consumer_commit (Consumer *self, PyObject *args,
rd_kafka_resp_err_t err;
PyObject *msg = NULL, *offsets = NULL, *async_o = NULL;
rd_kafka_topic_partition_list_t *c_offsets;
int async = 1;
static char *kws[] = { "message", "offsets", "async",NULL };
int asynchronous = 1;
static char *kws[] = { "message", "offsets", "asynchronous",NULL };

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OOO", kws,
&msg, &offsets, &async_o))
Expand All @@ -233,7 +233,7 @@ static PyObject *Consumer_commit (Consumer *self, PyObject *args,
}

if (async_o)
async = PyObject_IsTrue(async_o);
asynchronous = PyObject_IsTrue(async_o);


if (offsets) {
Expand All @@ -259,11 +259,11 @@ static PyObject *Consumer_commit (Consumer *self, PyObject *args,

} else {
c_offsets = NULL;
err = rd_kafka_commit(self->rk, NULL, async);
err = rd_kafka_commit(self->rk, NULL, asynchronous);
}

err = rd_kafka_commit(self->rk, c_offsets, async);

err = rd_kafka_commit(self->rk, c_offsets, asynchronous);

if (c_offsets)
rd_kafka_topic_partition_list_destroy(c_offsets);
Expand Down Expand Up @@ -455,7 +455,7 @@ static PyMethodDef Consumer_methods[] = {
"\n"
},
{ "commit", (PyCFunction)Consumer_commit, METH_VARARGS|METH_KEYWORDS,
".. py:function:: commit([message=None], [offsets=None], [async=True])\n"
".. py:function:: commit([message=None], [offsets=None], [asynchronous=True])\n"
"\n"
" Commit a message or a list of offsets.\n"
"\n"
Expand All @@ -464,11 +464,11 @@ static PyMethodDef Consumer_methods[] = {
"\n"
" :param confluent_kafka.Message message: Commit message's offset+1.\n"
" :param list(TopicPartition) offsets: List of topic+partitions+offsets to commit.\n"
" :param bool async: Asynchronous commit, return immediately.\n"
" :param bool asynchronous: Asynchronous commit, return immediately.\n"
" :rtype: None\n"
" :raises: KafkaException\n"
"\n"
},
},
{ "committed", (PyCFunction)Consumer_committed,
METH_VARARGS|METH_KEYWORDS,
".. py:function:: committed(partitions, [timeout=None])\n"
Expand Down Expand Up @@ -717,5 +717,3 @@ static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,

return (PyObject *)self;
}


4 changes: 2 additions & 2 deletions examples/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,9 @@ def verify_consumer():

if (msg.offset() % 5) == 0:
# Async commit
c.commit(msg, async=True)
c.commit(msg, asynchronous=True)
elif (msg.offset() % 4) == 0:
c.commit(msg, async=False)
c.commit(msg, asynchronous=False)

msgcnt += 1
if msgcnt >= max_msgcnt:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_Consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ def dummy_assign_revoke (consumer, partitions):

kc.unassign()

kc.commit(async=True)
kc.commit(asynchronous=True)

try:
kc.commit(async=False)
kc.commit(asynchronous=False)
except KafkaException as e:
assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._NO_OFFSET)

Expand Down