diff --git a/src/v/cloud_storage/cache_service.cc b/src/v/cloud_storage/cache_service.cc index 16fe70690e69a..336080ecc3754 100644 --- a/src/v/cloud_storage/cache_service.cc +++ b/src/v/cloud_storage/cache_service.cc @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -1233,6 +1234,7 @@ ss::future<> cache::put( ss::this_shard_id(), (++_cnt), cache_tmp_file_extension)); + auto tmp_filepath = dir_path / tmp_filename; ss::file tmp_cache_file; while (true) { @@ -1247,14 +1249,14 @@ ss::future<> cache::put( | ss::open_flags::exclusive; tmp_cache_file = co_await ss::open_file_dma( - (dir_path / tmp_filename).native(), flags); + tmp_filepath.native(), flags); break; } catch (std::filesystem::filesystem_error& e) { if (e.code() == std::errc::no_such_file_or_directory) { vlog( cst_log.debug, "Couldn't open {}, gonna retry", - (dir_path / tmp_filename).native()); + tmp_filepath.native()); } else { throw; } @@ -1267,42 +1269,59 @@ ss::future<> cache::put( options.io_priority_class = io_priority; auto out = co_await ss::make_file_output_stream(tmp_cache_file, options); - std::exception_ptr disk_full_error; + std::exception_ptr eptr; + bool no_space_on_device = false; try { co_await ss::copy(data, out) .then([&out]() { return out.flush(); }) .finally([&out]() { return out.close(); }); } catch (std::filesystem::filesystem_error& e) { // For ENOSPC errors, delay handling so that we can do a trim - if (e.code() == std::errc::no_space_on_device) { - disk_full_error = std::current_exception(); - } else { - throw; + no_space_on_device = e.code() == std::errc::no_space_on_device; + eptr = std::current_exception(); + } catch (...) { + // For other errors, delay handling so that we can clean up the tmp file + eptr = std::current_exception(); + } + + // If we failed to write to the tmp file, we should delete it, maybe do an + // eager trim, and rethrow the exception. + if (eptr) { + if (!_gate.is_closed()) { + auto delete_tmp_fut = co_await ss::coroutine::as_future( + delete_file_and_empty_parents(tmp_filepath.native())); + if ( + delete_tmp_fut.failed() + && !ssx::is_shutdown_exception(delete_tmp_fut.get_exception())) { + vlog( + cst_log.error, + "Failed to delete tmp file {}: {}", + tmp_filepath.native(), + delete_tmp_fut.get_exception()); + } } - } - if (disk_full_error) { - vlog(cst_log.error, "Out of space while writing to cache"); + if (no_space_on_device) { + vlog(cst_log.error, "Out of space while writing to cache"); - // Block further puts from being attempted until notify_disk_status - // reports that there is space available. - set_block_puts(true); + // Block further puts from being attempted until notify_disk_status + // reports that there is space available. + set_block_puts(true); - // Trim proactively: if many fibers hit this concurrently, - // they'll contend for cleanup_sm and the losers will skip - // trim due to throttling. - co_await trim_throttled(); + // Trim proactively: if many fibers hit this concurrently, + // they'll contend for cleanup_sm and the losers will skip + // trim due to throttling. + co_await trim_throttled(); + } - throw disk_full_error; + std::rethrow_exception(eptr); } // commit write transaction - auto src = (dir_path / tmp_filename).native(); - auto dest = (dir_path / filename).native(); + auto put_size = co_await ss::file_size(tmp_filepath.native()); - auto put_size = co_await ss::file_size(src); - - co_await ss::rename_file(src, dest); + auto dest = (dir_path / filename).native(); + co_await ss::rename_file(tmp_filepath.native(), dest); // We will now update reservation.wrote_data(put_size, 1); diff --git a/src/v/cloud_storage/tests/cache_test.cc b/src/v/cloud_storage/tests/cache_test.cc index 8612a2a6d0ec6..a697e5917baa0 100644 --- a/src/v/cloud_storage/tests/cache_test.cc +++ b/src/v/cloud_storage/tests/cache_test.cc @@ -18,7 +18,9 @@ #include "random/generators.h" #include "ssx/sformat.h" #include "test_utils/fixture.h" +#include "test_utils/iostream.h" #include "test_utils/scoped_config.h" +#include "utils/directory_walker.h" #include "utils/file_io.h" #include "utils/human.h" @@ -451,6 +453,41 @@ SEASTAR_THREAD_TEST_CASE(test_access_time_tracker_read_skipped_on_old_version) { BOOST_REQUIRE_EQUAL(out.size(), 0); } +ss::future count_files(ss::sstring dirname) { + directory_walker walker; + size_t count = 0; + co_await walker.walk( + dirname, + [dirname, &count](const ss::directory_entry& entry) -> ss::future<> { + if (entry.type == ss::directory_entry_type::directory) { + return count_files(fmt::format("{}/{}", dirname, entry.name)) + .then([&count](size_t sub_count) { count += sub_count; }); + } else { + ++count; + return ss::now(); + } + }); + + co_return count; +} + +FIXTURE_TEST(test_clean_up_on_stream_exception, cache_test_fixture) { + auto s = tests::make_throwing_stream(ss::abort_requested_exception()); + auto reservation = sharded_cache.local().reserve_space(1, 1).get(); + BOOST_CHECK_THROW( + sharded_cache.local().put(KEY, s, reservation).get(), + ss::abort_requested_exception); + vlog(test_log.info, "Put failed as expected"); + + BOOST_CHECK_EQUAL(sharded_cache.local().get_usage_bytes(), 0); + BOOST_CHECK_EQUAL(sharded_cache.local().get_usage_objects(), 0); + + vlog(test_log.info, "Counting files in cache directory"); + BOOST_CHECK_EQUAL(count_files(CACHE_DIR.native()).get(), 0); + + vlog(test_log.info, "Test passed"); +} + /** * Validate that .part files and empty directories are deleted if found during * the startup walk of the cache. diff --git a/src/v/test_utils/BUILD b/src/v/test_utils/BUILD index 684039b020f56..c9eee9d26f1e2 100644 --- a/src/v/test_utils/BUILD +++ b/src/v/test_utils/BUILD @@ -44,3 +44,16 @@ redpanda_cc_library( "@seastar//:testing", ], ) + +redpanda_test_cc_library( + name = "iostream", + hdrs = [ + "iostream.h", + ], + include_prefix = "test_utils", + visibility = ["//visibility:public"], + deps = [ + "//src/v/base", + "@seastar", + ], +) diff --git a/src/v/test_utils/iostream.h b/src/v/test_utils/iostream.h new file mode 100644 index 0000000000000..131ca6a9aa301 --- /dev/null +++ b/src/v/test_utils/iostream.h @@ -0,0 +1,42 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#pragma once + +#include "base/seastarx.h" + +#include +#include +#include + +namespace tests { + +/// Create an input stream that throws an exception on first interaction. +template +ss::input_stream make_throwing_stream(Err err) { + struct throwing_stream final : ss::data_source_impl { + explicit throwing_stream(Err e) + : _err(std::move(e)) {} + + ss::future> skip(uint64_t) final { + return get(); + } + + ss::future> get() final { + return ss::make_exception_future>( + std::move(_err)); + } + + Err _err; + }; + auto ds = ss::data_source(std::make_unique(err)); + return ss::input_stream(std::move(ds)); +} + +} // namespace tests diff --git a/src/v/utils/tests/BUILD b/src/v/utils/tests/BUILD index 483c587bb70f9..e9de8a149012b 100644 --- a/src/v/utils/tests/BUILD +++ b/src/v/utils/tests/BUILD @@ -389,6 +389,7 @@ redpanda_cc_btest( "//src/v/bytes:iostream", "//src/v/bytes:random", "//src/v/random:generators", + "//src/v/test_utils:iostream", "//src/v/test_utils:seastar_boost", "//src/v/utils:stream_utils", "@boost//:test", diff --git a/src/v/utils/tests/input_stream_fanout_test.cc b/src/v/utils/tests/input_stream_fanout_test.cc index 9fd9cc4b2cc71..8d2d86b427cfe 100644 --- a/src/v/utils/tests/input_stream_fanout_test.cc +++ b/src/v/utils/tests/input_stream_fanout_test.cc @@ -13,6 +13,7 @@ #include "bytes/iostream.h" #include "bytes/random.h" #include "random/generators.h" +#include "test_utils/iostream.h" #include "utils/stream_utils.h" #include @@ -437,29 +438,8 @@ SEASTAR_THREAD_TEST_CASE(input_stream_fanout_detach_10_size_limit) { test_detached_consumer<10>(4, 1000); } -template -ss::input_stream make_throwing_stream(Err err) { - struct throwing_stream final : ss::data_source_impl { - explicit throwing_stream(Err e) - : _err(std::move(e)) {} - - ss::future> skip(uint64_t) final { - return get(); - } - - ss::future> get() final { - return ss::make_exception_future>( - std::move(_err)); - } - - Err _err; - }; - auto ds = ss::data_source(std::make_unique(err)); - return ss::input_stream(std::move(ds)); -} - SEASTAR_THREAD_TEST_CASE(input_stream_fanout_producer_throw) { - auto is = make_throwing_stream(ss::abort_requested_exception()); + auto is = tests::make_throwing_stream(ss::abort_requested_exception()); auto [s1, s2] = input_stream_fanout<2>(std::move(is), 4, 8); BOOST_REQUIRE_THROW(s1.read().get(), ss::abort_requested_exception);