diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 74f86d062f11cd..9dd8dd5d2f8b36 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -1551,6 +1551,46 @@ Status CloudCompactionMixin::modify_rowsets() { return Status::OK(); } +Status CloudCompactionMixin::set_storage_resource_from_input_rowsets(RowsetWriterContext& ctx) { + // Set storage resource from input rowsets by iterating backwards to find the first rowset + // with non-empty resource_id. This handles two scenarios: + // 1. Hole rowsets compaction: Multiple hole rowsets may lack storage resource. + // Example: [0-1, 2-2, 3-3, 4-4, 5-5] where 2-5 are hole rowsets. + // If 0-1 lacks resource_id, then 2-5 also lack resource_id. + // 2. Schema change: New tablet may have later version empty rowsets without resource_id, + // but middle rowsets get resource_id after historical rowsets are converted. + // We iterate backwards to find the most recent rowset with valid resource_id. + + for (const auto& rowset : std::ranges::reverse_view(_input_rowsets)) { + const auto& resource_id = rowset->rowset_meta()->resource_id(); + + if (!resource_id.empty()) { + ctx.storage_resource = *DORIS_TRY(rowset->rowset_meta()->remote_storage_resource()); + return Status::OK(); + } + + // Validate that non-empty rowsets (num_segments > 0) must have valid resource_id + // Only hole rowsets or empty rowsets are allowed to have empty resource_id + if (rowset->num_segments() > 0) { + auto error_msg = fmt::format( + "Non-empty rowset must have valid resource_id. " + "rowset_id={}, version=[{}-{}], is_hole_rowset={}, num_segments={}, " + "tablet_id={}, table_id={}", + rowset->rowset_id().to_string(), rowset->start_version(), rowset->end_version(), + rowset->is_hole_rowset(), rowset->num_segments(), _tablet->tablet_id(), + _tablet->table_id()); + +#ifndef BE_TEST + DCHECK(false) << error_msg; +#endif + + return Status::InternalError(error_msg); + } + } + + return Status::OK(); +} + Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx) { // only do index compaction for dup_keys and unique_keys with mow enabled if (_enable_inverted_index_compaction && (((_tablet->keys_type() == KeysType::UNIQUE_KEYS && @@ -1560,33 +1600,7 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext& } // Use the storage resource of the previous rowset. - // There are two scenarios where rowsets may not have a storage resource: - // 1. When multiple hole rowsets doing compaction, those rowsets may not have a storage resource. - // case: [0-1, 2-2, 3-3, 4-4, 5-5], 2-5 are hole rowsets. - // 0-1 currently doesn't have a resource_id, so 2-5 also have no resource_id. - // 2. During schema change, new tablet may have some later version empty rowsets without resource_id, - // but middle rowsets get resource_id after historical rowsets are converted. - // We need to iterate backwards to find a rowset with non-empty resource_id. - for (const auto& rowset : std::ranges::reverse_view(_input_rowsets)) { - if (!rowset->rowset_meta()->resource_id().empty()) { - ctx.storage_resource = *DORIS_TRY(rowset->rowset_meta()->remote_storage_resource()); - break; - } else { - DCHECK(rowset->is_hole_rowset() || rowset->end_version() == 1) - << "Non-hole rowset with version != [0-1] must have non-empty resource_id" - << ", rowset_id=" << rowset->rowset_id() << ", version=[" - << rowset->start_version() << "-" << rowset->end_version() << "]" - << ", is_hole_rowset=" << rowset->is_hole_rowset() - << ", tablet_id=" << _tablet->tablet_id(); - if (!rowset->is_hole_rowset() && rowset->end_version() != 1) { - return Status::InternalError( - "Non-hole rowset with version != [0-1] must have non-empty resource_id" - ", rowset_id={}, version=[{}-{}], is_hole_rowset={}, tablet_id={}", - rowset->rowset_id().to_string(), rowset->start_version(), - rowset->end_version(), rowset->is_hole_rowset(), _tablet->tablet_id()); - } - } - } + RETURN_IF_ERROR(set_storage_resource_from_input_rowsets(ctx)); ctx.txn_id = boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) & std::numeric_limits::max(); // MUST be positive diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index f981f0107522aa..57ce98bef15aae 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -236,6 +236,8 @@ class CloudCompactionMixin : public Compaction { private: Status construct_output_rowset_writer(RowsetWriterContext& ctx) override; + Status set_storage_resource_from_input_rowsets(RowsetWriterContext& ctx); + Status execute_compact_impl(int64_t permits); Status build_basic_info(); diff --git a/be/test/cloud/cloud_compaction_test.cpp b/be/test/cloud/cloud_compaction_test.cpp index 0e81b2c49f81e9..67ba84021aca4c 100644 --- a/be/test/cloud/cloud_compaction_test.cpp +++ b/be/test/cloud/cloud_compaction_test.cpp @@ -26,11 +26,11 @@ #include "cloud/cloud_storage_engine.h" #include "cloud/cloud_tablet.h" #include "cloud/cloud_tablet_mgr.h" -#include "gtest/gtest_pred_impl.h" #include "json2pb/json_to_pb.h" #include "olap/olap_common.h" #include "olap/rowset/rowset_factory.h" #include "olap/rowset/rowset_meta.h" +#include "olap/storage_policy.h" #include "olap/tablet_meta.h" #include "util/uid_util.h" @@ -192,4 +192,192 @@ TEST_F(CloudCompactionTest, failure_cumu_compaction_tablet_sleep_test) { ASSERT_EQ(st, Status::OK()); ASSERT_EQ(tablets.size(), 0); } + +static RowsetSharedPtr create_rowset(Version version, int num_segments, bool overlapping, + int data_size) { + auto rs_meta = std::make_shared(); + rs_meta->set_rowset_type(BETA_ROWSET); // important + rs_meta->_rowset_meta_pb.set_start_version(version.first); + rs_meta->_rowset_meta_pb.set_end_version(version.second); + rs_meta->set_num_segments(num_segments); + rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING); + rs_meta->set_total_disk_size(data_size); + RowsetSharedPtr rowset; + Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta, &rowset); + if (!st.ok()) { + return nullptr; + } + return rowset; +} + +class TestableCloudCompaction : public CloudCompactionMixin { +public: + TestableCloudCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet) + : CloudCompactionMixin(engine, tablet, "test_compaction") {} + + // Set input rowsets for testing + void set_input_rowsets(const std::vector& rowsets) { + _input_rowsets = rowsets; + } + + Status prepare_compact() override { return Status::OK(); } + + ReaderType compaction_type() const override { return ReaderType::READER_CUMULATIVE_COMPACTION; } + + std::string_view compaction_name() const override { return "test_compaction"; } +}; + +TEST_F(CloudCompactionTest, test_set_storage_resource_from_input_rowsets) { + S3Conf s3_conf {.bucket = "bucket", + .prefix = "prefix", + .client_conf = { + .endpoint = "endpoint", + .region = "region", + .ak = "ak", + .sk = "sk", + .token = "", + .bucket = "", + .role_arn = "", + .external_id = "", + }}; + std::string resource_id = "10000"; + auto res = io::S3FileSystem::create(std::move(s3_conf), resource_id); + ASSERT_TRUE(res.has_value()) << res.error(); + auto fs = res.value(); + StorageResource storage_resource(fs); + + CloudTabletSPtr tablet = std::make_shared(_engine, _tablet_meta); + TestableCloudCompaction compaction(_engine, tablet); + + // Test case 1: All rowsets are empty (num_segments = 0) - should succeed + { + std::vector rowsets; + + RowsetSharedPtr rowset1 = create_rowset(Version(2, 2), 0, false, 41); + ASSERT_TRUE(rowset1 != nullptr); + rowset1->set_hole_rowset(true); // Mark as hole rowset since num_segments=0 + rowsets.push_back(rowset1); + + RowsetSharedPtr rowset2 = create_rowset(Version(3, 3), 0, false, 41); + ASSERT_TRUE(rowset2 != nullptr); + rowset2->set_hole_rowset(true); // Mark as hole rowset since num_segments=0 + rowsets.push_back(rowset2); + + compaction.set_input_rowsets(rowsets); + + RowsetWriterContext ctx; + Status st = compaction.set_storage_resource_from_input_rowsets(ctx); + ASSERT_TRUE(st.ok()) << st.to_string(); + // No storage resource should be set since no rowset has resource_id + ASSERT_FALSE(ctx.storage_resource.has_value()); + } + + // Test case 2: Backward iteration - last rowset has resource_id + { + std::vector rowsets; + + // First rowset: empty, no resource_id + RowsetSharedPtr rowset1 = create_rowset(Version(2, 2), 0, false, 41); + ASSERT_TRUE(rowset1 != nullptr); + rowset1->set_hole_rowset(true); + rowsets.push_back(rowset1); + + // Second rowset: empty, no resource_id + RowsetSharedPtr rowset2 = create_rowset(Version(3, 3), 0, false, 41); + ASSERT_TRUE(rowset2 != nullptr); + rowset2->set_hole_rowset(true); + rowsets.push_back(rowset2); + + // Third rowset: has resource_id (should be found during backward iteration) + RowsetSharedPtr rowset3 = create_rowset(Version(4, 4), 1, false, 41); + ASSERT_TRUE(rowset3 != nullptr); + rowset3->rowset_meta()->set_remote_storage_resource(storage_resource); + rowsets.push_back(rowset3); + + compaction.set_input_rowsets(rowsets); + + RowsetWriterContext ctx; + Status st = compaction.set_storage_resource_from_input_rowsets(ctx); + ASSERT_TRUE(st.ok()) << st.to_string(); + // Storage resource should be set from rowset3 + ASSERT_TRUE(ctx.storage_resource.has_value()); + } + + // Test case 3: Multiple rowsets with resource_id - should use the last one (backward iteration) + { + std::vector rowsets; + + // First rowset: has resource_id + RowsetSharedPtr rowset1 = create_rowset(Version(2, 2), 1, false, 41); + ASSERT_TRUE(rowset1 != nullptr); + StorageResource first_resource(fs); + rowset1->rowset_meta()->set_remote_storage_resource(first_resource); + rowsets.push_back(rowset1); + + // Second rowset: empty, no resource_id + RowsetSharedPtr rowset2 = create_rowset(Version(3, 3), 0, false, 41); + ASSERT_TRUE(rowset2 != nullptr); + rowset2->set_hole_rowset(true); + rowsets.push_back(rowset2); + + // Third rowset: has different resource_id (should be used due to backward iteration) + RowsetSharedPtr rowset3 = create_rowset(Version(4, 4), 1, false, 41); + ASSERT_TRUE(rowset3 != nullptr); + rowset3->rowset_meta()->set_remote_storage_resource(storage_resource); + rowsets.push_back(rowset3); + + compaction.set_input_rowsets(rowsets); + + RowsetWriterContext ctx; + Status st = compaction.set_storage_resource_from_input_rowsets(ctx); + ASSERT_TRUE(st.ok()) << st.to_string(); + // Storage resource should be set from rowset3 (last one with resource_id) + ASSERT_TRUE(ctx.storage_resource.has_value()); + } + + // Test case 4: Non-empty rowset in the middle without resource_id - should fail + { + std::vector rowsets; + + // First rowset: has resource_id + RowsetSharedPtr rowset1 = create_rowset(Version(2, 2), 1, false, 41); + ASSERT_TRUE(rowset1 != nullptr); + rowset1->rowset_meta()->set_remote_storage_resource(storage_resource); + rowsets.push_back(rowset1); + + // Second rowset: non-empty but no resource_id (invalid) + RowsetSharedPtr rowset2 = create_rowset(Version(3, 3), 2, false, 41); + ASSERT_TRUE(rowset2 != nullptr); + // Intentionally don't set resource_id + rowsets.push_back(rowset2); + + // Third rowset: empty, no resource_id + RowsetSharedPtr rowset3 = create_rowset(Version(4, 4), 0, false, 41); + ASSERT_TRUE(rowset3 != nullptr); + rowset3->set_hole_rowset(true); // Mark as hole rowset since num_segments=0 + rowsets.push_back(rowset3); + + compaction.set_input_rowsets(rowsets); + + RowsetWriterContext ctx; + Status st = compaction.set_storage_resource_from_input_rowsets(ctx); + ASSERT_TRUE(st.is()); + ASSERT_TRUE(st.to_string().find("Non-empty rowset must have valid resource_id") != + std::string::npos) + << st.to_string(); + } + + // Test case 5: Empty input rowsets - should succeed + { + std::vector rowsets; // Empty vector + + compaction.set_input_rowsets(rowsets); + + RowsetWriterContext ctx; + Status st = compaction.set_storage_resource_from_input_rowsets(ctx); + ASSERT_TRUE(st.ok()) << st.to_string(); + // No storage resource should be set + ASSERT_FALSE(ctx.storage_resource.has_value()); + } +} } // namespace doris diff --git a/regression-test/data/compaction/test_compaction_with_empty_rowset.out b/regression-test/data/compaction/test_compaction_with_empty_rowset.out new file mode 100644 index 00000000000000..8f98a92a26c2d0 --- /dev/null +++ b/regression-test/data/compaction/test_compaction_with_empty_rowset.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 2 3 4 5 6.6 1.7 9 a b c 2021-10-30 2021-10-30T00:00 + +-- !sql2 -- +1 2 3 4 5 6.6 1.7 9 a b c 2021-10-30 2021-10-30T00:00 + +-- !sql3 -- +1 2 3 4 5 6.6 1.7 9 a b c 2021-10-30 2021-10-30T00:00 +2 2 3 4 5 6.6 1.7 9 a b c 2021-10-30 2021-10-30T00:00 + diff --git a/regression-test/suites/compaction/test_compaction_with_empty_rowset.groovy b/regression-test/suites/compaction/test_compaction_with_empty_rowset.groovy new file mode 100644 index 00000000000000..6f6f869917d903 --- /dev/null +++ b/regression-test/suites/compaction/test_compaction_with_empty_rowset.groovy @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods +import org.awaitility.Awaitility + +suite("test_compaction_mow_with_empty_rowset", "p0") { + def tableName = "test_compaction_with_empty_rowset" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(11) NULL, + `k2` tinyint(4) NULL, + `k3` smallint(6) NULL, + `k4` int(30) NULL, + `k5` largeint(40) NULL, + `k6` float NULL, + `k7` double NULL, + `k8` decimal(9, 0) NULL, + `k9` char(10) NULL, + `k10` varchar(1024) NULL, + `k11` text NULL, + `k12` date NULL, + `k13` datetime NULL + ) ENGINE=OLAP + unique KEY(k1, k2, k3) + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true" + ); + """ + + for (int i = 0; i < 10; i++) { + sql """ insert into ${tableName} values (1, 2, 3, 4, 5, 6.6, 1.7, 8.8, + 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00') """ + } + + qt_sql """ select * from ${tableName} order by k1, k2, k3 """ + + + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + + def replicaNum = get_table_replica_num(tableName) + logger.info("get table replica num: " + replicaNum) + + // trigger compactions for all tablets in ${tableName} + trigger_and_wait_compaction(tableName, "cumulative") + int rowCount = 0 + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + def (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + for (String rowset in (List) tabletJson.rowsets) { + rowCount += Integer.parseInt(rowset.split(" ")[1]) + } + } + assert (rowCount < 10 * replicaNum) + qt_sql2 """ select * from ${tableName} order by k1, k2, k3 """ + + for (int i = 0; i < 10; i++) { + sql """ insert into ${tableName} values (2, 2, 3, 4, 5, 6.6, 1.7, 8.8, + 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00') """ + } + + // trigger compactions for all tablets in ${tableName} + trigger_and_wait_compaction(tableName, "cumulative") + rowCount = 0 + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + def (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + for (String rowset in (List) tabletJson.rowsets) { + rowCount += Integer.parseInt(rowset.split(" ")[1]) + } + } + assert (rowCount < 20 * replicaNum) + qt_sql3 """ select * from ${tableName} order by k1, k2, k3 """ +}