Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: using routers, publishers do not handle batch return from handlers #1945

Open
yann-combarnous opened this issue Nov 26, 2024 · 6 comments
Labels
bug Something isn't working Confluent Issues related to `faststream.confluent` module

Comments

@yann-combarnous
Copy link

Describe the bug
When using KafkaRouter and KafkaRoute publishers, publisher does not handle properly the tuple returned by handler function.

How to reproduce
Include source code:

from faststream import FastStream
from fastream.confluent import KafkaBroker, KafkaRouter, KafkaRoute, KafkaPublisher

broker = KafkaBroker()
app = FastStream()

async def handler(messages: list[str]):
   return tuple(messages)

router = KafkaRouter(
    handlers=(
        KafkaRoute(
            handle_msg,
            topic_in,
            batch=True,
            publishers=(KafkaPublisher(topic_out, batch=True),),
        ),
    )
)
broker.include_router(router)

Expected behavior
If 3 messages are received from topic_in by batch by handler function, 3 messages are pushed to topic_out.

Observed behavior
If 3 messages are received from topic_in by batch by handler function, 1 message is pushed to topic_out (the message is a list of the 3 messages received in function input).

Screenshots

Environment
Running FastStream 0.5.30 with CPython 3.12.7 on Darwin

Additional context
Provide any other relevant context or information about the problem here.

@yann-combarnous yann-combarnous added the bug Something isn't working label Nov 26, 2024
@Lancetnik
Copy link
Member

I tried to reproduce your problem, but seems like it is working fine

from faststream import FastStream
from faststream.kafka import KafkaBroker, KafkaPublisher, KafkaRoute, KafkaRouter

broker = KafkaBroker()
app = FastStream(broker)

async def handler(msgs: list[int]):
    print(msgs)
    return tuple(msgs)

router = KafkaRouter(
    handlers=(
        KafkaRoute(
            handler,
            "in",
            batch=True,
            publishers=(KafkaPublisher("out", batch=True),),
        ),
        KafkaRoute(
            handler,
            "out",
            batch=True,
        ),
    )
)
broker.include_router(router)

@app.after_startup
async def _():
    await broker.publish_batch(*list(range(3)), topic="in")

Handler consumes ["0", "1", "2"] each time

@Lancetnik
Copy link
Member

Sorry, I should check confluent instead 😄

@Lancetnik Lancetnik added the Confluent Issues related to `faststream.confluent` module label Nov 27, 2024
@spataphore1337
Copy link
Contributor

spataphore1337 commented Nov 28, 2024

I tested the code on faststream.confluent and everything works.

from faststream import FastStream
from faststream.confluent import KafkaBroker, KafkaRouter, KafkaRoute, KafkaPublisher
import asyncio

broker = KafkaBroker()
app = FastStream(broker)

async def handler(messages):
    print(f'first: {messages}')
    return tuple(messages)


async def second_handler(messages: set[str]):
    print(messages)


router = KafkaRouter(
    handlers=(
        KafkaRoute(
            handler,
            "topic_in_issue_1945",
            batch=True,
            auto_offset_reset="earliest",
            publishers=(KafkaPublisher("topic_out_issue_1945", batch=True),),
        ),
        KafkaRoute(
            second_handler,
            "topic_out_issue_1945",
            batch=True
        )
    )
)

@app.after_startup
async def t():
    await asyncio.sleep(5)
    await broker.publish_batch("test batch return", "test second batch return", topic="topic_in_issue_1945")

async def start_app():
    await app.run()

asyncio.run(start_app())
2024-11-28 17:41:15,321 INFO     - FastStream app starting...
2024-11-28 17:41:15,325 INFO     - topic_in_issue_1945  |            - `Handler` waiting for messages
2024-11-28 17:41:15,565 INFO     - topic_out_issue_1945 |            - `SecondHandler` waiting for messages
2024-11-28 17:41:15,816 INFO     - FastStream app started successfully! To exit, press CTRL+C
2024-11-28 17:41:19,090 INFO     - topic_in_issue_1945  | 27-28-1732 - Received
first: ['test batch return', 'test second batch return']
2024-11-28 17:41:19,509 INFO     - topic_in_issue_1945  | 27-28-1732 - Processed
2024-11-28 17:41:19,620 INFO     - topic_out_issue_1945 | 12-13-1732 - Received
{'test batch return', 'test second batch return'}
2024-11-28 17:41:19,620 INFO     - topic_out_issue_1945 | 12-13-1732 - Processed

@yann-combarnous
Copy link
Author

Thx @spataphore1337 , will try and run your reproduction later today.

@yann-combarnous
Copy link
Author

yann-combarnous commented Nov 28, 2024

Thx @spataphore1337 , will try and run your reproduction later today.

@spataphore1337, compared the code with mine, and highlighted the difference below.

second_handler receives a list[str], even though it has batch=False as batch is not set.

async def second_handler(messages: str)  #set[str]):
    print(messages)

...

        KafkaRoute(
            second_handler,
            "topic_out_issue_1945",
            # batch=True
        )

@yann-combarnous
Copy link
Author

yann-combarnous commented Nov 29, 2024

Interestingly, when I run on my local (Macos), the behavior is correct: 3 messages in, 3 messages out. When deployed to a Docker container on linux, I get 3 messages in, 1 list message out. A bit puzzled atm, as I cannot reproduce on my local.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Confluent Issues related to `faststream.confluent` module
Projects
None yet
Development

No branches or pull requests

3 participants