-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sarama (probably) incorrectly assumes FetchResponseV4 contains RecordBatches when it contains MessageSets #988
Comments
cc @wladh |
So, it looks like the correct thing to do is to peek 16 bytes forward and check the version, since it's at the same offset regardless of the format? |
@eapache Exactly, I think that's why they designed it this way. Dived into the Kafka source to see how it's handled there, and they also use the version (magic byte) for this: byte magic = buffer.get(buffer.position() + MAGIC_OFFSET);
ByteBuffer batchSlice = buffer.slice();
batchSlice.limit(batchSize);
buffer.position(buffer.position() + batchSize);
if (magic < 0 || magic > RecordBatch.CURRENT_MAGIC_VALUE)
throw new CorruptRecordException("Invalid magic found in record: " + magic);
if (magic > RecordBatch.MAGIC_VALUE_V1)
return new DefaultRecordBatch(batchSlice);
else
return new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice); |
@jurriaan at a guess, I'd think it's potentially possible for kafka to return the older format if you upgraded your broker from a previous version to a 0.11+ version. Does that seem to fit with your case? |
@tcrayford correct, forgot to mention that, before this happened I updated the message version in the broker config from 0.10 to 1.0 |
Yeah, it seems I was under mistaken assumption that the messages would be converted to v2 (it seems the conversion only happens downwards). |
No problem. I haven't had time to look at this properly yet, so if you want to grab it that would be great. You probably have more context in that chunk of code still anyway. |
I got the error
kafka: insufficient data to decode packet, more bytes expected
while consuming from some partitions. I tracked this down (by placing panics in real_decoder.go, because there were no stack traces) to this line:https://github.com/Shopify/sarama/blob/master/record_batch.go#L153
Somehow the array length was corrupted here. I dumped the raw response from kafka and compared it to the RecordBatch spec. It didn't match up, the magic number was incorrect (should be 2 according to https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol). But it was 1.
I eventually worked out that this thing Sarama was trying to decode wasn't a RecordBatch but a MessageSet.
It seems that https://github.com/Shopify/sarama/blob/master/fetch_response.go#L83 is not the correct way of determining if the response contains a messageset or a recordbatch.
If I understand the following comment in https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java#L94-L95 correctly it should be possible for FetchResponse v4 to contain the older format:
I worked around this issue by switching back the version to V0_10_0_0 for now.
The text was updated successfully, but these errors were encountered: