Skip to content

Commit

Permalink
Type-hints + refactoring (#2018)
Browse files Browse the repository at this point in the history
  • Loading branch information
doublehomixide authored Jan 9, 2025
1 parent d03b62d commit 97ff3cb
Show file tree
Hide file tree
Showing 26 changed files with 223 additions and 222 deletions.
2 changes: 1 addition & 1 deletion faststream/_internal/fastapi/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async def solve_faststream_dependency(
**kwargs,
)
values, errors, background = (
solved_result.values, # noqa: PD011
solved_result.values,
solved_result.errors,
solved_result.background_tasks,
)
Expand Down
6 changes: 3 additions & 3 deletions faststream/_internal/fastapi/route.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ def wrap_callable_to_fastapi_compatible(
response_model_exclude_none: bool,
state: "DIState",
) -> Callable[["NativeMessage[Any]"], Awaitable[Any]]:
__magic_attr = "__faststream_consumer__"
magic_attr = "__faststream_consumer__"

if getattr(user_callable, __magic_attr, False):
if getattr(user_callable, magic_attr, False):
return user_callable # type: ignore[return-value]

if response_model:
Expand All @@ -105,7 +105,7 @@ def wrap_callable_to_fastapi_compatible(
state=state,
)

setattr(parsed_callable, __magic_attr, True)
setattr(parsed_callable, magic_attr, True)
return wraps(user_callable)(parsed_callable)


Expand Down
7 changes: 6 additions & 1 deletion faststream/_internal/utils/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ async def return_input(x: Any) -> Any:
return x


async def run_in_executor(executor: Optional[Executor], func: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> T:
async def run_in_executor(
executor: Optional[Executor],
func: Callable[P, T],
*args: P.args,
**kwargs: P.kwargs,
) -> T:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(executor, partial(func, *args, **kwargs))
14 changes: 10 additions & 4 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ async def send(

def ack_callback(err: Any, msg: Optional[Message]) -> None:
if err or (msg is not None and (err := msg.error())):
loop.call_soon_threadsafe(result_future.set_exception, KafkaException(err))
loop.call_soon_threadsafe(
result_future.set_exception, KafkaException(err)
)
else:
loop.call_soon_threadsafe(result_future.set_result, msg)

Expand Down Expand Up @@ -357,7 +359,9 @@ async def start(self) -> None:

async def commit(self, asynchronous: bool = True) -> None:
"""Commits the offsets of all messages returned by the last poll operation."""
await run_in_executor(self._thread_pool, self.consumer.commit, asynchronous=asynchronous)
await run_in_executor(
self._thread_pool, self.consumer.commit, asynchronous=asynchronous
)

async def stop(self) -> None:
"""Stops the Kafka consumer and releases all resources."""
Expand All @@ -382,7 +386,7 @@ async def stop(self) -> None:
# Wrap calls to async to make method cancelable by timeout
# We shouldn't read messages and close consumer concurrently
# https://github.com/airtai/faststream/issues/1904#issuecomment-2506990895
# Now it works withouth lock due `ThreadPoolExecutor(max_workers=1)`
# Now it works without lock due `ThreadPoolExecutor(max_workers=1)`
# that makes all calls to consumer sequential
await run_in_executor(self._thread_pool, self.consumer.close)

Expand Down Expand Up @@ -414,7 +418,9 @@ async def seek(self, topic: str, partition: int, offset: int) -> None:
partition=partition,
offset=offset,
)
await run_in_executor(self._thread_pool, self.consumer.seek, topic_partition.to_confluent())
await run_in_executor(
self._thread_pool, self.consumer.seek, topic_partition.to_confluent()
)


def check_msg_error(msg: Optional[Message]) -> Optional[Message]:
Expand Down
4 changes: 2 additions & 2 deletions faststream/middlewares/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
Callable[..., None],
Callable[..., Awaitable[None]],
]
PublishingExceptionHandler: TypeAlias = Callable[..., "Any"]
PublishingExceptionHandler: TypeAlias = Callable[..., Any]

CastedGeneralExceptionHandler: TypeAlias = Callable[..., Awaitable[None]]
CastedPublishingExceptionHandler: TypeAlias = Callable[..., Awaitable["Any"]]
CastedPublishingExceptionHandler: TypeAlias = Callable[..., Awaitable[Any]]
CastedHandlers: TypeAlias = list[
tuple[
type[Exception],
Expand Down
161 changes: 65 additions & 96 deletions faststream/rabbit/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,111 +536,80 @@ async def start(self) -> None:
logger_state.log(f"Set max consumers to {self._max_consumers}")

@override
async def publish( # type: ignore[override]
async def publish(
self,
message: Annotated[
"AioPikaSendableMessage",
Doc("Message body to send."),
] = None,
queue: Annotated[
Union["RabbitQueue", str],
Doc("Message routing key to publish with."),
] = "",
exchange: Annotated[
Union["RabbitExchange", str, None],
Doc("Target exchange to publish message to."),
] = None,
message: "AioPikaSendableMessage" = None,
queue: Union["RabbitQueue", str] = "",
exchange: Union["RabbitExchange", str, None] = None,
*,
routing_key: Annotated[
str,
Doc(
"Message routing key to publish with. "
"Overrides `queue` option if presented.",
),
] = "",
mandatory: Annotated[
bool,
Doc(
"Client waits for confirmation that the message is placed to some queue. "
"RabbitMQ returns message to client if there is no suitable queue.",
),
] = True,
immediate: Annotated[
bool,
Doc(
"Client expects that there is consumer ready to take the message to work. "
"RabbitMQ returns message to client if there is no suitable consumer.",
),
] = False,
timeout: Annotated[
"TimeoutType",
Doc("Send confirmation time from RabbitMQ."),
] = None,
persist: Annotated[
bool,
Doc("Restore the message on RabbitMQ reboot."),
] = False,
reply_to: Annotated[
Optional[str],
Doc(
"Reply message routing key to send with (always sending to default exchange).",
),
] = None,
# message args
correlation_id: Annotated[
Optional[str],
Doc(
"Manual message **correlation_id** setter. "
"**correlation_id** is a useful option to trace messages.",
),
] = None,
headers: Annotated[
Optional["HeadersType"],
Doc("Message headers to store metainformation."),
] = None,
content_type: Annotated[
Optional[str],
Doc(
"Message **content-type** header. "
"Used by application, not core RabbitMQ. "
"Will be set automatically if not specified.",
),
] = None,
content_encoding: Annotated[
Optional[str],
Doc("Message body content encoding, e.g. **gzip**."),
] = None,
expiration: Annotated[
Optional["DateType"],
Doc("Message expiration (lifetime) in seconds (or datetime or timedelta)."),
] = None,
message_id: Annotated[
Optional[str],
Doc("Arbitrary message id. Generated automatically if not presented."),
] = None,
timestamp: Annotated[
Optional["DateType"],
Doc("Message publish timestamp. Generated automatically if not presented."),
] = None,
message_type: Annotated[
Optional[str],
Doc("Application-specific message type, e.g. **orders.created**."),
] = None,
user_id: Annotated[
Optional[str],
Doc("Publisher connection User ID, validated if set."),
] = None,
priority: Annotated[
Optional[int],
Doc("The message priority (0 by default)."),
] = None,
routing_key: str = "",
# publish options
mandatory: bool = True,
immediate: bool = False,
timeout: "TimeoutType" = None,
persist: bool = False,
reply_to: Optional[str] = None,
correlation_id: Optional[str] = None,
# message options
headers: Optional["HeadersType"] = None,
content_type: Optional[str] = None,
content_encoding: Optional[str] = None,
expiration: Optional["DateType"] = None,
message_id: Optional[str] = None,
timestamp: Optional["DateType"] = None,
message_type: Optional[str] = None,
user_id: Optional[str] = None,
priority: Optional[int] = None,
) -> Optional["aiormq.abc.ConfirmationFrameType"]:
"""Publish message directly.
This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks
applications or to publish messages from time to time.
Please, use `@broker.publisher(...)` or `broker.publisher(...).publish(...)` instead in a regular way.
Args:
message:
Message body to send.
queue:
Message routing key to publish with.
exchange:
Target exchange to publish message to.
routing_key:
Message routing key to publish with. Overrides `queue` option if presented.
mandatory:
Client waits for confirmation that the message is placed to some queue. RabbitMQ returns message to client if there is no suitable queue.
immediate:
Client expects that there is consumer ready to take the message to work. RabbitMQ returns message to client if there is no suitable consumer.
timeout:
Send confirmation time from RabbitMQ.
persist:
Restore the message on RabbitMQ reboot.
reply_to:
Reply message routing key to send with (always sending to default exchange).
correlation_id:
Manual message **correlation_id** setter. **correlation_id** is a useful option to trace messages.
headers:
Message headers to store metainformation.
content_type:
Message **content-type** header. Used by application, not core RabbitMQ. Will be set automatically if not specified.
content_encoding:
Message body content encoding, e.g. **gzip**.
expiration:
Message expiration (lifetime) in seconds (or datetime or timedelta).
message_id:
Arbitrary message id. Generated automatically if not presented.
timestamp:
Message publish timestamp. Generated automatically if not presented.
message_type:
Application-specific message type, e.g. **orders.created**.
user_id:
Publisher connection User ID, validated if set.
priority:
The message priority (0 by default).
Returns:
An optional `aiormq.abc.ConfirmationFrameType` representing the confirmation frame if RabbitMQ is configured to send confirmations.
"""
cmd = RabbitPublishCommand(
message,
Expand Down
11 changes: 5 additions & 6 deletions faststream/rabbit/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,14 @@ def __init__(
middlewares=middlewares,
)

request_options = dict(message_kwargs)
self.headers = request_options.pop("headers") or {}
self.reply_to = request_options.pop("reply_to", None) or ""
self.timeout = request_options.pop("timeout", None)
self.headers = message_kwargs.pop("headers") or {}
self.reply_to: str = message_kwargs.pop("reply_to", None) or ""
self.timeout = message_kwargs.pop("timeout", None)

message_options, _ = filter_by_dict(MessageOptions, request_options)
message_options, _ = filter_by_dict(MessageOptions, dict(message_kwargs))
self.message_options = message_options

publish_options, _ = filter_by_dict(PublishOptions, request_options)
publish_options, _ = filter_by_dict(PublishOptions, dict(message_kwargs))
self.publish_options = publish_options

self.app_id = None
Expand Down
4 changes: 2 additions & 2 deletions faststream/rabbit/schemas/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ def __init__(
if durable is EMPTY:
durable = True
elif not durable:
_error_msg = "Quorum and Stream queues must be durable"
raise SetupError(_error_msg)
error_msg = "Quorum and Stream queues must be durable"
raise SetupError(error_msg)
elif durable is EMPTY:
durable = False

Expand Down
Loading

0 comments on commit 97ff3cb

Please sign in to comment.