diff --git a/.gitignore b/.gitignore index ec095d1691..c85039ee25 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,7 @@ venv* htmlcov token .DS_Store +*.egg-info docs/site/ docs/site_build/ diff --git a/.secrets.baseline b/.secrets.baseline index 50f971ce77..051af165cc 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -153,7 +153,7 @@ "filename": "docs/docs/en/release.md", "hashed_secret": "35675e68f4b5af7b995d9205ad0fc43842f16450", "is_verified": false, - "line_number": 1934, + "line_number": 1958, "is_secret": false } ], @@ -178,5 +178,5 @@ } ] }, - "generated_at": "2024-12-05T15:39:13Z" + "generated_at": "2025-01-10T14:53:01Z" } diff --git a/docs/docs/en/api/faststream/kafka/exceptions/BatchBufferOverflowException.md b/docs/docs/en/api/faststream/kafka/exceptions/BatchBufferOverflowException.md deleted file mode 100644 index 824f6dc2d1..0000000000 --- a/docs/docs/en/api/faststream/kafka/exceptions/BatchBufferOverflowException.md +++ /dev/null @@ -1,11 +0,0 @@ ---- -# 0.5 - API -# 2 - Release -# 3 - Contributing -# 5 - Template Page -# 10 - Default -search: - boost: 0.5 ---- - -::: faststream.kafka.exceptions.BatchBufferOverflowException diff --git a/docs/docs/en/api/faststream/rabbit/QueueType.md b/docs/docs/en/api/faststream/rabbit/QueueType.md deleted file mode 100644 index 32fded7b16..0000000000 --- a/docs/docs/en/api/faststream/rabbit/QueueType.md +++ /dev/null @@ -1,11 +0,0 @@ ---- -# 0.5 - API -# 2 - Release -# 3 - Contributing -# 5 - Template Page -# 10 - Default -search: - boost: 0.5 ---- - -::: faststream.rabbit.QueueType diff --git a/docs/docs/en/api/faststream/rabbit/schemas/QueueType.md b/docs/docs/en/api/faststream/rabbit/schemas/QueueType.md deleted file mode 100644 index 49ab8b9abe..0000000000 --- a/docs/docs/en/api/faststream/rabbit/schemas/QueueType.md +++ /dev/null @@ -1,11 +0,0 @@ ---- -# 0.5 - API -# 2 - Release -# 3 - Contributing -# 5 - Template Page -# 10 - Default -search: - boost: 0.5 ---- - -::: faststream.rabbit.schemas.QueueType diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/ClassicQueueArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/ClassicQueueArgs.md deleted file mode 100644 index 48ddb0825a..0000000000 --- a/docs/docs/en/api/faststream/rabbit/schemas/queue/ClassicQueueArgs.md +++ /dev/null @@ -1,11 +0,0 @@ ---- -# 0.5 - API -# 2 - Release -# 3 - Contributing -# 5 - Template Page -# 10 - Default -search: - boost: 0.5 ---- - -::: faststream.rabbit.schemas.queue.ClassicQueueArgs diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/CommonQueueArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/CommonQueueArgs.md deleted file mode 100644 index fe6c0f6768..0000000000 --- a/docs/docs/en/api/faststream/rabbit/schemas/queue/CommonQueueArgs.md +++ /dev/null @@ -1,11 +0,0 @@ ---- -# 0.5 - API -# 2 - Release -# 3 - Contributing -# 5 - Template Page -# 10 - Default -search: - boost: 0.5 ---- - -::: faststream.rabbit.schemas.queue.CommonQueueArgs diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueClassicTypeSpecificArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueClassicTypeSpecificArgs.md deleted file mode 100644 index 6c8a62838a..0000000000 --- a/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueClassicTypeSpecificArgs.md +++ /dev/null @@ -1,11 +0,0 @@ ---- -# 0.5 - API -# 2 - Release -# 3 - Contributing -# 5 - Template Page -# 10 - Default -search: - boost: 0.5 ---- - -::: faststream.rabbit.schemas.queue.QueueClassicTypeSpecificArgs diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueQuorumTypeSpecificArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueQuorumTypeSpecificArgs.md deleted file mode 100644 index 3b4bd645f5..0000000000 --- a/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueQuorumTypeSpecificArgs.md +++ /dev/null @@ -1,11 +0,0 @@ ---- -# 0.5 - API -# 2 - Release -# 3 - Contributing -# 5 - Template Page -# 10 - Default -search: - boost: 0.5 ---- - -::: faststream.rabbit.schemas.queue.QueueQuorumTypeSpecificArgs diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueStreamTypeSpecificArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueStreamTypeSpecificArgs.md deleted file mode 100644 index 1e3694caa3..0000000000 --- a/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueStreamTypeSpecificArgs.md +++ /dev/null @@ -1,11 +0,0 @@ ---- -# 0.5 - API -# 2 - Release -# 3 - Contributing -# 5 - Template Page -# 10 - Default -search: - boost: 0.5 ---- - -::: faststream.rabbit.schemas.queue.QueueStreamTypeSpecificArgs diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueType.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueType.md deleted file mode 100644 index 099131fea0..0000000000 --- a/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueType.md +++ /dev/null @@ -1,11 +0,0 @@ ---- -# 0.5 - API -# 2 - Release -# 3 - Contributing -# 5 - Template Page -# 10 - Default -search: - boost: 0.5 ---- - -::: faststream.rabbit.schemas.queue.QueueType diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/QuorumQueueArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/QuorumQueueArgs.md deleted file mode 100644 index 4db347081a..0000000000 --- a/docs/docs/en/api/faststream/rabbit/schemas/queue/QuorumQueueArgs.md +++ /dev/null @@ -1,11 +0,0 @@ ---- -# 0.5 - API -# 2 - Release -# 3 - Contributing -# 5 - Template Page -# 10 - Default -search: - boost: 0.5 ---- - -::: faststream.rabbit.schemas.queue.QuorumQueueArgs diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/SharedQueueClassicAndQuorumArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/SharedQueueClassicAndQuorumArgs.md deleted file mode 100644 index 438ba0143f..0000000000 --- a/docs/docs/en/api/faststream/rabbit/schemas/queue/SharedQueueClassicAndQuorumArgs.md +++ /dev/null @@ -1,11 +0,0 @@ ---- -# 0.5 - API -# 2 - Release -# 3 - Contributing -# 5 - Template Page -# 10 - Default -search: - boost: 0.5 ---- - -::: faststream.rabbit.schemas.queue.SharedQueueClassicAndQuorumArgs diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/StreamQueueArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/StreamQueueArgs.md deleted file mode 100644 index 12c12bac47..0000000000 --- a/docs/docs/en/api/faststream/rabbit/schemas/queue/StreamQueueArgs.md +++ /dev/null @@ -1,11 +0,0 @@ ---- -# 0.5 - API -# 2 - Release -# 3 - Contributing -# 5 - Template Page -# 10 - Default -search: - boost: 0.5 ---- - -::: faststream.rabbit.schemas.queue.StreamQueueArgs diff --git a/docs/docs/en/api/faststream/rabbit/utils/build_virtual_host.md b/docs/docs/en/api/faststream/rabbit/utils/build_virtual_host.md deleted file mode 100644 index bab956290b..0000000000 --- a/docs/docs/en/api/faststream/rabbit/utils/build_virtual_host.md +++ /dev/null @@ -1,11 +0,0 @@ ---- -# 0.5 - API -# 2 - Release -# 3 - Contributing -# 5 - Template Page -# 10 - Default -search: - boost: 0.5 ---- - -::: faststream.rabbit.utils.build_virtual_host diff --git a/docs/docs/en/getting-started/asgi.md b/docs/docs/en/getting-started/asgi.md index 2ece28bd13..8d4acd06e5 100644 --- a/docs/docs/en/getting-started/asgi.md +++ b/docs/docs/en/getting-started/asgi.md @@ -101,7 +101,7 @@ app = AsgiFastStream( !!! tip You do not need to setup all routes using the `asgi_routes=[]` parameter.
- You can use the `#!python app.mount("/healh", asgi_endpoint)` method also. + You can use the `#!python app.mount("/health", asgi_endpoint)` method also. ### AsyncAPI Documentation diff --git a/docs/docs/en/kafka/Subscriber/index.md b/docs/docs/en/kafka/Subscriber/index.md index 831acc814b..8e0f4c511a 100644 --- a/docs/docs/en/kafka/Subscriber/index.md +++ b/docs/docs/en/kafka/Subscriber/index.md @@ -72,3 +72,10 @@ async def base_handler( ): ... ``` + + +## Concurrent processing + +There are two possible modes of concurrent message processing: +- With `auto_commit=False` and `max_workers` > 1, a handler processes all messages concurrently in a at-most-once semantic. +- With `auto_commit=True` and `max_workers` > 1, processing is concurrent between topic partitions and sequential within a partition to ensure reliable at-least-once processing. Maximum concurrency is achieved when total number of workers across all application instances running workers in the same consumer group is equal to the number of partitions in the topic. Increasing worker count beyond that will result in idle workers as not more than one consumer from a consumer group can be consuming from the same partition. diff --git a/docs/docs/en/release.md b/docs/docs/en/release.md index 02d814d290..32d6ad3ebf 100644 --- a/docs/docs/en/release.md +++ b/docs/docs/en/release.md @@ -12,6 +12,30 @@ hide: --- # Release Notes +## 0.5.34 + +### What's Changed + +* fix: when / present in virtual host name and passing as uri by [@pepellsd](https://github.com/pepellsd){.external-link target="_blank"} in [#1979](https://github.com/airtai/faststream/pull/1979){.external-link target="_blank"} +* fix (#2013): allow to create publisher in already connected broker by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#2024](https://github.com/airtai/faststream/pull/2024){.external-link target="_blank"} +* feat: add BatchBufferOverflowException by [@spataphore1337](https://github.com/spataphore1337){.external-link target="_blank"} in [#1990](https://github.com/airtai/faststream/pull/1990){.external-link target="_blank"} +* feat: add static instrumentation info by [@draincoder](https://github.com/draincoder){.external-link target="_blank"} in [#1996](https://github.com/airtai/faststream/pull/1996){.external-link target="_blank"} +* docs: remove reference of "faststream.access" by [@rishabhc32](https://github.com/rishabhc32){.external-link target="_blank"} in [#1995](https://github.com/airtai/faststream/pull/1995){.external-link target="_blank"} +* docs: fixed typo in publishing/test.md by [@AlexPetul](https://github.com/AlexPetul){.external-link target="_blank"} in [#2009](https://github.com/airtai/faststream/pull/2009){.external-link target="_blank"} +* docs: ability to declare queue/exchange binding by [@MagicAbdel](https://github.com/MagicAbdel){.external-link target="_blank"} in [#2011](https://github.com/airtai/faststream/pull/2011){.external-link target="_blank"} +* docs: fix spelling mistake of `/health` by [@herotomg](https://github.com/herotomg){.external-link target="_blank"} in [#2023](https://github.com/airtai/faststream/pull/2023){.external-link target="_blank"} +* docs: update aio-pika external docs URL as it has been moved by [@HybridBit](https://github.com/HybridBit){.external-link target="_blank"} in [#1984](https://github.com/airtai/faststream/pull/1984){.external-link target="_blank"} +* refactor: add type annotations for RabbitQueue and enum for queue type by [@pepellsd](https://github.com/pepellsd){.external-link target="_blank"} in [#2002](https://github.com/airtai/faststream/pull/2002){.external-link target="_blank"} + +### New Contributors +* [@HybridBit](https://github.com/HybridBit){.external-link target="_blank"} made their first contribution in [#1984](https://github.com/airtai/faststream/pull/1984){.external-link target="_blank"} +* [@rishabhc32](https://github.com/rishabhc32){.external-link target="_blank"} made their first contribution in [#1995](https://github.com/airtai/faststream/pull/1995){.external-link target="_blank"} +* [@AlexPetul](https://github.com/AlexPetul){.external-link target="_blank"} made their first contribution in [#2009](https://github.com/airtai/faststream/pull/2009){.external-link target="_blank"} +* [@MagicAbdel](https://github.com/MagicAbdel){.external-link target="_blank"} made their first contribution in [#2011](https://github.com/airtai/faststream/pull/2011){.external-link target="_blank"} +* [@herotomg](https://github.com/herotomg){.external-link target="_blank"} made their first contribution in [#2023](https://github.com/airtai/faststream/pull/2023){.external-link target="_blank"} + +**Full Changelog**: [#0.5.33...0.5.34](https://github.com/airtai/faststream/compare/0.5.33...0.5.34){.external-link target="_blank"} + ## 0.5.33 ### What's Changed diff --git a/faststream/__about__.py b/faststream/__about__.py index 2c1ebe109b..cbfea67070 100644 --- a/faststream/__about__.py +++ b/faststream/__about__.py @@ -1,5 +1,5 @@ """Simple and fast framework to create message brokers based microservices.""" -__version__ = "0.5.33" +__version__ = "0.5.34" SERVICE_NAME = f"faststream-{__version__}" diff --git a/faststream/_internal/subscriber/usecase.py b/faststream/_internal/subscriber/usecase.py index 70e7f22525..b2939ec026 100644 --- a/faststream/_internal/subscriber/usecase.py +++ b/faststream/_internal/subscriber/usecase.py @@ -7,6 +7,7 @@ Annotated, Any, Callable, + NamedTuple, Optional, Union, ) @@ -54,26 +55,11 @@ from faststream.response import Response -class _CallOptions: - __slots__ = ( - "decoder", - "dependencies", - "middlewares", - "parser", - ) - - def __init__( - self, - *, - parser: Optional["CustomCallable"], - decoder: Optional["CustomCallable"], - middlewares: Sequence["SubscriberMiddleware[Any]"], - dependencies: Iterable["Dependant"], - ) -> None: - self.parser = parser - self.decoder = decoder - self.middlewares = middlewares - self.dependencies = dependencies +class _CallOptions(NamedTuple): + parser: Optional["CustomCallable"] + decoder: Optional["CustomCallable"] + middlewares: Sequence["SubscriberMiddleware[Any]"] + dependencies: Iterable["Dependant"] class SubscriberUsecase(SubscriberProto[MsgType]): @@ -445,3 +431,18 @@ def get_log_context( return { "message_id": getattr(message, "message_id", ""), } + + def _log( + self, + log_level: int, + message: str, + extra: Optional["AnyDict"] = None, + exc_info: Optional[Exception] = None, + ) -> None: + state = self._state.get() + state.logger_state.logger.log( + log_level, + message, + extra=extra, + exc_info=exc_info, + ) diff --git a/faststream/kafka/broker/registrator.py b/faststream/kafka/broker/registrator.py index 6f804bda02..6059567647 100644 --- a/faststream/kafka/broker/registrator.py +++ b/faststream/kafka/broker/registrator.py @@ -1,4 +1,4 @@ -from collections.abc import Iterable, Sequence +from collections.abc import Collection, Iterable, Sequence from typing import ( TYPE_CHECKING, Annotated, @@ -39,6 +39,7 @@ ) from faststream.kafka.subscriber.specified import ( SpecificationBatchSubscriber, + SpecificationConcurrentBetweenPartitionsSubscriber, SpecificationConcurrentDefaultSubscriber, SpecificationDefaultSubscriber, ) @@ -59,6 +60,7 @@ class KafkaRegistrator( "SpecificationBatchSubscriber", "SpecificationDefaultSubscriber", "SpecificationConcurrentDefaultSubscriber", + "SpecificationConcurrentBetweenPartitionsSubscriber", ] ] _publishers: list[ @@ -384,7 +386,7 @@ def subscriber( ), ] = None, partitions: Annotated[ - Iterable["TopicPartition"], + Collection["TopicPartition"], Doc( """ An explicit partitions list to assign. @@ -765,7 +767,7 @@ def subscriber( ), ] = None, partitions: Annotated[ - Iterable["TopicPartition"], + Collection["TopicPartition"], Doc( """ An explicit partitions list to assign. @@ -1146,7 +1148,7 @@ def subscriber( ), ] = None, partitions: Annotated[ - Iterable["TopicPartition"], + Collection["TopicPartition"], Doc( """ An explicit partitions list to assign. @@ -1530,7 +1532,7 @@ def subscriber( ), ] = None, partitions: Annotated[ - Iterable["TopicPartition"], + Collection["TopicPartition"], Doc( """ An explicit partitions list to assign. @@ -1561,7 +1563,14 @@ def subscriber( ] = (), max_workers: Annotated[ int, - Doc("Number of workers to process messages concurrently."), + Doc( + "Maximum number of messages being processed concurrently. With " + "`auto_commit=False` processing is concurrent between partitions and " + "sequential within a partition. With `auto_commit=False` maximum " + "concurrency is achieved when total number of workers across all " + "application instances running workers in the same consumer group " + "is equal to the number of partitions in the topic." + ), ] = 1, no_ack: Annotated[ bool, @@ -1598,6 +1607,7 @@ def subscriber( "SpecificationDefaultSubscriber", "SpecificationBatchSubscriber", "SpecificationConcurrentDefaultSubscriber", + "SpecificationConcurrentBetweenPartitionsSubscriber", ]: sub = create_subscriber( *topics, @@ -1648,7 +1658,14 @@ def subscriber( if batch: subscriber = cast("SpecificationBatchSubscriber", subscriber) elif max_workers > 1: - subscriber = cast("SpecificationConcurrentDefaultSubscriber", subscriber) + if auto_commit: + subscriber = cast( + "SpecificationConcurrentDefaultSubscriber", subscriber + ) + else: + subscriber = cast( + "SpecificationConcurrentBetweenPartitionsSubscriber", subscriber + ) else: subscriber = cast("SpecificationDefaultSubscriber", subscriber) diff --git a/faststream/kafka/fastapi/fastapi.py b/faststream/kafka/fastapi/fastapi.py index b7f1bf0ec7..3477f48c10 100644 --- a/faststream/kafka/fastapi/fastapi.py +++ b/faststream/kafka/fastapi/fastapi.py @@ -55,6 +55,7 @@ ) from faststream.kafka.subscriber.specified import ( SpecificationBatchSubscriber, + SpecificationConcurrentBetweenPartitionsSubscriber, SpecificationConcurrentDefaultSubscriber, SpecificationDefaultSubscriber, ) @@ -2618,12 +2619,20 @@ def subscriber( ] = False, max_workers: Annotated[ int, - Doc("Number of workers to process messages concurrently."), + Doc( + "Maximum number of messages being processed concurrently. With " + "`auto_commit=False` processing is concurrent between partitions and " + "sequential within a partition. With `auto_commit=False` maximum " + "concurrency is achieved when total number of workers across all " + "application instances running workers in the same consumer group " + "is equal to the number of partitions in the topic." + ), ] = 1, ) -> Union[ "SpecificationBatchSubscriber", "SpecificationDefaultSubscriber", "SpecificationConcurrentDefaultSubscriber", + "SpecificationConcurrentBetweenPartitionsSubscriber", ]: subscriber = super().subscriber( *topics, @@ -2678,7 +2687,11 @@ def subscriber( if batch: return cast("SpecificationBatchSubscriber", subscriber) if max_workers > 1: - return cast("SpecificationConcurrentDefaultSubscriber", subscriber) + if auto_commit: + return cast("SpecificationConcurrentDefaultSubscriber", subscriber) + return cast( + "SpecificationConcurrentBetweenPartitionsSubscriber", subscriber + ) return cast("SpecificationDefaultSubscriber", subscriber) @overload # type: ignore[override] diff --git a/faststream/kafka/listener.py b/faststream/kafka/listener.py new file mode 100644 index 0000000000..12b54d36d1 --- /dev/null +++ b/faststream/kafka/listener.py @@ -0,0 +1,88 @@ +import logging +from typing import TYPE_CHECKING, Optional + +from aiokafka import ConsumerRebalanceListener + +from faststream._internal.utils.functions import call_or_await + +if TYPE_CHECKING: + from aiokafka import AIOKafkaConsumer, TopicPartition + + from faststream._internal.basic_types import AnyDict, LoggerProto + + +def make_logging_listener( + *, + consumer: "AIOKafkaConsumer", + logger: Optional["LoggerProto"], + log_extra: "AnyDict", + listener: Optional["ConsumerRebalanceListener"], +) -> Optional["ConsumerRebalanceListener"]: + if logger is None: + return listener + + logging_listener = _LoggingListener( + consumer=consumer, + logger=logger, + log_extra=log_extra, + ) + if listener is None: + return logging_listener + + return _LoggingListenerFacade( + logging_listener=logging_listener, + listener=listener, + ) + + +class _LoggingListener(ConsumerRebalanceListener): + def __init__( + self, + *, + consumer: "AIOKafkaConsumer", + logger: "LoggerProto", + log_extra: "AnyDict", + ) -> None: + self.consumer = consumer + self.logger = logger + self.log_extra = log_extra + + async def on_partitions_revoked(self, revoked: set["TopicPartition"]) -> None: + pass + + async def on_partitions_assigned(self, assigned: set["TopicPartition"]) -> None: + self.logger.log( + logging.INFO, + ( + f"Consumer {self.consumer._coordinator.member_id} assigned to partitions: " + f"{assigned}" + ), + extra=self.log_extra, + ) + if not assigned: + self.logger.log( + logging.WARNING, + ( + f"Consumer in group {self.consumer._group_id} has no partition assignments - topics " + f"{self.consumer._subscription.topics} may have fewer partitions than consumers" + ), + extra=self.log_extra, + ) + + +class _LoggingListenerFacade(ConsumerRebalanceListener): + def __init__( + self, + *, + logging_listener: _LoggingListener, + listener: ConsumerRebalanceListener, + ) -> None: + self.logging_listener = logging_listener + self.listener = listener + + async def on_partitions_revoked(self, revoked: set["TopicPartition"]) -> None: + await call_or_await(self.listener.on_partitions_revoked, revoked) + + async def on_partitions_assigned(self, assigned: set["TopicPartition"]) -> None: + await self.logging_listener.on_partitions_revoked(assigned) + await call_or_await(self.listener.on_partitions_assigned, assigned) diff --git a/faststream/kafka/message.py b/faststream/kafka/message.py index 20fe0d0edd..faf34d608a 100644 --- a/faststream/kafka/message.py +++ b/faststream/kafka/message.py @@ -1,12 +1,13 @@ -from typing import TYPE_CHECKING, Any, Protocol, Union +from typing import Any, Protocol, Union -from aiokafka import TopicPartition as AIOKafkaTopicPartition +from aiokafka import ( + AIOKafkaConsumer, + ConsumerRecord, + TopicPartition as AIOKafkaTopicPartition, +) from faststream.message import AckStatus, StreamMessage -if TYPE_CHECKING: - from aiokafka import ConsumerRecord - class ConsumerProtocol(Protocol): """A protocol for Kafka consumers.""" @@ -38,6 +39,10 @@ def seek( FAKE_CONSUMER = FakeConsumer() +class KafkaRawMessage(ConsumerRecord): # type: ignore[misc] + consumer: AIOKafkaConsumer + + class KafkaMessage( StreamMessage[ Union[ diff --git a/faststream/kafka/parser.py b/faststream/kafka/parser.py index 936abbc7b0..af543c8083 100644 --- a/faststream/kafka/parser.py +++ b/faststream/kafka/parser.py @@ -1,6 +1,11 @@ -from typing import TYPE_CHECKING, Any, Optional, cast - -from faststream.kafka.message import FAKE_CONSUMER, ConsumerProtocol, KafkaMessage +from typing import TYPE_CHECKING, Any, Optional, Union, cast + +from faststream.kafka.message import ( + FAKE_CONSUMER, + ConsumerProtocol, + KafkaMessage, + KafkaRawMessage, +) from faststream.message import decode_message if TYPE_CHECKING: @@ -30,7 +35,7 @@ def _setup(self, consumer: ConsumerProtocol) -> None: async def parse_message( self, - message: "ConsumerRecord", + message: Union["ConsumerRecord", "KafkaRawMessage"], ) -> "StreamMessage[ConsumerRecord]": """Parses a Kafka message.""" headers = {i: j.decode() for i, j in message.headers} @@ -44,7 +49,7 @@ async def parse_message( correlation_id=headers.get("correlation_id"), raw_message=message, path=self.get_path(message.topic), - consumer=self._consumer, + consumer=getattr(message, "consumer", self._consumer), ) async def decode_message( diff --git a/faststream/kafka/subscriber/factory.py b/faststream/kafka/subscriber/factory.py index 45edbefea3..5cd2742bf2 100644 --- a/faststream/kafka/subscriber/factory.py +++ b/faststream/kafka/subscriber/factory.py @@ -1,11 +1,12 @@ import warnings -from collections.abc import Iterable, Sequence -from typing import TYPE_CHECKING, Literal, Optional, Union, overload +from collections.abc import Collection, Iterable +from typing import TYPE_CHECKING, Optional, Union from faststream._internal.constants import EMPTY from faststream.exceptions import SetupError from faststream.kafka.subscriber.specified import ( SpecificationBatchSubscriber, + SpecificationConcurrentBetweenPartitionsSubscriber, SpecificationConcurrentDefaultSubscriber, SpecificationDefaultSubscriber, ) @@ -20,101 +21,6 @@ from faststream._internal.types import BrokerMiddleware -@overload -def create_subscriber( - *topics: str, - batch: Literal[True], - batch_timeout_ms: int, - max_records: Optional[int], - # Kafka information - group_id: Optional[str], - listener: Optional["ConsumerRebalanceListener"], - pattern: Optional[str], - connection_args: "AnyDict", - partitions: Iterable["TopicPartition"], - auto_commit: bool, - # Subscriber args - ack_policy: "AckPolicy", - max_workers: int, - no_ack: bool, - no_reply: bool, - broker_dependencies: Iterable["Dependant"], - broker_middlewares: Sequence["BrokerMiddleware[tuple[ConsumerRecord, ...]]"], - # Specification args - title_: Optional[str], - description_: Optional[str], - include_in_schema: bool, -) -> Union[ - "SpecificationDefaultSubscriber", - "SpecificationBatchSubscriber", - "SpecificationConcurrentDefaultSubscriber", -]: ... - - -@overload -def create_subscriber( - *topics: str, - batch: Literal[False], - batch_timeout_ms: int, - max_records: Optional[int], - # Kafka information - group_id: Optional[str], - listener: Optional["ConsumerRebalanceListener"], - pattern: Optional[str], - connection_args: "AnyDict", - partitions: Iterable["TopicPartition"], - auto_commit: bool, - # Subscriber args - ack_policy: "AckPolicy", - max_workers: int, - no_ack: bool, - no_reply: bool, - broker_dependencies: Iterable["Dependant"], - broker_middlewares: Sequence["BrokerMiddleware[ConsumerRecord]"], - # Specification args - title_: Optional[str], - description_: Optional[str], - include_in_schema: bool, -) -> Union[ - "SpecificationDefaultSubscriber", - "SpecificationBatchSubscriber", - "SpecificationConcurrentDefaultSubscriber", -]: ... - - -@overload -def create_subscriber( - *topics: str, - batch: bool, - batch_timeout_ms: int, - max_records: Optional[int], - # Kafka information - group_id: Optional[str], - listener: Optional["ConsumerRebalanceListener"], - pattern: Optional[str], - connection_args: "AnyDict", - partitions: Iterable["TopicPartition"], - auto_commit: bool, - # Subscriber args - ack_policy: "AckPolicy", - max_workers: int, - no_ack: bool, - no_reply: bool, - broker_dependencies: Iterable["Dependant"], - broker_middlewares: Sequence[ - "BrokerMiddleware[Union[ConsumerRecord, tuple[ConsumerRecord, ...]]]" - ], - # Specification args - title_: Optional[str], - description_: Optional[str], - include_in_schema: bool, -) -> Union[ - "SpecificationDefaultSubscriber", - "SpecificationBatchSubscriber", - "SpecificationConcurrentDefaultSubscriber", -]: ... - - def create_subscriber( *topics: str, batch: bool, @@ -125,15 +31,15 @@ def create_subscriber( listener: Optional["ConsumerRebalanceListener"], pattern: Optional[str], connection_args: "AnyDict", - partitions: Iterable["TopicPartition"], + partitions: Collection["TopicPartition"], auto_commit: bool, # Subscriber args ack_policy: "AckPolicy", max_workers: int, no_ack: bool, no_reply: bool, - broker_dependencies: Iterable["Dependant"], - broker_middlewares: Sequence[ + broker_dependencies: Collection["Dependant"], + broker_middlewares: Collection[ "BrokerMiddleware[Union[ConsumerRecord, tuple[ConsumerRecord, ...]]]" ], # Specification args @@ -144,6 +50,7 @@ def create_subscriber( "SpecificationDefaultSubscriber", "SpecificationBatchSubscriber", "SpecificationConcurrentDefaultSubscriber", + "SpecificationConcurrentBetweenPartitionsSubscriber", ]: _validate_input_for_misconfigure( *topics, @@ -168,6 +75,9 @@ def create_subscriber( if ack_policy is AckPolicy.ACK_FIRST: connection_args["enable_auto_commit"] = True ack_policy = AckPolicy.DO_NOTHING + ack_first = True + else: + ack_first = False if batch: return SpecificationBatchSubscriber( @@ -189,8 +99,26 @@ def create_subscriber( ) if max_workers > 1: - return SpecificationConcurrentDefaultSubscriber( - *topics, + if ack_first: + return SpecificationConcurrentDefaultSubscriber( + *topics, + max_workers=max_workers, + group_id=group_id, + listener=listener, + pattern=pattern, + connection_args=connection_args, + partitions=partitions, + ack_policy=ack_policy, + no_reply=no_reply, + broker_dependencies=broker_dependencies, + broker_middlewares=broker_middlewares, + title_=title_, + description_=description_, + include_in_schema=include_in_schema, + ) + + return SpecificationConcurrentBetweenPartitionsSubscriber( + topics[0], max_workers=max_workers, group_id=group_id, listener=listener, @@ -263,22 +191,30 @@ def _validate_input_for_misconfigure( ack_policy = AckPolicy.ACK_FIRST if max_workers > 1 and ack_policy is not AckPolicy.ACK_FIRST: - msg = "You can't use `max_workers` option with manual commit mode." - raise SetupError(msg) + if len(topics) > 1: + msg = "You must use a single topic with concurrent manual commit mode." + raise SetupError(msg) - if not group_id and ack_policy is not AckPolicy.ACK_FIRST: - msg = "You must use `group_id` with manual commit mode." - raise SetupError(msg) + if pattern is not None: + msg = "You can not use a pattern with concurrent manual commit mode." + raise SetupError(msg) + + if partitions: + msg = "Manual partition assignment is not supported with concurrent manual commit mode." + raise SetupError(msg) if not topics and not partitions and not pattern: msg = "You should provide either `topics` or `partitions` or `pattern`." raise SetupError(msg) + if topics and partitions: msg = "You can't provide both `topics` and `partitions`." raise SetupError(msg) + if topics and pattern: msg = "You can't provide both `topics` and `pattern`." raise SetupError(msg) + if partitions and pattern: msg = "You can't provide both `partitions` and `pattern`." raise SetupError(msg) diff --git a/faststream/kafka/subscriber/specified.py b/faststream/kafka/subscriber/specified.py index f2d5f70a3a..9ef93a15c6 100644 --- a/faststream/kafka/subscriber/specified.py +++ b/faststream/kafka/subscriber/specified.py @@ -7,6 +7,7 @@ ) from faststream.kafka.subscriber.usecase import ( BatchSubscriber, + ConcurrentBetweenPartitionsSubscriber, ConcurrentDefaultSubscriber, DefaultSubscriber, ) @@ -22,7 +23,7 @@ class SpecificationSubscriber(SpecificationSubscriberMixin): """A class to handle logic and async API operations.""" topics: Iterable[str] - partitions: Iterable["TopicPartition"] # TODO: support partitions + partitions: Iterable["TopicPartition"] _pattern: Optional[str] # TODO: support pattern schema def get_default_name(self) -> str: @@ -72,3 +73,10 @@ class SpecificationConcurrentDefaultSubscriber( ConcurrentDefaultSubscriber, ): pass + + +class SpecificationConcurrentBetweenPartitionsSubscriber( + SpecificationSubscriber, + ConcurrentBetweenPartitionsSubscriber, +): + pass diff --git a/faststream/kafka/subscriber/usecase.py b/faststream/kafka/subscriber/usecase.py index e96489c1e4..1acd9e6e5d 100644 --- a/faststream/kafka/subscriber/usecase.py +++ b/faststream/kafka/subscriber/usecase.py @@ -1,7 +1,7 @@ from abc import abstractmethod from collections.abc import Iterable, Sequence from itertools import chain -from typing import TYPE_CHECKING, Any, Callable, Optional +from typing import TYPE_CHECKING, Any, Callable, Optional, cast import anyio from aiokafka import ConsumerRecord, TopicPartition @@ -18,7 +18,8 @@ MsgType, ) from faststream._internal.utils.path import compile_path -from faststream.kafka.message import KafkaAckableMessage, KafkaMessage +from faststream.kafka.listener import make_logging_listener +from faststream.kafka.message import KafkaAckableMessage, KafkaMessage, KafkaRawMessage from faststream.kafka.parser import AioKafkaBatchParser, AioKafkaParser from faststream.kafka.publisher.fake import KafkaFakePublisher @@ -29,7 +30,7 @@ from faststream._internal.basic_types import AnyDict from faststream._internal.publisher.proto import BasePublisherProto - from faststream._internal.state import BrokerState + from faststream._internal.state import BrokerState, Pointer from faststream.message import StreamMessage from faststream.middlewares import AckPolicy @@ -79,8 +80,8 @@ def __init__( self.group_id = group_id self._pattern = pattern - self.__listener = listener - self.__connection_args = connection_args + self._listener = listener + self._connection_args = connection_args # Setup it later self.client_id = "" @@ -100,7 +101,7 @@ def _setup( # type: ignore[override] broker_parser: Optional["CustomCallable"], broker_decoder: Optional["CustomCallable"], # dependant args - state: "BrokerState", + state: "Pointer[BrokerState]", ) -> None: self.client_id = client_id self.builder = builder @@ -119,7 +120,7 @@ async def start(self) -> None: self.consumer = consumer = self.builder( group_id=self.group_id, client_id=self.client_id, - **self.__connection_args, + **self._connection_args, ) self.parser._setup(consumer) @@ -128,7 +129,12 @@ async def start(self) -> None: consumer.subscribe( topics=self.topics, pattern=self._pattern, - listener=self.__listener, + listener=make_logging_listener( + consumer=consumer, + logger=self._state.get().logger_state.logger.logger, + log_extra=self.get_log_context(None), + listener=self._listener, + ), ) elif self.partitions: @@ -138,7 +144,7 @@ async def start(self) -> None: await super().start() if self.calls: - self.add_task(self._consume()) + self.add_task(self._run_consume_loop(self.consumer)) async def close(self) -> None: await super().close() @@ -153,11 +159,12 @@ async def get_one( *, timeout: float = 5.0, ) -> "Optional[StreamMessage[MsgType]]": - assert self.consumer, "You should start subscriber at first." # nosec B101 assert ( # nosec B101 not self.calls ), "You can't use `get_one` method if subscriber has registered handlers." + assert self.consumer, "You should start subscriber at first." # nosec B101 + raw_messages = await self.consumer.getmany( timeout_ms=timeout * 1000, max_records=1, @@ -191,16 +198,16 @@ def _make_response_publisher( ) @abstractmethod - async def get_msg(self) -> MsgType: + async def get_msg(self, consumer: "AIOKafkaConsumer") -> MsgType: raise NotImplementedError - async def _consume(self) -> None: - assert self.consumer, "You should start subscriber at first." # nosec B101 + async def _run_consume_loop(self, consumer: "AIOKafkaConsumer") -> None: + assert consumer, "You should start subscriber at first." # nosec B101 connected = True while self.running: try: - msg = await self.get_msg() + msg = await self.get_msg(consumer) # pragma: no cover except KafkaError: # noqa: PERF203 @@ -303,9 +310,9 @@ def __init__( broker_dependencies=broker_dependencies, ) - async def get_msg(self) -> "ConsumerRecord": - assert self.consumer, "You should setup subscriber at first." # nosec B101 - return await self.consumer.getone() + async def get_msg(self, consumer: "AIOKafkaConsumer") -> "ConsumerRecord": + assert consumer, "You should setup subscriber at first." # nosec B101 + return await consumer.getone() def get_log_context( self, @@ -323,15 +330,6 @@ def get_log_context( ) -class ConcurrentDefaultSubscriber(ConcurrentMixin["ConsumerRecord"], DefaultSubscriber): - async def start(self) -> None: - await super().start() - self.start_consume_task() - - async def consume_one(self, msg: "ConsumerRecord") -> None: - await self._put_msg(msg) - - class BatchSubscriber(LogicSubscriber[tuple["ConsumerRecord", ...]]): def __init__( self, @@ -389,10 +387,12 @@ def __init__( broker_dependencies=broker_dependencies, ) - async def get_msg(self) -> tuple["ConsumerRecord", ...]: - assert self.consumer, "You should setup subscriber at first." # nosec B101 + async def get_msg( + self, consumer: "AIOKafkaConsumer" + ) -> tuple["ConsumerRecord", ...]: + assert consumer, "You should setup subscriber at first." # nosec B101 - messages = await self.consumer.getmany( + messages = await consumer.getmany( timeout_ms=self.batch_timeout_ms, max_records=self.max_records, ) @@ -417,3 +417,112 @@ def get_log_context( topic=topic, group_id=self.group_id, ) + + +class ConcurrentDefaultSubscriber(ConcurrentMixin["ConsumerRecord"], DefaultSubscriber): + async def start(self) -> None: + await super().start() + self.start_consume_task() + + async def consume_one(self, msg: "ConsumerRecord") -> None: + await self._put_msg(msg) + + +class ConcurrentBetweenPartitionsSubscriber(DefaultSubscriber): + consumer_subgroup: list["AIOKafkaConsumer"] + + def __init__( + self, + topic: str, + max_workers: int, + # Kafka information + group_id: Optional[str], + listener: Optional["ConsumerRebalanceListener"], + pattern: Optional[str], + connection_args: "AnyDict", + partitions: Iterable["TopicPartition"], + # Subscriber args + ack_policy: "AckPolicy", + no_reply: bool, + broker_dependencies: Iterable["Dependant"], + broker_middlewares: Sequence["BrokerMiddleware[ConsumerRecord]"], + ) -> None: + super().__init__( + topic, + group_id=group_id, + listener=listener, + pattern=pattern, + connection_args=connection_args, + partitions=partitions, + # Subscriber args + ack_policy=ack_policy, + no_reply=no_reply, + broker_dependencies=broker_dependencies, + broker_middlewares=broker_middlewares, + ) + + self.max_workers = max_workers + self.consumer_subgroup = [] + + async def start(self) -> None: + """Start the consumer subgroup.""" + assert self.builder, "You should setup subscriber at first." # nosec B101 + + if self.calls: + self.consumer_subgroup = [ + self.builder( + group_id=self.group_id, + client_id=self.client_id, + **self._connection_args, + ) + for _ in range(self.max_workers) + ] + + else: + # We should create single consumer to support + # `get_one()` and `__aiter__` methods + self.consumer = self.builder( + group_id=self.group_id, + client_id=self.client_id, + **self._connection_args, + ) + self.consumer_subgroup = [self.consumer] + + # Subscribers starting should be called concurrently + # to balance them correctly + async with anyio.create_task_group() as tg: + for c in self.consumer_subgroup: + c.subscribe( + topics=self.topics, + listener=make_logging_listener( + consumer=c, + logger=self._state.get().logger_state.logger.logger, + log_extra=self.get_log_context(None), + listener=self._listener, + ), + ) + + tg.start_soon(c.start) + + # call SubscriberUsecase method + await super(LogicSubscriber, self).start() + + if self.calls: + for c in self.consumer_subgroup: + self.add_task(self._run_consume_loop(c)) + + async def close(self) -> None: + if self.consumer_subgroup: + async with anyio.create_task_group() as tg: + for consumer in self.consumer_subgroup: + tg.start_soon(consumer.stop) + + self.consumer_subgroup = [] + + await super().close() + + async def get_msg(self, consumer: "AIOKafkaConsumer") -> "KafkaRawMessage": + assert consumer, "You should setup subscriber at first." # nosec B101 + message = await consumer.getone() + message.consumer = consumer + return cast("KafkaRawMessage", message) diff --git a/pyproject.toml b/pyproject.toml index 9ba8380b18..6973dda0d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -105,7 +105,7 @@ devdocs = [ types = [ "faststream[optionals]", - "mypy==1.14.0", + "mypy==1.14.1", # mypy extensions "types-Deprecated", "types-PyYAML", @@ -120,19 +120,17 @@ types = [ lint = [ "faststream[types]", - "ruff==0.8.4", - "bandit==1.8.0", - "semgrep==1.101.0", + "ruff==0.9.1", + "bandit==1.8.2", + "semgrep==1.102.0", "codespell==2.3.0", ] test-core = [ - "coverage[toml]==7.6.9; python_version >= '3.9'", - "coverage[toml]==7.6.1; python_version == '3.8'", + "coverage[toml]==7.6.10", "pytest==8.3.4", - "pytest-asyncio==0.25.0; python_version >= '3.9'", - "pytest-asyncio==0.24.0; python_version == '3.8'", - "dirty-equals==0.8.0", + "pytest-asyncio==0.25.2", + "dirty-equals==0.9.0", ] testing = [ diff --git a/scripts/publish.sh b/scripts/publish.sh deleted file mode 100755 index bc704d5e8a..0000000000 --- a/scripts/publish.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/bash - -hatch clean -hatch build -hatch publish diff --git a/tests/brokers/base/publish.py b/tests/brokers/base/publish.py index feac1efac4..0d5fe10dc6 100644 --- a/tests/brokers/base/publish.py +++ b/tests/brokers/base/publish.py @@ -578,6 +578,12 @@ async def handler(m): assert event.is_set() assert not mock.called + @pytest.mark.asyncio() + async def test_publisher_after_connect(self, queue: str) -> None: + async with self.patch_broker(self.get_broker()) as br: + # Should pass without error + await br.publisher(queue).publish(None) + @pytest.mark.asyncio() async def test_publisher_after_start( self, diff --git a/tests/brokers/base/testclient.py b/tests/brokers/base/testclient.py index 2076561581..015d92fa8a 100644 --- a/tests/brokers/base/testclient.py +++ b/tests/brokers/base/testclient.py @@ -187,8 +187,6 @@ async def test_broker_with_real_patches_publishers_and_subscribers( async def m(msg) -> None: await publisher.publish(f"response: {msg}") - await test_broker.start() - async with self.patch_broker(test_broker, with_real=True) as br: await br.publish("hello", queue) diff --git a/tests/brokers/kafka/test_consume.py b/tests/brokers/kafka/test_consume.py index d9160ddca0..98be7dd194 100644 --- a/tests/brokers/kafka/test_consume.py +++ b/tests/brokers/kafka/test_consume.py @@ -1,14 +1,16 @@ import asyncio +import logging +from typing import Any from unittest.mock import MagicMock, patch import pytest -from aiokafka import AIOKafkaConsumer +from aiokafka import AIOKafkaConsumer, ConsumerRebalanceListener +from aiokafka.admin import AIOKafkaAdminClient, NewTopic from aiokafka.structs import RecordMetadata from faststream import AckPolicy from faststream.exceptions import AckMessage -from faststream.kafka import TopicPartition -from faststream.kafka.annotations import KafkaMessage +from faststream.kafka import KafkaBroker, KafkaMessage, TopicPartition from tests.brokers.base.consume import BrokerRealConsumeTestcase from tests.tools import spy_decorator @@ -33,7 +35,7 @@ async def handler(msg) -> None: pattern_event = asyncio.Event() @consume_broker.subscriber(pattern=f"{queue[:-1]}*") - async def pattern_handler(msg) -> None: + async def pattern_handler(msg: Any) -> None: pattern_event.set() async with self.patch_broker(consume_broker) as br: @@ -61,7 +63,7 @@ async def test_consume_batch(self, queue: str) -> None: msgs_queue = asyncio.Queue(maxsize=1) @consume_broker.subscriber(queue, batch=True) - async def handler(msg) -> None: + async def handler(msg: Any) -> None: await msgs_queue.put(msg) async with self.patch_broker(consume_broker) as br: @@ -79,7 +81,7 @@ async def handler(msg) -> None: @pytest.mark.asyncio() async def test_consume_batch_headers( self, - mock, + mock: MagicMock, queue: str, ) -> None: event = asyncio.Event() @@ -87,7 +89,7 @@ async def test_consume_batch_headers( consume_broker = self.get_broker(apply_types=True) @consume_broker.subscriber(queue, batch=True) - def subscriber(m, msg: KafkaMessage) -> None: + def subscriber(msg: KafkaMessage) -> None: check = all( ( msg.headers, @@ -164,7 +166,7 @@ async def test_manual_partition_consume( tp1 = TopicPartition(queue, partition=0) @consume_broker.subscriber(partitions=[tp1]) - async def handler_tp1(msg) -> None: + async def handler_tp1(msg: Any) -> None: event.set() async with self.patch_broker(consume_broker) as br: @@ -234,7 +236,7 @@ async def test_consume_ack_by_raise( @consume_broker.subscriber( queue, group_id="test", ack_policy=AckPolicy.REJECT_ON_ERROR ) - async def handler(msg: KafkaMessage): + async def handler(msg: KafkaMessage) -> None: event.set() raise AckMessage @@ -343,6 +345,64 @@ async def handler(msg: KafkaMessage) -> None: assert event.is_set() + @pytest.mark.asyncio() + async def test_consume_without_value( + self, + mock: MagicMock, + queue: str, + event: asyncio.Event, + ) -> None: + consume_broker = self.get_broker() + + @consume_broker.subscriber(queue) + async def handler(msg: bytes) -> None: + event.set() + mock(msg) + + async with self.patch_broker(consume_broker) as br: + await br.start() + + await asyncio.wait( + ( + asyncio.create_task( + br._producer._producer.producer.send(queue, key=b"") + ), + asyncio.create_task(event.wait()), + ), + timeout=3, + ) + + mock.assert_called_once_with(b"") + + @pytest.mark.asyncio() + async def test_consume_batch_without_value( + self, + mock: MagicMock, + queue: str, + event: asyncio.Event, + ) -> None: + consume_broker = self.get_broker() + + @consume_broker.subscriber(queue, batch=True) + async def handler(msg: list[bytes]) -> None: + event.set() + mock(msg) + + async with self.patch_broker(consume_broker) as br: + await br.start() + + await asyncio.wait( + ( + asyncio.create_task( + br._producer._producer.producer.send(queue, key=b"") + ), + asyncio.create_task(event.wait()), + ), + timeout=3, + ) + + mock.assert_called_once_with([b""]) + @pytest.mark.asyncio() @pytest.mark.slow() async def test_concurrent_consume(self, queue: str, mock: MagicMock) -> None: @@ -354,7 +414,7 @@ async def test_concurrent_consume(self, queue: str, mock: MagicMock) -> None: args, kwargs = self.get_subscriber_params(queue, max_workers=2) @consume_broker.subscriber(*args, **kwargs) - async def handler(msg) -> None: + async def handler(msg: Any) -> None: mock() if event.is_set(): event2.set() @@ -383,59 +443,216 @@ async def handler(msg) -> None: assert mock.call_count == 2, mock.call_count @pytest.mark.asyncio() - async def test_consume_without_value( + @pytest.mark.slow() + async def test_concurrent_consume_between_partitions( self, - mock: MagicMock, queue: str, - event: asyncio.Event, ) -> None: - consume_broker = self.get_broker() + await create_topic(queue, 3) - @consume_broker.subscriber(queue) - async def handler(msg): - event.set() - mock(msg) + consume_broker = self.get_broker(apply_types=True) - async with self.patch_broker(consume_broker) as br: - await br.start() + event1, event2 = asyncio.Event(), asyncio.Event() + + consumers = set() + + @consume_broker.subscriber( + queue, + max_workers=3, + ack_policy=AckPolicy.ACK, + group_id="service_1", + ) + async def handler(message: KafkaMessage) -> None: + nonlocal consumers + consumers.add(getattr(message.raw_message, "consumer", None)) + if event1.is_set(): + event2.set() + else: + event1.set() + + async with self.patch_broker(consume_broker) as broker: + await broker.start() + + await broker.publish("hello1", queue, partition=0) + await broker.publish("hello2", queue, partition=1) await asyncio.wait( ( - asyncio.create_task( - br._producer._producer.producer.send(queue, key=b"") - ), - asyncio.create_task(event.wait()), + asyncio.create_task(event1.wait()), + asyncio.create_task(event2.wait()), ), - timeout=3, + timeout=10, ) - mock.assert_called_once_with(b"") + assert event1.is_set() + assert event2.is_set() + + assert len(consumers) == 2 @pytest.mark.asyncio() - async def test_consume_batch_without_value( + @pytest.mark.slow() + @pytest.mark.parametrize( + "with_explicit_commit", + ( + pytest.param(True, id="manual commit"), + pytest.param(False, id="commit after process"), + ), + ) + async def test_concurrent_consume_between_partitions_commit( + self, + queue: str, + with_explicit_commit: bool, + ) -> None: + await create_topic(queue, 2) + + consume_broker = self.get_broker(apply_types=True) + + @consume_broker.subscriber( + queue, + max_workers=3, + ack_policy=AckPolicy.ACK, + group_id="service_1", + ) + async def handler(msg: KafkaMessage) -> None: + await asyncio.sleep(0.7) + if with_explicit_commit: + await msg.ack() + + async with self.patch_broker(consume_broker) as broker: + await broker.start() + + with patch.object( + AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit) + ) as mock: + await asyncio.wait( + ( + asyncio.create_task( + broker.publish("hello1", queue, partition=0) + ), + asyncio.create_task( + broker.publish("hello3", queue, partition=0) + ), + asyncio.create_task( + broker.publish("hello2", queue, partition=1) + ), + asyncio.create_task(asyncio.sleep(1)), + ), + timeout=10, + ) + assert mock.mock.call_count == 2 + + +@pytest.mark.asyncio() +@pytest.mark.slow() +@pytest.mark.kafka() +class TestListener(KafkaTestcaseConfig): + async def test_sync_listener( self, - mock: MagicMock, queue: str, + mock: MagicMock, event: asyncio.Event, ) -> None: consume_broker = self.get_broker() - @consume_broker.subscriber(queue, batch=True) - async def handler(msg): - event.set() - mock(msg) + class CustomListener(ConsumerRebalanceListener): + def on_partitions_revoked(self, revoked: set[str]) -> None: + mock.on_partitions_revoked() - async with self.patch_broker(consume_broker) as br: - await br.start() + def on_partitions_assigned(self, assigned: set[str]) -> None: + mock.on_partitions_assigned() + event.set() - await asyncio.wait( - ( - asyncio.create_task( - br._producer._producer.producer.send(queue, key=b"") - ), - asyncio.create_task(event.wait()), - ), - timeout=3, - ) + consume_broker.subscriber( + queue, + ack_policy=AckPolicy.DO_NOTHING, + group_id="service_1", + listener=CustomListener(), + ) - mock.assert_called_once_with([b""]) + async with self.patch_broker(consume_broker) as broker: + await broker.start() + + await asyncio.wait((asyncio.create_task(event.wait()),), timeout=3.0) + + assert event.is_set() + mock.on_partitions_assigned.assert_called_once() + mock.on_partitions_revoked.assert_called_once() + + async def test_listener_async(self, queue: str, mock: MagicMock) -> None: + consume_broker = self.get_broker() + + class CustomListener(ConsumerRebalanceListener): + async def on_partitions_revoked(self, revoked: set[str]) -> None: + mock.on_partitions_revoked() + + async def on_partitions_assigned(self, assigned: set[str]) -> None: + mock.on_partitions_assigned() + + consume_broker.subscriber( + queue, + ack_policy=AckPolicy.DO_NOTHING, + group_id="service_1", + listener=CustomListener(), + ) + + async with self.patch_broker(consume_broker) as broker: + await broker.start() + + mock.on_partitions_assigned.assert_called_once() + mock.on_partitions_revoked.assert_called_once() + + +@pytest.mark.asyncio() +@pytest.mark.slow() +@pytest.mark.kafka() +@pytest.mark.parametrize( + ("overflow_workers"), + ( + pytest.param(True, id="workers > partitions"), + pytest.param(False, id="workers == partitions"), + ), +) +async def test_concurrent_consume_between_partitions_assignment_warning( + queue: str, + overflow_workers: bool, + mock: MagicMock, +) -> None: + max_workers = partitions = 2 + if overflow_workers: + max_workers += 1 + + await create_topic(queue, partitions) + + # arrange broker setup + broker = KafkaBroker(logger=mock, apply_types=False) + + @broker.subscriber( + queue, + max_workers=max_workers, + ack_policy=AckPolicy.DO_NOTHING, + group_id="service_1", + ) + async def handler(msg: Any) -> None: + pass + + # act + async with broker: + await broker.start() + + # assert + warning_calls = [x for x in mock.log.call_args_list if x[0][0] == logging.WARNING] + if overflow_workers: + assert len(warning_calls) == 1 + else: + assert len(warning_calls) == 0 + + +async def create_topic(topic: str, partitions: int) -> None: + admin_client = AIOKafkaAdminClient() + try: + await admin_client.start() + await admin_client.create_topics([ + NewTopic(topic, partitions, 1), + ]) + finally: + await admin_client.close() diff --git a/tests/brokers/kafka/test_misconfigure.py b/tests/brokers/kafka/test_misconfigure.py index 79bd8bdef8..f18c853708 100644 --- a/tests/brokers/kafka/test_misconfigure.py +++ b/tests/brokers/kafka/test_misconfigure.py @@ -1,13 +1,49 @@ +from typing import Any + import pytest from faststream import AckPolicy from faststream.exceptions import SetupError from faststream.kafka import KafkaBroker, TopicPartition from faststream.kafka.subscriber.specified import ( + SpecificationConcurrentBetweenPartitionsSubscriber, SpecificationConcurrentDefaultSubscriber, ) +@pytest.mark.parametrize( + ("args", "kwargs"), + ( + pytest.param( + (), + {}, + id="no destination", + ), + pytest.param( + ("topic",), + {"partitions": [TopicPartition("topic", 1)]}, + id="topic and partitions", + ), + pytest.param( + ("topic",), + {"pattern": ".*"}, + id="topic and pattern", + ), + pytest.param( + (), + { + "partitions": [TopicPartition("topic", 1)], + "pattern": ".*", + }, + id="partitions and pattern", + ), + ), +) +def test_wrong_destination(args: list[str], kwargs: dict[str, Any]) -> None: + with pytest.raises(SetupError): + KafkaBroker().subscriber(*args, **kwargs) + + def test_deprecated_options(queue: str) -> None: broker = KafkaBroker() @@ -34,57 +70,39 @@ def test_deprecated_conflicts_actual(queue: str) -> None: broker.subscriber(queue, no_ack=False, ack_policy=AckPolicy.ACK) -def test_manual_ack_policy_without_group(queue: str) -> None: - broker = KafkaBroker() - - broker.subscriber(queue, group_id="test", ack_policy=AckPolicy.DO_NOTHING) - - with pytest.raises(SetupError): - broker.subscriber(queue, ack_policy=AckPolicy.DO_NOTHING) - - -def test_manual_commit_without_group(queue: str) -> None: - broker = KafkaBroker() - - with pytest.warns(DeprecationWarning): - broker.subscriber(queue, group_id="test", auto_commit=False) - - with pytest.raises(SetupError), pytest.warns(DeprecationWarning): - broker.subscriber(queue, auto_commit=False) - - -def test_max_workers_with_manual(queue: str) -> None: - broker = KafkaBroker() - - with pytest.warns(DeprecationWarning): - sub = broker.subscriber(queue, max_workers=3, auto_commit=True) - assert isinstance(sub, SpecificationConcurrentDefaultSubscriber) - - with pytest.raises(SetupError), pytest.warns(DeprecationWarning): - broker.subscriber(queue, max_workers=3, auto_commit=False) - - -def test_max_workers_with_ack_policy(queue: str) -> None: +def test_max_workers_configuration(queue: str) -> None: broker = KafkaBroker() sub = broker.subscriber(queue, max_workers=3, ack_policy=AckPolicy.ACK_FIRST) assert isinstance(sub, SpecificationConcurrentDefaultSubscriber) - with pytest.raises(SetupError): - broker.subscriber(queue, max_workers=3, ack_policy=AckPolicy.REJECT_ON_ERROR) + sub = broker.subscriber(queue, max_workers=3, ack_policy=AckPolicy.REJECT_ON_ERROR) + assert isinstance(sub, SpecificationConcurrentBetweenPartitionsSubscriber) -def test_wrong_destination(queue: str) -> None: - broker = KafkaBroker() - +def test_max_workers_manual_commit_multi_topics_forbidden() -> None: with pytest.raises(SetupError): - broker.subscriber() + KafkaBroker().subscriber( + "queue1", + "queue2", + max_workers=3, + auto_commit=False, + ) - with pytest.raises(SetupError): - broker.subscriber(queue, partitions=[TopicPartition(queue, 1)]) +def test_max_workers_manual_commit_pattern_forbidden() -> None: with pytest.raises(SetupError): - broker.subscriber(partitions=[TopicPartition(queue, 1)], pattern=".*") + KafkaBroker().subscriber( + pattern="pattern", + max_workers=3, + auto_commit=False, + ) + +def test_max_workers_manual_commit_partitions_forbidden() -> None: with pytest.raises(SetupError): - broker.subscriber(queue, pattern=".*") + KafkaBroker().subscriber( + partitions=[TopicPartition(topic="topic", partition=1)], + max_workers=3, + auto_commit=False, + ) diff --git a/tests/opentelemetry/basic.py b/tests/opentelemetry/basic.py index 2b0396b8f7..9d19096829 100644 --- a/tests/opentelemetry/basic.py +++ b/tests/opentelemetry/basic.py @@ -101,7 +101,7 @@ def assert_span( msg: str, parent_span_id: Optional[str] = None, ) -> None: - attrs = span.attributes + attrs = span.attributes or {} assert attrs[SpanAttr.MESSAGING_SYSTEM] == self.messaging_system, attrs[ SpanAttr.MESSAGING_SYSTEM ] @@ -125,9 +125,9 @@ def assert_span( ] if action == Action.PROCESS: - assert attrs[SpanAttr.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES] == len( - msg, - ), attrs[SpanAttr.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES] + assert attrs[SpanAttr.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES] == len(msg), ( + attrs[SpanAttr.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES] + ) assert attrs[SpanAttr.MESSAGING_OPERATION] == action, attrs[ SpanAttr.MESSAGING_OPERATION ] @@ -138,6 +138,7 @@ def assert_span( ] if parent_span_id: + assert span.parent assert span.parent.span_id == parent_span_id, span.parent.span_id def assert_metrics(