Skip to content

Commit

Permalink
Merge pull request #22812 from Lazin/fix/upload-compacted-range
Browse files Browse the repository at this point in the history
CORE-6835 Fix first time compacted segment upload
  • Loading branch information
piyushredpanda authored Aug 17, 2024
2 parents fb9f1b9 + e2fd980 commit 0fe6230
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 16 deletions.
24 changes: 13 additions & 11 deletions src/v/cluster/archival/archival_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,19 @@ archival_policy::lookup_result archival_policy::find_segment(
const auto& set = log->segments();
const auto& ntp_conf = log->config();
auto it = set.lower_bound(start_offset);
if (it == set.end() || eligible_for_compacted_reupload(**it)) {
// Skip forward if we hit a gap or compacted segment
for (auto i = set.begin(); i != set.end(); i++) {
const auto& sg = *i;
if (start_offset < sg->offsets().get_base_offset()) {
// Move last offset forward
it = i;
start_offset = sg->offsets().get_base_offset();
break;
}
}
if (it == set.end() && start_offset < log->offsets().committed_offset) {
// The 'start_offset' is in the gap. Normally this shouldn't happen.
vlog(
archival_log.warn,
"Upload policy for {}: can't find segment with base_offset={}",
_ntp,
start_offset);
it = std::find_if(
set.begin(),
set.end(),
[start_offset](const ss::lw_shared_ptr<storage::segment>& s) {
return s->offsets().get_base_offset() >= start_offset;
});
}
if (it == set.end()) {
vlog(
Expand Down
7 changes: 3 additions & 4 deletions src/v/cluster/archival/tests/ntp_archiver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -963,11 +963,10 @@ FIXTURE_TEST(
.get())
.candidate;

// The search is expected to find the next segment after the compacted
// segment, skipping the compacted one.
// The search is expected to find first compacted segment
BOOST_REQUIRE(!candidate.sources.empty());
BOOST_REQUIRE_GT(candidate.starting_offset(), 0);
BOOST_REQUIRE_GT(
BOOST_REQUIRE_EQUAL(candidate.starting_offset(), 0);
BOOST_REQUIRE_EQUAL(
candidate.sources.front()->offsets().get_base_offset(), model::offset{0});
}

Expand Down
182 changes: 181 additions & 1 deletion tests/rptest/tests/shadow_indexing_compacted_topic_test.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import pprint

from rptest.clients.rpk import RpkPartition, RpkTool
from rptest.services.admin import Admin
from rptest.clients.types import TopicSpec
from rptest.services.cluster import cluster
from rptest.services.metrics_check import MetricCheck
from rptest.services.redpanda import CloudStorageType, SISettings, make_redpanda_service, LoggingConfig, get_cloud_storage_type
from rptest.services.redpanda import CloudStorageType, SISettings, make_redpanda_service, LoggingConfig, get_cloud_storage_type, MetricsEndpoint
from rptest.tests.end_to_end import EndToEndTest
from rptest.util import wait_until_segments, wait_for_removal_of_n_segments
from rptest.utils.si_utils import BucketView
from ducktape.utils.util import wait_until
from ducktape.mark import matrix
import random
import time


class ShadowIndexingCompactedTopicTest(EndToEndTest):
Expand Down Expand Up @@ -156,3 +159,180 @@ def compacted_segments_uploaded():
self.logger.info(
f'manifest: {pprint.pformat(s3_snapshot.manifest_for_ntp(self.topic, 0))}'
)


class TSWithAlreadyCompactedTopic(EndToEndTest):
segment_size = 2**20
topics = (TopicSpec(name='panda-topic-no-ts',
partition_count=1,
replication_factor=3,
cleanup_policy='compact',
redpanda_remote_read=False,
redpanda_remote_write=False), )

def __init__(self, test_context, extra_rp_conf={}, environment=None):
super().__init__(test_context)
self.num_brokers = 3
self.si_settings = SISettings(test_context,
cloud_storage_max_connections=5,
fast_uploads=True,
cloud_storage_enable_remote_read=False,
cloud_storage_enable_remote_write=False)
extra_rp_conf.update(
dict(
enable_leader_balancer=False,
partition_autobalancing_mode="off",
group_initial_rebalance_delay=300,
compacted_log_segment_size=self.segment_size,
))
self.redpanda = make_redpanda_service(
context=self.test_context,
num_brokers=self.num_brokers,
si_settings=self.si_settings,
extra_rp_conf=extra_rp_conf,
environment=environment,
log_config=LoggingConfig(default_level='debug'))
self.topic = self.topics[0].name
self._rpk_client = RpkTool(self.redpanda)

def setUp(self):
super().setUp()
self.redpanda.start()
for topic in self.topics:
self._rpk_client.create_topic(
topic.name, topic.partition_count, topic.replication_factor, {
'cleanup.policy': topic.cleanup_policy,
})

def describe_topic(self, topic: str) -> list[RpkPartition]:
description: list[RpkPartition] | None = None

def capture_description_is_ok():
nonlocal description
description = list(self._rpk_client.describe_topic(topic))
return description is not None and len(description) > 0

wait_until(capture_description_is_ok,
timeout_sec=20,
backoff_sec=1,
err_msg=f"failed to get describe_topic {topic}")
assert description is not None, "description is None"
return description

@cluster(num_nodes=4)
def test_initial_upload(self):
"""Test initial upload of the compacted segments. The ntp_archiver
could start from already compacted data with gaps. We should be able
to upload such data without triggering validation errors."""
# This topic has TS turned off. The goal is to produce a lot of data
# and let it get compacted.
topic_spec = self.topics[0]
topic_name = topic_spec.name

self._rpk_client.cluster_config_set("log_compaction_interval_ms",
"9999999")
# actually produce data
self.start_producer(throughput=5000, repeating_keys=2)

for p in range(1000, 10000, 1000):
self.await_num_produced(min_records=p)
# Generate a bunch of config batches and terms
# to prevent a situation when everything except the
# last segment is compacted away. We want to have several
# compacted segments with different terms.
self._transfer_topic_leadership()

wait_until_segments(redpanda=self.redpanda,
topic=self.topic,
partition_idx=0,
count=10,
timeout_sec=300)

self.logger.info(
f"Stopping producer after writing up to offsets {self.producer.last_acked_offsets}"
)
self.producer.stop()

# Collect original file sizes
original_per_node_stat = {}
for node in self.redpanda.nodes:
stats = self.redpanda.data_stat(node)
total = sum(
[size for path, size in stats if path.suffix == ".log"])
original_per_node_stat[node] = total
self.logger.info(f"Size before compaction {total}")
assert total > 0, "No data found in the data dir"

# list redpanda dir before compaction
self.redpanda.storage(scan_cache=False)
# wait for compaction to do the job
self._rpk_client.cluster_config_set("log_compaction_interval_ms",
"500")

def compacted():
# Collect original file sizes
worst_ratio = 0.0
for node in self.redpanda.nodes:
new_stats = self.redpanda.data_stat(node)
new_size = sum([
size for path, size in new_stats if path.suffix == ".log"
])
old_size = original_per_node_stat[node]
ratio = new_size / old_size
worst_ratio = max(worst_ratio, ratio)
self.logger.info(
f"Old data dir size: {old_size}, new data dir size: {new_size}, ratio: {ratio}"
)
self.logger.info(
f"Worst compaction ration across the cluster is {worst_ratio}")
return worst_ratio < 0.8

wait_until(compacted,
timeout_sec=120,
backoff_sec=2,
err_msg=f"Segments were not compacted well enough")

# enable TS for the topic
self._rpk_client.alter_topic_config(topic_name, "redpanda.remote.read",
"true")
self._rpk_client.alter_topic_config(topic_name,
"redpanda.remote.write", "true")

# Transfer leadership while the data is uploaded.
for _ in range(0, 10):
self._transfer_topic_leadership()
time.sleep(1)

s3_snapshot = BucketView(self.redpanda, topics=self.topics)
self.logger.info(
f'manifest: {pprint.pformat(s3_snapshot.manifest_for_ntp(topic_name, 0))}'
)
s3_snapshot.assert_at_least_n_uploaded_segments_compacted(
topic_name, partition=0, revision=None, n=1)
self.logger.info(
f'manifest: {pprint.pformat(s3_snapshot.manifest_for_ntp(topic_name, 0))}'
)
# Given that we have multiple terms we shouldn't have only one segment
# even after compaction.
assert len(s3_snapshot.manifest_for_ntp(topic_name, 0)['segments']) > 1

def _transfer_topic_leadership(self):
admin = Admin(self.redpanda)
cur_leader = admin.get_partition_leader(namespace='kafka',
topic=self.topic,
partition=0)
broker_ids = [x['node_id'] for x in admin.get_brokers()]
transfer_to = random.choice([n for n in broker_ids if n != cur_leader])
assert cur_leader != transfer_to, "incorrect partition move in test"
admin.transfer_leadership_to(namespace="kafka",
topic=self.topic,
partition=0,
target_id=transfer_to,
leader_id=cur_leader)

admin.await_stable_leader(self.topic,
partition=0,
namespace='kafka',
timeout_s=60,
backoff_s=2,
check=lambda node_id: node_id == transfer_to)

0 comments on commit 0fe6230

Please sign in to comment.