From 6cc477dcbaff158f60014a946a485c07e8e38195 Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Mon, 19 Feb 2018 13:44:49 +0100 Subject: [PATCH] Rename async -> asynchronous arguments (but keep async for now) #21 --- confluent_kafka/kafkatest/verifiable_consumer.py | 12 ++++++------ confluent_kafka/src/Consumer.c | 16 ++++++++-------- examples/integration_test.py | 10 +++++----- tests/test_Consumer.py | 6 +++--- 4 files changed, 22 insertions(+), 22 deletions(-) diff --git a/confluent_kafka/kafkatest/verifiable_consumer.py b/confluent_kafka/kafkatest/verifiable_consumer.py index c335bcc53..846f95d31 100755 --- a/confluent_kafka/kafkatest/verifiable_consumer.py +++ b/confluent_kafka/kafkatest/verifiable_consumer.py @@ -97,7 +97,7 @@ def on_revoke(self, consumer, partitions): # Send final consumed records prior to rebalancing to make sure # latest consumed is in par with what is going to be committed. self.send_records_consumed(immediate=True) - self.do_commit(immediate=True, async=False) + self.do_commit(immediate=True, asynchronous=False) self.assignment = list() self.assignment_dict = dict() self.send_assignment('revoked', partitions) @@ -133,7 +133,7 @@ def on_commit(self, err, partitions): self.send(d) - def do_commit(self, immediate=False, async=None): + def do_commit(self, immediate=False, asynchronous=None): """ Commit every 1000 messages or whenever there is a consume timeout or immediate. """ if (self.use_auto_commit @@ -146,10 +146,10 @@ def do_commit(self, immediate=False, async=None): if self.consumed_msgs_at_last_commit < self.consumed_msgs: self.send_records_consumed(immediate=True) - if async is None: + if asynchronous is None: async_mode = self.use_async_commit else: - async_mode = async + async_mode = asynchronous self.dbg('Committing %d messages (Async=%s)' % (self.consumed_msgs - self.consumed_msgs_at_last_commit, @@ -159,7 +159,7 @@ def do_commit(self, immediate=False, async=None): while True: try: self.dbg('Commit') - offsets = self.consumer.commit(async=async_mode) + offsets = self.consumer.commit(asynchronous=async_mode) self.dbg('Commit done: offsets %s' % offsets) if not async_mode: @@ -300,7 +300,7 @@ def to_dict(self): vc.dbg('Closing consumer') vc.send_records_consumed(immediate=True) if not vc.use_auto_commit: - vc.do_commit(immediate=True, async=False) + vc.do_commit(immediate=True, asynchronous=False) vc.consumer.close() diff --git a/confluent_kafka/src/Consumer.c b/confluent_kafka/src/Consumer.c index fb737831b..838182f08 100644 --- a/confluent_kafka/src/Consumer.c +++ b/confluent_kafka/src/Consumer.c @@ -369,13 +369,13 @@ Consumer_offset_commit_return_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, static PyObject *Consumer_commit (Handle *self, PyObject *args, - PyObject *kwargs) { - + PyObject *kwargs) { rd_kafka_resp_err_t err; PyObject *msg = NULL, *offsets = NULL, *async_o = NULL; rd_kafka_topic_partition_list_t *c_offsets; int async = 1; - static char *kws[] = { "message", "offsets", "async",NULL }; + static char *kws[] = { "message", "offsets", + "async", "asynchronous", NULL }; rd_kafka_queue_t *rkqu = NULL; struct commit_return commit_return; PyThreadState *thread_state; @@ -386,8 +386,8 @@ static PyObject *Consumer_commit (Handle *self, PyObject *args, return NULL; } - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OOO", kws, - &msg, &offsets, &async_o)) + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OOOO", kws, + &msg, &offsets, &async_o, &async_o)) return NULL; if (msg && offsets) { @@ -1123,7 +1123,7 @@ static PyMethodDef Consumer_methods[] = { "\n" }, { "commit", (PyCFunction)Consumer_commit, METH_VARARGS|METH_KEYWORDS, - ".. py:function:: commit([message=None], [offsets=None], [async=True])\n" + ".. py:function:: commit([message=None], [offsets=None], [asynchronous=True])\n" "\n" " Commit a message or a list of offsets.\n" "\n" @@ -1133,14 +1133,14 @@ static PyMethodDef Consumer_methods[] = { "\n" " :param confluent_kafka.Message message: Commit message's offset+1.\n" " :param list(TopicPartition) offsets: List of topic+partitions+offsets to commit.\n" - " :param bool async: Asynchronous commit, return None immediately. " + " :param bool asynchronous: Asynchronous commit, return None immediately. " "If False the commit() call will block until the commit succeeds or " "fails and the committed offsets will be returned (on success). Note that specific partitions may have failed and the .err field of each partition will need to be checked for success.\n" " :rtype: None|list(TopicPartition)\n" " :raises: KafkaException\n" " :raises: RuntimeError if called on a closed consumer\n" "\n" - }, + }, { "committed", (PyCFunction)Consumer_committed, METH_VARARGS|METH_KEYWORDS, ".. py:function:: committed(partitions, [timeout=None])\n" diff --git a/examples/integration_test.py b/examples/integration_test.py index d1e3200a4..1fd38bd2f 100755 --- a/examples/integration_test.py +++ b/examples/integration_test.py @@ -308,7 +308,7 @@ def verify_avro(): (msg.topic(), msg.partition(), msg.offset(), msg.key(), msg.value(), tstype, timestamp)) - c.commit(msg, async=False) + c.commit(msg, asynchronous=False) # Close consumer c.close() @@ -507,9 +507,9 @@ def print_wmark(consumer, parts): if (msg.offset() % 5) == 0: # Async commit - c.commit(msg, async=True) + c.commit(msg, asynchronous=True) elif (msg.offset() % 4) == 0: - offsets = c.commit(msg, async=False) + offsets = c.commit(msg, asynchronous=False) assert len(offsets) == 1, 'expected 1 offset, not %s' % (offsets) assert offsets[0].offset == msg.offset()+1, \ 'expected offset %d to be committed, not %s' % \ @@ -679,9 +679,9 @@ def verify_batch_consumer(): if (msg.offset() % 5) == 0: # Async commit - c.commit(msg, async=True) + c.commit(msg, asynchronous=True) elif (msg.offset() % 4) == 0: - offsets = c.commit(msg, async=False) + offsets = c.commit(msg, asynchronous=False) assert len(offsets) == 1, 'expected 1 offset, not %s' % (offsets) assert offsets[0].offset == msg.offset()+1, \ 'expected offset %d to be committed, not %s' % \ diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index dfb1a4fd2..b7fdbc624 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -84,10 +84,10 @@ def dummy_assign_revoke(consumer, partitions): kc.unassign() - kc.commit(async=True) + kc.commit(asynchronous=True) try: - kc.commit(async=False) + kc.commit(asynchronous=False) except KafkaException as e: assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._NO_OFFSET) @@ -163,7 +163,7 @@ def commit_cb(cs, err, ps): if cs.once: # Try commit once try: - c.commit(async=False) + c.commit(asynchronous=False) except KafkaException as e: print('commit failed with %s (expected)' % e) assert e.args[0].code() == KafkaError._NO_OFFSET