diff --git a/confluent_kafka/kafkatest/verifiable_consumer.py b/confluent_kafka/kafkatest/verifiable_consumer.py index 3e01367db..c97432678 100755 --- a/confluent_kafka/kafkatest/verifiable_consumer.py +++ b/confluent_kafka/kafkatest/verifiable_consumer.py @@ -132,7 +132,7 @@ def on_commit (self, err, partitions): self.send(d) - def do_commit (self, immediate=False, async=None): + def do_commit (self, immediate=False, asynchronous=None): """ Commit every 1000 messages or whenever there is a consume timeout or immediate. """ if (self.use_auto_commit or @@ -145,17 +145,17 @@ def do_commit (self, immediate=False, async=None): if self.consumed_msgs_at_last_commit < self.consumed_msgs: self.send_records_consumed(immediate=True) - if async is None: + if asynchronous is None: async_mode = self.use_async_commit else: - async_mode = async + async_mode = asynchronous self.dbg('Committing %d messages (Async=%s)' % (self.consumed_msgs - self.consumed_msgs_at_last_commit, async_mode)) try: - self.consumer.commit(async=async_mode) + self.consumer.commit(asynchronous=async_mode) except KafkaException as e: if e.args[0].code() == KafkaError._WAIT_COORD: self.dbg('Ignoring commit failure, still waiting for coordinator') @@ -279,7 +279,7 @@ def to_dict (self): vc.dbg('Closing consumer') vc.send_records_consumed(immediate=True) if not vc.use_auto_commit: - vc.do_commit(immediate=True, async=False) + vc.do_commit(immediate=True, asynchronous=False) vc.consumer.close() diff --git a/confluent_kafka/src/Consumer.c b/confluent_kafka/src/Consumer.c index 80e136a9e..ed1a7f010 100644 --- a/confluent_kafka/src/Consumer.c +++ b/confluent_kafka/src/Consumer.c @@ -219,8 +219,8 @@ static PyObject *Consumer_commit (Consumer *self, PyObject *args, rd_kafka_resp_err_t err; PyObject *msg = NULL, *offsets = NULL, *async_o = NULL; rd_kafka_topic_partition_list_t *c_offsets; - int async = 1; - static char *kws[] = { "message", "offsets", "async",NULL }; + int asynchronous = 1; + static char *kws[] = { "message", "offsets", "asynchronous",NULL }; if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OOO", kws, &msg, &offsets, &async_o)) @@ -233,7 +233,7 @@ static PyObject *Consumer_commit (Consumer *self, PyObject *args, } if (async_o) - async = PyObject_IsTrue(async_o); + asynchronous = PyObject_IsTrue(async_o); if (offsets) { @@ -259,11 +259,11 @@ static PyObject *Consumer_commit (Consumer *self, PyObject *args, } else { c_offsets = NULL; - err = rd_kafka_commit(self->rk, NULL, async); + err = rd_kafka_commit(self->rk, NULL, asynchronous); } - - err = rd_kafka_commit(self->rk, c_offsets, async); + + err = rd_kafka_commit(self->rk, c_offsets, asynchronous); if (c_offsets) rd_kafka_topic_partition_list_destroy(c_offsets); @@ -455,7 +455,7 @@ static PyMethodDef Consumer_methods[] = { "\n" }, { "commit", (PyCFunction)Consumer_commit, METH_VARARGS|METH_KEYWORDS, - ".. py:function:: commit([message=None], [offsets=None], [async=True])\n" + ".. py:function:: commit([message=None], [offsets=None], [asynchronous=True])\n" "\n" " Commit a message or a list of offsets.\n" "\n" @@ -464,11 +464,11 @@ static PyMethodDef Consumer_methods[] = { "\n" " :param confluent_kafka.Message message: Commit message's offset+1.\n" " :param list(TopicPartition) offsets: List of topic+partitions+offsets to commit.\n" - " :param bool async: Asynchronous commit, return immediately.\n" + " :param bool asynchronous: Asynchronous commit, return immediately.\n" " :rtype: None\n" " :raises: KafkaException\n" "\n" - }, + }, { "committed", (PyCFunction)Consumer_committed, METH_VARARGS|METH_KEYWORDS, ".. py:function:: committed(partitions, [timeout=None])\n" @@ -717,5 +717,3 @@ static PyObject *Consumer_new (PyTypeObject *type, PyObject *args, return (PyObject *)self; } - - diff --git a/examples/integration_test.py b/examples/integration_test.py index f2c5d43bf..8e0bc0d0e 100755 --- a/examples/integration_test.py +++ b/examples/integration_test.py @@ -244,9 +244,9 @@ def verify_consumer(): if (msg.offset() % 5) == 0: # Async commit - c.commit(msg, async=True) + c.commit(msg, asynchronous=True) elif (msg.offset() % 4) == 0: - c.commit(msg, async=False) + c.commit(msg, asynchronous=False) msgcnt += 1 if msgcnt >= max_msgcnt: diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index 8d538123e..3306f36a6 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -41,10 +41,10 @@ def dummy_assign_revoke (consumer, partitions): kc.unassign() - kc.commit(async=True) + kc.commit(asynchronous=True) try: - kc.commit(async=False) + kc.commit(asynchronous=False) except KafkaException as e: assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._NO_OFFSET)