-
Notifications
You must be signed in to change notification settings - Fork 211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka time stamp fix #5108
Kafka time stamp fix #5108
Conversation
@@ -99,6 +99,7 @@ public class KafkaCustomConsumer implements Runnable, ConsumerRebalanceListener | |||
private final LogRateLimiter errLogRateLimiter; | |||
private final ByteDecoder byteDecoder; | |||
private final long maxRetriesOnException; | |||
private final Map<Integer, Long> lastReceivedTimeStampMap; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private final Map<Integer, Long> lastReceivedTimeStampMap; | |
private final Map<Integer, Long> partitionToLastReceivedTimestampMillis; |
Clarifies how we index and that we are storing millis here.
@@ -142,6 +144,23 @@ KafkaTopicConsumerMetrics getTopicMetrics() { | |||
return topicMetrics; | |||
} | |||
|
|||
private <T> long getReceivedTime(final ConsumerRecord<String, T> consumerRecord) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need some unit testing on this logic.
Also, you could make a class that handles this and it would be easier to verify. But, I'll leave that to you.
class PartitionTracker {
<T> Instant getReceivedTime(final ConsumerRecord<String, T> consumerRecord) {
// same logic
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add unit tests but adding a class seems to be little bit too much. Also, we use "PartitionTracker" for tracking the acks at partition level.
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
d0cc522
to
35a168a
Compare
# This is the 1st commit message: additional test coverage Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> # This is the commit message #2: cleaned up JiraOauthConfig file Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> # This is the commit message #3: addressing review comments and simplifying the exception handling Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> # This is the commit message #4: Add external origination time for events created from S3 Object (opensearch-project#5104) Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> # This is the commit message #5: moved the wait block out of the catch block Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> # This is the commit message #6: Renewal logic adjusted Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> # This is the commit message #7: partial Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com> fix merge issues Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com> update Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com> Add getColumnDataTypes method to SchemaManager to get datatype for table columns (opensearch-project#5135) Add getColumnDataTypes method to SchemaManager Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> Add model for table column metadata for Global state (opensearch-project#5136) Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> Rename the KDS source plugin name to "kinesis-data-streams" (opensearch-project#5138) Signed-off-by: Souvik Bose <souvbose@amazon.com> Co-authored-by: Souvik Bose <souvbose@amazon.com> Addressed review comments (opensearch-project#5108) Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> fixes related to source config properties change Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> removed future handling for loop based operations Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> additional test cases Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> addressing review comments Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> Jira Service Test coverage Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com> jirasourceconfigTest comments Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com> introduced RestClient and moved rest template interactions to there. Similar chage on the test cases too Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> backingoff for any kind of exception. Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> restructured constants file Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> JiraSourceTests Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com> JiraItemInfo coverage Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com> jira service branch coverage Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com> branch coverage jira service Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com> move add Items to queue logic into JiraItemInfo Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com> fixing regex and adding date time formatter Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com> Revert "Jira source" re add changes and fix issues Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com> unneeded comment Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com> using issue bean methods to simplify the logic Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Description
Pipeline latency is showing up as negative value some times. This fix makes sure that the time received from Kafka consumer record is valid. If it's not valid, it is replaced with last valid value. If last valid value is not present, current time is used.
Issues Resolved
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.