-
Notifications
You must be signed in to change notification settings - Fork 4.1k
Log.warn if found a message in kafka topic larger than the maximum fetch size #1443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
This would mean spout will get stuck at that particular message and go into infinite worker jvm restarts until config is updated. Isn't it? |
|
Spout will not get stuck and keep running like nothing goes wrong. Users will not know what happened but their topology just stop processing kafka messages. |
|
MessageSizeTooLarge exception will kill the worker JVM (since it is RuntimeException). When worker is re-launched again, the jvm will again be killed when spout reaches the faulty offset. This behavior may not be desired for many users who don't want to stall the worker because of one large message. Can you instead log an Error, ignore the message and proceed further? Also I didn't fully understand the side effect of not throwing an exception. what do you mean by "topology will fetch no data but still be running"? Will it stop fetching data at all? |
This means data loss to users. I am not sure data loss is acceptable or not. As I mentioned above, ConsumerIterator chooses to throw an exception (MessageTooLargeException, which will cause Kafka Consumer to stop working), so I think maybe it is a good way.
The spout will keep trying to fetch data, but the response from Kafka contains no valid bytes because of size limit. The side effect of this is that the data in Kafka topic will pile up while users don't know why their storm topology stops processing messages (there is no data to process). I think you are right that many users don't want to stall the worker because of one large message, but this a result of incorrect config (KafkaConfig.fetchSizeBytes) and if they want to avoid this situation, they need to set a really large size limit at the first time. |
|
@abhishekagarwal87 and @Victor-Wong I am on the fence on this. Having a spout that is stuck forever is really bad, but having it crash, lose data, come back up and repeat the process possibly draining the other partitions feels even worse. I guess if you configured your spouts for at most once processing then you got what you asked for, even if it was shoot yourself in the foot, and storm is fail fast so it fits with that philosophy. Please at least update the exception message to indicate what the sizes are actually set to. I think this would make life simpler for the user that is in this situation, so they see the error message and it says something like.
Another alternative might be to give the user the option to skip messages that are too large, and provide a metric to indicate how many messages/bytes have been skipped because of this. |
It seems we can not get the actual size of the message. I have made some updates according to your advice.
|
This reverts commit b007f03.
|
I made some changes, what about logging a warn message instead? |
| if (msgs.sizeInBytes() > 0 && msgs.validBytes() == 0) { | ||
| LOG.warn(String.format("Found a message larger than the maximum fetch size (%d bytes) of this consumer on topic " + | ||
| "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow." | ||
| , config.fetchSizeBytes, partition.topic, partition.partition, offset)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Comma at the front of the line is aesthetically displeasing to me (i.e., ugly). ;-)
More importantly, what is the value in msgs.sizeInBytes() if it's not the message size? i.e., in the review comments you said:
It seems we can not get the actual size of the message.
So I wonder what the value is in msgs.sizeInBytes() then?
We are closing stale Pull Requests to make the list more manageable. Please re-open any Pull Request that has been closed in error. Closes apache#608 Closes apache#639 Closes apache#640 Closes apache#648 Closes apache#662 Closes apache#668 Closes apache#692 Closes apache#705 Closes apache#724 Closes apache#728 Closes apache#730 Closes apache#753 Closes apache#803 Closes apache#854 Closes apache#922 Closes apache#986 Closes apache#992 Closes apache#1019 Closes apache#1040 Closes apache#1041 Closes apache#1043 Closes apache#1046 Closes apache#1051 Closes apache#1078 Closes apache#1146 Closes apache#1164 Closes apache#1165 Closes apache#1178 Closes apache#1213 Closes apache#1225 Closes apache#1258 Closes apache#1259 Closes apache#1268 Closes apache#1272 Closes apache#1277 Closes apache#1278 Closes apache#1288 Closes apache#1296 Closes apache#1328 Closes apache#1342 Closes apache#1353 Closes apache#1370 Closes apache#1376 Closes apache#1391 Closes apache#1395 Closes apache#1399 Closes apache#1406 Closes apache#1410 Closes apache#1422 Closes apache#1427 Closes apache#1443 Closes apache#1462 Closes apache#1468 Closes apache#1483 Closes apache#1506 Closes apache#1509 Closes apache#1515 Closes apache#1520 Closes apache#1521 Closes apache#1525 Closes apache#1527 Closes apache#1544 Closes apache#1550 Closes apache#1566 Closes apache#1569 Closes apache#1570 Closes apache#1575 Closes apache#1580 Closes apache#1584 Closes apache#1591 Closes apache#1600 Closes apache#1611 Closes apache#1613 Closes apache#1639 Closes apache#1703 Closes apache#1711 Closes apache#1719 Closes apache#1737 Closes apache#1760 Closes apache#1767 Closes apache#1768 Closes apache#1785 Closes apache#1799 Closes apache#1822 Closes apache#1824 Closes apache#1844 Closes apache#1874 Closes apache#1918 Closes apache#1928 Closes apache#1937 Closes apache#1942 Closes apache#1951 Closes apache#1957 Closes apache#1963 Closes apache#1964 Closes apache#1965 Closes apache#1967 Closes apache#1968 Closes apache#1971 Closes apache#1985 Closes apache#1986 Closes apache#1998 Closes apache#2031 Closes apache#2032 Closes apache#2071 Closes apache#2076 Closes apache#2108 Closes apache#2119 Closes apache#2128 Closes apache#2142 Closes apache#2174 Closes apache#2206 Closes apache#2297 Closes apache#2322 Closes apache#2332 Closes apache#2341 Closes apache#2377 Closes apache#2414 Closes apache#2469
We are closing stale Pull Requests to make the list more manageable. Please re-open any Pull Request that has been closed in error. Closes apache#608 Closes apache#639 Closes apache#640 Closes apache#648 Closes apache#662 Closes apache#668 Closes apache#692 Closes apache#705 Closes apache#724 Closes apache#728 Closes apache#730 Closes apache#753 Closes apache#803 Closes apache#854 Closes apache#922 Closes apache#986 Closes apache#992 Closes apache#1019 Closes apache#1040 Closes apache#1041 Closes apache#1043 Closes apache#1046 Closes apache#1051 Closes apache#1078 Closes apache#1146 Closes apache#1164 Closes apache#1165 Closes apache#1178 Closes apache#1213 Closes apache#1225 Closes apache#1258 Closes apache#1259 Closes apache#1268 Closes apache#1272 Closes apache#1277 Closes apache#1278 Closes apache#1288 Closes apache#1296 Closes apache#1328 Closes apache#1342 Closes apache#1353 Closes apache#1370 Closes apache#1376 Closes apache#1391 Closes apache#1395 Closes apache#1399 Closes apache#1406 Closes apache#1410 Closes apache#1422 Closes apache#1427 Closes apache#1443 Closes apache#1462 Closes apache#1468 Closes apache#1483 Closes apache#1506 Closes apache#1509 Closes apache#1515 Closes apache#1520 Closes apache#1521 Closes apache#1525 Closes apache#1527 Closes apache#1544 Closes apache#1550 Closes apache#1566 Closes apache#1569 Closes apache#1570 Closes apache#1575 Closes apache#1580 Closes apache#1584 Closes apache#1591 Closes apache#1600 Closes apache#1611 Closes apache#1613 Closes apache#1639 Closes apache#1703 Closes apache#1711 Closes apache#1719 Closes apache#1737 Closes apache#1760 Closes apache#1767 Closes apache#1768 Closes apache#1785 Closes apache#1799 Closes apache#1822 Closes apache#1824 Closes apache#1844 Closes apache#1874 Closes apache#1918 Closes apache#1928 Closes apache#1937 Closes apache#1942 Closes apache#1951 Closes apache#1957 Closes apache#1963 Closes apache#1964 Closes apache#1965 Closes apache#1967 Closes apache#1968 Closes apache#1971 Closes apache#1985 Closes apache#1986 Closes apache#1998 Closes apache#2031 Closes apache#2032 Closes apache#2071 Closes apache#2076 Closes apache#2108 Closes apache#2119 Closes apache#2128 Closes apache#2142 Closes apache#2174 Closes apache#2206 Closes apache#2297 Closes apache#2322 Closes apache#2332 Closes apache#2341 Closes apache#2377 Closes apache#2414 Closes apache#2469
[YSTORM-6520] Upmerge fifth batch
In kafka ConsumerIterator, there is some codes like this:
// if we just updated the current chunk and it is empty that means the fetch size is too small! if(currentDataChunk.messages.validBytes == 0) throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " + "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow." .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))When "fetch.message.max.bytes" config is smaller than the actual message size in topic, ConsumerIterator will throw an exception to notify user.
But in storm-kafka, there is no such logic. And as a result, if KafkaConfig.fetchSizeBytes is smaller than actual message size, the topology will fetch no data but still be running.
To prevent this situation, we need throw MessageSizeTooLargeException as well.