diff --git a/be/src/cloud/cloud_delta_writer.cpp b/be/src/cloud/cloud_delta_writer.cpp index e0f9d203750d0f..0b3818190da710 100644 --- a/be/src/cloud/cloud_delta_writer.cpp +++ b/be/src/cloud/cloud_delta_writer.cpp @@ -26,6 +26,9 @@ namespace doris { +bvar::Adder g_cloud_commit_rowset_count("cloud_commit_rowset_count"); +bvar::Adder g_cloud_commit_empty_rowset_count("cloud_commit_empty_rowset_count"); + CloudDeltaWriter::CloudDeltaWriter(CloudStorageEngine& engine, const WriteRequest& req, RuntimeProfile* profile, const UniqueId& load_id) : BaseDeltaWriter(req, profile, load_id), _engine(engine) { @@ -108,10 +111,12 @@ const RowsetMetaSharedPtr& CloudDeltaWriter::rowset_meta() { } Status CloudDeltaWriter::commit_rowset() { + g_cloud_commit_rowset_count << 1; std::lock_guard lock(_mtx); // Handle empty rowset (no data written) if (!_is_init) { + g_cloud_commit_empty_rowset_count << 1; return _commit_empty_rowset(); } diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 1364cab47fb7f2..5be30dcac792c3 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -1706,28 +1706,61 @@ Status CloudMetaMgr::fill_version_holes(CloudTablet* tablet, int64_t max_version return a.first < b.first; }); + // During schema change, get_tablet operations on new tablets trigger sync_tablet_rowsets which calls + // fill_version_holes. For schema change tablets (TABLET_NOTREADY state), we selectively skip hole + // filling for versions <= alter_version to prevent: + // 1. Abnormal compaction score calculations for schema change tablets + // 2. Unexpected -235 errors during load operations + // This allows schema change to proceed normally while still permitting hole filling for versions + // beyond the alter_version threshold. + bool is_schema_change_tablet = tablet->tablet_state() == TABLET_NOTREADY; + if (is_schema_change_tablet && tablet->alter_version() <= 1) { + LOG(INFO) << "Skip version hole filling for new schema change tablet " + << tablet->tablet_id() << " with alter_version " << tablet->alter_version(); + return Status::OK(); + } + int64_t last_version = -1; for (const Version& version : existing_versions) { + VLOG_NOTICE << "Existing version for tablet " << tablet->tablet_id() << ": [" + << version.first << ", " << version.second << "]"; // missing versions are those that are not in the existing_versions if (version.first > last_version + 1) { // there is a hole between versions auto prev_non_hole_rowset = tablet->get_rowset_by_version(version); for (int64_t ver = last_version + 1; ver < version.first; ++ver) { + // Skip hole filling for versions <= alter_version during schema change + if (is_schema_change_tablet && ver <= tablet->alter_version()) { + continue; + } RowsetSharedPtr hole_rowset; RETURN_IF_ERROR(create_empty_rowset_for_hole( tablet, ver, prev_non_hole_rowset->rowset_meta(), &hole_rowset)); hole_rowsets.push_back(hole_rowset); } LOG(INFO) << "Created empty rowset for version hole, from " << last_version + 1 - << " to " << version.first - 1 << " for tablet " << tablet->tablet_id(); + << " to " << version.first - 1 << " for tablet " << tablet->tablet_id() + << (is_schema_change_tablet + ? (", schema change tablet skipped filling versions <= " + + std::to_string(tablet->alter_version())) + : ""); } last_version = version.second; } if (last_version + 1 <= max_version) { LOG(INFO) << "Created empty rowset for version hole, from " << last_version + 1 << " to " - << max_version << " for tablet " << tablet->tablet_id(); + << max_version << " for tablet " << tablet->tablet_id() + << (is_schema_change_tablet + ? (", schema change tablet skipped filling versions <= " + + std::to_string(tablet->alter_version())) + : ""); + // there is a hole after the last existing version for (; last_version + 1 <= max_version; ++last_version) { + // Skip hole filling for versions <= alter_version during schema change + if (is_schema_change_tablet && last_version + 1 <= tablet->alter_version()) { + continue; + } RowsetSharedPtr hole_rowset; auto prev_non_hole_rowset = tablet->get_rowset_by_version(existing_versions.back()); RETURN_IF_ERROR(create_empty_rowset_for_hole( diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index ec4c8224050b2e..9b629c0b038a35 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -19,6 +19,7 @@ #include +#include #include #include #include @@ -459,12 +460,22 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version, RETURN_IF_ERROR(_cloud_storage_engine.register_compaction_stop_token(_new_tablet, initiator)); TabletMetaSharedPtr tmp_meta = std::make_shared(*(_new_tablet->tablet_meta())); tmp_meta->delete_bitmap()->delete_bitmap.clear(); - tmp_meta->clear_rowsets(); + // Keep only version [0-1] rowset, other rowsets will be added in _output_rowsets + auto& rs_metas = tmp_meta->all_mutable_rs_metas(); + rs_metas.erase(std::remove_if(rs_metas.begin(), rs_metas.end(), + [](const RowsetMetaSharedPtr& rs_meta) { + return !(rs_meta->version().first == 0 && + rs_meta->version().second == 1); + }), + rs_metas.end()); + std::shared_ptr tmp_tablet = std::make_shared(_cloud_storage_engine, tmp_meta); { std::unique_lock wlock(tmp_tablet->get_header_lock()); tmp_tablet->add_rowsets(_output_rowsets, true, wlock); + // Set alter version to let the tmp_tablet can fill hole rowset greater than alter_version + tmp_tablet->set_alter_version(alter_version); } // step 1, process incremental rowset without delete bitmap update lock diff --git a/regression-test/data/schema_change_p0/test_schema_change_with_empty_rowset.out b/regression-test/data/schema_change_p0/test_schema_change_with_empty_rowset.out new file mode 100644 index 00000000000000..e9555c3d5d7b14 --- /dev/null +++ b/regression-test/data/schema_change_p0/test_schema_change_with_empty_rowset.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +7140 240 + diff --git a/regression-test/suites/schema_change_p0/test_schema_change_mow_with_empty_rowset.groovy b/regression-test/suites/schema_change_p0/test_schema_change_mow_with_empty_rowset.groovy index a0fe1d5832113b..d7bcf69b3d5652 100644 --- a/regression-test/suites/schema_change_p0/test_schema_change_mow_with_empty_rowset.groovy +++ b/regression-test/suites/schema_change_p0/test_schema_change_mow_with_empty_rowset.groovy @@ -62,6 +62,7 @@ suite("test_schema_change_mow_with_empty_rowset", "p0") { for (int i = 0; i < 20; i++) { sql """ insert into ${tableName} values (100, 2, 3, 4, 5, 6.6, 1.7, 8.8, 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00') """ + sleep(20) } Awaitility.await().atMost(30, TimeUnit.SECONDS).pollDelay(10, TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until( diff --git a/regression-test/suites/schema_change_p0/test_schema_change_with_empty_rowset.groovy b/regression-test/suites/schema_change_p0/test_schema_change_with_empty_rowset.groovy new file mode 100644 index 00000000000000..4f1eaebde6cc2b --- /dev/null +++ b/regression-test/suites/schema_change_p0/test_schema_change_with_empty_rowset.groovy @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_schema_change_with_empty_rowset", "p0,nonConcurrent") { + def custoBeConfig = [ + max_tablet_version_num : 100 + ] + + setBeConfigTemporary(custoBeConfig) { + def tableName = "test_sc_with_empty_rowset" + + def getJobState = { tbl -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tbl}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(11) NULL, + `k2` tinyint(4) NULL, + `k3` smallint(6) NULL, + `k4` int(30) NULL, + `k5` largeint(40) NULL, + `k6` float NULL, + `k7` double NULL, + `k8` decimal(9, 0) NULL, + `k9` char(10) NULL, + `k10` varchar(1024) NULL, + `k11` text NULL, + `k12` date NULL, + `k13` datetime NULL + ) ENGINE=OLAP + UNIQUE KEY(k1, k2, k3) + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true" + ); + """ + + for (int i = 0; i < 100; i++) { + sql """ insert into ${tableName} values ($i, 2, 3, 4, 5, 6.6, 1.7, 8.8, + 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00') """ + } + + + // trigger compactions for all tablets in ${tableName} + trigger_and_wait_compaction(tableName, "cumulative") + + sql """ alter table ${tableName} modify column k4 string NULL""" + + for (int i = 100; i < 120; i++) { + sql """ insert into ${tableName} values ($i, 2, 3, 4, 5, 6.6, 1.7, 8.8, + 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00') """ + sleep(20) + } + + Awaitility.await().atMost(30, TimeUnit.SECONDS).pollDelay(10, TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until( + { + String res = getJobState(tableName) + if (res == "FINISHED" || res == "CANCELLED") { + assertEquals("FINISHED", res) + return true + } + return false + } + ) + + qt_sql """ select sum(k1), sum(k2) from ${tableName} """ + } +} +