From 69b79ad1b53d5e9058710ced63c42ebb1da2d9ec Mon Sep 17 00:00:00 2001 From: stephan-hof Date: Fri, 16 Dec 2022 19:25:07 +0100 Subject: [PATCH] Address the changes requested by the review. --- CHANGELOG.md | 5 ++ examples/linux_asyncio_consumer.py | 73 +++++++++++++++++++++++--- src/confluent_kafka/src/Consumer.c | 82 ++++++++++++++++-------------- 3 files changed, 114 insertions(+), 46 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d51cdffa..4a44f0ec2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Confluent's Python client for Apache Kafka +## Next Version + + - Added `consumer.io_event_enable` to integrate with event-loops like asyncio. + (#1448) + ## v2.1.1 v2.1.1 is a maintenance release with the following fixes and enhancements: diff --git a/examples/linux_asyncio_consumer.py b/examples/linux_asyncio_consumer.py index 5f6538173..3f5d8ba81 100644 --- a/examples/linux_asyncio_consumer.py +++ b/examples/linux_asyncio_consumer.py @@ -1,3 +1,43 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2022 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# This example shows the integration of confluent_kafka.Consumer with asyncio. +# +# This file contains a class called 'AsyncConsumer' which offers the following +# methods from 'confluent_kafka.Consumer': assign, subscribe, poll +# With the advantage that the poll method is defined as 'async def poll()'. +# Allowing it to be used like this: msg = await consumer.poll() +# +# Under the hood it uses 'confluent_kafka.Consumer.io_event_enable' to avoid +# - usage of threads +# - busy loops around confluent_kafka.Consumer.poll(timeout=0) +# 'io_event_enable' makes the Consumer write to a filedescriptor in case a new +# message is ready. Hence to wait for new messages, we simply let asyncio wait +# on that filedescriptor. + + +# FIXME: This example uses +# * eventfd as the filedescriptor - linux only +# * asyncio.add_reader - Which is (up to now) not supported by the +# Windows ProactorEventLoop. See the following page for more details: +# https://docs.python.org/3.11/library/asyncio-platforms.html +# Under Windows/macOS, try socketpair or pipes as an alternative. + import argparse import asyncio import os @@ -12,18 +52,22 @@ from confluent_kafka import Consumer from confluent_kafka import KafkaException - assert sys.platform == 'linux', "This example is linux only, cause of eventfd" class AsyncConsumer: def __init__(self, config, logger): + """Construct a Consumer usable within asyncio. + + :param config: A configuration dict for this Consumer + :param logger: A python logger instance. + """ + self.consumer = Consumer(config, logger=logger) - # Sorry Windows/MacOX try something with socketpair or pipe. self.eventfd = os.eventfd(0, os.EFD_CLOEXEC | os.EFD_NONBLOCK) - # This is channel how librdkafka notifies asyncio. + # This is the channel how the consumer notifies asyncio. self.loop = asyncio.get_running_loop() self.loop.add_reader(self.eventfd, self.__eventfd_ready) self.consumer.io_event_enable(self.eventfd, struct.pack('@q', 1)) @@ -39,6 +83,7 @@ def __init__(self, config, logger): @staticmethod def close_eventd(loop, eventfd): + """Internal helper method. Not part of the public API.""" loop.remove_reader(eventfd) os.close(eventfd) @@ -59,6 +104,19 @@ def assign(self, *args, **kwargs): self.consumer.assign(*args, **kwargs) async def poll(self, timeout=0): + """Consumes a single message, calls callbacks and returns events. + + It is defined a 'async def' and returns an awaitable object a + caller needs to deal with to get the result. + See https://docs.python.org/3/library/asyncio-task.html#awaitables + + Which makes it safe (and mandatory) to call it directly in an asyncio + coroutine like this: `msg = await consumer.poll()` + + If timeout > 0: Wait at most X seconds for a message. + Returns `None` if no message arrives in time. + If timeout <= 0: Endless wait for a message. + """ if timeout > 0: try: return await asyncio.wait_for(self._poll_no_timeout(), timeout) @@ -104,13 +162,12 @@ async def main(): config = { 'bootstrap.servers': arguments.bootstrap_servers, 'group.id': arguments.group_id, - 'session.timeout.ms': 6000, 'auto.offset.reset': 'earliest' } - if arguments.client_status: + if arguments.client_stats: config['stats_cb'] = stats_cb - config['statistics.interval.ms'] = arguments.client_status + config['statistics.interval.ms'] = arguments.client_stats consumer = AsyncConsumer(config, logger) consumer.subscribe(arguments.topic, on_assign=print_assignment) @@ -128,7 +185,7 @@ async def main(): msg.topic(), msg.partition(), msg.offset(), - str(msg.key()) + msg.key() )) print(msg.value()) finally: @@ -140,7 +197,7 @@ def parse_args(): parser.add_argument( '-T', metavar='interval', - dest='client_status', + dest='client_stats', type=int, default=None ) diff --git a/src/confluent_kafka/src/Consumer.c b/src/confluent_kafka/src/Consumer.c index 67409cc80..44d9a8dd1 100644 --- a/src/confluent_kafka/src/Consumer.c +++ b/src/confluent_kafka/src/Consumer.c @@ -1142,32 +1142,41 @@ Consumer_consumer_group_metadata (Handle *self, PyObject *ignore) { static PyObject * Consumer_io_event_enable(Handle *self, PyObject *args, PyObject *kwargs) { - int fd = 0; - const char* data = NULL; - Py_ssize_t data_size = 0; - static char *kws[] = {"fd", "payload", NULL}; - - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "iy#", kws, - &fd, &data, &data_size)) - { - return NULL; - } + int fd = 0; + const char* data = NULL; + Py_ssize_t data_size = 0; + static char *kws[] = {"fd", "payload", NULL}; - if (fd <= 0) { - return PyErr_Format(PyExc_ValueError, "fd outside range: %i", fd); - } + if (!self->rk) { + PyErr_SetString(PyExc_RuntimeError, "Consumer closed"); + return NULL; + } - if (data_size <= 0) { - return PyErr_Format(PyExc_ValueError, "payload cannot be empty"); - } + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "iy#", kws, &fd, + &data, &data_size)) { + return NULL; + } - if (self->u.Consumer.rkqu == NULL) { - PyErr_SetString(PyExc_RuntimeError, "Consumer Queue not available"); - return NULL; - } + if (fd < 0) { + return PyErr_Format(PyExc_ValueError, + "fd outside range: %i", fd); + } + + if (data_size <= 0) { + return PyErr_Format(PyExc_ValueError, + "payload cannot be empty"); + } - rd_kafka_queue_io_event_enable(self->u.Consumer.rkqu, fd, data, data_size); - Py_RETURN_NONE; + if (self->u.Consumer.rkqu == NULL) { + PyErr_SetString(PyExc_RuntimeError, + "Consumer Queue not available"); + return NULL; + } + + rd_kafka_queue_io_event_enable(self->u.Consumer.rkqu, + fd, data, data_size); + + Py_RETURN_NONE; } static PyMethodDef Consumer_methods[] = { @@ -1525,22 +1534,19 @@ static PyMethodDef Consumer_methods[] = { { "set_sasl_credentials", (PyCFunction)set_sasl_credentials, METH_VARARGS|METH_KEYWORDS, set_sasl_credentials_doc }, - { - "io_event_enable", - (PyCFunction) Consumer_io_event_enable, - METH_VARARGS | METH_KEYWORDS, - ".. py:function:: io_event_enable(fd, payload)\n" - "\n" - "This method basically calls ``rd_kafka_queue_io_event_enable``\n" - "\n" - "Enable IO event triggering.\n" - "To ease integration with IO based polling loops this API\n" - "allows an application to create a separate file-descriptor\n" - "that librdkafka will write ``payload`` to whenever a new\n" - "element is enqueued on a previously empty queue.\n" - " :param int fd: The filedescrptor librdkafka writes to.\n" - " :param bytes payload: The payload librdkakfa writes to fd.\n" - " :returns: ``None``\n" + { "io_event_enable", + (PyCFunction) Consumer_io_event_enable, + METH_VARARGS | METH_KEYWORDS, + ".. py:function:: io_event_enable(fd, payload)\n" + "\n" + "Enable IO event triggering.\n" + "To ease integration with IO based polling loops this API\n" + "allows an application to create a separate file-descriptor\n" + "that the consumer will write ``payload`` to whenever a new\n" + "element is enqueued on a previously empty queue.\n" + " :param int fd: The filedescriptor the consumer writes to.\n" + " :param bytes payload: The payload the consumer writes to fd.\n" + " :returns: ``None``\n" }, { NULL }