Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add concurrent-between-partitions kafka subscriber #2017

Conversation

Arseniy-Popov
Copy link
Contributor

@Arseniy-Popov Arseniy-Popov commented Jan 2, 2025

Description

This adds at-least-once (manual commit) concurrent consumption from Kafka that is concurrent between partitions and sequential within a partition. This improves throughput by enabling concurrency while preserving message ordering guarantees and at-least-once delivery semantics.

Background

Concurrent at-least-once consumption from a single partition is at best not trivial with Kafka because Kafka doesn't support out-of-order commits: a commit shifts the offset forward and effectively commits all previous messages up to that offset that might have not yet been processed. That problem is rectified if consumption is sequential within a single partition while being concurrent between partitions. A partition is a unit of parallelism in Kafka.

Type of change

  • New feature (a non-breaking change that adds functionality)
  • This change requires a documentation update

Checklist

  • My code adheres to the style guidelines of this project (scripts/lint.sh shows no errors)
  • I have conducted a self-review of my own code
  • I have made the necessary changes to the documentation
  • My changes do not generate any new warnings
  • I have added tests to validate the effectiveness of my fix or the functionality of my new feature
  • Both new and existing unit tests pass successfully on my local environment by running scripts/test-cov.sh
  • I have ensured that static analysis tests are passing by running scripts/static-analysis.sh
  • I have included code examples to illustrate the modifications

Example

Assuming a topic is populated with 8 messages spread across 4 partitions in a round-robin manner, this example

import asyncio
import time
from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker
from faststream.kafka.annotations  import KafkaMessage

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber(
    "topic1",
    group_id="microservice-1",
    auto_commit=False,
    max_workers=4
)
async def base_handler(message: dict, msg: KafkaMessage, logger: Logger):
    await asyncio.sleep(3)
    logger.debug(
        f"Finished message: {message['id']:>3}, time from publish: {time.time() - float(message['time']) :4.2f}"
    )
    await msg.ack()

will generate the following output

2025-01-03 14:11:14,821 INFO     - FastStream app starting...
2025-01-03 14:11:14,852 INFO     - topic1 | microservice-1 |            - `BaseHandler` waiting for messages
2025-01-03 14:11:20,796 INFO     -        |                |            - Consumer faststream-0.5.33-7cc1b452-9b78-47ac-bdf2-e5aa6d6263b7 assigned to partitions: frozenset({TopicPartition(topic='topic1', partition=2)})
2025-01-03 14:11:20,797 INFO     -        |                |            - Consumer faststream-0.5.33-9d39e639-e9a0-41d9-9b94-0fad0f68c9bc assigned to partitions: frozenset({TopicPartition(topic='topic1', partition=3)})
2025-01-03 14:11:20,797 INFO     -        |                |            - Consumer faststream-0.5.33-4080e0f3-69d6-42bd-8d12-cfe4af011866 assigned to partitions: frozenset({TopicPartition(topic='topic1', partition=1)})
2025-01-03 14:11:20,797 INFO     -        |                |            - Consumer faststream-0.5.33-10392508-9d8f-4b73-8eca-8d223896a02d assigned to partitions: frozenset({TopicPartition(topic='topic1', partition=0)})
2025-01-03 14:11:20,797 INFO     - FastStream app started successfully! To exit, press CTRL+C
2025-01-03 14:11:27,880 INFO     - topic1 | microservice-1 | 12-1735902 - Received
2025-01-03 14:11:27,884 INFO     - topic1 | microservice-1 | 12-1735902 - Received
2025-01-03 14:11:27,886 INFO     - topic1 | microservice-1 | 12-1735902 - Received
2025-01-03 14:11:27,891 INFO     - topic1 | microservice-1 | 12-1735902 - Received
2025-01-03 14:11:30,883 DEBUG    - topic1 | microservice-1 | 12-1735902 - Finished message:   1, time from publish: 3.01
2025-01-03 14:11:30,891 DEBUG    - topic1 | microservice-1 | 12-1735902 - Finished message:   2, time from publish: 3.01
2025-01-03 14:11:30,895 DEBUG    - topic1 | microservice-1 | 12-1735902 - Finished message:   3, time from publish: 3.01
2025-01-03 14:11:30,895 DEBUG    - topic1 | microservice-1 | 12-1735902 - Finished message:   4, time from publish: 3.01
2025-01-03 14:11:30,905 INFO     - topic1 | microservice-1 | 12-1735902 - Processed
2025-01-03 14:11:30,905 INFO     - topic1 | microservice-1 | 12-1735902 - Processed
2025-01-03 14:11:30,905 INFO     - topic1 | microservice-1 | 12-1735902 - Processed
2025-01-03 14:11:30,905 INFO     - topic1 | microservice-1 | 12-1735902 - Processed
2025-01-03 14:11:30,908 INFO     - topic1 | microservice-1 | 13-1735902 - Received
2025-01-03 14:11:30,908 INFO     - topic1 | microservice-1 | 13-1735902 - Received
2025-01-03 14:11:30,909 INFO     - topic1 | microservice-1 | 13-1735902 - Received
2025-01-03 14:11:30,909 INFO     - topic1 | microservice-1 | 13-1735902 - Received
2025-01-03 14:11:33,913 DEBUG    - topic1 | microservice-1 | 13-1735902 - Finished message:   7, time from publish: 6.02
2025-01-03 14:11:33,914 DEBUG    - topic1 | microservice-1 | 13-1735902 - Finished message:   5, time from publish: 6.03
2025-01-03 14:11:33,914 DEBUG    - topic1 | microservice-1 | 13-1735902 - Finished message:   6, time from publish: 6.02
2025-01-03 14:11:33,915 DEBUG    - topic1 | microservice-1 | 13-1735902 - Finished message:   8, time from publish: 6.02
2025-01-03 14:11:33,920 INFO     - topic1 | microservice-1 | 13-1735902 - Processed
2025-01-03 14:11:33,922 INFO     - topic1 | microservice-1 | 13-1735902 - Processed
2025-01-03 14:11:33,923 INFO     - topic1 | microservice-1 | 13-1735902 - Processed
2025-01-03 14:11:33,923 INFO     - topic1 | microservice-1 | 13-1735902 - Processed

Sorry, something went wrong.

@Arseniy-Popov
Copy link
Contributor Author

So far this is a work in progress, I will mark it as ready for review when it is ready.

@Lancetnik Lancetnik added the AioKafka Issues related to `faststream.kafka` module label Jan 2, 2025
@Arseniy-Popov Arseniy-Popov force-pushed the feat/add-kafka-subscriber-concurrent-among-partitions branch from 5b0f54d to e7b31dd Compare January 3, 2025 10:59
@Arseniy-Popov Arseniy-Popov marked this pull request as ready for review January 3, 2025 11:12
@Arseniy-Popov
Copy link
Contributor Author

@Lancetnik Ready for review.

@Lancetnik
Copy link
Member

@Arseniy-Popov thank you a lot! But I'll be able to look only next few days, sorry

@Lancetnik Lancetnik added the enhancement New feature or request label Jan 10, 2025
@Arseniy-Popov
Copy link
Contributor Author

Arseniy-Popov commented Jan 10, 2025

Previously the user-supplied consumer rebalance listener couldn't identify which consumer it was listening to, so a default listener factory that accepts a consumer parameter was added. For backwards compatibility any user-supplied instance takes precedence over the default factory. Partition assigment logging was moved to the new listener so that it applies throughout the lifecycle and not just on startup.

2025-01-10 23:35:50,270 INFO     - FastStream app starting...
2025-01-10 23:35:50,314 INFO     - topic1 | microservice-1 |            - `BaseHandler` waiting for messages
2025-01-10 23:35:53,429 INFO     -        |                |            - Consumer faststream-0.5.33-98f2e3ec-f766-412c-82b9-bcd82d9cdd6e assigned to partitions: {TopicPartition(topic='topic1', partition=2)}
2025-01-10 23:35:53,430 INFO     -        |                |            - Consumer faststream-0.5.33-661433d3-fca1-41f0-9b50-0f6e7cb4871d assigned to partitions: {TopicPartition(topic='topic1', partition=1)}
2025-01-10 23:35:53,431 INFO     -        |                |            - Consumer faststream-0.5.33-54696ec4-c0a1-406c-8ead-257b465c3032 assigned to partitions: {TopicPartition(topic='topic1', partition=0)}
2025-01-10 23:35:53,431 INFO     -        |                |            - Consumer faststream-0.5.33-d3dc825d-686f-4920-a883-1dd0ffa281c1 assigned to partitions: {TopicPartition(topic='topic1', partition=3)}
2025-01-10 23:35:53,432 INFO     - FastStream app started successfully! To exit, press CTRL+C
Heartbeat failed for group microservice-1 because it is rebalancing
Heartbeat failed for group microservice-1 because it is rebalancing
Heartbeat failed for group microservice-1 because it is rebalancing
Heartbeat failed for group microservice-1 because it is rebalancing
2025-01-10 23:35:59,446 INFO     -        |                |            - Consumer faststream-0.5.33-98f2e3ec-f766-412c-82b9-bcd82d9cdd6e assigned to partitions: set()
2025-01-10 23:35:59,447 INFO     -        |                |            - Consumer faststream-0.5.33-d3dc825d-686f-4920-a883-1dd0ffa281c1 assigned to partitions: set()
2025-01-10 23:35:59,447 INFO     -        |                |            - Consumer faststream-0.5.33-661433d3-fca1-41f0-9b50-0f6e7cb4871d assigned to partitions: {TopicPartition(topic='topic1', partition=3)}
2025-01-10 23:35:59,448 INFO     -        |                |            - Consumer faststream-0.5.33-54696ec4-c0a1-406c-8ead-257b465c3032 assigned to partitions: {TopicPartition(topic='topic1', partition=2)}

@Lancetnik Lancetnik enabled auto-merge January 12, 2025 16:13
@Lancetnik Lancetnik added this pull request to the merge queue Jan 12, 2025
Merged via the queue into airtai:main with commit dfd7b9a Jan 12, 2025
32 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AioKafka Issues related to `faststream.kafka` module enhancement New feature or request
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

None yet

2 participants