-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-12541; Extend ListOffset to fetch offset with max timestamp (KIP-734) #10760
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I will review it in the next few days. |
dajac
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thomaskwscott Thanks for the PR. I just made a first read over it and I left few high level questions and few nits.
I feel like we need to add more tests to cover the changes done in the PR. I can think of the following places where we might need to add and/or extend tests:
- RequestResponseTest
- ListOffsetsRequestTest
- KafkaAdminClientTest - Here we need to test all the edge cases.
- LogTest - We need to test the changes in Log.
- ListOffsetsRequestTest and/or LogOffsetTest
There might other places as well.
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
| Some(new TimestampAndOffset(latestTimestampSegment.maxTimestampSoFar, | ||
| latestTimestampSegment.offsetOfMaxTimestampSoFar, | ||
| epochOptional)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we get a maxTimestampSoFar and offsetOfMaxTimestampSoFar which does not correspond to each others? It seems that we have no guarantee here. Is it an issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In all cases I can find the 2 are updated together so I think we can assume consistency. For the topic liveness case in the KIP absolute consistency is not required but there will be other cases that will need this (e.g. topic inspection).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should address this as well. I am fine with doing it in a follow-up PR though so we can keep this focused. Ok for you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we file a JIRA as a subtask in the Jira of the KIP to not forget about it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure thing, raised: https://issues.apache.org/jira/browse/KAFKA-12981
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
Outdated
Show resolved
Hide resolved
|
@thomaskwscott Thanks for the update. I will review it next week. |
dajac
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thomaskwscott Thanks for the update. I left few more comments.
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
Show resolved
Hide resolved
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
dajac
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thomaskwscott Thanks for the update. I left a few more comments/questions.
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
Show resolved
Hide resolved
clients/src/main/resources/common/message/ListOffsetsRequest.json
Outdated
Show resolved
Hide resolved
clients/src/main/resources/common/message/ListOffsetsResponse.json
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
dajac
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thomaskwscott The PR looks pretty good. I left a few minor comments.
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
Outdated
Show resolved
Hide resolved
dajac
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the patch!
|
Failures are not related: |
|
@vvcephei filed https://issues.apache.org/jira/browse/KAFKA-13002, please take a look and see if it's related to this. |
|
Looking at it now. |
|
We have found the issue. @thomaskwscott is working on the fix. My bad, I missed one case. |
|
Thanks for the quick investigation! |
|
good job |
See https://cwiki.apache.org/confluence/display/KAFKA/KIP-734%3A+Improve+AdminClient.listOffsets+to+return+timestamp+and+offset+for+the+record+with+the+largest+timestamp
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Tested with new Integration test
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)