Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Consumer.io_event_enable #1448

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

stephan-hof
Copy link
Contributor

Hello,

this pull requests adds a method called io_event_enable to the Consumer. It simply wraps rd_kafka_queue_io_event_enable from librdkafka.

My goal is to make integration with async frameworks easier. Using it allows the async framework to be immediately notified when a new message arrived in the queue. Avoiding,

  • the use of threads
  • busy loops around Consumer.poll(timeout=0)

I'm not entirely sure about the linux_asyncio_consumer.py.
Should it be part of the confluent-kafka repository or not? My thinking was to show how io_event_enable could be used.

Update:
Now that I checked the issue tracker somebody already came up with the idea: #185 (comment) so consider this pull-request as an implementation of it.

@stephan-hof stephan-hof requested a review from a team as a code owner October 16, 2022 16:39
@CLAassistant
Copy link

CLAassistant commented Oct 16, 2022

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@stephan-hof stephan-hof force-pushed the features/io_event_enable branch 2 times, most recently from 8fbac8d to 9a73aa8 Compare December 16, 2022 12:05
@edenhill
Copy link
Contributor

I'm not entirely sure about the linux_asyncio_consumer.py.

Having this as an example is good 👍

Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good stuff

@@ -0,0 +1,164 @@
import argparse
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add confluent copyright header (see other files), update year to 2022


assert sys.platform == 'linux', "This example is linux only, cause of eventfd"


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be good with a bit of text here (or after the license) which explains what this example is for, and why.

def __init__(self, config, logger):
self.consumer = Consumer(config, logger=logger)

# Sorry Windows/MacOX try something with socketpair or pipe.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Sorry Windows/MacOX try something with socketpair or pipe.
# Sorry Windows/MacOX try something with socketpair or pipe.
Suggested change
# Sorry Windows/MacOX try something with socketpair or pipe.
# FIXME: Windows/MacOX try something with socketpair or pipe.

self.consumer.assign(*args, **kwargs)

async def poll(self, timeout=0):
if timeout > 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This warrants a doc string, since with async it differs from the usual poll

config = {
'bootstrap.servers': arguments.bootstrap_servers,
'group.id': arguments.group_id,
'session.timeout.ms': 6000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a recommended value, it will give poor robustness, so please remove.

Py_ssize_t data_size = 0;
static char *kws[] = {"fd", "payload", NULL};

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "iy#", kws,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see other functions that check self->rk first.

METH_VARARGS | METH_KEYWORDS,
".. py:function:: io_event_enable(fd, payload)\n"
"\n"
"This method basically calls ``rd_kafka_queue_io_event_enable``\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We try to abstract the underlying C library, so this doesn't tell the user much, so skip this line altogether.

"Enable IO event triggering.\n"
"To ease integration with IO based polling loops this API\n"
"allows an application to create a separate file-descriptor\n"
"that librdkafka will write ``payload`` to whenever a new\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"that librdkafka will write ``payload`` to whenever a new\n"
"that the consumer will write ``payload`` to whenever a new\n"

"allows an application to create a separate file-descriptor\n"
"that librdkafka will write ``payload`` to whenever a new\n"
"element is enqueued on a previously empty queue.\n"
" :param int fd: The filedescrptor librdkafka writes to.\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
" :param int fd: The filedescrptor librdkafka writes to.\n"
" :param int fd: The filedescrptor the consumer writes to.\n"

"that librdkafka will write ``payload`` to whenever a new\n"
"element is enqueued on a previously empty queue.\n"
" :param int fd: The filedescrptor librdkafka writes to.\n"
" :param bytes payload: The payload librdkakfa writes to fd.\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
" :param bytes payload: The payload librdkakfa writes to fd.\n"
" :param bytes payload: The payload the consumer writes to fd.\n"

@stephan-hof
Copy link
Contributor Author

Many thanks for looking at this pull request. I made a commit trying to address the changes requested by the review. I'm wondering now what is the next step on my side. Should I mark the conversations as 'resolved'?
Or should I simply wait for the next review.

@edenhill
Copy link
Contributor

We're currently in release code freeze, but will revisit this PR in January when the upcoming release is out.
Thank you

@wbarnha
Copy link

wbarnha commented Apr 17, 2023

Any update on this PR? I'd like to use Confluent Kafka's asynchronous methods in https://github.com/faust-streaming/faust

@stephan-hof stephan-hof force-pushed the features/io_event_enable branch from 97b3a87 to cd0f5b2 Compare April 17, 2023 18:05
@stephan-hof
Copy link
Contributor Author

Thanks for the ping. In the hope to speed-up things I rebased the branch on latest master.

@wbarnha
Copy link

wbarnha commented Jun 20, 2023

Bumping for posterity. Is the code freeze over @edenhill?

We would really like this functionality to be implemented to use in Faust and many other asynchronous Kafka applications!

@harubi
Copy link

harubi commented Sep 27, 2023

It would be amazing to have :) @edenhill

@stephan-hof
Copy link
Contributor Author

It appears to me that @edenhill is not very active in the last months (hope you are well).

@pranavrth I see that you keep this project alive. So is this pull request something you cold have a look at please?
If you like I can make a push to solve the conflicts on CHANGELOG.md

@woile
Copy link

woile commented Jan 15, 2024

Is there any other maintainer who can take a look at this 🙏🏻 ? This is a huge win for async kafka

@aviadr1
Copy link

aviadr1 commented Jan 6, 2025

bump any chance to have asyncio in confluent-kafka?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants