You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hello, to protect my application from OOMs I want to set receive.message.max.bytes. My understanding of a message being larger than this limit is that here it will be detected, and librdkafka will disconnect from the broker and try to reconnect indefinitely. I'm able to detect such instances adding an event callback and checking the str() method of the event and checking whether it contains Receive failed: Invalid response size (in principle it could be possible to check the method err(), but this method returns 0 under these circumstances in my tests).
But the question is, assuming I want to ignore these large messages and proceed consuming subsequent messages, what is the best way of doing this? Would it be the following:
Keep track of the offset of the last delivered message (tracked at offset_last_delivered_message)
Detect via the event callback that a message is larger than the max allowed size.
Stop the consumer
Start the consumer at offset offset_last_delivered_message + 2, thereby skipping the large message. Maybe it's also possible to seek to offset_last_delivered_message + 2.
This relies on messages being delivered in order, which is guaranteed, so my reading of the code and docs is that this would work. But I wanted to ask:
Are there any holes in this understanding? Would it indeed work? Or could it be possible to skip a message which I wasn't intending to skip?
Is this the recommended way of deadline with messages larger than receive.message.max.bytes if I want to skip them?
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Hello, to protect my application from OOMs I want to set
receive.message.max.bytes
. My understanding of a message being larger than this limit is that here it will be detected, and librdkafka will disconnect from the broker and try to reconnect indefinitely. I'm able to detect such instances adding an event callback and checking thestr()
method of the event and checking whether it containsReceive failed: Invalid response size
(in principle it could be possible to check the methoderr()
, but this method returns0
under these circumstances in my tests).But the question is, assuming I want to ignore these large messages and proceed consuming subsequent messages, what is the best way of doing this? Would it be the following:
offset_last_delivered_message
)offset_last_delivered_message + 2
, thereby skipping the large message. Maybe it's also possible to seek tooffset_last_delivered_message + 2
.This relies on messages being delivered in order, which is guaranteed, so my reading of the code and docs is that this would work. But I wanted to ask:
receive.message.max.bytes
if I want to skip them?Thank you
Beta Was this translation helpful? Give feedback.
All reactions