Skip to content

Commit

Permalink
Address the changes requested by the review.
Browse files Browse the repository at this point in the history
  • Loading branch information
stephan-hof committed Dec 16, 2022
1 parent 9a73aa8 commit 97b3a87
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 46 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
- Added `consumer.memberid()` for getting member id assigned to
the consumer in a consumer group (#1154).
- Added Python 3.11 wheels
- Added `consumer.io_event_enable` to integrate with event-loops like asyncio.
(#1448)


## v1.9.2
Expand Down
73 changes: 65 additions & 8 deletions examples/linux_asyncio_consumer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,43 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2022 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


# This example shows the integration of confluent_kafka.Consumer with asyncio.
#
# This file contains a class called 'AsyncConsumer' which offers the following
# methods from 'confluent_kafka.Consumer': assign, subscribe, poll
# With the advantage that the poll method is defined as 'async def poll()'.
# Allowing it to be used like this: msg = await consumer.poll()
#
# Under the hood it uses 'confluent_kafka.Consumer.io_event_enable' to avoid
# - usage of threads
# - busy loops around confluent_kafka.Consumer.poll(timeout=0)
# 'io_event_enable' makes the Consumer write to a filedescriptor in case a new
# message is ready. Hence to wait for new messages, we simply let asyncio wait
# on that filedescriptor.


# FIXME: This example uses
# * eventfd as the filedescriptor - linux only
# * asyncio.add_reader - Which is (up to now) not supported by the
# Windows ProactorEventLoop. See the following page for more details:
# https://docs.python.org/3.11/library/asyncio-platforms.html
# Under Windows/macOS, try socketpair or pipes as an alternative.

import argparse
import asyncio
import os
Expand All @@ -12,18 +52,22 @@
from confluent_kafka import Consumer
from confluent_kafka import KafkaException


assert sys.platform == 'linux', "This example is linux only, cause of eventfd"


class AsyncConsumer:
def __init__(self, config, logger):
"""Construct a Consumer usable within asyncio.
:param config: A configuration dict for this Consumer
:param logger: A python logger instance.
"""

self.consumer = Consumer(config, logger=logger)

# Sorry Windows/MacOX try something with socketpair or pipe.
self.eventfd = os.eventfd(0, os.EFD_CLOEXEC | os.EFD_NONBLOCK)

# This is channel how librdkafka notifies asyncio.
# This is the channel how the consumer notifies asyncio.
self.loop = asyncio.get_running_loop()
self.loop.add_reader(self.eventfd, self.__eventfd_ready)
self.consumer.io_event_enable(self.eventfd, struct.pack('@q', 1))
Expand All @@ -39,6 +83,7 @@ def __init__(self, config, logger):

@staticmethod
def close_eventd(loop, eventfd):
"""Internal helper method. Not part of the public API."""
loop.remove_reader(eventfd)
os.close(eventfd)

Expand All @@ -59,6 +104,19 @@ def assign(self, *args, **kwargs):
self.consumer.assign(*args, **kwargs)

async def poll(self, timeout=0):
"""Consumes a single message, calls callbacks and returns events.
It is defined a 'async def' and returns an awaitable object a
caller needs to deal with to get the result.
See https://docs.python.org/3/library/asyncio-task.html#awaitables
Which makes it safe (and mandatory) to call it directly in an asyncio
coroutine like this: `msg = await consumer.poll()`
If timeout > 0: Wait at most X seconds for a message.
Returns `None` if no message arrives in time.
If timeout <= 0: Endless wait for a message.
"""
if timeout > 0:
try:
return await asyncio.wait_for(self._poll_no_timeout(), timeout)
Expand Down Expand Up @@ -104,13 +162,12 @@ async def main():
config = {
'bootstrap.servers': arguments.bootstrap_servers,
'group.id': arguments.group_id,
'session.timeout.ms': 6000,
'auto.offset.reset': 'earliest'
}

if arguments.client_status:
if arguments.client_stats:
config['stats_cb'] = stats_cb
config['statistics.interval.ms'] = arguments.client_status
config['statistics.interval.ms'] = arguments.client_stats

consumer = AsyncConsumer(config, logger)
consumer.subscribe(arguments.topic, on_assign=print_assignment)
Expand All @@ -128,7 +185,7 @@ async def main():
msg.topic(),
msg.partition(),
msg.offset(),
str(msg.key())
msg.key()
))
print(msg.value())
finally:
Expand All @@ -140,7 +197,7 @@ def parse_args():
parser.add_argument(
'-T',
metavar='interval',
dest='client_status',
dest='client_stats',
type=int,
default=None
)
Expand Down
82 changes: 44 additions & 38 deletions src/confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1127,32 +1127,41 @@ Consumer_consumer_group_metadata (Handle *self, PyObject *ignore) {
static PyObject *
Consumer_io_event_enable(Handle *self, PyObject *args, PyObject *kwargs)
{
int fd = 0;
const char* data = NULL;
Py_ssize_t data_size = 0;
static char *kws[] = {"fd", "payload", NULL};

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "iy#", kws,
&fd, &data, &data_size))
{
return NULL;
}
int fd = 0;
const char* data = NULL;
Py_ssize_t data_size = 0;
static char *kws[] = {"fd", "payload", NULL};

if (fd <= 0) {
return PyErr_Format(PyExc_ValueError, "fd outside range: %i", fd);
}
if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError, "Consumer closed");
return NULL;
}

if (data_size <= 0) {
return PyErr_Format(PyExc_ValueError, "payload cannot be empty");
}
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "iy#", kws, &fd,
&data, &data_size)) {
return NULL;
}

if (self->u.Consumer.rkqu == NULL) {
PyErr_SetString(PyExc_RuntimeError, "Consumer Queue not available");
return NULL;
}
if (fd < 0) {
return PyErr_Format(PyExc_ValueError,
"fd outside range: %i", fd);
}

if (data_size <= 0) {
return PyErr_Format(PyExc_ValueError,
"payload cannot be empty");
}

rd_kafka_queue_io_event_enable(self->u.Consumer.rkqu, fd, data, data_size);
Py_RETURN_NONE;
if (self->u.Consumer.rkqu == NULL) {
PyErr_SetString(PyExc_RuntimeError,
"Consumer Queue not available");
return NULL;
}

rd_kafka_queue_io_event_enable(self->u.Consumer.rkqu,
fd, data, data_size);

Py_RETURN_NONE;
}

static PyMethodDef Consumer_methods[] = {
Expand Down Expand Up @@ -1507,22 +1516,19 @@ static PyMethodDef Consumer_methods[] = {
"send_offsets_to_transaction() API.\n"
"\n"
},
{
"io_event_enable",
(PyCFunction) Consumer_io_event_enable,
METH_VARARGS | METH_KEYWORDS,
".. py:function:: io_event_enable(fd, payload)\n"
"\n"
"This method basically calls ``rd_kafka_queue_io_event_enable``\n"
"\n"
"Enable IO event triggering.\n"
"To ease integration with IO based polling loops this API\n"
"allows an application to create a separate file-descriptor\n"
"that librdkafka will write ``payload`` to whenever a new\n"
"element is enqueued on a previously empty queue.\n"
" :param int fd: The filedescrptor librdkafka writes to.\n"
" :param bytes payload: The payload librdkakfa writes to fd.\n"
" :returns: ``None``\n"
{ "io_event_enable",
(PyCFunction) Consumer_io_event_enable,
METH_VARARGS | METH_KEYWORDS,
".. py:function:: io_event_enable(fd, payload)\n"
"\n"
"Enable IO event triggering.\n"
"To ease integration with IO based polling loops this API\n"
"allows an application to create a separate file-descriptor\n"
"that the consumer will write ``payload`` to whenever a new\n"
"element is enqueued on a previously empty queue.\n"
" :param int fd: The filedescriptor the consumer writes to.\n"
" :param bytes payload: The payload the consumer writes to fd.\n"
" :returns: ``None``\n"
},

{ NULL }
Expand Down

0 comments on commit 97b3a87

Please sign in to comment.