Skip to content

Commit

Permalink
Merge pull request #23861 from IoannisRP/vbotbuildovich/backport-2379…
Browse files Browse the repository at this point in the history
…2-v24.1.x-727

[v24.1.x] kafka: oversized alloc in list_offsets_topic
  • Loading branch information
IoannisRP authored Oct 22, 2024
2 parents 0ee9a52 + 6128f59 commit 08a3936
Show file tree
Hide file tree
Showing 4 changed files with 305 additions and 5 deletions.
14 changes: 9 additions & 5 deletions src/v/kafka/server/handlers/list_offsets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "model/fundamental.h"
#include "model/namespace.h"
#include "resource_mgmt/io_priority.h"
#include "ssx/when_all.h"

namespace kafka {

Expand Down Expand Up @@ -193,7 +194,7 @@ static ss::future<list_offset_partition_response> list_offsets_partition(

static ss::future<list_offset_topic_response>
list_offsets_topic(list_offsets_ctx& octx, list_offset_topic& topic) {
std::vector<ss::future<list_offset_partition_response>> partitions;
chunked_vector<ss::future<list_offset_partition_response>> partitions;
partitions.reserve(topic.partitions.size());

const auto* disabled_set
Expand Down Expand Up @@ -232,9 +233,11 @@ list_offsets_topic(list_offsets_ctx& octx, list_offset_topic& topic) {
partitions.push_back(std::move(pr));
}

return when_all_succeed(partitions.begin(), partitions.end())
return ssx::when_all_succeed<
chunked_vector<list_offset_partition_response>>(
std::move(partitions))
.then([name = std::move(topic.name)](
std::vector<list_offset_partition_response> parts) mutable {
chunked_vector<list_offset_partition_response> parts) mutable {
return list_offset_topic_response{
.name = std::move(name),
.partitions = chunked_vector<list_offset_partition_response>{
Expand Down Expand Up @@ -341,8 +344,9 @@ list_offsets_handler::handle(request_context ctx, ss::smp_service_group ssg) {

return ss::do_with(std::move(octx), [](list_offsets_ctx& octx) {
auto topics = list_offsets_topics(octx);
return ss::when_all_succeed(topics.begin(), topics.end())
.then([&octx](std::vector<list_offset_topic_response> topics) {
return ssx::when_all_succeed<
chunked_vector<list_offset_topic_response>>(std::move(topics))
.then([&octx](chunked_vector<list_offset_topic_response> topics) {
octx.response.data.topics = {
std::make_move_iterator(topics.begin()),
std::make_move_iterator(topics.end())};
Expand Down
97 changes: 97 additions & 0 deletions src/v/ssx/include/ssx/when_all.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#pragma once

#include "serde/rw/reservable.h"

#include <seastar/core/future.hh>
#include <seastar/coroutine/as_future.hh>

#include <concepts>
#include <ranges>

namespace ssx {

namespace detail {

template<typename Container, typename... Args>
concept emplace_backable = requires(Container c, Args&&... args) {
c.emplace_back(std::forward<Args>(args)...);
};

} // namespace detail

/// \brief Wait for a range of futures to complete.
///
/// Given a range of futures as input, wait for all of them
/// to resolve, and return a future containing a range with the
/// resolved values of the original futures.
///
/// If any future fails, one of the exceptions will be
/// returned as a failed future.
///
/// \param futures a \c FutureRange containing the futures to wait for.
/// \pre \c futures must be an owning range, i.e. responsible for managing the
/// lifetime of its elements.
///
/// \return a \c future<ResolvedContainer> with all
/// the resolved values of \c futures
template<typename ResolvedContainer, typename FutureRange>
requires seastar::Future<typename FutureRange::value_type>
&& std::ranges::input_range<FutureRange>
&& std::constructible_from<
typename ResolvedContainer::value_type,
typename std::ranges::range_value_t<FutureRange>::value_type>
&& detail::emplace_backable<
ResolvedContainer,
typename std::ranges::range_value_t<FutureRange>::value_type>
seastar::future<ResolvedContainer> when_all_succeed(FutureRange futures) {
ResolvedContainer result{};

if constexpr (serde::Reservable<ResolvedContainer>) {
if constexpr (std::ranges::sized_range<FutureRange>) {
result.reserve(std::ranges::size(futures));
} else if constexpr (std::ranges::forward_range<FutureRange>) {
result.reserve(std::ranges::distance(futures));
} else {
// Do nothing
// Can't estimate the size of the FutureRange
}
}

std::exception_ptr excp;

for (auto& fut : futures) {
auto ready_future = co_await seastar::coroutine::as_future(
std::move(fut));

if (excp) {
ready_future.ignore_ready_future();
continue;
}

if (ready_future.failed()) {
excp = ready_future.get_exception();
continue;
}

result.emplace_back(ready_future.get());
}

if (excp) {
co_return seastar::coroutine::exception(excp);
}

co_return result;
}

} // namespace ssx
1 change: 1 addition & 0 deletions src/v/ssx/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ rp_test(
async_algorithm_test.cc
event_test.cc
work_queue_test.cc
when_all_test.cc
LIBRARIES v::gtest_main v::ssx
LABELS ssx
)
Expand Down
198 changes: 198 additions & 0 deletions src/v/ssx/tests/when_all_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "base/seastarx.h"
#include "container/fragmented_vector.h"
#include "ssx/when_all.h"

#include <seastar/core/future.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/memory.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/smp.hh>

#include <boost/container/static_vector.hpp>
#include <fmt/format.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include <memory>

using namespace std::chrono_literals;

namespace {

// A small number of items that are easy to parse visually if something goes
// wrong.
constexpr size_t few_items = 5;

template<typename T>
using static_vector = boost::container::static_vector<T, few_items>;

template<typename Container>
Container make_ready_futures(size_t n_items) {
Container futures;
futures.reserve(n_items);
for (size_t i = 0; i < n_items; ++i) {
futures.push_back(ss::make_ready_future<int>(static_cast<int>(i)));
}
return futures;
}

template<typename Container>
Container make_expected(size_t n_items) {
using value_type = typename Container::value_type;
Container expected;
expected.reserve(n_items);
for (size_t i = 0; i < n_items; ++i) {
expected.push_back(static_cast<value_type>(i));
}
return expected;
}

} // namespace

TEST(WhenAllAlgorithm, when_all_basic_testing) {
// all futures ready
{
const auto expected_vector = ::make_expected<std::vector<int>>(
few_items);
auto futures = ::make_ready_futures<std::vector<ss::future<int>>>(
few_items);
auto res = ssx::when_all_succeed<std::vector<int>>(std::move(futures));
EXPECT_EQ(res.get(), expected_vector);
}

// all futures ready - boost::static_vector
{
const auto expected_vector = ::make_expected<static_vector<int>>(
few_items);
auto futures = ::make_ready_futures<static_vector<ss::future<int>>>(
few_items);
auto res = ssx::when_all_succeed<static_vector<int>>(
std::move(futures));
EXPECT_EQ(res.get(), expected_vector);
}

// all futures ready - chunked_vector
{
const auto expected_vector = ::make_expected<chunked_vector<int>>(
few_items);
auto futures = ::make_ready_futures<chunked_vector<ss::future<int>>>(
few_items);
auto res = ssx::when_all_succeed<chunked_vector<int>>(
std::move(futures));
EXPECT_EQ(res.get(), expected_vector);
}

// all futures ready - input/output different containers
{
const auto expected_vector = ::make_expected<chunked_vector<int>>(
few_items);
auto futures = ::make_ready_futures<std::vector<ss::future<int>>>(
few_items);
auto res = ssx::when_all_succeed<chunked_vector<int>>(
std::move(futures));
EXPECT_EQ(res.get(), expected_vector);
}

// all futures ready - input/output different value type
{
const auto expected_vector = ::make_expected<chunked_vector<double>>(
few_items);
auto futures = ::make_ready_futures<std::vector<ss::future<int>>>(
few_items);
auto res = ssx::when_all_succeed<chunked_vector<double>>(
std::move(futures));
EXPECT_EQ(res.get(), expected_vector);
}

// all futures ready - move_only value_type
{
using StringPtr = std::unique_ptr<std::string>;
std::vector<ss::future<StringPtr>> futures;
futures.reserve(2);
futures.push_back(seastar::make_ready_future<StringPtr>(
std::make_unique<std::string>("test string 0")));
futures.push_back(seastar::make_ready_future<StringPtr>(
std::make_unique<std::string>("test string 1")));
auto res = ssx::when_all_succeed<chunked_vector<StringPtr>>(
std::move(futures));
auto resolved = res.get();
EXPECT_EQ(resolved.size(), 2);
EXPECT_EQ(*resolved[0], "test string 0");
EXPECT_EQ(*resolved[1], "test string 1");
}

// exceptional future
{
std::vector<ss::future<int>> futures;
futures.reserve(few_items);
futures.push_back(seastar::make_ready_future<int>(0));
futures.push_back(seastar::make_ready_future<int>(1));
futures.emplace_back(seastar::make_exception_future<int>(
std::runtime_error{"Test Exception"}));
futures.push_back(seastar::make_ready_future<int>(3));
futures.push_back(seastar::make_ready_future<int>(4));
auto res = ssx::when_all_succeed<std::vector<int>>(std::move(futures));
EXPECT_THROW(res.get(), std::runtime_error);
}
}

TEST(WhenAllAlgorithm, when_all_ongoing_futures) {
constexpr auto time_increment = 10ms;

const std::vector<int> expected_std_vector{0, 1, 2, 3, 4};

// wait for futures
{
std::vector<ss::future<int>> futures;
futures.reserve(few_items);
for (size_t i = 0; i < few_items; ++i) {
auto f
= ss::sleep<seastar::lowres_clock>(i * time_increment).then([i] {
return seastar::make_ready_future<int>(static_cast<int>(i));
});
futures.push_back(std::move(f));
}
auto res = ssx::when_all_succeed<std::vector<int>>(std::move(futures));
EXPECT_EQ(res.get(), expected_std_vector);
}
}

// This test is only meaningfull in release mode
#ifdef NDEBUG

TEST(WhenAllAlgorithm, when_all_verify_no_large_allocation) {
// Enougn items to fill 2 fragments in a chunked_vector<int>.
// Also, twice the size of the 128kb expected allocation warning threshold.
constexpr size_t many_items = 2UL * 1024UL * 128UL / sizeof(int);

// Same as the default memory allocation warning threshold
const size_t large_allocation_threshold = 1024UL * 128UL + 1UL;
ss::smp::invoke_on_all([threshold = large_allocation_threshold] {
ss::memory::set_large_allocation_warning_threshold(threshold);
}).get();

// validate that a large vector won't generate a warning
{
const auto expected_vector = ::make_expected<chunked_vector<int>>(
many_items);
auto futures = ::make_ready_futures<chunked_vector<ss::future<int>>>(
many_items);
auto res = ssx::when_all_succeed<chunked_vector<int>>(
std::move(futures));
EXPECT_EQ(res.get(), expected_vector);

const ss::memory::statistics mem = ss::memory::stats();
EXPECT_EQ(mem.large_allocations(), 0);
}
}

#endif

0 comments on commit 08a3936

Please sign in to comment.