Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Short metadata refresh time triggers a refetch of already fetched message despite no cluster changes #4249

Closed
7 tasks done
mensfeld opened this issue Apr 12, 2023 · 8 comments

Comments

@mensfeld
Copy link

mensfeld commented Apr 12, 2023

Hello,

I'm one of the maintainers of rdkafka-ruby bindings and Karafka framework author (https://github.com/karafka/karafka). I wanted to upgrade librdkafka from 2.0.2 to 2.1.0 in the ruby bindings. However, I noticed one issue that prevented me from doing this.

For some scenarios, I use a really short topic.metadata.refresh.interval.ms of 5_000 ms. This is used mainly in dev to update consumer and producer metadata states upon cluster changes quickly. I noticed that despite no cluster changes and only a few messages being produced, such a short interval causes duplicates when polling almost every time metadata refresh occurs.

The flow of the code is as follows:

  1. I create a topic with one partition
  2. I produce to it one message (first message)
  3. I create a consumer that consumes a single message, and after its consumption, it creates another message to be consumed
  4. I store all the offsets in an array, and I expect no duplicates

The code below works as expected with librdkafka 2.0.2, and none of my integration tests fail.

Consumer #each just runs a poll and yields the message.

topic = 'test'
offsets = []

Thread.new do
  # start after the consumer starts since it starts from latest
  sleep(5)
  Karafka.producer.produce_sync(topic: topic, payload: '1')
end

config = {
  'bootstrap.servers': 'localhost:9092',
  'group.id': Time.now.to_f.to_s,
  'auto.offset.reset': 'latest',
  'enable.auto.offset.store': 'false',
  'debug': 'all',
  'topic.metadata.refresh.interval.ms': 5_000
}

consumer = Rdkafka::Config.new(config).consumer

consumer.subscribe(topic)

consumer.each do |message|
  raise if offsets.include?(message.offset)

  offsets << message.offset

  Karafka.producer.produce_sync(topic: topic, payload: '1')
end

Note: Karafka.producer is just a wrapped librdkafka that dispatches a message and waits for the delivery callback. Producer metadata refresh is set much higher (100 000 ms)

Checklist

Please provide the following information:

  • librdkafka version (release number or git tag): 2.1.0
  • Apache Kafka version: 2.8.1 and 3.4.0 from bitnami
  • librdkafka client configuration: presented in the above code snippet
  • Operating system: Linux 5.4.0-146-generic #163-Ubuntu SMP Fri Mar 17 18:26:02 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue - I would consider it critical, because metadata refresh should not cause duplicates on poll without a rebalance. A metadata refresh on a cluster without changes should not cause any of those issues.

Logs

Bitnami Kafka logs:

kafka    | [2023-04-12 11:20:46,844] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group 1681298446.83954 in Empty state. Created a new member id rdkafka-bcd9def1-b0e3-4ad2-ba33-8ff6f0f53810 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
kafka    | [2023-04-12 11:20:46,844] INFO [GroupCoordinator 1]: Preparing to rebalance group 1681298446.83954 in state PreparingRebalance with old generation 0 (__consumer_offsets-14) (reason: Adding new member rdkafka-bcd9def1-b0e3-4ad2-ba33-8ff6f0f53810 with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
kafka    | [2023-04-12 11:20:46,845] INFO [GroupCoordinator 1]: Stabilized group 1681298446.83954 generation 1 (__consumer_offsets-14) with 1 members (kafka.coordinator.group.GroupCoordinator)
kafka    | [2023-04-12 11:20:46,846] INFO [GroupCoordinator 1]: Assignment received from leader rdkafka-bcd9def1-b0e3-4ad2-ba33-8ff6f0f53810 for group 1681298446.83954 for generation 1. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
kafka    | [2023-04-12 11:20:49,098] INFO [GroupMetadataManager brokerId=1] Group 1681298338.3458133 transitioned to Dead in generation 2 (kafka.coordinator.group.GroupMetadataManager)
kafka    | [2023-04-12 11:20:49,100] INFO [GroupMetadataManager brokerId=1] Group 1681298390.4270868 transitioned to Dead in generation 2 (kafka.coordinator.group.GroupMetadataManager)
kafka    | [2023-04-12 11:20:49,100] INFO [GroupMetadataManager brokerId=1] Group anything transitioned to Dead in generation 11 (kafka.coordinator.group.GroupMetadataManager)

librdkafka debug all logs + some basic Karafka logs

Full issue log here: https://gist.github.com/mensfeld/9782da56cffc5fd7293000522e4b6744 as it did not fit into the comment

D, [2023-04-12T13:20:51.858815 #418730] DEBUG -- : rdkafka: [thrd:main]: Partition test2 [0] changed fetch state active -> validate-epoch-wait
D, [2023-04-12T13:20:51.858846 #418730] DEBUG -- : rdkafka: [thrd:main]: 127.0.0.1:9092/1: test2 [0]: querying broker for epoch validation of offset 480 (leader epoch 0): epoch updated from metadata
D, [2023-04-12T13:20:51.858860 #418730] DEBUG -- : rdkafka: [thrd:main]: 127.0.0.1:9092/1:   Broker #0/1: 127.0.0.1:9092 NodeId 1
D, [2023-04-12T13:20:51.858875 #418730] DEBUG -- : rdkafka: [thrd:main]: 127.0.0.1:9092/1: 1/1 requested topic(s) seen in metadata
D, [2023-04-12T13:20:51.858884 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic test2 [0] in state validate-epoch-wait at offset 480 (leader epoch 0) (11/100000 msgs, 0/65536 kb queued, opv 4) is not fetchable: not in active fetch state
D, [2023-04-12T13:20:51.858895 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Removed test2 [0] from fetch list (0 entries, opv 4): not in active fetch state
D, [2023-04-12T13:20:51.858903 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent OffsetForLeaderEpochRequest (v2, 48 bytes @ 0, CorrId 15)
D, [2023-04-12T13:20:51.858912 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Received FetchResponse (v11, 136 bytes, CorrId 14, rtt 5.93ms)
D, [2023-04-12T13:20:51.858920 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic test2 [0] MessageSet size 69, error "Success", MaxOffset 481, LSO 481, Ver 4/4
D, [2023-04-12T13:20:51.858929 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Enqueue 1 message(s) (1 bytes, 1 ops) on test2 [0] fetch queue (qlen 16, v4, last_offset 480, 0 ctrl msgs, 0 aborted msgsets, uncompressed)
D, [2023-04-12T13:20:51.858938 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Received OffsetForLeaderEpochResponse (v2, 37 bytes, CorrId 15, rtt 5.77ms)
D, [2023-04-12T13:20:51.858947 #418730] DEBUG -- : rdkafka: [thrd:main]: 127.0.0.1:9092/1: test2 [0]: offset and epoch validation succeeded: broker end offset 481 (offset leader epoch 0)
D, [2023-04-12T13:20:51.858955 #418730] DEBUG -- : rdkafka: [thrd:main]: Partition test2 [0] changed fetch state validate-epoch-wait -> active
D, [2023-04-12T13:20:51.858964 #418730] DEBUG -- : rdkafka: [thrd:main]: Partition test2 [0] start fetching at offset 480 (leader epoch 0)
D, [2023-04-12T13:20:51.858972 #418730] DEBUG -- : rdkafka: [thrd:app]: test2 [0] is the new sticky partition
D, [2023-04-12T13:20:51.858981 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1: test2 [0] 1 message(s) in xmit queue (1 added from partition queue)
D, [2023-04-12T13:20:51.863871 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1: test2 [0] 1 message(s) in xmit queue (0 added from partition queue)
D, [2023-04-12T13:20:51.863906 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1: test2 [0]: Produce MessageSet with 1 message(s) (69 bytes, ApiVersion 7, MsgVersion 2, MsgId 3, BaseSeq 2, PID{Id:11,Epoch:0}, uncompressed)
D, [2023-04-12T13:20:51.863919 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1: Sent ProduceRequest (v7, 123 bytes @ 0, CorrId 6)
D, [2023-04-12T13:20:51.864340 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1: Received ProduceResponse (v7, 49 bytes, CorrId 6, rtt 0.43ms)
D, [2023-04-12T13:20:51.864366 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1: test2 [0]: MessageSet with 1 message(s) (MsgId 3, BaseSeq 2) delivered
I, [2023-04-12T13:20:51.868928 #418730]  INFO -- : [4e51e22256f1] Sync producing of a message to 'test2' topic took 10.200689999386668 ms
D, [2023-04-12T13:20:51.868963 #418730] DEBUG -- : [4e51e22256f1] {:topic=>"test2", :payload=>"1"}
D, [2023-04-12T13:20:52.850354 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: Topic test2 [0]: fetch decide: updating to version 4 (was 4) at offset 480 (leader epoch 0) (was offset 481 (leader epoch 0))
D, [2023-04-12T13:20:52.850420 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic test2 [0] in state active at offset 480 (leader epoch 0) (1/100000 msgs, 0/65536 kb queued, opv 4) is fetchable
D, [2023-04-12T13:20:52.850444 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Added test2 [0] to fetch list (1 entries, opv 4, 0 messages queued): fetchable
D, [2023-04-12T13:20:52.850472 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic test2 [0] at offset 480 (leader epoch 0, current leader epoch 0, v4)
D, [2023-04-12T13:20:52.850492 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch 1/1/1 toppar(s)
D, [2023-04-12T13:20:52.850509 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent FetchRequest (v11, 95 bytes @ 0, CorrId 16)
/mnt/software/Karafka/karafka/spec/integrations/consumption/loop_with_messages_with_headers_spec.rb:60:in `block in <main>': unhandled exception
  from /mnt/software/Karafka/karafka-rdkafka/lib/rdkafka/consumer.rb:460:in `block in each'
  from /mnt/software/Karafka/karafka-rdkafka/lib/rdkafka/consumer.rb:457:in `loop'
  from /mnt/software/Karafka/karafka-rdkafka/lib/rdkafka/consumer.rb:457:in `each'
  from /mnt/software/Karafka/karafka/spec/integrations/consumption/loop_with_messages_with_headers_spec.rb:59:in `<main>'
I, [2023-04-12T13:20:52.942391 #418730]  INFO -- : [4e51e22256f1] Closing producer took 0.03628700040280819 ms

Exactly the same code under 2.0.2 works as expected and can run for several minutes without any issues. I can reproduce it every single time.

I suspect, that it may be related to the metadata cache eviction flow: v2.0.2...v2.1.0#diff-17c2af7f93fd5ac3f7afc7993bcfaa03bf3cb614e522a8dae82e2b077bcfd3beR163

@emasab
Copy link
Contributor

emasab commented Apr 12, 2023

Hi @mensfeld, thanks for the detailed report. Could you tell me if it happens even if you set the sleep to a value different from the metadata refresh, like 1s higher or lower?

@mensfeld
Copy link
Author

mensfeld commented Apr 12, 2023

@emasab yes it does happen if I set it higher or lower or in case I start from earliest and just catch up on messages. The sleep being set to 5 is a coincidence.

Tested now on 1, 2 and 10 seconds. Same effect.

Actually if I set the sleep to a much higher value then the refresh, things work. It looks like the issue is with the first metadata refresh in case messages started to be consumed.

@mensfeld
Copy link
Author

I did one more thing: I created a separate process to produce messages and just used consumer independently from producer.

If I start producing and consuming (separate processes) messages BEFORE the initial first metadata refresh (after 5 seconds), things reprocess. If I wait and only produce (separate process) AFTER the first refresh, works as expected.

Hi @mensfeld, thanks for the detailed report.

Will I get a great report badge? ;)

@emasab
Copy link
Contributor

emasab commented Apr 12, 2023

I had seen the fetch response close to the OffsetForLeaderEpochRequest, I think it's the FetchResponse that needs to be discarded when in state RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT

D, [2023-04-12T13:20:51.858903 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent OffsetForLeaderEpochRequest (v2, 48 bytes @ 0, CorrId 15)
D, [2023-04-12T13:20:51.858912 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Received FetchResponse (v11, 136 bytes, CorrId 14, rtt 5.93ms)

That initial OffsetForLeaderEpochRequest shouldn't be called, but we need it at the moment as we need to refactor and introduce a private rd_kafka_metadata_topic_t struct.

This is independent though, as it can happen in other moments the offset validation is called.

@mensfeld
Copy link
Author

I think it's the FetchResponse that needs to be discarded when in state RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT

Probably yes but not an expert :( so won't help much. if patch is available I can however run my rather heavy integrations suite to test it :)

emasab added a commit that referenced this issue Apr 18, 2023
emasab added a commit that referenced this issue Apr 18, 2023
@emasab emasab closed this as completed in 497b8f2 Apr 20, 2023
@emasab
Copy link
Contributor

emasab commented Apr 26, 2023

Hi @mensfeld does version v2.1.1-RC1 solve this issue? I've reproduced the issue with a test, but wanted to confirm.

@mensfeld
Copy link
Author

@emasab I will be able to confirm you this in around 3-7 days.

@mensfeld
Copy link
Author

mensfeld commented May 3, 2023

@emasab quick tests show it works as expected. I am not able to test it deeply, though because traveling. However things I could test fast show, the issue is no longer there. great work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants