From ca63337fa2301a8f14ba48c72061549498b7abbd Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Tue, 17 Sep 2024 19:06:23 -0700 Subject: [PATCH] archival: skip spillover retention if not collectable CONFLICT: - updated test for GTest Compact-only topics should never apply retention. This wasn't the case for the spillover region in tiered storage. Note, for the STM region, we do correctly no-op when trying to apply retention for compacted topics. (cherry picked from commit 26df815465dc2cfd68e716e0936d1bdc99bb248b) --- src/v/archival/ntp_archiver_service.cc | 4 ++ .../tests/cloud_storage_e2e_test.cc | 45 +++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/src/v/archival/ntp_archiver_service.cc b/src/v/archival/ntp_archiver_service.cc index a46379871baf..0c0bbd98e979 100644 --- a/src/v/archival/ntp_archiver_service.cc +++ b/src/v/archival/ntp_archiver_service.cc @@ -2211,6 +2211,10 @@ ss::future<> ntp_archiver::apply_archive_retention() { } const auto& ntp_conf = _parent.get_ntp_config(); + if (!ntp_conf.is_collectable()) { + vlog(_rtclog.trace, "NTP is not collectable"); + co_return; + } std::optional retention_bytes = ntp_conf.retention_bytes(); std::optional retention_ms = ntp_conf.retention_duration(); diff --git a/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc b/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc index 9892e97b1d71..34714041820d 100644 --- a/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc +++ b/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc @@ -58,6 +58,51 @@ class e2e_fixture scoped_config test_local_cfg; }; +FIXTURE_TEST(test_spillover_retention_compacted_topic, e2e_fixture) { + test_local_cfg.get("cloud_storage_disable_upload_loop_for_tests") + .set_value(true); + test_local_cfg.get("cloud_storage_spillover_manifest_max_segments") + .set_value(std::make_optional(5)); + test_local_cfg.get("cloud_storage_spillover_manifest_size") + .set_value(std::optional{}); + test_local_cfg.get("log_retention_ms") + .set_value(std::make_optional(1ms)); + const model::topic topic_name("tapioca"); + model::ntp ntp(model::kafka_namespace, topic_name, 0); + + cluster::topic_properties props; + props.shadow_indexing = model::shadow_indexing_mode::full; + props.cleanup_policy_bitflags = model::cleanup_policy_bitflags::compaction; + add_topic({model::kafka_namespace, topic_name}, 1, props).get(); + wait_for_leader(ntp).get(); + + const auto records_per_seg = 5; + const auto num_segs = 100; + auto partition = app.partition_manager.local().get(ntp); + auto& archiver = partition->archiver().value().get(); + tests::remote_segment_generator gen(make_kafka_client().get(), *partition); + auto total_records = gen.num_segments(num_segs) + .batches_per_segment(records_per_seg) + .produce() + .get(); + BOOST_REQUIRE_GE(total_records, 500); + BOOST_REQUIRE(archiver.sync_for_tests().get()); + archiver.apply_spillover().get(); + ss::sleep(5s).get(); + archiver.apply_archive_retention().get(); + + tests::kafka_list_offsets_transport lister(make_kafka_client().get()); + lister.start().get(); + + auto offset + = lister.start_offset_for_partition(topic_name, model::partition_id(0)) + .get(); + BOOST_REQUIRE_EQUAL(offset(), 0); + BOOST_REQUIRE_EQUAL( + archiver.manifest().full_log_start_offset().value_or(model::offset{})(), + 0); +} + FIXTURE_TEST(test_produce_consume_from_cloud, e2e_fixture) { test_local_cfg.get("cloud_storage_disable_upload_loop_for_tests") .set_value(true);