Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

confluent-kafka-python vs aiokafka for Asynchronous applications #1723

Open
7 tasks
srisudarsan opened this issue Mar 14, 2024 · 7 comments
Open
7 tasks

confluent-kafka-python vs aiokafka for Asynchronous applications #1723

srisudarsan opened this issue Mar 14, 2024 · 7 comments
Assignees
Labels

Comments

@srisudarsan
Copy link

srisudarsan commented Mar 14, 2024

Description

NOTE: This is a question and not an issue

I am building an application with FastAPI, this application will also need to consume messages from kafka. Thus we want to run the consumer asynchronously.

I read https://www.confluent.io/blog/kafka-python-asyncio-integration/ and understood that we can introduce async nature for confluent-kafka-plugin. While exploring other libraries, I stumbled on aiokafka, I am trying to understand the differences between the libraries and I found a specific issue with an interesting comment from one of the repo's members - aio-libs/aiokafka#665 (comment)

With this I am unable to understand if confluent-kafka-python offers complete async support as mentioned in #185

Can you please help me clarify this and unblock ?

How to reproduce

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • Apache Kafka broker version:
  • Client configuration: {...}
  • Operating system:
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue
@emasab
Copy link
Contributor

emasab commented Mar 20, 2024

Hello, we have an example using asyncio.
You have to use a different thread to poll (producer/consumer) or consume (consumer) and make sure all the callbacks, are wrapped with call_soon_threadsafe to return to the event loop thread.
We'll be working on an easier to implement API for asyncio and gevent

@mdespriee
Copy link

mdespriee commented Apr 3, 2024

The example is great, but there's no async consumer demonstrated.
The thing is, async consuming is much trickier than producing.

I've ended up writing this.
[WARNING] This code only handles a subset of the possible states of a kafka consumer, and has only been used in a simple use-case (no consumer groups, no offset commits, no transactions...). There may be weird async edge-cases not properly handled. Use at your own risk. [/]

T = TypeVar("T")
V = TypeVar("V")

class AsyncKafkaConsumer:
    def __init__(
        self,
        bootstrap_servers: str,
        sasl_username: str,
        sasl_password: str,
        topic: str,
        key_deserializer: Callable[[bytes], str],
        value_deserializer: Callable[[bytes], V],
        group_id: str | None = None,
        offset_reset="earliest",
    ):
        self.topic = topic
        group_id = group_id or uuid_utils.uuid4()
        self._consumer = Consumer(
            {
                # see: https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-configuration
                "bootstrap.servers": bootstrap_servers,
                "sasl.mechanism": "PLAIN",
                "security.protocol": "SASL_SSL",
                "sasl.username": sasl_username,
                "sasl.password": sasl_password,
                "error_cb": error_cb,
                "auto.offset.reset": offset_reset,
                "enable.auto.offset.store": False,  # check what that mean
                "group.id": group_id,
            }
        )
        self._consumer.subscribe(
            [topic], on_assign=lambda c, p: logger.info(f"Assigned partition {p}")
        )
        self._stop = False
        self._key_deserializer = key_deserializer
        self._value_deserializer = value_deserializer
        self._loop = None

    async def _run_in_executor(self, func: Callable[..., T], *args: Any) -> T:
        return await self._loop.run_in_executor(None, func, *args)

    async def consume(
        self,
        timeout_s: float | None = None,
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> AsyncIterable[tuple[str, V]]:
        self._loop = loop or asyncio.get_running_loop()
        timeout_s = timeout_s or float("inf")
        stop_at = time.monotonic() + timeout_s

        logger.debug("Starting consume")

        # inspired from https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/consumer.py
        # and https://github.com/Aiven-Open/karapace/blob/26e8354e55619f9e7498101ee5a97930a4991b64/karapace/kafka/consumer.py#L200
        try:
            while not self._stop and time.monotonic() < stop_at:
                messages = await self._run_in_executor(self._consumer.consume, 100, 1.0)
                for msg in messages:
                    if msg.error():
                        if msg.error().retriable():
                            logger.warning(f"Got {msg.error()} - Will retry")
                            continue
                        else:
                            logger.error(f"Got {msg.error()}")
                            raise KafkaException(msg.error())
                    else:
                        try:
                            key = self._key_deserializer(msg.key())
                            value = self._value_deserializer(msg.value())
                        except Exception as e:
                            logger.error(
                                f"Cannot deserialize message {msg=}", exc_info=e
                            )
                            continue

                        logger.debug(
                            f"Received record with key={key} from topic {msg.topic()} [{msg.partition()}] at offset {msg.offset()}"
                        )
                        yield key, value

                        await self._run_in_executor(self._consumer.store_offsets, msg)

        finally:
            self.stop()

    def stop(self):
        self._stop = True
        self._consumer.close()


def error_cb(err):
    """The error callback is used for generic client errors.

    These errors are generally to be considered informational as the
    client will automatically try to recover from all errors, and no
    extra action is typically required by the application. For this
    example however, we terminate the application if the client is
    unable to connect to any broker (_ALL_BROKERS_DOWN) and on
    authentication errors (_AUTHENTICATION).
    """
    logger.error(f"Client error: {err}")
    if (
        err.code() == KafkaError._ALL_BROKERS_DOWN
        or err.code() == KafkaError._AUTHENTICATION
    ):
        # Any exception raised from this callback will be re-raised from the
        # triggering flush() or poll() call.
        raise KafkaException(err)

Regarding the FastAPI part, we have something like this:

# you can hook this to your app with @app.on_event("startup")
# the UpdateNotifSubscriber below is a wrapper using the AsyncKafkaConsumer above.
# note that FastAPI's Depends() does not work for on_event() hooks

async def consume(sub: UpdateNotifSubscriber):
    async for k, v in sub.consume():
        await handle_update(v)

async def handle_update(update: UpdateMessage):
    ...

@pkit
Copy link

pkit commented Apr 20, 2024

The thing is, async consuming is much trickier than producing.

Yes, it's infuriating how badly it is supported by Confluent

@pavelschon
Copy link

pavelschon commented Apr 25, 2024

The example is great, but there's no async consumer demonstrated. The thing is, async consuming is much trickier than producing.

I've ended up writing this.

Interestingly we have*) very similar asyncio wrapper around confluent_kafka.Consumer.
But instead of loop.run_in_executor we use await asyncio.to_thread(consumer.consume, num_messages, timeout).

Rebalance callbacks we use in the following way:

assign_queue = asyncio.Queue[Sequence[TopicPartition]]()

def on_assign(_: Consumer, partitions: Sequence[ConfluentTopicPartition])
    assign_queue.put_nowait(partitions)

consumer.subscribe(topics, on_assign=on_assign)

Then you can have separate task that awaits the queue of assigned partitions.

*) I cannot show full code since it's proprietary.

@conradogarciaberrotaran
Copy link

Postin @mdespriee's code with code highlighting

T = TypeVar("T")
V = TypeVar("V")

class AsyncKafkaConsumer:
    def __init__(
        self,
        bootstrap_servers: str,
        sasl_username: str,
        sasl_password: str,
        topic: str,
        key_deserializer: Callable[[bytes], str],
        value_deserializer: Callable[[bytes], V],
        group_id: str | None = None,
        offset_reset="earliest",
    ):
        self.topic = topic
        group_id = group_id or uuid_utils.uuid4()
        self._consumer = Consumer(
            {
                # see: https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-configuration
                "bootstrap.servers": bootstrap_servers,
                "sasl.mechanism": "PLAIN",
                "security.protocol": "SASL_SSL",
                "sasl.username": sasl_username,
                "sasl.password": sasl_password,
                "error_cb": error_cb,
                "auto.offset.reset": offset_reset,
                "enable.auto.offset.store": False,  # check what that mean
                "group.id": group_id,
            }
        )
        self._consumer.subscribe(
            [topic], on_assign=lambda c, p: logger.info(f"Assigned partition {p}")
        )
        self._stop = False
        self._key_deserializer = key_deserializer
        self._value_deserializer = value_deserializer
        self._loop = None

    async def _run_in_executor(self, func: Callable[..., T], *args: Any) -> T:
        return await self._loop.run_in_executor(None, func, *args)

    async def consume(
        self,
        timeout_s: float | None = None,
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> AsyncIterable[tuple[str, V]]:
        self._loop = loop or asyncio.get_running_loop()
        timeout_s = timeout_s or float("inf")
        stop_at = time.monotonic() + timeout_s

        logger.debug("Starting consume")

        # inspired from https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/consumer.py
        # and https://github.com/Aiven-Open/karapace/blob/26e8354e55619f9e7498101ee5a97930a4991b64/karapace/kafka/consumer.py#L200
        try:
            while not self._stop and time.monotonic() < stop_at:
                messages = await self._run_in_executor(self._consumer.consume, 100, 1.0)
                for msg in messages:
                    if msg.error():
                        if msg.error().retriable():
                            logger.warning(f"Got {msg.error()} - Will retry")
                            continue
                        else:
                            logger.error(f"Got {msg.error()}")
                            raise KafkaException(msg.error())
                    else:
                        try:
                            key = self._key_deserializer(msg.key())
                            value = self._value_deserializer(msg.value())
                        except Exception as e:
                            logger.error(
                                f"Cannot deserialize message {msg=}", exc_info=e
                            )
                            continue

                        logger.debug(
                            f"Received record with key={key} from topic {msg.topic()} [{msg.partition()}] at offset {msg.offset()}"
                        )
                        yield key, value

                        await self._run_in_executor(self._consumer.store_offsets, msg)

        finally:
            self.stop()

    def stop(self):
        self._stop = True
        self._consumer.close()


def error_cb(err):
    """The error callback is used for generic client errors.

    These errors are generally to be considered informational as the
    client will automatically try to recover from all errors, and no
    extra action is typically required by the application. For this
    example however, we terminate the application if the client is
    unable to connect to any broker (_ALL_BROKERS_DOWN) and on
    authentication errors (_AUTHENTICATION).
    """
    logger.error(f"Client error: {err}")
    if (
        err.code() == KafkaError._ALL_BROKERS_DOWN
        or err.code() == KafkaError._AUTHENTICATION
    ):
        # Any exception raised from this callback will be re-raised from the
        # triggering flush() or poll() call.
        raise KafkaException(err)   

@stephan-hof
Copy link
Contributor

I created this pull-request #1448 which should make integration (especially the consumer part) with async frameworks like gevent, asyncio possible.

By exposing librdkafka event_io_enable frameworks like asyncio can truly wait (without threads in an async way) for new messages.

@rachitchauhan43
Copy link

rachitchauhan43 commented Jan 11, 2025

I am referring this article

Can someone explain this part in this article about moving poll to event-loop ?

 It would have been simpler to arrange for non-blocking poll calls to be made on the event loop thread periodically using call_later, but this approach would introduce an additional average latency of (poll period)/2 to each delivery notification.

I am unable to understand how would non-blocking poll calls on event loop introduce an additional average latency of (poll period)/2.

Also, can someone explain how code would look like if non-blocking poll calls are made on event loop using call_later ?

cc: @mhowlett

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

9 participants