Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-74 / KAFKA-2063: max_partition_fetch_bytes should now be a soft limit #1232

Open
jeffwidman opened this issue Oct 4, 2017 · 4 comments

Comments

@jeffwidman
Copy link
Collaborator

jeffwidman commented Oct 4, 2017

I noticed that the official Kafka consumer config docs for the max_partition_fetch_bytes param now specify that it's a soft limit so that the consumer can make progress: https://kafka.apache.org/documentation/#newconsumerconfigs

It looks like the change happened as part of KIP-74 / KAFKA-2063 / actual implementation, which happened in Kafka 0.10.1.

Here's some of the relevant code in kafka-python that will need to be updated:

  • This size must be at least as large as the maximum message size
    the server allows or else it is possible for the producer to
    send messages larger than the consumer can fetch. If that
    happens, the consumer can get stuck trying to fetch a large
    message on a certain partition. Default: 1048576.
  • def _raise_if_record_too_large(self):
    """Check FetchResponses for messages larger than the max per partition.
    Raises:
    RecordTooLargeError: if there is a message larger than fetch size
    """
    if not self._record_too_large_partitions:
    return
    copied_record_too_large_partitions = dict(self._record_too_large_partitions)
    self._record_too_large_partitions.clear()
    raise RecordTooLargeError(
    "There are some messages at [Partition=Offset]: %s "
    " whose size is larger than the fetch size %s"
    " and hence cannot be ever returned."
    " Increase the fetch size, or decrease the maximum message"
    " size the broker will allow.",
    copied_record_too_large_partitions,
    self.config['max_partition_fetch_bytes'])

This was further amended in Kafka 0.11 as part of KAFKA-5032 which relates to the new V2 message format which @tvoinarovskyi is working on in #1185.

@jeffwidman jeffwidman changed the title KIP-74 / KAFKA-2063: max_partition_fetch_bytes is now a soft limit KIP-74 / KAFKA-2063: max_partition_fetch_bytes should now be a soft limit Oct 4, 2017
@jeffwidman
Copy link
Collaborator Author

jeffwidman commented Oct 4, 2017

I'm also a little confused on this one because 9c19ea7#diff-835562f30f853295e2dbc151829426f7R69 which implemented part of KIP-74 specifically documents fetch_max_bytes as a soft limit...

So I'm not sure what level of fix is needed here, but I think it's more than just updating the docs for max_partition_fetch_bytes as the code is still trying to raise RecordTooLargeError, but based on my research above the official Kafka docs imply that Java consumer no longer raises this error (I didn't actually check the Java source).

@tvoinarovskyi I know it's been a few months since you worked on this, but do you remember any other pertinent information?

@tvoinarovskyi
Copy link
Collaborator

Not much work needed to support it, as far as I remember. It's a broker change, it just stopped returning partial messages for Fetch of V3 and higher. We can change the error to critical on consumption if we don't make progress, as it should never happen now. (java raises a raw KafkaError for v3+ case of API call instead of RecordTooLargeError).
As for the producer we already have a check to have at least 1 message in the batch, no matter the size.
The documented string for 'fetch_max_bytes' is mostly from Java client, what confuses you in it?

@jeffwidman
Copy link
Collaborator Author

It's a broker change, it just stopped returning partial messages for Fetch of V3 and higher.

Gotcha, I didn't realize the broker behavior changed. So we can't delete _raise_if_record_too_large() like I was hoping because it's still a valid/expected error for brokers < 0.10.1.

The documented string for 'fetch_max_bytes' is mostly from Java client, what confuses you in it?

It was the opposite. I assumed you'd have to touch the code for max_partition_fetch_bytes in order to implement fetch_max_bytes, so I was confused why the docstring for max_partition_fetch_bytes wasn't updated even though the docstring for fetch_max_bytes matched upstream.

@tvoinarovskyi
Copy link
Collaborator

Gotcha, I didn't realize the broker behavior changed. So we can't delete _raise_if_record_too_large() like I was hoping because it's still a valid/expected error for brokers < 0.10.1.

Yup

It was the opposite. I assumed you'd have to touch the code for max_partition_fetch_bytes in order to implement fetch_max_bytes, so I was confused why the docstring for max_partition_fetch_bytes wasn't updated even though the docstring for fetch_max_bytes matched upstream.

Yea, my miss there, I implemented that change, did not fully understand that both options changed behaviour there.

Also there's a strange test (I wrote it %) ), that @dpkp had issues with, that covers this case:

def test_kafka_consumer_max_bytes_one_msg(self):

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants