Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use async compression in kafka client #15920

Merged
merged 2 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/v/kafka/client/produce_batcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ class produce_batcher {
return _client_reqs.back().promise.get_future();
}

model::record_batch consume() {
auto batch = std::exchange(_builder, make_builder()).build();
ss::future<model::record_batch> consume() {
auto batch
= co_await std::exchange(_builder, make_builder()).build_async();
_broker_reqs.emplace_back(batch.record_count());
return batch;
co_return batch;
}

void handle_response(partition_response res) {
Expand Down
64 changes: 44 additions & 20 deletions src/v/kafka/client/produce_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,32 +34,35 @@ class produce_partition {
produce_partition(const configuration& config, consumer&& c)
: _config{config}
, _batcher{compression_from_str(config.produce_compression_type())}
, _timer{[this]() { try_consume(true); }}
, _timer{[this]() {
ssx::spawn_with_gate(_gate, [this]() { return try_consume(); });
}}
, _consumer{std::move(c)} {}

ss::future<response> produce(model::record_batch&& batch) {
_record_count += batch.record_count();
_size_bytes += batch.size_bytes();
auto fut = _batcher.produce(std::move(batch));
try_consume(false);
arm_consumer();
return fut;
}

void handle_response(response&& res) {
vassert(_in_flight, "handle_response requires a batch in flight");
_batcher.handle_response(std::move(res));
_in_flight = false;
try_consume(false);
arm_consumer();
}

ss::future<> stop() {
try_consume(true);
_timer.set_callback([]() {});
co_await try_consume();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess rearming it and then cancelling it isn't too bad. In the case where produce_batch_delay == 0, it may be possible to run another round of try_consume.

To get the original behaviour:

Suggested change
co_await try_consume();
_timer.set_callback([]() {});
co_await try_consume();

_timer.cancel();
return ss::now();
co_await _gate.close();
}

private:
model::record_batch do_consume() {
ss::future<model::record_batch> do_consume() {
vassert(!_in_flight, "do_consume should not run concurrently");

_in_flight = true;
Expand All @@ -68,34 +71,55 @@ class produce_partition {
return _batcher.consume();
}

bool try_consume(bool timed_out) {
if (_in_flight || _record_count == 0) {
return false;
ss::future<> try_consume() {
if (consumer_can_run()) {
_consumer(co_await do_consume());
}
}

auto batch_record_count = _config.produce_batch_record_count();
auto batch_size_bytes = _config.produce_batch_size_bytes();

auto threshold_met = _record_count >= batch_record_count
|| _size_bytes >= batch_size_bytes;
/// \brief Arms the timer that starts the consumer
///
/// Will arm the timer only if the consumer can run and then sets the timer
/// delay depending on whether or not size thresholds have been met
void arm_consumer() {
if (!consumer_can_run()) {
return;
}

if (!timed_out && !threshold_met) {
_timer.cancel();
_timer.arm(_config.produce_batch_delay());
return false;
std::chrono::milliseconds rearm_timer_delay{0};
// If the threshold is met, then use a delay of 0 so the timer fires
// nearly immediately after this call. Otherwise use the produce
// batch delay when arming the timer.
if (!threshold_met()) {
rearm_timer_delay = _config.produce_batch_delay();
}
_timer.cancel();
_timer.arm(rearm_timer_delay);
}

_consumer(do_consume());
return true;
/// \brief Validates that the size threshold has been met to trigger produce
bool threshold_met() const {
auto batch_record_count = _config.produce_batch_record_count();
auto batch_size_bytes = _config.produce_batch_size_bytes();

return _record_count >= batch_record_count
|| _size_bytes >= batch_size_bytes;
}

/// \brief Checks to see if the consumer can run
///
/// Consumer can only run if one is not already running and there are
/// records available
bool consumer_can_run() const { return !_in_flight && _record_count > 0; }

const configuration& _config;
produce_batcher _batcher{};
ss::timer<> _timer{};
consumer _consumer;
int32_t _record_count{};
int32_t _size_bytes{};
bool _in_flight{};
ss::gate _gate;
};

} // namespace kafka::client
28 changes: 14 additions & 14 deletions src/v/kafka/client/test/produce_batcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ struct produce_batcher_context {
produce_futs.push_back(batcher.produce(std::move(batch)));
client_req_offset += count;
}
auto consume() {
auto batch = batcher.consume();
ss::future<int32_t> consume() {
model::record_batch batch = co_await batcher.consume();
auto record_count = batch.record_count();
broker_batches.emplace_back(broker_req_offset, std::move(batch));
broker_req_offset += record_count;
return record_count;
co_return record_count;
}
auto handle_response(kafka::error_code error = kafka::error_code::none) {
auto batch = kc::consume_front(broker_batches);
Expand Down Expand Up @@ -88,64 +88,64 @@ SEASTAR_THREAD_TEST_CASE(test_partition_producer_single) {
produce_batcher_context ctx;

ctx.produce(2);
BOOST_REQUIRE(ctx.consume() == 2);
BOOST_REQUIRE(ctx.consume().get() == 2);
BOOST_REQUIRE(ctx.handle_response() == 2);

auto offsets = ctx.get_response_offsets().get0();
BOOST_REQUIRE(offsets == ctx.expected_offsets);

BOOST_REQUIRE(ctx.consume() == 0);
BOOST_REQUIRE(ctx.consume().get() == 0);
}

SEASTAR_THREAD_TEST_CASE(test_partition_producer_seq) {
produce_batcher_context ctx;

ctx.produce(2);
ctx.produce(2);
BOOST_REQUIRE(ctx.consume() == 4);
BOOST_REQUIRE(ctx.consume().get() == 4);
BOOST_REQUIRE(ctx.handle_response() == 4);

ctx.produce(2);
ctx.produce(2);
BOOST_REQUIRE(ctx.consume() == 4);
BOOST_REQUIRE(ctx.consume().get() == 4);
BOOST_REQUIRE(ctx.handle_response() == 4);

auto offsets = ctx.get_response_offsets().get0();
BOOST_REQUIRE(offsets == ctx.expected_offsets);

BOOST_REQUIRE(ctx.consume() == 0);
BOOST_REQUIRE(ctx.consume().get() == 0);
}

SEASTAR_THREAD_TEST_CASE(test_partition_producer_overlapped) {
produce_batcher_context ctx;

ctx.produce(2);
ctx.produce(2);
BOOST_REQUIRE(ctx.consume() == 4);
BOOST_REQUIRE(ctx.consume().get() == 4);

ctx.produce(2);
ctx.produce(2);
BOOST_REQUIRE(ctx.consume() == 4);
BOOST_REQUIRE(ctx.consume().get() == 4);

BOOST_REQUIRE(ctx.handle_response() == 4);
BOOST_REQUIRE(ctx.handle_response() == 4);

auto offsets = ctx.get_response_offsets().get0();
BOOST_REQUIRE(offsets == ctx.expected_offsets);

BOOST_REQUIRE(ctx.consume() == 0);
BOOST_REQUIRE(ctx.consume().get() == 0);
}

SEASTAR_THREAD_TEST_CASE(test_partition_producer_error) {
produce_batcher_context ctx;

ctx.produce(2);
ctx.produce(2);
BOOST_REQUIRE(ctx.consume() == 4);
BOOST_REQUIRE(ctx.consume().get() == 4);

ctx.produce(2);
ctx.produce(2);
BOOST_REQUIRE(ctx.consume() == 4);
BOOST_REQUIRE(ctx.consume().get() == 4);

BOOST_REQUIRE(ctx.handle_response() == 4);
BOOST_REQUIRE(
Expand All @@ -166,5 +166,5 @@ SEASTAR_THREAD_TEST_CASE(test_partition_producer_error) {
BOOST_REQUIRE(
responses[3].error_code == kafka::error_code::not_leader_for_partition);

BOOST_REQUIRE(ctx.consume() == 0);
BOOST_REQUIRE(ctx.consume().get() == 0);
}
10 changes: 10 additions & 0 deletions src/v/kafka/client/test/produce_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "kafka/protocol/produce.h"
#include "model/fundamental.h"
#include "model/record.h"
#include "test_utils/async.h"

#include <seastar/testing/thread_test_case.hh>

Expand All @@ -42,6 +43,11 @@ SEASTAR_THREAD_TEST_CASE(test_produce_partition_record_count) {

auto c_res0_fut = producer.produce(make_batch(model::offset(0), 2));
auto c_res1_fut = producer.produce(make_batch(model::offset(2), 1));

tests::cooperative_spin_wait_with_timeout(5s, [&consumed_batches]() {
return consumed_batches.size() > 0;
}).get();

producer.handle_response(kafka::produce_response::partition{
.partition_index{model::partition_id{42}},
.error_code = kafka::error_code::none,
Expand All @@ -55,6 +61,9 @@ SEASTAR_THREAD_TEST_CASE(test_produce_partition_record_count) {
BOOST_REQUIRE_EQUAL(c_res1.base_offset, model::offset{2});

auto c_res2_fut = producer.produce(make_batch(model::offset(3), 3));
tests::cooperative_spin_wait_with_timeout(5s, [&consumed_batches]() {
return consumed_batches.size() > 1;
}).get();
producer.handle_response(kafka::produce_response::partition{
.partition_index{model::partition_id{42}},
.error_code = kafka::error_code::none,
Expand All @@ -64,4 +73,5 @@ SEASTAR_THREAD_TEST_CASE(test_produce_partition_record_count) {
BOOST_REQUIRE_EQUAL(consumed_batches[1].record_count(), 3);
auto c_res2 = c_res2_fut.get0();
BOOST_REQUIRE_EQUAL(c_res2.base_offset, model::offset{3});
producer.stop().get();
}
38 changes: 31 additions & 7 deletions src/v/storage/record_batch_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,36 @@ model::record_batch record_batch_builder::build() && {
_timestamp = model::timestamp::now();
}

auto header = build_header();

if (_compression != model::compression::none) {
_records = compression::compressor::compress(_records, _compression);
}

internal::reset_size_checksum_metadata(header, _records);
return model::record_batch(
header, std::move(_records), model::record_batch::tag_ctor_ng{});
}

ss::future<model::record_batch> record_batch_builder::build_async() && {
if (!_timestamp) {
_timestamp = model::timestamp::now();
}

auto header = build_header();

if (_compression != model::compression::none) {
_records = co_await compression::stream_compressor::compress(
std::move(_records), _compression);
}

internal::reset_size_checksum_metadata(header, _records);

co_return model::record_batch(
header, std::move(_records), model::record_batch::tag_ctor_ng{});
}

model::record_batch_header record_batch_builder::build_header() const {
model::record_batch_header header = {
.size_bytes = 0,
.base_offset = _base_offset,
Expand All @@ -85,13 +115,7 @@ model::record_batch record_batch_builder::build() && {
header.attrs.set_transactional_type();
}

if (_compression != model::compression::none) {
_records = compression::compressor::compress(_records, _compression);
}

internal::reset_size_checksum_metadata(header, _records);
return model::record_batch(
header, std::move(_records), model::record_batch::tag_ctor_ng{});
return header;
}

uint32_t record_batch_builder::record_size(
Expand Down
4 changes: 3 additions & 1 deletion src/v/storage/record_batch_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class record_batch_builder {
std::optional<iobuf>&& key,
std::optional<iobuf>&& value,
std::vector<model::record_header> headers);
virtual model::record_batch build() &&;
model::record_batch build() &&;
ss::future<model::record_batch> build_async() &&;
virtual ~record_batch_builder();

void set_producer_identity(int64_t id, int16_t epoch) {
Expand Down Expand Up @@ -81,6 +82,7 @@ class record_batch_builder {
std::vector<model::record_header> headers;
};

model::record_batch_header build_header() const;
uint32_t record_size(int32_t offset_delta, const serialized_record& r);

model::record_batch_type _batch_type;
Expand Down