Subscriber with Pydantic BaseModel argument treated as a single field's type rather than the whole body's type #1230
-
I'm new to FastStream and getting started with adding a FastStream-based (aiokafka) Kafka consumer into a FastAPI app. I have another FastStream app that's publishing messages serialized to JSON from a Pydantic BaseModel. In my consumer app, I can successfully do the following where I treat the message body as a string and then manually deserialize into the Pydantic model: """Kafka router and consumers."""
from __future__ import annotations
from faststream.kafka.fastapi import KafkaRouter
from faststream.security import BaseSecurity
from rubin.squarebot.models.kafka import SquarebotSlackMessageValue
from structlog import get_logger
from ..config import config
__all__ = ["kafka_router", "handle_slack_message"]
kafka_security = BaseSecurity(ssl_context=config.kafka.ssl_context)
kafka_router = KafkaRouter(
config.kafka.bootstrap_servers, security=kafka_security
)
@kafka_router.subscriber(
config.message_channels_topic,
group_id=config.consumer_group_id,
)
async def handle_slack_message(
msg: str,
) -> None:
"""Handle a Slack message."""
logger = get_logger(__name__)
message = SquarebotSlackMessageValue.model_validate_json(msg)
logger.info("Slack message text", text=message.text) (the But I understand I should be able to annotate the subscriber's argument with the Pydantic model type and have it automatically deserialize into the
When I do this, though, I get a Pydantic validation error saying that the "msg" field is missing the contents of the
Are there some things I should try to make sure that FastStream is treating the subscriber function argument that's annotated as a Pydantic BaseModel type as representing the whole message body, and not just a field? This discussion as touches on this topic, but my reading is that a single argument annotated as the Pydantic model's type should do what I want. I'm running in Python 3.12.2 with these key library versions:
Thanks for looking at this. FastStream is really cool and I've been overall quite happy with how it simplifies setting up Kafka app within our FastAPI apps. 😄 |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 2 replies
-
With such annotation, your message body should be SquarebotSlackMessageValue type, not To consume this one, u should create extra Pydantic model like this class Data(BaseModel):
msg: SquarebotSlackMessageValue And use it as payload annotation |
Beta Was this translation helpful? Give feedback.
Yes, FastStream is able to publish raw pydantic models (as well as regular python types) and sets
content-type
header automatically too