diff --git a/src/v/kafka/client/produce_batcher.h b/src/v/kafka/client/produce_batcher.h index 601b0fed365d6..583a8e81c012f 100644 --- a/src/v/kafka/client/produce_batcher.h +++ b/src/v/kafka/client/produce_batcher.h @@ -85,10 +85,11 @@ class produce_batcher { return _client_reqs.back().promise.get_future(); } - model::record_batch consume() { - auto batch = std::exchange(_builder, make_builder()).build(); + ss::future consume() { + auto batch + = co_await std::exchange(_builder, make_builder()).build_async(); _broker_reqs.emplace_back(batch.record_count()); - return batch; + co_return batch; } void handle_response(partition_response res) { diff --git a/src/v/kafka/client/produce_partition.h b/src/v/kafka/client/produce_partition.h index 11a38bee95a0f..ffd8211a8e3b8 100644 --- a/src/v/kafka/client/produce_partition.h +++ b/src/v/kafka/client/produce_partition.h @@ -34,14 +34,16 @@ class produce_partition { produce_partition(const configuration& config, consumer&& c) : _config{config} , _batcher{compression_from_str(config.produce_compression_type())} - , _timer{[this]() { try_consume(true); }} + , _timer{[this]() { + ssx::spawn_with_gate(_gate, [this]() { return try_consume(); }); + }} , _consumer{std::move(c)} {} ss::future produce(model::record_batch&& batch) { _record_count += batch.record_count(); _size_bytes += batch.size_bytes(); auto fut = _batcher.produce(std::move(batch)); - try_consume(false); + arm_consumer(); return fut; } @@ -49,17 +51,18 @@ class produce_partition { vassert(_in_flight, "handle_response requires a batch in flight"); _batcher.handle_response(std::move(res)); _in_flight = false; - try_consume(false); + arm_consumer(); } ss::future<> stop() { - try_consume(true); + _timer.set_callback([]() {}); + co_await try_consume(); _timer.cancel(); - return ss::now(); + co_await _gate.close(); } private: - model::record_batch do_consume() { + ss::future do_consume() { vassert(!_in_flight, "do_consume should not run concurrently"); _in_flight = true; @@ -68,27 +71,47 @@ class produce_partition { return _batcher.consume(); } - bool try_consume(bool timed_out) { - if (_in_flight || _record_count == 0) { - return false; + ss::future<> try_consume() { + if (consumer_can_run()) { + _consumer(co_await do_consume()); } + } - auto batch_record_count = _config.produce_batch_record_count(); - auto batch_size_bytes = _config.produce_batch_size_bytes(); - - auto threshold_met = _record_count >= batch_record_count - || _size_bytes >= batch_size_bytes; + /// \brief Arms the timer that starts the consumer + /// + /// Will arm the timer only if the consumer can run and then sets the timer + /// delay depending on whether or not size thresholds have been met + void arm_consumer() { + if (!consumer_can_run()) { + return; + } - if (!timed_out && !threshold_met) { - _timer.cancel(); - _timer.arm(_config.produce_batch_delay()); - return false; + std::chrono::milliseconds rearm_timer_delay{0}; + // If the threshold is met, then use a delay of 0 so the timer fires + // nearly immediately after this call. Otherwise use the produce + // batch delay when arming the timer. + if (!threshold_met()) { + rearm_timer_delay = _config.produce_batch_delay(); } + _timer.cancel(); + _timer.arm(rearm_timer_delay); + } - _consumer(do_consume()); - return true; + /// \brief Validates that the size threshold has been met to trigger produce + bool threshold_met() const { + auto batch_record_count = _config.produce_batch_record_count(); + auto batch_size_bytes = _config.produce_batch_size_bytes(); + + return _record_count >= batch_record_count + || _size_bytes >= batch_size_bytes; } + /// \brief Checks to see if the consumer can run + /// + /// Consumer can only run if one is not already running and there are + /// records available + bool consumer_can_run() const { return !_in_flight && _record_count > 0; } + const configuration& _config; produce_batcher _batcher{}; ss::timer<> _timer{}; @@ -96,6 +119,7 @@ class produce_partition { int32_t _record_count{}; int32_t _size_bytes{}; bool _in_flight{}; + ss::gate _gate; }; } // namespace kafka::client diff --git a/src/v/kafka/client/test/produce_batcher.cc b/src/v/kafka/client/test/produce_batcher.cc index 3da28fece7cff..5644df640814d 100644 --- a/src/v/kafka/client/test/produce_batcher.cc +++ b/src/v/kafka/client/test/produce_batcher.cc @@ -44,12 +44,12 @@ struct produce_batcher_context { produce_futs.push_back(batcher.produce(std::move(batch))); client_req_offset += count; } - auto consume() { - auto batch = batcher.consume(); + ss::future consume() { + model::record_batch batch = co_await batcher.consume(); auto record_count = batch.record_count(); broker_batches.emplace_back(broker_req_offset, std::move(batch)); broker_req_offset += record_count; - return record_count; + co_return record_count; } auto handle_response(kafka::error_code error = kafka::error_code::none) { auto batch = kc::consume_front(broker_batches); @@ -88,13 +88,13 @@ SEASTAR_THREAD_TEST_CASE(test_partition_producer_single) { produce_batcher_context ctx; ctx.produce(2); - BOOST_REQUIRE(ctx.consume() == 2); + BOOST_REQUIRE(ctx.consume().get() == 2); BOOST_REQUIRE(ctx.handle_response() == 2); auto offsets = ctx.get_response_offsets().get0(); BOOST_REQUIRE(offsets == ctx.expected_offsets); - BOOST_REQUIRE(ctx.consume() == 0); + BOOST_REQUIRE(ctx.consume().get() == 0); } SEASTAR_THREAD_TEST_CASE(test_partition_producer_seq) { @@ -102,18 +102,18 @@ SEASTAR_THREAD_TEST_CASE(test_partition_producer_seq) { ctx.produce(2); ctx.produce(2); - BOOST_REQUIRE(ctx.consume() == 4); + BOOST_REQUIRE(ctx.consume().get() == 4); BOOST_REQUIRE(ctx.handle_response() == 4); ctx.produce(2); ctx.produce(2); - BOOST_REQUIRE(ctx.consume() == 4); + BOOST_REQUIRE(ctx.consume().get() == 4); BOOST_REQUIRE(ctx.handle_response() == 4); auto offsets = ctx.get_response_offsets().get0(); BOOST_REQUIRE(offsets == ctx.expected_offsets); - BOOST_REQUIRE(ctx.consume() == 0); + BOOST_REQUIRE(ctx.consume().get() == 0); } SEASTAR_THREAD_TEST_CASE(test_partition_producer_overlapped) { @@ -121,11 +121,11 @@ SEASTAR_THREAD_TEST_CASE(test_partition_producer_overlapped) { ctx.produce(2); ctx.produce(2); - BOOST_REQUIRE(ctx.consume() == 4); + BOOST_REQUIRE(ctx.consume().get() == 4); ctx.produce(2); ctx.produce(2); - BOOST_REQUIRE(ctx.consume() == 4); + BOOST_REQUIRE(ctx.consume().get() == 4); BOOST_REQUIRE(ctx.handle_response() == 4); BOOST_REQUIRE(ctx.handle_response() == 4); @@ -133,7 +133,7 @@ SEASTAR_THREAD_TEST_CASE(test_partition_producer_overlapped) { auto offsets = ctx.get_response_offsets().get0(); BOOST_REQUIRE(offsets == ctx.expected_offsets); - BOOST_REQUIRE(ctx.consume() == 0); + BOOST_REQUIRE(ctx.consume().get() == 0); } SEASTAR_THREAD_TEST_CASE(test_partition_producer_error) { @@ -141,11 +141,11 @@ SEASTAR_THREAD_TEST_CASE(test_partition_producer_error) { ctx.produce(2); ctx.produce(2); - BOOST_REQUIRE(ctx.consume() == 4); + BOOST_REQUIRE(ctx.consume().get() == 4); ctx.produce(2); ctx.produce(2); - BOOST_REQUIRE(ctx.consume() == 4); + BOOST_REQUIRE(ctx.consume().get() == 4); BOOST_REQUIRE(ctx.handle_response() == 4); BOOST_REQUIRE( @@ -166,5 +166,5 @@ SEASTAR_THREAD_TEST_CASE(test_partition_producer_error) { BOOST_REQUIRE( responses[3].error_code == kafka::error_code::not_leader_for_partition); - BOOST_REQUIRE(ctx.consume() == 0); + BOOST_REQUIRE(ctx.consume().get() == 0); } diff --git a/src/v/kafka/client/test/produce_partition.cc b/src/v/kafka/client/test/produce_partition.cc index 51bf2ea5d9153..c27cbb2a66214 100644 --- a/src/v/kafka/client/test/produce_partition.cc +++ b/src/v/kafka/client/test/produce_partition.cc @@ -16,6 +16,7 @@ #include "kafka/protocol/produce.h" #include "model/fundamental.h" #include "model/record.h" +#include "test_utils/async.h" #include @@ -42,6 +43,11 @@ SEASTAR_THREAD_TEST_CASE(test_produce_partition_record_count) { auto c_res0_fut = producer.produce(make_batch(model::offset(0), 2)); auto c_res1_fut = producer.produce(make_batch(model::offset(2), 1)); + + tests::cooperative_spin_wait_with_timeout(5s, [&consumed_batches]() { + return consumed_batches.size() > 0; + }).get(); + producer.handle_response(kafka::produce_response::partition{ .partition_index{model::partition_id{42}}, .error_code = kafka::error_code::none, @@ -55,6 +61,9 @@ SEASTAR_THREAD_TEST_CASE(test_produce_partition_record_count) { BOOST_REQUIRE_EQUAL(c_res1.base_offset, model::offset{2}); auto c_res2_fut = producer.produce(make_batch(model::offset(3), 3)); + tests::cooperative_spin_wait_with_timeout(5s, [&consumed_batches]() { + return consumed_batches.size() > 1; + }).get(); producer.handle_response(kafka::produce_response::partition{ .partition_index{model::partition_id{42}}, .error_code = kafka::error_code::none, @@ -64,4 +73,5 @@ SEASTAR_THREAD_TEST_CASE(test_produce_partition_record_count) { BOOST_REQUIRE_EQUAL(consumed_batches[1].record_count(), 3); auto c_res2 = c_res2_fut.get0(); BOOST_REQUIRE_EQUAL(c_res2.base_offset, model::offset{3}); + producer.stop().get(); } diff --git a/src/v/storage/record_batch_builder.cc b/src/v/storage/record_batch_builder.cc index cb67b08f01848..43c6b4cef149d 100644 --- a/src/v/storage/record_batch_builder.cc +++ b/src/v/storage/record_batch_builder.cc @@ -61,6 +61,36 @@ model::record_batch record_batch_builder::build() && { _timestamp = model::timestamp::now(); } + auto header = build_header(); + + if (_compression != model::compression::none) { + _records = compression::compressor::compress(_records, _compression); + } + + internal::reset_size_checksum_metadata(header, _records); + return model::record_batch( + header, std::move(_records), model::record_batch::tag_ctor_ng{}); +} + +ss::future record_batch_builder::build_async() && { + if (!_timestamp) { + _timestamp = model::timestamp::now(); + } + + auto header = build_header(); + + if (_compression != model::compression::none) { + _records = co_await compression::stream_compressor::compress( + std::move(_records), _compression); + } + + internal::reset_size_checksum_metadata(header, _records); + + co_return model::record_batch( + header, std::move(_records), model::record_batch::tag_ctor_ng{}); +} + +model::record_batch_header record_batch_builder::build_header() const { model::record_batch_header header = { .size_bytes = 0, .base_offset = _base_offset, @@ -85,13 +115,7 @@ model::record_batch record_batch_builder::build() && { header.attrs.set_transactional_type(); } - if (_compression != model::compression::none) { - _records = compression::compressor::compress(_records, _compression); - } - - internal::reset_size_checksum_metadata(header, _records); - return model::record_batch( - header, std::move(_records), model::record_batch::tag_ctor_ng{}); + return header; } uint32_t record_batch_builder::record_size( diff --git a/src/v/storage/record_batch_builder.h b/src/v/storage/record_batch_builder.h index d146fca60a2c9..ee87c662dc088 100644 --- a/src/v/storage/record_batch_builder.h +++ b/src/v/storage/record_batch_builder.h @@ -30,7 +30,8 @@ class record_batch_builder { std::optional&& key, std::optional&& value, std::vector headers); - virtual model::record_batch build() &&; + model::record_batch build() &&; + ss::future build_async() &&; virtual ~record_batch_builder(); void set_producer_identity(int64_t id, int16_t epoch) { @@ -81,6 +82,7 @@ class record_batch_builder { std::vector headers; }; + model::record_batch_header build_header() const; uint32_t record_size(int32_t offset_delta, const serialized_record& r); model::record_batch_type _batch_type;