Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: fix cloud storage support for delete-records #11579

Merged
merged 22 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
3764687
tests: throw on error in delete records utils
andrwng Jun 29, 2023
54bb805
tests: tweak cloud_storage produce utils for local segments
andrwng Jun 29, 2023
c5de046
cloud_storage: be less restrictive in moving start kafka offset
andrwng Jun 17, 2023
4f6fa3b
kafka: clamp the start offset in Kafka layer
andrwng Jun 20, 2023
cb8323e
kafka: plumbing for archival prefix truncate
andrwng Jun 21, 2023
82f9ffc
cluster: make archival listen for prefix_truncate batch
andrwng Jun 22, 2023
25a217f
cloud_storage: fix offset-based archive retention condition
andrwng Jun 20, 2023
682ee06
cluster: remove restrictions on delete-records
andrwng Jun 21, 2023
bbe6ff3
cluster: do not translate log_eviction_stm offset on read replicas
andrwng Jun 30, 2023
61fd5d8
tests: case for syncing start kafka offset override on read replicas
andrwng Jun 30, 2023
bd613bf
tests: add list offsets kafka utils
andrwng Jun 30, 2023
2668cb4
kafka: clamp start offset on read replicas
andrwng Jun 24, 2023
950084b
kafka: reject delete records on read replicas
andrwng Jun 30, 2023
e08bdd2
cloud_storage: account for start override in STM manifest retention
andrwng Jun 22, 2023
530ebec
cloud_storage: small logging improvements
andrwng Jun 29, 2023
1e42144
tests: fixture test for consuming after delete-records
andrwng Jun 15, 2023
fe52e2b
tests: add timestamps to remote_segment_generator
andrwng Jun 29, 2023
bb69227
kafka: bump timequery results up to start override
andrwng Jun 24, 2023
39ebf96
kafka: check for truncated log before returning from fetch
andrwng Jun 24, 2023
3dc5bcf
tests: add offset_for_leader_epoch utils
andrwng Jun 30, 2023
4de15f5
kafka: bump result of leader_epoch_last_offset to start offset
andrwng Jun 24, 2023
f1368e5
ducktape: tests for delete-records
andrwng Jun 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/v/cloud_storage/tests/read_replica_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "cloud_storage/spillover_manifest.h"
#include "cloud_storage/tests/produce_utils.h"
#include "cloud_storage/tests/s3_imposter.h"
#include "cloud_storage/types.h"
#include "config/configuration.h"
#include "kafka/server/tests/delete_records_utils.h"
#include "kafka/server/tests/list_offsets_utils.h"
Expand Down Expand Up @@ -122,6 +123,47 @@ FIXTURE_TEST(test_read_replica_basic_sync, read_replica_e2e_fixture) {
}
}

FIXTURE_TEST(
test_read_replica_rejects_delete_records, read_replica_e2e_fixture) {
const model::topic topic_name("tapioca");
model::ntp ntp(model::kafka_namespace, topic_name, 0);
cluster::topic_properties props;
props.shadow_indexing = model::shadow_indexing_mode::full;
props.retention_local_target_bytes = tristate<size_t>(1);
add_topic({model::kafka_namespace, topic_name}, 1, props).get();
wait_for_leader(ntp).get();

auto partition = app.partition_manager.local().get(ntp).get();
auto& archiver = partition->archiver()->get();
BOOST_REQUIRE(archiver.sync_for_tests().get());
archiver.upload_topic_manifest().get();

auto rr_rp = start_read_replica_fixture();
cluster::topic_properties read_replica_props;
read_replica_props.shadow_indexing = model::shadow_indexing_mode::fetch;
read_replica_props.read_replica = true;
read_replica_props.read_replica_bucket = "test-bucket";
rr_rp
->add_topic({model::kafka_namespace, topic_name}, 1, read_replica_props)
.get();
rr_rp->wait_for_leader(ntp).get();

// Send the delete request to the _read replica_. This is not allowed.
tests::kafka_delete_records_transport deleter(
rr_rp->make_kafka_client().get());
deleter.start().get();
BOOST_REQUIRE_EXCEPTION(
deleter
.delete_records_from_partition(
topic_name, model::partition_id(0), model::offset(0), 5s)
.get(),
std::runtime_error,
[](std::runtime_error e) {
return std::string(e.what()).find("policy_violation")
!= std::string::npos;
});
}

FIXTURE_TEST(test_read_replica_delete_records, read_replica_e2e_fixture) {
const model::topic topic_name("tapioca");
model::ntp ntp(model::kafka_namespace, topic_name, 0);
Expand Down
3 changes: 3 additions & 0 deletions src/v/kafka/server/handlers/delete_records.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ validate_at_topic_level(request_context& ctx, const delete_records_topic& t) {
return ctx.authorized(security::acl_operation::remove, t.name);
};
const auto is_deletable = [](const cluster::topic_configuration& cfg) {
if (cfg.is_read_replica()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nicee , I didn't think of this +1

return false;
}
/// Immitates the logic in ntp_config::is_collectible
if (
!cfg.properties.has_overrides()
Expand Down