-
Notifications
You must be signed in to change notification settings - Fork 117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Commit offsets at specified intervals #1405
Commit offsets at specified intervals #1405
Conversation
Codecov Report
@@ Coverage Diff @@
## main #1405 +/- ##
============================================
- Coverage 76.24% 71.19% -5.05%
+ Complexity 584 575 -9
============================================
Files 98 99 +1
Lines 3658 3906 +248
Branches 166 160 -6
============================================
- Hits 2789 2781 -8
- Misses 653 909 +256
Partials 216 216
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
d2ad17b
to
73e28f8
Compare
/cherry-pick release-1.0 |
@pierDipi: once the present PR merges, I will cherry-pick it on top of release-1.0 in a new PR and assign it to you. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
/cherry-pick release-0.26 |
@pierDipi: once the present PR merges, I will cherry-pick it on top of release-0.26 in a new PR and assign it to you. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
/cherry-pick release-0.25 |
@pierDipi: once the present PR merges, I will cherry-pick it on top of release-0.25 in a new PR and assign it to you. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
case UNORDERED -> new UnorderedOffsetManager(consumer, commitHandler); | ||
}; | ||
} | ||
|
||
private static AbstractVerticle getConsumerVerticle(final DeliveryOrder type, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, one Offset mgr is used for the (Un)Ordered Consumers - ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, there is no need to keep 2 separate components and implement the same commit interval logic in 2 places.
The offset tracker overhead in the ordered case is constant.
@@ -178,14 +172,15 @@ public AbstractVerticle get(final DataPlaneContract.Resource resource, final Dat | |||
Filter.noop(); | |||
|
|||
final var responseHandler = getNoopResponseHandlerOrDefault(egress, () -> getKafkaResponseHandler(vertx, producerConfigs, resource)); | |||
final var commitIntervalMs = Integer.parseInt(String.valueOf(consumerConfigs.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we document this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll open an issue in knative/docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com>
Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com>
73e28f8
to
d2776e4
Compare
.../knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedOffsetManagerBenchmark.java
Outdated
Show resolved
Hide resolved
blackhole.consume( | ||
offsetManager.recordReceived(recordsState.records[partition][0]) | ||
); | ||
offsetManager.recordReceived(recordsState.records[partition][0]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are they directly consumed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the code to make it compile but I'm inclined to drop these OffsetManager
benchmarks since they are not really useful to catch perf regressions as they are.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
/approve
@@ -178,14 +172,15 @@ public AbstractVerticle get(final DataPlaneContract.Resource resource, final Dat | |||
Filter.noop(); | |||
|
|||
final var responseHandler = getNoopResponseHandlerOrDefault(egress, () -> getKafkaResponseHandler(vertx, producerConfigs, resource)); | |||
final var commitIntervalMs = Integer.parseInt(String.valueOf(consumerConfigs.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: matzew, pierDipi The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
@pierDipi should we backport this to 0.27 ? (and perhaps 0.26 - if possible) ? |
Prow will do that #1405 (comment) |
and #1405 (comment) |
@pierDipi: new pull request created: #1449 In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
@pierDipi: #1405 failed to apply on top of branch "release-0.26":
In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
@pierDipi: #1405 failed to apply on top of branch "release-0.25":
In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
* Commit offset at a specified interval Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> * Add reset logic to handle long large offsets Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> * Use final Co-authored-by: Matthias Wessendorf <mwessend@redhat.com> Co-authored-by: Matthias Wessendorf <mwessend@redhat.com>
* Commit offsets at specified intervals (#1405) * Commit offset at a specified interval Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> * Add reset logic to handle long large offsets Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> * Use final Co-authored-by: Matthias Wessendorf <mwessend@redhat.com> Co-authored-by: Matthias Wessendorf <mwessend@redhat.com> * Update codegen Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> * ko upgrade fixes Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> Co-authored-by: Matthias Wessendorf <mwessend@redhat.com>
* Commit offset at a specified interval Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> * Add reset logic to handle long large offsets Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> * Use final Co-authored-by: Matthias Wessendorf <mwessend@redhat.com> Co-authored-by: Matthias Wessendorf <mwessend@redhat.com>
…ons#1405) (knative-extensions#1450) * Commit offsets at specified intervals (knative-extensions#1405) * Commit offset at a specified interval Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> * Add reset logic to handle long large offsets Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> * Use final Co-authored-by: Matthias Wessendorf <mwessend@redhat.com> Co-authored-by: Matthias Wessendorf <mwessend@redhat.com> * Update codegen Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> * ko upgrade fixes Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> Co-authored-by: Matthias Wessendorf <mwessend@redhat.com>
…1405) (#1454) * Commit offsets at specified intervals (#1405) * Commit offset at a specified interval Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> * Add reset logic to handle long large offsets Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> * Use final Co-authored-by: Matthias Wessendorf <mwessend@redhat.com> Co-authored-by: Matthias Wessendorf <mwessend@redhat.com> * Update codegen Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> * ko upgrade fixes Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> Co-authored-by: Pierangelo Di Pilato <pdipilat@redhat.com> Co-authored-by: Matthias Wessendorf <mwessend@redhat.com>
…ons#1405) (knative-extensions#1450) * Commit offsets at specified intervals (knative-extensions#1405) * Commit offset at a specified interval Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> * Add reset logic to handle long large offsets Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> * Use final Co-authored-by: Matthias Wessendorf <mwessend@redhat.com> Co-authored-by: Matthias Wessendorf <mwessend@redhat.com> * Update codegen Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> * ko upgrade fixes Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> Co-authored-by: Matthias Wessendorf <mwessend@redhat.com>
…ons#1405) (knative-extensions#1450) * Commit offsets at specified intervals (knative-extensions#1405) * Commit offset at a specified interval Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> * Add reset logic to handle long large offsets Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> * Use final Co-authored-by: Matthias Wessendorf <mwessend@redhat.com> Co-authored-by: Matthias Wessendorf <mwessend@redhat.com> * Update codegen Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> * ko upgrade fixes Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> Co-authored-by: Matthias Wessendorf <mwessend@redhat.com>
Committing offsets at a specified interval reduces Kafka cluster
load and makes the event delivery hundreds of times faster.
auto.commit.interval.ms
is used to determine the commitinterval.
Proposed Changes
Release Note
Docs