Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 41 additions & 27 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1551,6 +1551,46 @@ Status CloudCompactionMixin::modify_rowsets() {
return Status::OK();
}

Status CloudCompactionMixin::set_storage_resource_from_input_rowsets(RowsetWriterContext& ctx) {
// Set storage resource from input rowsets by iterating backwards to find the first rowset
// with non-empty resource_id. This handles two scenarios:
// 1. Hole rowsets compaction: Multiple hole rowsets may lack storage resource.
// Example: [0-1, 2-2, 3-3, 4-4, 5-5] where 2-5 are hole rowsets.
// If 0-1 lacks resource_id, then 2-5 also lack resource_id.
// 2. Schema change: New tablet may have later version empty rowsets without resource_id,
// but middle rowsets get resource_id after historical rowsets are converted.
// We iterate backwards to find the most recent rowset with valid resource_id.

for (const auto& rowset : std::ranges::reverse_view(_input_rowsets)) {
const auto& resource_id = rowset->rowset_meta()->resource_id();

if (!resource_id.empty()) {
ctx.storage_resource = *DORIS_TRY(rowset->rowset_meta()->remote_storage_resource());
return Status::OK();
}

// Validate that non-empty rowsets (num_segments > 0) must have valid resource_id
// Only hole rowsets or empty rowsets are allowed to have empty resource_id
if (rowset->num_segments() > 0) {
auto error_msg = fmt::format(
"Non-empty rowset must have valid resource_id. "
"rowset_id={}, version=[{}-{}], is_hole_rowset={}, num_segments={}, "
"tablet_id={}, table_id={}",
rowset->rowset_id().to_string(), rowset->start_version(), rowset->end_version(),
rowset->is_hole_rowset(), rowset->num_segments(), _tablet->tablet_id(),
_tablet->table_id());

#ifndef BE_TEST
DCHECK(false) << error_msg;
#endif

return Status::InternalError<false>(error_msg);
}
}

return Status::OK();
}

Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx) {
// only do index compaction for dup_keys and unique_keys with mow enabled
if (_enable_inverted_index_compaction && (((_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
Expand All @@ -1560,33 +1600,7 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
}

// Use the storage resource of the previous rowset.
// There are two scenarios where rowsets may not have a storage resource:
// 1. When multiple hole rowsets doing compaction, those rowsets may not have a storage resource.
// case: [0-1, 2-2, 3-3, 4-4, 5-5], 2-5 are hole rowsets.
// 0-1 currently doesn't have a resource_id, so 2-5 also have no resource_id.
// 2. During schema change, new tablet may have some later version empty rowsets without resource_id,
// but middle rowsets get resource_id after historical rowsets are converted.
// We need to iterate backwards to find a rowset with non-empty resource_id.
for (const auto& rowset : std::ranges::reverse_view(_input_rowsets)) {
if (!rowset->rowset_meta()->resource_id().empty()) {
ctx.storage_resource = *DORIS_TRY(rowset->rowset_meta()->remote_storage_resource());
break;
} else {
DCHECK(rowset->is_hole_rowset() || rowset->end_version() == 1)
<< "Non-hole rowset with version != [0-1] must have non-empty resource_id"
<< ", rowset_id=" << rowset->rowset_id() << ", version=["
<< rowset->start_version() << "-" << rowset->end_version() << "]"
<< ", is_hole_rowset=" << rowset->is_hole_rowset()
<< ", tablet_id=" << _tablet->tablet_id();
if (!rowset->is_hole_rowset() && rowset->end_version() != 1) {
return Status::InternalError<false>(
"Non-hole rowset with version != [0-1] must have non-empty resource_id"
", rowset_id={}, version=[{}-{}], is_hole_rowset={}, tablet_id={}",
rowset->rowset_id().to_string(), rowset->start_version(),
rowset->end_version(), rowset->is_hole_rowset(), _tablet->tablet_id());
}
}
}
RETURN_IF_ERROR(set_storage_resource_from_input_rowsets(ctx));

ctx.txn_id = boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) &
std::numeric_limits<int64_t>::max(); // MUST be positive
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ class CloudCompactionMixin : public Compaction {
private:
Status construct_output_rowset_writer(RowsetWriterContext& ctx) override;

Status set_storage_resource_from_input_rowsets(RowsetWriterContext& ctx);

Status execute_compact_impl(int64_t permits);

Status build_basic_info();
Expand Down
190 changes: 189 additions & 1 deletion be/test/cloud/cloud_compaction_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/cloud_tablet_mgr.h"
#include "gtest/gtest_pred_impl.h"
#include "json2pb/json_to_pb.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/storage_policy.h"
#include "olap/tablet_meta.h"
#include "util/uid_util.h"

Expand Down Expand Up @@ -192,4 +192,192 @@ TEST_F(CloudCompactionTest, failure_cumu_compaction_tablet_sleep_test) {
ASSERT_EQ(st, Status::OK());
ASSERT_EQ(tablets.size(), 0);
}

static RowsetSharedPtr create_rowset(Version version, int num_segments, bool overlapping,
int data_size) {
auto rs_meta = std::make_shared<RowsetMeta>();
rs_meta->set_rowset_type(BETA_ROWSET); // important
rs_meta->_rowset_meta_pb.set_start_version(version.first);
rs_meta->_rowset_meta_pb.set_end_version(version.second);
rs_meta->set_num_segments(num_segments);
rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING);
rs_meta->set_total_disk_size(data_size);
RowsetSharedPtr rowset;
Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta, &rowset);
if (!st.ok()) {
return nullptr;
}
return rowset;
}

class TestableCloudCompaction : public CloudCompactionMixin {
public:
TestableCloudCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet)
: CloudCompactionMixin(engine, tablet, "test_compaction") {}

// Set input rowsets for testing
void set_input_rowsets(const std::vector<RowsetSharedPtr>& rowsets) {
_input_rowsets = rowsets;
}

Status prepare_compact() override { return Status::OK(); }

ReaderType compaction_type() const override { return ReaderType::READER_CUMULATIVE_COMPACTION; }

std::string_view compaction_name() const override { return "test_compaction"; }
};

TEST_F(CloudCompactionTest, test_set_storage_resource_from_input_rowsets) {
S3Conf s3_conf {.bucket = "bucket",
.prefix = "prefix",
.client_conf = {
.endpoint = "endpoint",
.region = "region",
.ak = "ak",
.sk = "sk",
.token = "",
.bucket = "",
.role_arn = "",
.external_id = "",
}};
std::string resource_id = "10000";
auto res = io::S3FileSystem::create(std::move(s3_conf), resource_id);
ASSERT_TRUE(res.has_value()) << res.error();
auto fs = res.value();
StorageResource storage_resource(fs);

CloudTabletSPtr tablet = std::make_shared<CloudTablet>(_engine, _tablet_meta);
TestableCloudCompaction compaction(_engine, tablet);

// Test case 1: All rowsets are empty (num_segments = 0) - should succeed
{
std::vector<RowsetSharedPtr> rowsets;

RowsetSharedPtr rowset1 = create_rowset(Version(2, 2), 0, false, 41);
ASSERT_TRUE(rowset1 != nullptr);
rowset1->set_hole_rowset(true); // Mark as hole rowset since num_segments=0
rowsets.push_back(rowset1);

RowsetSharedPtr rowset2 = create_rowset(Version(3, 3), 0, false, 41);
ASSERT_TRUE(rowset2 != nullptr);
rowset2->set_hole_rowset(true); // Mark as hole rowset since num_segments=0
rowsets.push_back(rowset2);

compaction.set_input_rowsets(rowsets);

RowsetWriterContext ctx;
Status st = compaction.set_storage_resource_from_input_rowsets(ctx);
ASSERT_TRUE(st.ok()) << st.to_string();
// No storage resource should be set since no rowset has resource_id
ASSERT_FALSE(ctx.storage_resource.has_value());
}

// Test case 2: Backward iteration - last rowset has resource_id
{
std::vector<RowsetSharedPtr> rowsets;

// First rowset: empty, no resource_id
RowsetSharedPtr rowset1 = create_rowset(Version(2, 2), 0, false, 41);
ASSERT_TRUE(rowset1 != nullptr);
rowset1->set_hole_rowset(true);
rowsets.push_back(rowset1);

// Second rowset: empty, no resource_id
RowsetSharedPtr rowset2 = create_rowset(Version(3, 3), 0, false, 41);
ASSERT_TRUE(rowset2 != nullptr);
rowset2->set_hole_rowset(true);
rowsets.push_back(rowset2);

// Third rowset: has resource_id (should be found during backward iteration)
RowsetSharedPtr rowset3 = create_rowset(Version(4, 4), 1, false, 41);
ASSERT_TRUE(rowset3 != nullptr);
rowset3->rowset_meta()->set_remote_storage_resource(storage_resource);
rowsets.push_back(rowset3);

compaction.set_input_rowsets(rowsets);

RowsetWriterContext ctx;
Status st = compaction.set_storage_resource_from_input_rowsets(ctx);
ASSERT_TRUE(st.ok()) << st.to_string();
// Storage resource should be set from rowset3
ASSERT_TRUE(ctx.storage_resource.has_value());
}

// Test case 3: Multiple rowsets with resource_id - should use the last one (backward iteration)
{
std::vector<RowsetSharedPtr> rowsets;

// First rowset: has resource_id
RowsetSharedPtr rowset1 = create_rowset(Version(2, 2), 1, false, 41);
ASSERT_TRUE(rowset1 != nullptr);
StorageResource first_resource(fs);
rowset1->rowset_meta()->set_remote_storage_resource(first_resource);
rowsets.push_back(rowset1);

// Second rowset: empty, no resource_id
RowsetSharedPtr rowset2 = create_rowset(Version(3, 3), 0, false, 41);
ASSERT_TRUE(rowset2 != nullptr);
rowset2->set_hole_rowset(true);
rowsets.push_back(rowset2);

// Third rowset: has different resource_id (should be used due to backward iteration)
RowsetSharedPtr rowset3 = create_rowset(Version(4, 4), 1, false, 41);
ASSERT_TRUE(rowset3 != nullptr);
rowset3->rowset_meta()->set_remote_storage_resource(storage_resource);
rowsets.push_back(rowset3);

compaction.set_input_rowsets(rowsets);

RowsetWriterContext ctx;
Status st = compaction.set_storage_resource_from_input_rowsets(ctx);
ASSERT_TRUE(st.ok()) << st.to_string();
// Storage resource should be set from rowset3 (last one with resource_id)
ASSERT_TRUE(ctx.storage_resource.has_value());
}

// Test case 4: Non-empty rowset in the middle without resource_id - should fail
{
std::vector<RowsetSharedPtr> rowsets;

// First rowset: has resource_id
RowsetSharedPtr rowset1 = create_rowset(Version(2, 2), 1, false, 41);
ASSERT_TRUE(rowset1 != nullptr);
rowset1->rowset_meta()->set_remote_storage_resource(storage_resource);
rowsets.push_back(rowset1);

// Second rowset: non-empty but no resource_id (invalid)
RowsetSharedPtr rowset2 = create_rowset(Version(3, 3), 2, false, 41);
ASSERT_TRUE(rowset2 != nullptr);
// Intentionally don't set resource_id
rowsets.push_back(rowset2);

// Third rowset: empty, no resource_id
RowsetSharedPtr rowset3 = create_rowset(Version(4, 4), 0, false, 41);
ASSERT_TRUE(rowset3 != nullptr);
rowset3->set_hole_rowset(true); // Mark as hole rowset since num_segments=0
rowsets.push_back(rowset3);

compaction.set_input_rowsets(rowsets);

RowsetWriterContext ctx;
Status st = compaction.set_storage_resource_from_input_rowsets(ctx);
ASSERT_TRUE(st.is<ErrorCode::INTERNAL_ERROR>());
ASSERT_TRUE(st.to_string().find("Non-empty rowset must have valid resource_id") !=
std::string::npos)
<< st.to_string();
}

// Test case 5: Empty input rowsets - should succeed
{
std::vector<RowsetSharedPtr> rowsets; // Empty vector

compaction.set_input_rowsets(rowsets);

RowsetWriterContext ctx;
Status st = compaction.set_storage_resource_from_input_rowsets(ctx);
ASSERT_TRUE(st.ok()) << st.to_string();
// No storage resource should be set
ASSERT_FALSE(ctx.storage_resource.has_value());
}
}
} // namespace doris
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 2 3 4 5 6.6 1.7 9 a b c 2021-10-30 2021-10-30T00:00

-- !sql2 --
1 2 3 4 5 6.6 1.7 9 a b c 2021-10-30 2021-10-30T00:00

-- !sql3 --
1 2 3 4 5 6.6 1.7 9 a b c 2021-10-30 2021-10-30T00:00
2 2 3 4 5 6.6 1.7 9 a b c 2021-10-30 2021-10-30T00:00

Loading
Loading