Skip to content

Rename async -> asynchronous arguments (but keep async for now) #21 #319

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions confluent_kafka/kafkatest/verifiable_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def on_revoke(self, consumer, partitions):
# Send final consumed records prior to rebalancing to make sure
# latest consumed is in par with what is going to be committed.
self.send_records_consumed(immediate=True)
self.do_commit(immediate=True, async=False)
self.do_commit(immediate=True, asynchronous=False)
self.assignment = list()
self.assignment_dict = dict()
self.send_assignment('revoked', partitions)
Expand Down Expand Up @@ -133,7 +133,7 @@ def on_commit(self, err, partitions):

self.send(d)

def do_commit(self, immediate=False, async=None):
def do_commit(self, immediate=False, asynchronous=None):
""" Commit every 1000 messages or whenever there is a consume timeout
or immediate. """
if (self.use_auto_commit
Expand All @@ -146,10 +146,10 @@ def do_commit(self, immediate=False, async=None):
if self.consumed_msgs_at_last_commit < self.consumed_msgs:
self.send_records_consumed(immediate=True)

if async is None:
if asynchronous is None:
async_mode = self.use_async_commit
else:
async_mode = async
async_mode = asynchronous

self.dbg('Committing %d messages (Async=%s)' %
(self.consumed_msgs - self.consumed_msgs_at_last_commit,
Expand All @@ -159,7 +159,7 @@ def do_commit(self, immediate=False, async=None):
while True:
try:
self.dbg('Commit')
offsets = self.consumer.commit(async=async_mode)
offsets = self.consumer.commit(asynchronous=async_mode)
self.dbg('Commit done: offsets %s' % offsets)

if not async_mode:
Expand Down Expand Up @@ -300,7 +300,7 @@ def to_dict(self):
vc.dbg('Closing consumer')
vc.send_records_consumed(immediate=True)
if not vc.use_auto_commit:
vc.do_commit(immediate=True, async=False)
vc.do_commit(immediate=True, asynchronous=False)

vc.consumer.close()

Expand Down
16 changes: 8 additions & 8 deletions confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -369,13 +369,13 @@ Consumer_offset_commit_return_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,


static PyObject *Consumer_commit (Handle *self, PyObject *args,
PyObject *kwargs) {

PyObject *kwargs) {
rd_kafka_resp_err_t err;
PyObject *msg = NULL, *offsets = NULL, *async_o = NULL;
rd_kafka_topic_partition_list_t *c_offsets;
int async = 1;
static char *kws[] = { "message", "offsets", "async",NULL };
static char *kws[] = { "message", "offsets",
"async", "asynchronous", NULL };
rd_kafka_queue_t *rkqu = NULL;
struct commit_return commit_return;
PyThreadState *thread_state;
Expand All @@ -386,8 +386,8 @@ static PyObject *Consumer_commit (Handle *self, PyObject *args,
return NULL;
}

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OOO", kws,
&msg, &offsets, &async_o))
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OOOO", kws,
&msg, &offsets, &async_o, &async_o))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's quite nice, does it just overwrite based on which one is set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this works since ParseTupleAndKeywords returns borrowed references and does not increase the refcount for each matched object (which wouldn't work here).

return NULL;

if (msg && offsets) {
Expand Down Expand Up @@ -1123,7 +1123,7 @@ static PyMethodDef Consumer_methods[] = {
"\n"
},
{ "commit", (PyCFunction)Consumer_commit, METH_VARARGS|METH_KEYWORDS,
".. py:function:: commit([message=None], [offsets=None], [async=True])\n"
".. py:function:: commit([message=None], [offsets=None], [asynchronous=True])\n"
"\n"
" Commit a message or a list of offsets.\n"
"\n"
Expand All @@ -1133,14 +1133,14 @@ static PyMethodDef Consumer_methods[] = {
"\n"
" :param confluent_kafka.Message message: Commit message's offset+1.\n"
" :param list(TopicPartition) offsets: List of topic+partitions+offsets to commit.\n"
" :param bool async: Asynchronous commit, return None immediately. "
" :param bool asynchronous: Asynchronous commit, return None immediately. "
"If False the commit() call will block until the commit succeeds or "
"fails and the committed offsets will be returned (on success). Note that specific partitions may have failed and the .err field of each partition will need to be checked for success.\n"
" :rtype: None|list(TopicPartition)\n"
" :raises: KafkaException\n"
" :raises: RuntimeError if called on a closed consumer\n"
"\n"
},
},
{ "committed", (PyCFunction)Consumer_committed,
METH_VARARGS|METH_KEYWORDS,
".. py:function:: committed(partitions, [timeout=None])\n"
Expand Down
10 changes: 5 additions & 5 deletions examples/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ def verify_avro():
(msg.topic(), msg.partition(), msg.offset(),
msg.key(), msg.value(), tstype, timestamp))

c.commit(msg, async=False)
c.commit(msg, asynchronous=False)

# Close consumer
c.close()
Expand Down Expand Up @@ -507,9 +507,9 @@ def print_wmark(consumer, parts):

if (msg.offset() % 5) == 0:
# Async commit
c.commit(msg, async=True)
c.commit(msg, asynchronous=True)
elif (msg.offset() % 4) == 0:
offsets = c.commit(msg, async=False)
offsets = c.commit(msg, asynchronous=False)
assert len(offsets) == 1, 'expected 1 offset, not %s' % (offsets)
assert offsets[0].offset == msg.offset()+1, \
'expected offset %d to be committed, not %s' % \
Expand Down Expand Up @@ -679,9 +679,9 @@ def verify_batch_consumer():

if (msg.offset() % 5) == 0:
# Async commit
c.commit(msg, async=True)
c.commit(msg, asynchronous=True)
elif (msg.offset() % 4) == 0:
offsets = c.commit(msg, async=False)
offsets = c.commit(msg, asynchronous=False)
assert len(offsets) == 1, 'expected 1 offset, not %s' % (offsets)
assert offsets[0].offset == msg.offset()+1, \
'expected offset %d to be committed, not %s' % \
Expand Down
6 changes: 3 additions & 3 deletions tests/test_Consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ def dummy_assign_revoke(consumer, partitions):

kc.unassign()

kc.commit(async=True)
kc.commit(asynchronous=True)

try:
kc.commit(async=False)
kc.commit(asynchronous=False)
except KafkaException as e:
assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._NO_OFFSET)

Expand Down Expand Up @@ -163,7 +163,7 @@ def commit_cb(cs, err, ps):
if cs.once:
# Try commit once
try:
c.commit(async=False)
c.commit(asynchronous=False)
except KafkaException as e:
print('commit failed with %s (expected)' % e)
assert e.args[0].code() == KafkaError._NO_OFFSET
Expand Down