Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-6835 Fix first time compacted segment upload #22812

Merged
merged 3 commits into from
Aug 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions src/v/cluster/archival/archival_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,19 @@ archival_policy::lookup_result archival_policy::find_segment(
const auto& set = log->segments();
const auto& ntp_conf = log->config();
auto it = set.lower_bound(start_offset);
if (it == set.end() || eligible_for_compacted_reupload(**it)) {
// Skip forward if we hit a gap or compacted segment
for (auto i = set.begin(); i != set.end(); i++) {
const auto& sg = *i;
if (start_offset < sg->offsets().get_base_offset()) {
// Move last offset forward
it = i;
start_offset = sg->offsets().get_base_offset();
break;
}
}
if (it == set.end() && start_offset < log->offsets().committed_offset) {
// The 'start_offset' is in the gap. Normally this shouldn't happen.
vlog(
archival_log.warn,
"Upload policy for {}: can't find segment with base_offset={}",
_ntp,
start_offset);
it = std::find_if(
set.begin(),
set.end(),
[start_offset](const ss::lw_shared_ptr<storage::segment>& s) {
return s->offsets().get_base_offset() >= start_offset;
});
}
if (it == set.end()) {
vlog(
Expand Down
7 changes: 3 additions & 4 deletions src/v/cluster/archival/tests/ntp_archiver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -963,11 +963,10 @@ FIXTURE_TEST(
.get())
.candidate;

// The search is expected to find the next segment after the compacted
// segment, skipping the compacted one.
// The search is expected to find first compacted segment
BOOST_REQUIRE(!candidate.sources.empty());
BOOST_REQUIRE_GT(candidate.starting_offset(), 0);
BOOST_REQUIRE_GT(
BOOST_REQUIRE_EQUAL(candidate.starting_offset(), 0);
BOOST_REQUIRE_EQUAL(
candidate.sources.front()->offsets().get_base_offset(), model::offset{0});
}

Expand Down
182 changes: 181 additions & 1 deletion tests/rptest/tests/shadow_indexing_compacted_topic_test.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import pprint

from rptest.clients.rpk import RpkPartition, RpkTool
from rptest.services.admin import Admin
from rptest.clients.types import TopicSpec
from rptest.services.cluster import cluster
from rptest.services.metrics_check import MetricCheck
from rptest.services.redpanda import CloudStorageType, SISettings, make_redpanda_service, LoggingConfig, get_cloud_storage_type
from rptest.services.redpanda import CloudStorageType, SISettings, make_redpanda_service, LoggingConfig, get_cloud_storage_type, MetricsEndpoint
from rptest.tests.end_to_end import EndToEndTest
from rptest.util import wait_until_segments, wait_for_removal_of_n_segments
from rptest.utils.si_utils import BucketView
from ducktape.utils.util import wait_until
from ducktape.mark import matrix
import random
import time


class ShadowIndexingCompactedTopicTest(EndToEndTest):
Expand Down Expand Up @@ -156,3 +159,180 @@ def compacted_segments_uploaded():
self.logger.info(
f'manifest: {pprint.pformat(s3_snapshot.manifest_for_ntp(self.topic, 0))}'
)


class TSWithAlreadyCompactedTopic(EndToEndTest):
segment_size = 2**20
topics = (TopicSpec(name='panda-topic-no-ts',
partition_count=1,
replication_factor=3,
cleanup_policy='compact',
redpanda_remote_read=False,
redpanda_remote_write=False), )

def __init__(self, test_context, extra_rp_conf={}, environment=None):
super().__init__(test_context)
self.num_brokers = 3
self.si_settings = SISettings(test_context,
cloud_storage_max_connections=5,
fast_uploads=True,
cloud_storage_enable_remote_read=False,
cloud_storage_enable_remote_write=False)
extra_rp_conf.update(
dict(
enable_leader_balancer=False,
partition_autobalancing_mode="off",
group_initial_rebalance_delay=300,
compacted_log_segment_size=self.segment_size,
))
self.redpanda = make_redpanda_service(
context=self.test_context,
num_brokers=self.num_brokers,
si_settings=self.si_settings,
extra_rp_conf=extra_rp_conf,
environment=environment,
log_config=LoggingConfig(default_level='debug'))
self.topic = self.topics[0].name
self._rpk_client = RpkTool(self.redpanda)

def setUp(self):
super().setUp()
self.redpanda.start()
for topic in self.topics:
self._rpk_client.create_topic(
topic.name, topic.partition_count, topic.replication_factor, {
'cleanup.policy': topic.cleanup_policy,
})

def describe_topic(self, topic: str) -> list[RpkPartition]:
description: list[RpkPartition] | None = None

def capture_description_is_ok():
nonlocal description
description = list(self._rpk_client.describe_topic(topic))
return description is not None and len(description) > 0

wait_until(capture_description_is_ok,
timeout_sec=20,
backoff_sec=1,
err_msg=f"failed to get describe_topic {topic}")
assert description is not None, "description is None"
return description

@cluster(num_nodes=4)
def test_initial_upload(self):
"""Test initial upload of the compacted segments. The ntp_archiver
could start from already compacted data with gaps. We should be able
to upload such data without triggering validation errors."""
# This topic has TS turned off. The goal is to produce a lot of data
# and let it get compacted.
topic_spec = self.topics[0]
topic_name = topic_spec.name

self._rpk_client.cluster_config_set("log_compaction_interval_ms",
"9999999")
# actually produce data
self.start_producer(throughput=5000, repeating_keys=2)

for p in range(1000, 10000, 1000):
self.await_num_produced(min_records=p)
# Generate a bunch of config batches and terms
# to prevent a situation when everything except the
# last segment is compacted away. We want to have several
# compacted segments with different terms.
self._transfer_topic_leadership()

wait_until_segments(redpanda=self.redpanda,
topic=self.topic,
partition_idx=0,
count=10,
timeout_sec=300)

self.logger.info(
f"Stopping producer after writing up to offsets {self.producer.last_acked_offsets}"
)
self.producer.stop()

# Collect original file sizes
original_per_node_stat = {}
for node in self.redpanda.nodes:
stats = self.redpanda.data_stat(node)
total = sum(
[size for path, size in stats if path.suffix == ".log"])
original_per_node_stat[node] = total
self.logger.info(f"Size before compaction {total}")
assert total > 0, "No data found in the data dir"

# list redpanda dir before compaction
self.redpanda.storage(scan_cache=False)
# wait for compaction to do the job
self._rpk_client.cluster_config_set("log_compaction_interval_ms",
"500")

def compacted():
# Collect original file sizes
worst_ratio = 0.0
for node in self.redpanda.nodes:
new_stats = self.redpanda.data_stat(node)
new_size = sum([
size for path, size in new_stats if path.suffix == ".log"
])
old_size = original_per_node_stat[node]
ratio = new_size / old_size
worst_ratio = max(worst_ratio, ratio)
self.logger.info(
f"Old data dir size: {old_size}, new data dir size: {new_size}, ratio: {ratio}"
)
self.logger.info(
f"Worst compaction ration across the cluster is {worst_ratio}")
return worst_ratio < 0.8

wait_until(compacted,
timeout_sec=120,
backoff_sec=2,
err_msg=f"Segments were not compacted well enough")

# enable TS for the topic
self._rpk_client.alter_topic_config(topic_name, "redpanda.remote.read",
"true")
self._rpk_client.alter_topic_config(topic_name,
"redpanda.remote.write", "true")

# Transfer leadership while the data is uploaded.
for _ in range(0, 10):
self._transfer_topic_leadership()
time.sleep(1)

s3_snapshot = BucketView(self.redpanda, topics=self.topics)
self.logger.info(
f'manifest: {pprint.pformat(s3_snapshot.manifest_for_ntp(topic_name, 0))}'
)
s3_snapshot.assert_at_least_n_uploaded_segments_compacted(
topic_name, partition=0, revision=None, n=1)
self.logger.info(
f'manifest: {pprint.pformat(s3_snapshot.manifest_for_ntp(topic_name, 0))}'
)
# Given that we have multiple terms we shouldn't have only one segment
# even after compaction.
assert len(s3_snapshot.manifest_for_ntp(topic_name, 0)['segments']) > 1

def _transfer_topic_leadership(self):
admin = Admin(self.redpanda)
cur_leader = admin.get_partition_leader(namespace='kafka',
topic=self.topic,
partition=0)
broker_ids = [x['node_id'] for x in admin.get_brokers()]
transfer_to = random.choice([n for n in broker_ids if n != cur_leader])
assert cur_leader != transfer_to, "incorrect partition move in test"
admin.transfer_leadership_to(namespace="kafka",
topic=self.topic,
partition=0,
target_id=transfer_to,
leader_id=cur_leader)

admin.await_stable_leader(self.topic,
partition=0,
namespace='kafka',
timeout_s=60,
backoff_s=2,
check=lambda node_id: node_id == transfer_to)