-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KIP-74: Add fetch response size limit and implement round-robin on client side #1812
Conversation
@@ -278,6 +279,7 @@ object KafkaConfig { | |||
val ReplicaFetchMaxBytesProp = "replica.fetch.max.bytes" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should deprecate this in favour of a replica.fetch.max.partition.bytes
. It seems quite confusing that this one applies to a single partition when every other replica.fetch.*
setting applies to the request. Maybe something for later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gets my vote
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's tempting to do this now while we're still in the KIP window. We'll be less inclined to do it in the future if it requires a new KIP.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What you say is true, but I think it's too late to add this to the KIP as we don't have a great story for deprecating properties.
Thanks for the PR @nepal. It looks to me like it's going in the right direction. Left some comments/questions and looking forward to the missing bits (when do you think you'll have them?). By the way, I would add one point to your to-do list: tests. |
this(replicaId, maxWait, minBytes, fetchData, ProtoUtils.latestVersion(ApiKeys.FETCH.id), maxBytes); | ||
} | ||
|
||
public FetchRequest(int replicaId, int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData, int version, int maxBytes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest making the version
the first parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the ordering is relevant here, would it be better to use LinkedHashMap in the signature?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure whether it is good idea to accept only LinkedHashMap in constructor - it is used for all versions of FetchRequest. Maybe we should just add comment that FetchRequest will preserve the ordering of fetchData.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using LinkedHashMap for all versions should be fine, right? The previous versions don't care about ordering, so either way is fine. Or am I missing something?
@nepal : Thanks for the patch. A couple of other comments.
|
@nepal any idea when you will update the PR with the missing bits? Let us know if you need any help making progress on this. Thanks! |
I am currently working on missing bits. I think it will take three days. Maybe we can do some intermediate version of this PR which can be accepted if I am blocking someone? |
Thanks for the update @nepal, 3 days is fine. Probably easier to get the one PR in since it's not too complicated. |
* <code>fetch.max.bytes</code> | ||
*/ | ||
public static final String FETCH_MAX_BYTES_CONFIG = "fetch.max.bytes"; | ||
private static final String FETCH_MAX_BYTES_DOC = "The maximum amount of data the server should return for a fetch request. This is not an absolute maximum - if there is a single message which is larger than fetch.max.bytes, it will still be returned."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it's worth mentioning here that the consumer may send multiple parallel fetches and that this setting only applies to each fetch individually? In a future KIP, it might make sense to add a max.in.flight.fetches
setting so that users have better control over memory. I went ahead and opened a JIRA to collect comments on this: https://issues.apache.org/jira/browse/KAFKA-4133.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another approach might be KIP-72 which provides a memory pool to control the memory footprint.
Oversized in the context of replication.
KAFKA-2063: Address Jun's comments, remove warnings and add producer test
…ch-response-size * apache/trunk: KAFKA-4093; Cluster Id (KIP-78) KAFKA-4173; SchemaProjector should successfully project missing Struct field when target field is optional KAFKA-4183; Corrected Kafka Connect's JSON Converter to properly convert from null to logical values KAFKA-3776: Unify store and downstream caching in streams
KAFKA-2063: Preserve behaviour of fetch requests version 2 and below and add tests
This PR is complete, so @nepal, can you please remove the WIP from the PR title and update the PR description to be up to date? The PR title and description became the commit message of the squashed commit that gets merged. |
@@ -25,15 +25,12 @@ import kafka.api._ | |||
import kafka.cluster.{Partition, Replica} | |||
import kafka.common._ | |||
import kafka.controller.KafkaController | |||
import kafka.log.{LogAppendInfo, LogManager} | |||
import kafka.log.{FileMessageSet, LogAppendInfo, LogManager} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused import FileMessageSet
@@ -19,4 +19,5 @@ package kafka.server | |||
|
|||
import kafka.message.MessageSet | |||
|
|||
case class FetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata, messageSet: MessageSet) | |||
case class FetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata, messageSet: MessageSet, | |||
messageSetIncomplete: Boolean = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be clearer, messageSetIncomplete probably should be firstMessageSetIncomplete?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, this also came up in a conversation between Jason and myself and I was thinking of renaming it along the lines you suggest. :)
Thanks for the latest patch. Made a pass on the server side changes. They look good to me. Just a couple of minor comments, both can be addressed in a followup patch if needed. |
Runtime.getRuntime.halt(1) | ||
} | ||
} | ||
|
||
def warnIfMessageOversized(messageSet: ByteBufferMessageSet, topicAndPartition: TopicAndPartition): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This warning may still be useful during rolling upgrade?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that it would fix itself once the rolling upgrade is over, but it's probably safer to keep it for now (as you said). Will add it back and add a comment that it only applies to older brokers.
* This class is a useful building block for doing fetch requests where topic partitions have to be rotated via | ||
* round-robin to ensure fairness and some level of determinism given the existence of a limit on the fetch response | ||
* size. Because the serialization of fetch requests is more efficient if all partitions for the same topic are grouped | ||
* together, we do such grouping in the method `set`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For followup: I think it's worth elaborating a little that this heuristic can diverge a little over time from the ideal as partitions are fetched in different orders and partition leadership changes.
/** | ||
* Create a replica fetch request for the current version. | ||
*/ | ||
public static FetchRequest fromReplica(int replicaId, int maxWait, int minBytes, int maxBytes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Static factory methods are totally the way to go for these objects. Having a name makes it much less error-prone and keeps us from having to jump through hoops to keep argument lists different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are OK with the other constructors remaining as they are for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'm fine with it. I was just noting that this is a pattern I'd like to see more of.
} | ||
|
||
def shuffle(requestInfo: Seq[(TopicAndPartition, PartitionFetchInfo)]): Seq[(TopicAndPartition, PartitionFetchInfo)] = | ||
random.shuffle(requestInfo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it would be a little safer to shuffle by topic and by partition separately to keep the fetch request size consistent with older versions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea. I'll do this in a follow-up and include some tests.
} catch { | ||
// NOTE: Failed fetch requests metric is not incremented for known exceptions since it | ||
// is supposed to indicate un-expected failure of a broker in handling a fetch request | ||
case utpe: UnknownTopicOrPartitionException => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: Would it be worthwhile to combine some of these cases since they are all handled the same?
…ch-response-size * apache/trunk: KAFKA-3492; Secure quotas for authenticated users
As per Jun's suggestion.
As per Jason's suggestion
… optimal over time
…t <= 2 Suggested by Jun
…ch-response-size * apache/trunk: MINOR: Update the README.md to include a note about GRADLE_USER_HOME KAFKA-4157; Transient system test failure in replica_verification_test.test_replica_lags HOTFIX: changed quickstart donwload from 0.10.0.0 to 0.10.0.1 HOTFIX: Increase timeout for bounce test
KAFKA-2063: Merge conflicts with trunk and address review feedback
One failure in the system test run: test_id: 2016-09-17--001.kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_bounce.clean=True It looks unrelated as we had the same failure in trunk recently (http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-09-17--001.1474133882--apache--trunk--ecc1fb1/report.html). The Jenkins PR builder timed out after 180 minutes while running a Streams test, but all client and core tests passed (I've restarted the build). The tests passed locally. |
This commit adds support for version 3 of the FetchRequest API. The KIP can be found here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes the PR here: apache/kafka#1812 and the JIRA here: https://issues.apache.org/jira/browse/KAFKA-2063 Should document the fact that the per-partition limits take precedence (so the returned message may be larger than the requested limit).
This PR is implementation of KIP-74 which is originally motivated by KAFKA-2063.
Your comments are greatly appreciated.