-
Notifications
You must be signed in to change notification settings - Fork 593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use async compression in kafka client #15920
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
Removing review I realize after seeing Rob's review that I only reviewed the first commit.
d1d479d
to
f698063
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, nice !
f698063
to
ea16d32
Compare
CI failure: #15950 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good.
I wonder if it's worth raising a ticket to opportunistically replace uses of build()
with build_async()
where the call is already in an async scope.
src/v/storage/record_batch_builder.h
Outdated
@@ -31,6 +31,7 @@ class record_batch_builder { | |||
std::optional<iobuf>&& value, | |||
std::vector<model::record_header> headers); | |||
virtual model::record_batch build() &&; | |||
virtual ss::future<model::record_batch> build_async() &&; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't look like build()
is ever overridden, can these both be made non-virtual to avoid confusion?
ssx::spawn_with_gate( | ||
_gate, [this]() -> ss::future<> { return try_consume(); }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick:
ssx::spawn_with_gate( | |
_gate, [this]() -> ss::future<> { return try_consume(); }); | |
ssx::spawn_with_gate(_gate, [this]() { return try_consume(); }); |
if (!consumer_can_run()) { | ||
co_return; | ||
} | ||
|
||
auto batch_record_count = _config.produce_batch_record_count(); | ||
auto batch_size_bytes = _config.produce_batch_size_bytes(); | ||
_consumer(co_await do_consume()); | ||
co_return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick (also, what are you doing with this preview, github?):
if (!consumer_can_run()) { | |
co_return; | |
} | |
auto batch_record_count = _config.produce_batch_record_count(); | |
auto batch_size_bytes = _config.produce_batch_size_bytes(); | |
_consumer(co_await do_consume()); | |
co_return; | |
if (consumer_can_run()) { | |
_consumer(co_await do_consume()); | |
} |
} | ||
|
||
ss::future<> stop() { | ||
try_consume(true); | ||
co_await try_consume(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess rearming it and then cancelling it isn't too bad. In the case where produce_batch_delay == 0
, it may be possible to run another round of try_consume
.
To get the original behaviour:
co_await try_consume(); | |
_timer.set_callback([]() {}); | |
co_await try_consume(); |
} | ||
|
||
/// \brief Validates that the size threshold has been met to trigger produce | ||
inline bool threshold_met() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: I don't think the inline
helps you much here, it's implicitly inline. I suspect you're attempting to hint the compiler to inline the function body, there's a fair chance it will ignore you, as that's not what inline
is for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah good point. I think this was muscle memory from previous work I've done.
/// | ||
/// Consumer can only run if one is not already running and there are | ||
/// records available | ||
inline bool consumer_can_run() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: Same comment about inline
as above.
|
Added async builder for record_batch_builder that can be used to perform asynchronous compression. Signed-off-by: Michael Boquard <michael@redpanda.com>
Updated produce batcher to use async builder that utilizes asynchronous compression. Signed-off-by: Michael Boquard <michael@redpanda.com>
ea16d32
to
d4fc4ac
Compare
Force push
|
new failures in https://buildkite.com/redpanda/redpanda/builds/43489#018cdac2-fb91-4f6e-b3b8-df08aec97500:
|
/backport v23.3.x |
/backport v23.2.x |
/backport v23.1.x |
Failed to create a backport PR to v23.1.x branch. I tried:
|
Failed to create a backport PR to v23.2.x branch. I tried:
|
Older branches will need manual cherrypicks, @michael-redpanda. Thanks for the fix, though! |
yes, sorry was prioritizing v23.3. Will return to this soon. |
Fixes: #15900
Utilize asynchronous compression in kafka client to reduce possibility of oversized allocation.
Backports Required
Release Notes
Improvements