Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] [CORE-7803] Audit Log Manager: Refactoring - use and reduce retries. #23868

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/v/kafka/client/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
v_cc_library(
NAME kc_record_batcher
HDRS
record_batcher.h
SRCS
record_batcher.cc
DEPS
v::model
v::storage
)

v_cc_library(
NAME kafka_client
SRCS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,42 @@
#include "storage/record_batch_builder.h"
#include "utils/human.h"

namespace transform::logging {
namespace kafka::client {

namespace detail {
class batcher_impl {
public:
batcher_impl() = delete;
explicit batcher_impl(size_t batch_max_bytes)
: _batch_max_bytes(batch_max_bytes) {}
explicit batcher_impl(
size_t batch_max_bytes, std::optional<ss::logger*> log)
: _batch_max_bytes(batch_max_bytes)
, _log(log.value_or(&kclog)) {
vassert(_log != nullptr, "Injected logger must not be nullptr");
}
~batcher_impl() = default;
batcher_impl(const batcher_impl&) = delete;
batcher_impl& operator=(const batcher_impl&) = delete;
batcher_impl(batcher_impl&&) = delete;
batcher_impl& operator=(batcher_impl&&) = delete;

model::record_batch make_batch_of_one(iobuf k, iobuf v) {
model::record_batch
make_batch_of_one(std::optional<iobuf> k, std::optional<iobuf> v) {
return std::move(bb_init().add_raw_kv(std::move(k), std::move(v)))
.build();
}

void append(iobuf k, iobuf v) {
void append(std::optional<iobuf> k, std::optional<iobuf> v) {
auto record = model::record(
/*attributes*/ {},
/*ts_delta*/ 0,
/*offset_delta*/ std::numeric_limits<int32_t>::max(),
std::move(k),
std::move(v),
std::move(k).value_or(iobuf{}),
std::move(v).value_or(iobuf{}),
/*headers*/ {});
size_t record_size = record.size_bytes();
if (record_size > max_records_bytes()) {
vlog(
tlg_log.info,
_log->info,
"Dropped record: size exceeds configured batch max "
"size: {} > {}",
human::bytes{static_cast<double>(record_size)},
Expand Down Expand Up @@ -92,14 +97,14 @@ class batcher_impl {
- static_cast<int64_t>(batch.size_bytes());
if (diff < 0) {
vlog(
tlg_log.debug,
_log->debug,
"Underestimaged batch size {} - {} = {}",
human::bytes{static_cast<double>(batch_size_bytes())},
human::bytes{static_cast<double>(batch.size_bytes())},
diff);
} else {
vlog(
tlg_log.trace,
_log->trace,
"Building record batch. Actual size: {} (estimated: {}, err:{})",
human::bytes{static_cast<double>(batch.size_bytes())},
human::bytes{static_cast<double>(batch_size_bytes())},
Expand All @@ -119,19 +124,26 @@ class batcher_impl {
}

size_t _batch_max_bytes;
ss::logger* _log;
storage::record_batch_builder _builder{bb_init()};
ss::chunked_fifo<model::record_batch> _record_batches;
size_t _curr_batch_size{0};
};

} // namespace detail

record_batcher::record_batcher(size_t max_batch_size)
: _impl(std::make_unique<detail::batcher_impl>(max_batch_size)) {}
record_batcher::record_batcher(
size_t max_batch_size, std::optional<ss::logger*> log)
: _impl(std::make_unique<detail::batcher_impl>(max_batch_size, log)) {}

record_batcher::~record_batcher() = default;

void record_batcher::append(iobuf k, iobuf v) {
model::record_batch record_batcher::make_batch_of_one(
std::optional<iobuf> k, std::optional<iobuf> v) {
return _impl->make_batch_of_one(std::move(k), std::move(v));
}

void record_batcher::append(std::optional<iobuf> k, std::optional<iobuf> v) {
_impl->append(std::move(k), std::move(v));
}
size_t record_batcher::total_size_bytes() { return _impl->total_size_bytes(); }
Expand All @@ -140,4 +152,4 @@ ss::chunked_fifo<model::record_batch> record_batcher::finish() {
return _impl->finish();
}

} // namespace transform::logging
} // namespace kafka::client
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@

#pragma once

#include "bytes/iobuf.h"
#include "model/record.h"

namespace transform::logging {
#include <seastar/util/log.hh>

#include <optional>

namespace kafka::client {

namespace detail {
class batcher_impl;
Expand All @@ -31,18 +36,25 @@ class batcher_impl;
class record_batcher {
public:
record_batcher() = delete;
explicit record_batcher(size_t batch_max_bytes);
explicit record_batcher(
size_t batch_max_bytes, std::optional<ss::logger*> log = std::nullopt);
~record_batcher();
record_batcher(const record_batcher&) = delete;
record_batcher& operator=(const record_batcher&) = delete;
record_batcher(record_batcher&&) = delete;
record_batcher& operator=(record_batcher&&) = delete;

/**
* Construct a batch from a single record
*/
model::record_batch
make_batch_of_one(std::optional<iobuf> k, std::optional<iobuf> v);

/**
* Add a record to the current batch, possibly rolling over to a
* new batch if the serialized record would exceed batch_max_bytes.
*/
void append(iobuf k, iobuf v);
void append(std::optional<iobuf> k, std::optional<iobuf> v);

/**
* Return the total size in bytes of all record_batches, exclusive of any
Expand All @@ -59,4 +71,4 @@ class record_batcher {
private:
std::unique_ptr<detail::batcher_impl> _impl;
};
} // namespace transform::logging
} // namespace kafka::client
23 changes: 23 additions & 0 deletions src/v/kafka/client/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,29 @@ rp_test(
LABELS kafka
)

v_cc_library(
NAME kc_test_utils
HDRS
utils.h
SRCS
utils.cc
DEPS
v::utils
)

rp_test(
UNIT_TEST
GTEST
BINARY_NAME test_kc_record_batcher
SOURCES
record_batcher_test.cc
LIBRARIES
v::gtest_main
v::kc_record_batcher
v::kc_test_utils
LABELS kafka
)

rp_test(
FIXTURE_TEST
BINARY_NAME kafka_client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
*/

#include "base/units.h"
#include "transform/logging/record_batcher.h"
#include "transform/logging/tests/utils.h"
#include "kafka/client/record_batcher.h"
#include "kafka/client/test/utils.h"

#include <gtest/gtest.h>

namespace transform::logging {
namespace kafka::client {

TEST(TransformLoggingRecordBatcherTest, TestMaxBytes) {
constexpr size_t batch_max_bytes = 1_KiB;
Expand Down Expand Up @@ -64,4 +64,4 @@ TEST(TransformLoggingRecordBatcherTest, TestReuseBatcher) {
}
}

} // namespace transform::logging
} // namespace kafka::client
27 changes: 27 additions & 0 deletions src/v/kafka/client/test/utils.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#include "kafka/client/test/utils.h"

#include "random/generators.h"

namespace kafka::client::testing {

iobuf random_length_iobuf(size_t data_max) {
assert(data_max > 0);
auto sz = random_generators::get_int(size_t{1}, data_max);
auto data = random_generators::gen_alphanum_string(sz);
iobuf b;
b.append(data.data(), data.size());
return b;
}

} // namespace kafka::client::testing
5 changes: 5 additions & 0 deletions src/v/kafka/client/test/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#pragma once

#include "bytes/iobuf.h"
#include "reflection/adl.h"
#include "storage/record_batch_builder.h"

Expand All @@ -22,3 +23,7 @@ inline model::record_batch make_batch(model::offset offset, size_t count) {
}
return std::move(builder).build();
}

namespace kafka::client::testing {
iobuf random_length_iobuf(size_t data_max);
} // namespace kafka::client::testing
6 changes: 5 additions & 1 deletion src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1684,7 +1684,11 @@ void application::wire_up_redpanda_services(

syschecks::systemd_message("Creating auditing subsystem").get();
construct_service(
audit_mgr, controller.get(), std::ref(*_audit_log_client_config))
audit_mgr,
node_id,
controller.get(),
std::ref(*_audit_log_client_config),
&metadata_cache)
.get();

syschecks::systemd_message("Creating metadata dissemination service").get();
Expand Down
2 changes: 2 additions & 0 deletions src/v/security/audit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ v_cc_library(
v::bytes
v::utils
v::config
v::cluster
v::kc_record_batcher
)

add_subdirectory(schemas)
Expand Down
Loading