-
Notifications
You must be signed in to change notification settings - Fork 472
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(stream): make consumer decrement pending number when message is acknowledged. #2352
fix(stream): make consumer decrement pending number when message is acknowledged. #2352
Conversation
266bbff
to
566e419
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! LGTM.
Hi @torwig , Thanks for your suggestion. I have updated the related code. Please take a look. Thanks very much. |
CI failed is unrelated, rerun |
Maybe I need to rebase on unstable branch rather than merge, it involved merge commit in this PR. |
We can squash and merge it in one patch, don't worry about that |
Thanks very much for your warm help. 😊 The merge commit seems to make coverage calculated more code blocks than my commits. Do I need to solve it with rebase in this PR to re-trigger the coverage workflow? |
src/types/redis_stream.cc
Outdated
} | ||
} | ||
if (*acknowledged > 0) { | ||
StreamConsumerGroupMetadata group_metadata = decodeStreamConsumerGroupMetadataValue(get_group_value); | ||
group_metadata.pending_number -= *acknowledged; | ||
std::string group_value = encodeStreamConsumerGroupMetadataValue(group_metadata); | ||
batch->Put(stream_cf_handle_, group_key, group_value); | ||
|
||
for (const auto &consumer : consumer_acknowledges) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (const auto &consumer : consumer_acknowledges) { | |
for (const auto &[consumer_name, count] : consumer_acknowledges) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is reported by sonar, maybe we should fix that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it, I will update the loop with this syntax.
auto consumer_meta_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer.first); | ||
std::string consumer_meta_original; | ||
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_meta_key, &consumer_meta_original); | ||
if (s.ok()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if !s.ok() && !s.IsNotFound()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks very for your correction.
It should be an internal error and I think that I need to return the error. This function's return type is a rocksdb::Status
so I don't need to wrap it to another format.
Do I need to add golang integration test for this error handling case? This may be hard to produce a case for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do I need to add golang integration test for this error handling case?
Currently it's a bit hard to modify this, especially in integration tests. We can just handle this without testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it! I have corrected my wording above.
This function returns rocksdb::Status
and I think I should return the status immediately in the function when !s.ok() && !s.IsNotFound()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There're two gets here, right? if (s.ok())
in the loop should also adds this checks(Though errors merely happens)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your correction!
Sorry for missing another error handling. I have added it with a new commit.
777f504
712a717
to
777f504
Compare
Would merge if ci passes |
No worry about this flaky test failure, can rerun to avoid blocking the PR. |
Quality Gate failedFailed conditions |
Issue
close #2350
Proposed Changes
pending_number
when consumer group has acknowledged entry.XINFO
.