diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index b5ee1585400132..015bdaae501435 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -441,6 +441,15 @@ Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tab Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data, bool sync_delete_bitmap, bool full_sync, SyncRowsetStats* sync_stats) { + std::unique_lock lock {tablet->get_sync_meta_lock()}; + return sync_tablet_rowsets_unlocked(tablet, lock, warmup_delta_data, sync_delete_bitmap, + full_sync, sync_stats); +} + +Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet, + std::unique_lock& lock, + bool warmup_delta_data, bool sync_delete_bitmap, + bool full_sync, SyncRowsetStats* sync_stats) { using namespace std::chrono; TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets", Status::OK(), tablet); @@ -554,6 +563,12 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_ } tablet->tablet_meta()->delete_bitmap().merge(delete_bitmap); } + DBUG_EXECUTE_IF("CloudMetaMgr::sync_tablet_rowsets.before.modify_tablet_meta", { + auto target_tablet_id = dp->param("tablet_id", -1); + if (target_tablet_id == tablet->tablet_id()) { + DBUG_BLOCK + } + }); { const auto& stats = resp.stats(); std::unique_lock wlock(tablet->get_header_lock()); diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h index 3ff817a8710168..cf86f09929dfce 100644 --- a/be/src/cloud/cloud_meta_mgr.h +++ b/be/src/cloud/cloud_meta_mgr.h @@ -65,6 +65,10 @@ class CloudMetaMgr { Status sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data = false, bool sync_delete_bitmap = true, bool full_sync = false, SyncRowsetStats* sync_stats = nullptr); + Status sync_tablet_rowsets_unlocked( + CloudTablet* tablet, std::unique_lock& lock /* _sync_meta_lock */, + bool warmup_delta_data = false, bool sync_delete_bitmap = true, bool full_sync = false, + SyncRowsetStats* sync_stats = nullptr); Status prepare_rowset(const RowsetMeta& rs_meta, std::shared_ptr* existed_rs_meta = nullptr); diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index db2dacd7663ef6..60d2fb42a314cc 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -63,6 +63,7 @@ CloudSchemaChangeJob::CloudSchemaChangeJob(CloudStorageEngine& cloud_storage_eng CloudSchemaChangeJob::~CloudSchemaChangeJob() = default; Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& request) { + DBUG_EXECUTE_IF("CloudSchemaChangeJob::process_alter_tablet.block", DBUG_BLOCK); // new tablet has to exist _new_tablet = DORIS_TRY(_cloud_storage_engine.tablet_mgr().get_tablet(request.new_tablet_id)); if (_new_tablet->tablet_state() == TABLET_RUNNING) { @@ -399,6 +400,8 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam _new_tablet->tablet_id()); return Status::Error("injected retryable error"); }); + DBUG_EXECUTE_IF("CloudSchemaChangeJob::_convert_historical_rowsets.before.commit_job", + DBUG_BLOCK); auto st = _cloud_storage_engine.meta_mgr().commit_tablet_job(job, &finish_resp); if (!st.ok()) { if (finish_resp.status().code() == cloud::JOB_ALREADY_SUCCESS) { @@ -416,6 +419,9 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam } const auto& stats = finish_resp.stats(); { + // to prevent the converted historical rowsets be replaced by rowsets written on new tablet + // during double write phase by `CloudMetaMgr::sync_tablet_rowsets` in another thread + std::unique_lock lock {_new_tablet->get_sync_meta_lock()}; std::unique_lock wlock(_new_tablet->get_header_lock()); _new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock); _new_tablet->set_cumulative_layer_point(_output_cumulative_point); diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 61d6fa59bda954..5f9490ecab03df 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -156,7 +156,7 @@ Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data, } // serially execute sync to reduce unnecessary network overhead - std::lock_guard lock(_sync_meta_lock); + std::unique_lock lock(_sync_meta_lock); if (query_version > 0) { std::shared_lock rlock(_meta_lock); if (_max_version >= query_version) { @@ -164,7 +164,8 @@ Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data, } } - auto st = _engine.meta_mgr().sync_tablet_rowsets(this, warmup_delta_data, true, false, stats); + auto st = _engine.meta_mgr().sync_tablet_rowsets_unlocked(this, lock, warmup_delta_data, true, + false, stats); if (st.is()) { clear_cache(); } @@ -181,7 +182,7 @@ Status CloudTablet::sync_if_not_running(SyncRowsetStats* stats) { } // Serially execute sync to reduce unnecessary network overhead - std::lock_guard lock(_sync_meta_lock); + std::unique_lock lock(_sync_meta_lock); { std::shared_lock rlock(_meta_lock); @@ -216,7 +217,7 @@ Status CloudTablet::sync_if_not_running(SyncRowsetStats* stats) { _max_version = -1; } - st = _engine.meta_mgr().sync_tablet_rowsets(this, false, true, false, stats); + st = _engine.meta_mgr().sync_tablet_rowsets_unlocked(this, lock, false, true, false, stats); if (st.is()) { clear_cache(); } diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index baa6fdd8e1b78d..ab1ed896a7ee0b 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -205,6 +205,8 @@ class CloudTablet final : public BaseTablet { std::mutex& get_rowset_update_lock() { return _rowset_update_lock; } + bthread::Mutex& get_sync_meta_lock() { return _sync_meta_lock; } + const auto& rowset_map() const { return _rs_version_map; } // Merge all rowset schemas within a CloudTablet @@ -237,6 +239,7 @@ class CloudTablet final : public BaseTablet { // this mutex MUST ONLY be used when sync meta bthread::Mutex _sync_meta_lock; + // ATTENTION: lock order should be: _sync_meta_lock -> _meta_lock std::atomic _cumulative_point {-1}; std::atomic _approximate_num_rowsets {-1}; diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_sc_convert_data_replaced_on_new_tablet.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_sc_convert_data_replaced_on_new_tablet.out new file mode 100644 index 00000000000000..6b45b630b97656 --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_sc_convert_data_replaced_on_new_tablet.out @@ -0,0 +1,14 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 10 +2 2 2 20 +3 3 3 30 + +-- !dup_key_count -- + +-- !sql -- +1 1 1 10 +100 100 100 1 +2 2 2 20 +3 3 3 30 + diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_sc_convert_data_replaced_on_new_tablet.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_sc_convert_data_replaced_on_new_tablet.groovy new file mode 100644 index 00000000000000..3de7d22491c35e --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_sc_convert_data_replaced_on_new_tablet.groovy @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_cloud_sc_convert_data_replaced_on_new_tablet", "nonConcurrent") { + if (!isCloudMode()) { + return + } + + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + def table1 = "test_cloud_sc_convert_data_replaced_on_new_tablet" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1", + "function_column.sequence_col" = "c3" + ); """ + + sql "insert into ${table1} values(1,1,1,10);" // 2 + sql "insert into ${table1} values(2,2,2,20);" // 3 + sql "insert into ${table1} values(3,3,3,30);" // 4 + sql "sync;" + qt_sql "select * from ${table1} order by k1;" + + def backends = sql_return_maparray('show backends') + def tabletStats = sql_return_maparray("show tablets from ${table1};") + assert tabletStats.size() == 1 + def tabletId = tabletStats[0].TabletId + def tabletBackendId = tabletStats[0].BackendId + def tabletBackend + for (def be : backends) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + try { + GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::process_alter_tablet.block") + GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.before.commit_job") + // GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.before.modify_tablet_meta") + sql "alter table ${table1} modify column c2 varchar(100);" + + Thread.sleep(1000) + + tabletStats = sql_return_maparray("show tablets from ${table1};") + def newTabletId = "-1" + for (def stat : tabletStats) { + if (stat.TabletId != tabletId) { + newTabletId = stat.TabletId + break + } + } + logger.info("new_tablet_id: ${newTabletId}") + + sql "insert into ${table1} values(1,99,99,1);" // 5 + + // let query trigger sync_rowsets to make base tablet's max version be 5 + sql "select * from ${table1} order by k1;" + + Thread.sleep(1000) + GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::process_alter_tablet.block") + Thread.sleep(1000) + + // to trigger sync_rowsets on new tablet and block before modify be's tablet meta + GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::sync_tablet_rowsets.before.modify_tablet_meta", [tablet_id: newTabletId]) + def t1 = Thread.start { + sql "insert into ${table1} values(100,100,100,1);" + } + + Thread.sleep(1000) + + // let sc modify tablet meta first + GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.before.commit_job") + Thread.sleep(1000) + // and load's sync_rowsets() on new tablet modify be's tablet meta + GetDebugPoint().disableDebugPointForAllBEs("CloudMetaMgr::sync_tablet_rowsets.before.modify_tablet_meta") + + t1.join() + + waitForSchemaChangeDone { + sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1 """ + time 1000 + } + + qt_dup_key_count "select k1,count() as cnt from ${table1} group by k1 having cnt>1;" + order_qt_sql "select * from ${table1};" + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + } +}