Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: Event ack'd after calling KafkaMessage.nack with auto_commit=False #1001

Closed
bradydean opened this issue Nov 28, 2023 · 10 comments · Fixed by #1048 or #1678
Closed

Bug: Event ack'd after calling KafkaMessage.nack with auto_commit=False #1001

bradydean opened this issue Nov 28, 2023 · 10 comments · Fixed by #1048 or #1678
Assignees
Labels
AioKafka Issues related to `faststream.kafka` module bug Something isn't working

Comments

@bradydean
Copy link

Describe the bug

Events are ack'd after calling nack().

How to reproduce

from faststream import FastStream, Logger
from faststream.kafka import KafkaMessage

broker = KafkaBroker()
app = FastStream(broker)

@broker.subscriber("test", group_id="async_subscriber", auto_commit=False)
async def async_subscriber(message: str, logger: Logger, msg: KafkaMessage):
    logger.info(message)
    await msg.nack()

Expected behavior

After publishing an event to the test topic, the message is logged and consumer does not ack the message.

Observed behavior
The message is logged and it appears the consumer acks the message. In kafka-ui I see the consumer group is 0 messages behind.

Environment

Running FastStream 0.2.15 with CPython 3.11.6 on Linux
@bradydean bradydean added the bug Something isn't working label Nov 28, 2023
@davorrunje davorrunje moved this to Todo in FastStream Nov 28, 2023
@kumaranvpl
Copy link
Collaborator

Hello @bradydean,

Apologies for replying late.
Thanks for letting us know about the issue. we were working on releasing redis support so couldn't work on this issue. If you are interested in that feature, please take a look https://github.com/airtai/faststream/releases/tag/0.3.0rc0.
We will look at this issue and will fix it soon.

@Lancetnik
Copy link
Member

Interesting... FastStream doen't call consumer.commit() in this case and passes enable_auto_commit=False to the original AiokafkaConsumer correctly. But, the message still acked...

@Lancetnik Lancetnik mentioned this issue Dec 13, 2023
13 tasks
@Lancetnik
Copy link
Member

Seems like with aiokafka >= 0.9 all works fine. All tests at 0.3.5 versions are working correctly, so we can close it, I suppose

@github-project-automation github-project-automation bot moved this from Todo to Done in FastStream Dec 13, 2023
@lorenzobenvenuti
Copy link

lorenzobenvenuti commented Apr 12, 2024

Hi, I'm still experiencing this issue with Faststream 0.47.0.

from typing import Any, Dict
from faststream import FastStream
from faststream.kafka import KafkaBroker
from faststream.kafka.annotations import (
    KafkaMessage,
    Logger
)

broker = KafkaBroker("localhost:9093")
app = FastStream(broker)

@broker.subscriber("source", group_id="foo", auto_commit=False)
async def async_subscriber(message: Dict[str,Any], logger: Logger, msg: KafkaMessage):
    logger.info(message)
    await msg.nack() # calling this or NOT calling this doesn't change the behavior

If I send some messages with

for i in $(seq 1 20); do echo "${i}:{\"index\":\"${i}\"}" | kafka-console-producer.sh --bootstrap-server localhost:9093  --topic source --property parse.key=true --property key.separator=:; done

I see

2024-04-12 18:08:56,994 INFO     - source | foo | 110-171293 - Received
2024-04-12 18:08:56,995 INFO     - source | foo | 110-171293 - {'index': '1'}
2024-04-12 18:08:56,996 INFO     - source | foo | 110-171293 - Processed
2024-04-12 18:08:58,556 INFO     - source | foo | 111-171293 - Received
2024-04-12 18:08:58,557 INFO     - source | foo | 111-171293 - {'index': '2'}
2024-04-12 18:08:58,557 INFO     - source | foo | 111-171293 - Processed
2024-04-12 18:09:00,131 INFO     - source | foo | 112-171293 - Received
2024-04-12 18:09:00,132 INFO     - source | foo | 112-171293 - {'index': '3'}
2024-04-12 18:09:00,132 INFO     - source | foo | 112-171293 - Processed
2024-04-12 18:09:01,679 INFO     - source | foo | 113-171293 - Received
2024-04-12 18:09:01,680 INFO     - source | foo | 113-171293 - {'index': '4'}
2024-04-12 18:09:01,680 INFO     - source | foo | 113-171293 - Processed
2024-04-12 18:09:03,355 INFO     - source | foo | 114-171293 - Received
2024-04-12 18:09:03,356 INFO     - source | foo | 114-171293 - {'index': '5'}
2024-04-12 18:09:03,356 INFO     - source | foo | 114-171293 - Processed

I was expecting to consume the same message again and again.

Thanks,

lorenzo

(edit: same behavior if I raise a NackMessage exception)

@Lancetnik Lancetnik reopened this Apr 13, 2024
@Lancetnik
Copy link
Member

I am not sure, what is it: we creates AIOKafkaConsumer(..., enable_auto_commit=False) in such case and does not call consumer.commit() method at all.

I have no ideas why it commits the offset anyway.
@kumaranvpl have you any ideas?

@kumaranvpl
Copy link
Collaborator

@Lancetnik @lorenzobenvenuti
Right now I have no idea why this is happening. Let me try to replicate the same in local.
And then I will work on the fix.

@lorenzobenvenuti
Copy link

Thanks @Lancetnik @kumaranvpl . You can reproduce the issue with this application.

@lorenzobenvenuti
Copy link

I had a quick look at the code, as far as I can see aiokafka doesn't commit the offset but it keeps increasing the value in memory (see FetchResult._update_position) regardless of the commit policy and this means that, even if the n-th message is not ack-ed, the consumer will consume message n+1, n+2 and so on. I don't know if this is the expected behavior: coming from other frameworks (for example Apache Camel) I was expecting that "nack" was seeking the offset back to the last committed one (or just the previous one? I guess in a scenario where you're ack-ing/nack-ing a message at once you want to proceed one message at a time?).

As a workaround, the consumer can explicitly call seek:

@broker.subscriber("source-topic", group_id="foo", auto_commit=False)
async def async_subscriber(body: Dict[str,Any], logger: Logger, msg: KafkaMessage):
    logger.info(body)
    await msg.nack()
    msg.consumer.seek(TopicPartition(msg.raw_message.topic, msg.raw_message.partition), msg.raw_message.offset)

@davorrunje davorrunje moved this from Done to Todo in FastStream Apr 29, 2024
@Lancetnik
Copy link
Member

Lancetnik commented May 6, 2024

@lorenzobenvenuti

I think we can just use smth like that:

class KafkaMessage:
    async def nack(self) -> None:
         await self.consumer.seek_to_committed()

And it looks correct, but I am not sure about sideeffects or potential problems

@Lancetnik Lancetnik added the AioKafka Issues related to `faststream.kafka` module label May 16, 2024
@davorrunje davorrunje moved this from Todo to In Progress in FastStream Jul 15, 2024
@Lancetnik Lancetnik mentioned this issue Jul 17, 2024
63 tasks
@kumaranvpl kumaranvpl linked a pull request Aug 13, 2024 that will close this issue
13 tasks
@github-project-automation github-project-automation bot moved this from In Progress to Done in FastStream Aug 13, 2024
@kumaranvpl
Copy link
Collaborator

Hello @bradydean,

This bug has been fixed as part of https://github.com/airtai/faststream/releases/tag/0.5.18 release.

Following is a sample code

from typing import Any, Dict
from faststream import FastStream
from faststream.kafka import KafkaBroker
from faststream.kafka.annotations import (
    KafkaMessage,
    Logger
)
import json
import asyncio

broker = KafkaBroker("localhost:9092")
topic = "kafka-nack-test"

app = FastStream(broker)


@broker.subscriber(topic, group_id="foo", auto_commit=False, auto_offset_reset="earliest")
async def async_subscriber(body: Dict[str,Any], logger: Logger, msg: KafkaMessage):
    logger.info(body)
    await msg.nack()


@app.after_startup
async def publish_something() -> None:
    async def _publish_something() -> None:
        i = 10
        print(f"Sleeping for {i} seconds")
        await asyncio.sleep(i)
        message = {"hi": "there"}
        await broker.publish(message, topic=topic)    
        print("Published message" + json.dumps(message))

    asyncio.create_task(_publish_something())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AioKafka Issues related to `faststream.kafka` module bug Something isn't working
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

4 participants