From 82880d1254d802320622ad2c41f41dce8af7ed1e Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Mon, 4 Nov 2024 15:04:13 +0000 Subject: [PATCH 1/6] cloud_storage: throw exception not a pointer Noticed while manually reviewing code. No change in behavior expected. (cherry picked from commit 6e1e9980215c3fda81a4785f406f4604b0f8eff4) --- src/v/cloud_storage/cache_service.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v/cloud_storage/cache_service.cc b/src/v/cloud_storage/cache_service.cc index 16fe70690e69..da64d34c690a 100644 --- a/src/v/cloud_storage/cache_service.cc +++ b/src/v/cloud_storage/cache_service.cc @@ -1293,7 +1293,7 @@ ss::future<> cache::put( // trim due to throttling. co_await trim_throttled(); - throw disk_full_error; + std::rethrow_exception(disk_full_error); } // commit write transaction From 3797ba347849554a4ec117c80e8dd807d6037f14 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Mon, 4 Nov 2024 16:32:15 +0000 Subject: [PATCH 2/6] test_utils: extract make_throwing_stream as a test_util (cherry picked from commit be77f709bce76f2af759d7d3c3259f6af1aeecde) --- src/v/test_utils/BUILD | 13 ++++++ src/v/test_utils/iostream.h | 42 +++++++++++++++++++ src/v/utils/tests/BUILD | 1 + src/v/utils/tests/input_stream_fanout_test.cc | 24 +---------- 4 files changed, 58 insertions(+), 22 deletions(-) create mode 100644 src/v/test_utils/iostream.h diff --git a/src/v/test_utils/BUILD b/src/v/test_utils/BUILD index 684039b020f5..c9eee9d26f1e 100644 --- a/src/v/test_utils/BUILD +++ b/src/v/test_utils/BUILD @@ -44,3 +44,16 @@ redpanda_cc_library( "@seastar//:testing", ], ) + +redpanda_test_cc_library( + name = "iostream", + hdrs = [ + "iostream.h", + ], + include_prefix = "test_utils", + visibility = ["//visibility:public"], + deps = [ + "//src/v/base", + "@seastar", + ], +) diff --git a/src/v/test_utils/iostream.h b/src/v/test_utils/iostream.h new file mode 100644 index 000000000000..131ca6a9aa30 --- /dev/null +++ b/src/v/test_utils/iostream.h @@ -0,0 +1,42 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#pragma once + +#include "base/seastarx.h" + +#include +#include +#include + +namespace tests { + +/// Create an input stream that throws an exception on first interaction. +template +ss::input_stream make_throwing_stream(Err err) { + struct throwing_stream final : ss::data_source_impl { + explicit throwing_stream(Err e) + : _err(std::move(e)) {} + + ss::future> skip(uint64_t) final { + return get(); + } + + ss::future> get() final { + return ss::make_exception_future>( + std::move(_err)); + } + + Err _err; + }; + auto ds = ss::data_source(std::make_unique(err)); + return ss::input_stream(std::move(ds)); +} + +} // namespace tests diff --git a/src/v/utils/tests/BUILD b/src/v/utils/tests/BUILD index 483c587bb70f..5ce4a0c89003 100644 --- a/src/v/utils/tests/BUILD +++ b/src/v/utils/tests/BUILD @@ -390,6 +390,7 @@ redpanda_cc_btest( "//src/v/bytes:random", "//src/v/random:generators", "//src/v/test_utils:seastar_boost", + "//src/v/test_utils:iostream", "//src/v/utils:stream_utils", "@boost//:test", "@seastar", diff --git a/src/v/utils/tests/input_stream_fanout_test.cc b/src/v/utils/tests/input_stream_fanout_test.cc index 9fd9cc4b2cc7..8d2d86b427cf 100644 --- a/src/v/utils/tests/input_stream_fanout_test.cc +++ b/src/v/utils/tests/input_stream_fanout_test.cc @@ -13,6 +13,7 @@ #include "bytes/iostream.h" #include "bytes/random.h" #include "random/generators.h" +#include "test_utils/iostream.h" #include "utils/stream_utils.h" #include @@ -437,29 +438,8 @@ SEASTAR_THREAD_TEST_CASE(input_stream_fanout_detach_10_size_limit) { test_detached_consumer<10>(4, 1000); } -template -ss::input_stream make_throwing_stream(Err err) { - struct throwing_stream final : ss::data_source_impl { - explicit throwing_stream(Err e) - : _err(std::move(e)) {} - - ss::future> skip(uint64_t) final { - return get(); - } - - ss::future> get() final { - return ss::make_exception_future>( - std::move(_err)); - } - - Err _err; - }; - auto ds = ss::data_source(std::make_unique(err)); - return ss::input_stream(std::move(ds)); -} - SEASTAR_THREAD_TEST_CASE(input_stream_fanout_producer_throw) { - auto is = make_throwing_stream(ss::abort_requested_exception()); + auto is = tests::make_throwing_stream(ss::abort_requested_exception()); auto [s1, s2] = input_stream_fanout<2>(std::move(is), 4, 8); BOOST_REQUIRE_THROW(s1.read().get(), ss::abort_requested_exception); From 7d676dcf4bdcf477fb0bd044326c3095d4f74c9c Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Mon, 4 Nov 2024 16:36:26 +0000 Subject: [PATCH 3/6] cst/cache: Test current exception handling during put (cherry picked from commit 5a93114f8a7b1f4a3dc24809a7809e5ab50b2073) --- src/v/cloud_storage/tests/cache_test.cc | 39 +++++++++++++++++++++++++ src/v/utils/tests/BUILD | 2 +- 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/src/v/cloud_storage/tests/cache_test.cc b/src/v/cloud_storage/tests/cache_test.cc index 8612a2a6d0ec..0fd8cbc253a8 100644 --- a/src/v/cloud_storage/tests/cache_test.cc +++ b/src/v/cloud_storage/tests/cache_test.cc @@ -18,7 +18,9 @@ #include "random/generators.h" #include "ssx/sformat.h" #include "test_utils/fixture.h" +#include "test_utils/iostream.h" #include "test_utils/scoped_config.h" +#include "utils/directory_walker.h" #include "utils/file_io.h" #include "utils/human.h" @@ -451,6 +453,43 @@ SEASTAR_THREAD_TEST_CASE(test_access_time_tracker_read_skipped_on_old_version) { BOOST_REQUIRE_EQUAL(out.size(), 0); } +ss::future count_files(ss::sstring dirname) { + directory_walker walker; + size_t count = 0; + co_await walker.walk( + dirname, + [dirname, &count](const ss::directory_entry& entry) -> ss::future<> { + if (entry.type == ss::directory_entry_type::directory) { + return count_files(fmt::format("{}/{}", dirname, entry.name)) + .then([&count](size_t sub_count) { count += sub_count; }); + } else { + ++count; + return ss::now(); + } + }); + + co_return count; +} + +FIXTURE_TEST(test_clean_up_on_stream_exception, cache_test_fixture) { + auto s = tests::make_throwing_stream(ss::abort_requested_exception()); + auto reservation = sharded_cache.local().reserve_space(1, 1).get(); + BOOST_CHECK_THROW( + sharded_cache.local().put(KEY, s, reservation).get(), + ss::abort_requested_exception); + vlog(test_log.info, "Put failed as expected"); + + BOOST_CHECK_EQUAL(sharded_cache.local().get_usage_bytes(), 0); + BOOST_CHECK_EQUAL(sharded_cache.local().get_usage_objects(), 0); + + // TODO: This is not expected behavior. The temporary file should be cleaned + // up on exception. + vlog(test_log.info, "Counting files in cache directory"); + BOOST_CHECK_EQUAL(count_files(CACHE_DIR.native()).get(), 1); + + vlog(test_log.info, "Test passed"); +} + /** * Validate that .part files and empty directories are deleted if found during * the startup walk of the cache. diff --git a/src/v/utils/tests/BUILD b/src/v/utils/tests/BUILD index 5ce4a0c89003..e9de8a149012 100644 --- a/src/v/utils/tests/BUILD +++ b/src/v/utils/tests/BUILD @@ -389,8 +389,8 @@ redpanda_cc_btest( "//src/v/bytes:iostream", "//src/v/bytes:random", "//src/v/random:generators", - "//src/v/test_utils:seastar_boost", "//src/v/test_utils:iostream", + "//src/v/test_utils:seastar_boost", "//src/v/utils:stream_utils", "@boost//:test", "@seastar", From 6eaab892b98ba66ab155d79da039ae5d196c4841 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Mon, 4 Nov 2024 17:57:26 +0000 Subject: [PATCH 4/6] cst/cache: Extract tmp_filepath variable (cherry picked from commit 50fef29c7910e0c5725cea8e3d5aea3e8e8802c8) --- src/v/cloud_storage/cache_service.cc | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/v/cloud_storage/cache_service.cc b/src/v/cloud_storage/cache_service.cc index da64d34c690a..1ac821d5396f 100644 --- a/src/v/cloud_storage/cache_service.cc +++ b/src/v/cloud_storage/cache_service.cc @@ -1233,6 +1233,7 @@ ss::future<> cache::put( ss::this_shard_id(), (++_cnt), cache_tmp_file_extension)); + auto tmp_filepath = dir_path / tmp_filename; ss::file tmp_cache_file; while (true) { @@ -1247,14 +1248,14 @@ ss::future<> cache::put( | ss::open_flags::exclusive; tmp_cache_file = co_await ss::open_file_dma( - (dir_path / tmp_filename).native(), flags); + tmp_filepath.native(), flags); break; } catch (std::filesystem::filesystem_error& e) { if (e.code() == std::errc::no_such_file_or_directory) { vlog( cst_log.debug, "Couldn't open {}, gonna retry", - (dir_path / tmp_filename).native()); + tmp_filepath.native()); } else { throw; } @@ -1297,12 +1298,10 @@ ss::future<> cache::put( } // commit write transaction - auto src = (dir_path / tmp_filename).native(); - auto dest = (dir_path / filename).native(); - - auto put_size = co_await ss::file_size(src); + auto put_size = co_await ss::file_size(tmp_filepath.native()); - co_await ss::rename_file(src, dest); + auto dest = (dir_path / filename).native(); + co_await ss::rename_file(tmp_filepath.native(), dest); // We will now update reservation.wrote_data(put_size, 1); From 8ce28641423d9fca387b1d24338723adee98c73f Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Mon, 4 Nov 2024 16:50:24 +0000 Subject: [PATCH 5/6] cst/cache: Refactor error handling during put Prepare for tmp cleanup on failure. (cherry picked from commit b34aab6da144008f56a1eb51e54d60dc9ae2a5cc) --- src/v/cloud_storage/cache_service.cc | 35 +++++++++++++++------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/src/v/cloud_storage/cache_service.cc b/src/v/cloud_storage/cache_service.cc index 1ac821d5396f..0b4b3281914a 100644 --- a/src/v/cloud_storage/cache_service.cc +++ b/src/v/cloud_storage/cache_service.cc @@ -1268,33 +1268,36 @@ ss::future<> cache::put( options.io_priority_class = io_priority; auto out = co_await ss::make_file_output_stream(tmp_cache_file, options); - std::exception_ptr disk_full_error; + std::exception_ptr eptr; + bool no_space_on_device = false; try { co_await ss::copy(data, out) .then([&out]() { return out.flush(); }) .finally([&out]() { return out.close(); }); } catch (std::filesystem::filesystem_error& e) { // For ENOSPC errors, delay handling so that we can do a trim - if (e.code() == std::errc::no_space_on_device) { - disk_full_error = std::current_exception(); - } else { - throw; - } + no_space_on_device = e.code() == std::errc::no_space_on_device; + eptr = std::current_exception(); + } catch (...) { + // For other errors, delay handling so that we can clean up the tmp file + eptr = std::current_exception(); } - if (disk_full_error) { - vlog(cst_log.error, "Out of space while writing to cache"); + if (eptr) { + if (no_space_on_device) { + vlog(cst_log.error, "Out of space while writing to cache"); - // Block further puts from being attempted until notify_disk_status - // reports that there is space available. - set_block_puts(true); + // Block further puts from being attempted until notify_disk_status + // reports that there is space available. + set_block_puts(true); - // Trim proactively: if many fibers hit this concurrently, - // they'll contend for cleanup_sm and the losers will skip - // trim due to throttling. - co_await trim_throttled(); + // Trim proactively: if many fibers hit this concurrently, + // they'll contend for cleanup_sm and the losers will skip + // trim due to throttling. + co_await trim_throttled(); + } - std::rethrow_exception(disk_full_error); + std::rethrow_exception(eptr); } // commit write transaction From af467ef9afc1572b4fe3bdf49fb8f6bd2e5e6216 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Mon, 4 Nov 2024 16:52:06 +0000 Subject: [PATCH 6/6] cst/cache: Remove tmp file on put failure (cherry picked from commit e90eafbbfb348f140e10cb0e183edbf6bb270f7d) --- src/v/cloud_storage/cache_service.cc | 17 +++++++++++++++++ src/v/cloud_storage/tests/cache_test.cc | 4 +--- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/src/v/cloud_storage/cache_service.cc b/src/v/cloud_storage/cache_service.cc index 0b4b3281914a..336080ecc375 100644 --- a/src/v/cloud_storage/cache_service.cc +++ b/src/v/cloud_storage/cache_service.cc @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -1283,7 +1284,23 @@ ss::future<> cache::put( eptr = std::current_exception(); } + // If we failed to write to the tmp file, we should delete it, maybe do an + // eager trim, and rethrow the exception. if (eptr) { + if (!_gate.is_closed()) { + auto delete_tmp_fut = co_await ss::coroutine::as_future( + delete_file_and_empty_parents(tmp_filepath.native())); + if ( + delete_tmp_fut.failed() + && !ssx::is_shutdown_exception(delete_tmp_fut.get_exception())) { + vlog( + cst_log.error, + "Failed to delete tmp file {}: {}", + tmp_filepath.native(), + delete_tmp_fut.get_exception()); + } + } + if (no_space_on_device) { vlog(cst_log.error, "Out of space while writing to cache"); diff --git a/src/v/cloud_storage/tests/cache_test.cc b/src/v/cloud_storage/tests/cache_test.cc index 0fd8cbc253a8..a697e5917baa 100644 --- a/src/v/cloud_storage/tests/cache_test.cc +++ b/src/v/cloud_storage/tests/cache_test.cc @@ -482,10 +482,8 @@ FIXTURE_TEST(test_clean_up_on_stream_exception, cache_test_fixture) { BOOST_CHECK_EQUAL(sharded_cache.local().get_usage_bytes(), 0); BOOST_CHECK_EQUAL(sharded_cache.local().get_usage_objects(), 0); - // TODO: This is not expected behavior. The temporary file should be cleaned - // up on exception. vlog(test_log.info, "Counting files in cache directory"); - BOOST_CHECK_EQUAL(count_files(CACHE_DIR.native()).get(), 1); + BOOST_CHECK_EQUAL(count_files(CACHE_DIR.native()).get(), 0); vlog(test_log.info, "Test passed"); }