diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp index e85b160cf2ffaa..c4ae5513001dc9 100644 --- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp +++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include "cloud/cloud_meta_mgr.h" #include "cloud/cloud_tablet.h" @@ -75,6 +76,7 @@ Status CloudEngineCalcDeleteBitmapTask::execute() { bool has_compaction_stats = partition.__isset.base_compaction_cnts && partition.__isset.cumulative_compaction_cnts && partition.__isset.cumulative_points; + bool has_tablet_states = partition.__isset.tablet_states; for (size_t i = 0; i < partition.tablet_ids.size(); i++) { auto tablet_id = partition.tablet_ids[i]; auto tablet_calc_delete_bitmap_ptr = std::make_shared( @@ -84,6 +86,9 @@ Status CloudEngineCalcDeleteBitmapTask::execute() { partition.base_compaction_cnts[i], partition.cumulative_compaction_cnts[i], partition.cumulative_points[i]); } + if (has_tablet_states) { + tablet_calc_delete_bitmap_ptr->set_tablet_state(partition.tablet_states[i]); + } auto submit_st = token->submit_func([=]() { auto st = tablet_calc_delete_bitmap_ptr->handle(); if (!st.ok()) { @@ -128,6 +133,9 @@ void CloudTabletCalcDeleteBitmapTask::set_compaction_stats(int64_t ms_base_compa _ms_cumulative_compaction_cnt = ms_cumulative_compaction_cnt; _ms_cumulative_point = ms_cumulative_point; } +void CloudTabletCalcDeleteBitmapTask::set_tablet_state(int64_t tablet_state) { + _ms_tablet_state = tablet_state; +} Status CloudTabletCalcDeleteBitmapTask::handle() const { VLOG_DEBUG << "start calculate delete bitmap on tablet " << _tablet_id; @@ -146,7 +154,10 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { int64_t max_version = tablet->max_version_unlocked(); int64_t t2 = MonotonicMicros(); - auto should_sync_rowsets_produced_by_compaction = [&]() { + auto should_sync_rowsets = [&]() { + if (_version != max_version + 1) { + return true; + } if (_ms_base_compaction_cnt == -1) { return true; } @@ -156,9 +167,12 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { std::shared_lock rlock(tablet->get_header_lock()); return _ms_base_compaction_cnt > tablet->base_compaction_cnt() || _ms_cumulative_compaction_cnt > tablet->cumulative_compaction_cnt() || - _ms_cumulative_point > tablet->cumulative_layer_point(); + _ms_cumulative_point > tablet->cumulative_layer_point() || + (_ms_tablet_state.has_value() && + _ms_tablet_state.value() != // an SC job finished on other BEs during this load job + static_cast>(tablet->tablet_state())); }; - if (_version != max_version + 1 || should_sync_rowsets_produced_by_compaction()) { + if (should_sync_rowsets()) { auto sync_st = tablet->sync_rowsets(); if (!sync_st.ok()) { LOG(WARNING) << "failed to sync rowsets. tablet_id=" << _tablet_id diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h index e3733d3e696ff8..62bd91b0a8ab3c 100644 --- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h +++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include "cloud/cloud_storage_engine.h" #include "cloud/cloud_tablet.h" @@ -39,6 +40,7 @@ class CloudTabletCalcDeleteBitmapTask { void set_compaction_stats(int64_t ms_base_compaction_cnt, int64_t ms_cumulative_compaction_cnt, int64_t ms_cumulative_point); + void set_tablet_state(int64_t tablet_state); Status handle() const; @@ -53,6 +55,7 @@ class CloudTabletCalcDeleteBitmapTask { int64_t _ms_base_compaction_cnt {-1}; int64_t _ms_cumulative_compaction_cnt {-1}; int64_t _ms_cumulative_point {-1}; + std::optional _ms_tablet_state; std::shared_ptr _mem_tracker; }; diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index bc8af94496a0cf..cc74384decfda3 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -2310,6 +2310,7 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl } for (const auto& tablet_idx : request->tablet_indexes()) { + // 1. get compaction cnts TabletStatsPB tablet_stat; std::string stats_key = stats_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(), @@ -2343,16 +2344,43 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl response->add_base_compaction_cnts(tablet_stat.base_compaction_cnt()); response->add_cumulative_compaction_cnts(tablet_stat.cumulative_compaction_cnt()); response->add_cumulative_points(tablet_stat.cumulative_point()); + + // 2. get tablet states + std::string tablet_meta_key = + meta_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(), + tablet_idx.partition_id(), tablet_idx.tablet_id()}); + std::string tablet_meta_val; + err = txn->get(tablet_meta_key, &tablet_meta_val); + if (err != TxnErrorCode::TXN_OK) { + ss << "failed to get tablet meta" + << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "") + << " instance_id=" << instance_id << " tablet_id=" << tablet_idx.tablet_id() + << " key=" << hex(tablet_meta_key) << " err=" << err; + msg = ss.str(); + code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND + : cast_as(err); + return; + } + doris::TabletMetaCloudPB tablet_meta; + if (!tablet_meta.ParseFromString(tablet_meta_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = "malformed tablet meta"; + return; + } + response->add_tablet_states( + static_cast>(tablet_meta.tablet_state())); } read_stats_sw.pause(); - LOG(INFO) << fmt::format("tablet_idxes.size()={}, read tablet compaction cnts cost={} ms", - request->tablet_indexes().size(), read_stats_sw.elapsed_us() / 1000); + LOG(INFO) << fmt::format( + "tablet_idxes.size()={}, read tablet compaction cnts and tablet states cost={} ms", + request->tablet_indexes().size(), read_stats_sw.elapsed_us() / 1000); DeleteBitmapUpdateLockPB lock_info_tmp; if (!check_delete_bitmap_lock(code, msg, ss, txn, table_id, request->lock_id(), request->initiator(), lock_key, lock_info_tmp)) { - LOG(WARNING) << "failed to check delete bitmap lock after get tablet stats, table_id=" + LOG(WARNING) << "failed to check delete bitmap lock after get tablet stats and tablet " + "states, table_id=" << table_id << " request lock_id=" << request->lock_id() << " request initiator=" << request->initiator() << " code=" << code << " msg=" << msg; diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index 9eed77271f2efa..10a5b3c6f18556 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -277,7 +277,7 @@ static void insert_rowset(MetaServiceProxy* meta_service, int64_t db_id, const s commit_txn(meta_service, db_id, txn_id, label); } -static void add_tablet_stats(MetaServiceProxy* meta_service, std::string instance_id, +static void add_tablet_metas(MetaServiceProxy* meta_service, std::string instance_id, int64_t table_id, int64_t index_id, const std::vector>& tablet_idxes) { std::unique_ptr txn; @@ -293,6 +293,17 @@ static void add_tablet_stats(MetaServiceProxy* meta_service, std::string instanc stats.set_cumulative_compaction_cnt(20); stats.set_cumulative_point(30); txn->put(stats_key, stats.SerializeAsString()); + + doris::TabletMetaCloudPB tablet_pb; + tablet_pb.set_table_id(table_id); + tablet_pb.set_index_id(index_id); + tablet_pb.set_partition_id(partition_id); + tablet_pb.set_tablet_id(tablet_id); + tablet_pb.set_tablet_state(doris::TabletStatePB::PB_RUNNING); + auto tablet_meta_key = + meta_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}); + auto tablet_meta_val = tablet_pb.SerializeAsString(); + txn->put(tablet_meta_key, tablet_meta_val); } ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); } @@ -4659,7 +4670,7 @@ TEST(MetaServiceTest, GetDeleteBitmapUpdateLockTabletStatsNormal) { // [(partition_id, tablet_id)] std::vector> tablet_idxes {{70001, 12345}, {80001, 3456}, {90001, 6789}}; - add_tablet_stats(meta_service.get(), instance_id, table_id, index_id, tablet_idxes); + add_tablet_metas(meta_service.get(), instance_id, table_id, index_id, tablet_idxes); GetDeleteBitmapUpdateLockResponse res; get_delete_bitmap_update_lock(meta_service.get(), res, db_id, table_id, index_id, tablet_idxes, @@ -4713,7 +4724,7 @@ TEST(MetaServiceTest, GetDeleteBitmapUpdateLockTabletStatsLockExpired) { std::vector> tablet_idxes { {70001, 12345}, {80001, 3456}, {90001, 6789}}; - add_tablet_stats(meta_service.get(), instance_id, table_id, index_id, tablet_idxes); + add_tablet_metas(meta_service.get(), instance_id, table_id, index_id, tablet_idxes); GetDeleteBitmapUpdateLockResponse res; get_delete_bitmap_update_lock(meta_service.get(), res, db_id, table_id, index_id, @@ -4754,7 +4765,7 @@ TEST(MetaServiceTest, GetDeleteBitmapUpdateLockTabletStatsLockExpired) { std::vector> tablet_idxes { {70001, 12345}, {80001, 3456}, {90001, 6789}}; - add_tablet_stats(meta_service.get(), instance_id, table_id, index_id, tablet_idxes); + add_tablet_metas(meta_service.get(), instance_id, table_id, index_id, tablet_idxes); GetDeleteBitmapUpdateLockResponse res; get_delete_bitmap_update_lock(meta_service.get(), res, db_id, table_id, index_id, @@ -4796,7 +4807,7 @@ TEST(MetaServiceTest, GetDeleteBitmapUpdateLockTabletStatsError) { std::vector> tablet_idxes { {70001, 12345}, {80001, 3456}, {90001, 6789}}; - add_tablet_stats(meta_service.get(), instance_id, table_id, index_id, tablet_idxes); + add_tablet_metas(meta_service.get(), instance_id, table_id, index_id, tablet_idxes); GetDeleteBitmapUpdateLockResponse res; get_delete_bitmap_update_lock(meta_service.get(), res, db_id, table_id, index_id, @@ -4841,7 +4852,7 @@ TEST(MetaServiceTest, GetDeleteBitmapUpdateLockTabletStatsError) { tablet_idxes.push_back({partition_id, tablet_id}); } - add_tablet_stats(meta_service.get(), instance_id, table_id, index_id, tablet_idxes); + add_tablet_metas(meta_service.get(), instance_id, table_id, index_id, tablet_idxes); GetDeleteBitmapUpdateLockResponse res; get_delete_bitmap_update_lock(meta_service.get(), res, db_id, table_id, index_id, diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index bee81365628984..188173b32a9d44 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -780,17 +780,26 @@ private Map> getCalcDeleteBitmapInfo( if (!lockContext.getBaseCompactionCnts().isEmpty() && !lockContext.getCumulativeCompactionCnts().isEmpty() && !lockContext.getCumulativePoints().isEmpty()) { + boolean hasTabletStats = !lockContext.getTabletStates().isEmpty(); + List reqBaseCompactionCnts = Lists.newArrayList(); List reqCumulativeCompactionCnts = Lists.newArrayList(); List reqCumulativePoints = Lists.newArrayList(); + List reqTabletStates = Lists.newArrayList(); for (long tabletId : tabletList) { reqBaseCompactionCnts.add(lockContext.getBaseCompactionCnts().get(tabletId)); reqCumulativeCompactionCnts.add(lockContext.getCumulativeCompactionCnts().get(tabletId)); reqCumulativePoints.add(lockContext.getCumulativePoints().get(tabletId)); + if (hasTabletStats) { + reqTabletStates.add(lockContext.getTabletStates().get(tabletId)); + } } partitionInfo.setBaseCompactionCnts(reqBaseCompactionCnts); partitionInfo.setCumulativeCompactionCnts(reqCumulativeCompactionCnts); partitionInfo.setCumulativePoints(reqCumulativePoints); + if (hasTabletStats) { + partitionInfo.setTabletStates(reqTabletStates); + } } partitionInfos.add(partitionInfo); } @@ -917,19 +926,24 @@ private void getDeleteBitmapUpdateLock(long transactionId, List mowTa List respBaseCompactionCnts = response.getBaseCompactionCntsList(); List respCumulativeCompactionCnts = response.getCumulativeCompactionCntsList(); List respCumulativePoints = response.getCumulativePointsList(); + List respTabletStates = response.getTabletStatesList(); int size1 = respBaseCompactionCnts.size(); int size2 = respCumulativeCompactionCnts.size(); int size3 = respCumulativePoints.size(); - if (size1 != tabletList.size() || size2 != tabletList.size() || size3 != tabletList.size()) { + int size4 = respTabletStates.size(); + if (size1 != tabletList.size() || size2 != tabletList.size() || size3 != tabletList.size() + || (size4 > 0 && size4 != tabletList.size())) { throw new UserException("The size of returned compaction cnts can't match the size of tabletList, " + "tabletList.size()=" + tabletList.size() + ", respBaseCompactionCnts.size()=" + size1 - + ", respCumulativeCompactionCnts.size()=" + size2 + ", respCumulativePoints.size()=" + size3); + + ", respCumulativeCompactionCnts.size()=" + size2 + ", respCumulativePoints.size()=" + size3 + + ", respTabletStates.size()=" + size4); } for (int i = 0; i < tabletList.size(); i++) { long tabletId = tabletList.get(i); lockContext.getBaseCompactionCnts().put(tabletId, respBaseCompactionCnts.get(i)); lockContext.getCumulativeCompactionCnts().put(tabletId, respCumulativeCompactionCnts.get(i)); lockContext.getCumulativePoints().put(tabletId, respCumulativePoints.get(i)); + lockContext.getTabletStates().put(tabletId, respTabletStates.get(i)); } totalRetryTime += retryTime; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java index 02886f63427f18..120715d627610b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java @@ -30,6 +30,7 @@ public class DeleteBitmapUpdateLockContext { private Map baseCompactionCnts; private Map cumulativeCompactionCnts; private Map cumulativePoints; + private Map tabletStates; private Map> tableToPartitions; private Map partitions; private Map>> backendToPartitionTablets; @@ -40,6 +41,7 @@ public DeleteBitmapUpdateLockContext() { baseCompactionCnts = Maps.newHashMap(); cumulativeCompactionCnts = Maps.newHashMap(); cumulativePoints = Maps.newHashMap(); + tabletStates = Maps.newHashMap(); tableToPartitions = Maps.newHashMap(); partitions = Maps.newHashMap(); backendToPartitionTablets = Maps.newHashMap(); @@ -63,6 +65,10 @@ public Map getCumulativePoints() { return cumulativePoints; } + public Map getTabletStates() { + return tabletStates; + } + public Map>> getBackendToPartitionTablets() { return backendToPartitionTablets; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 06047e2cf16682..e104ed288b3c27 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -44,6 +44,7 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; import org.apache.doris.common.util.Daemon; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.NetUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.cooldown.CooldownConf; @@ -593,7 +594,22 @@ public void tabletReport(long backendId, Map backendTablets, LOG.info("finished to handle tablet report from backend[{}] cost: {} ms", backendId, (end - start)); } + private static void debugBlock() { + if (DebugPointUtil.isEnable("ReportHandler.block")) { + LOG.info("debug point: block at ReportHandler.block"); + while (DebugPointUtil.isEnable("ReportHandler.block")) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.info("error ", e); + } + } + LOG.info("debug point: leave ReportHandler.block"); + } + } + private static void taskReport(long backendId, Map> runningTasks) { + debugBlock(); if (LOG.isDebugEnabled()) { LOG.debug("begin to handle task report from backend {}", backendId); } diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index dd85f6ec2a21a6..ff0279990ee10d 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -1470,6 +1470,7 @@ message GetDeleteBitmapUpdateLockResponse { repeated int64 base_compaction_cnts = 2; repeated int64 cumulative_compaction_cnts = 3; repeated int64 cumulative_points = 4; + repeated int64 tablet_states = 5; } message RemoveDeleteBitmapUpdateLockRequest { diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index abffd176ef8ea7..aa1ee2f5b9fe01 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -440,6 +440,8 @@ struct TCalcDeleteBitmapPartitionInfo { 4: optional list base_compaction_cnts 5: optional list cumulative_compaction_cnts 6: optional list cumulative_points + 7: optional list sub_txn_ids + 8: optional list tablet_states } struct TCalcDeleteBitmapRequest { diff --git a/regression-test/data/fault_injection_p0/cloud/test_tablet_state_change_in_publish_phase.out b/regression-test/data/fault_injection_p0/cloud/test_tablet_state_change_in_publish_phase.out new file mode 100644 index 00000000000000..0ece86d0fb4e4a --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_tablet_state_change_in_publish_phase.out @@ -0,0 +1,20 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 +2 2 2 +3 3 3 + +-- !sql -- +1 1 1 +2 2 2 +3 3 3 +10 88 88 + +-- !dup_key_count -- + +-- !sql -- +1 \N 99 +2 2 2 +3 3 3 +10 \N 88 + diff --git a/regression-test/suites/fault_injection_p0/cloud/test_tablet_state_change_in_publish_phase.groovy b/regression-test/suites/fault_injection_p0/cloud/test_tablet_state_change_in_publish_phase.groovy new file mode 100644 index 00000000000000..6b7102ed243d76 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_tablet_state_change_in_publish_phase.groovy @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + +suite("test_tablet_state_change_in_publish_phase", "docker") { + def options = new ClusterOptions() + options.setFeNum(1) + options.setBeNum(2) + options.cloudMode = true + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1' + ] + options.enableDebugPoints() + + docker(options) { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + def table1 = "test_tablet_state_change_in_publish_phase" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1);" + sql "insert into ${table1} values(2,2,2);" + sql "insert into ${table1} values(3,3,3);" + sql "sync;" + qt_sql "select * from ${table1} order by k1;" + + def beNodes = sql_return_maparray("show backends;") + def tabletStat = sql_return_maparray("show tablets from ${table1};").get(0) + def tabletBackendId = tabletStat.BackendId + def tabletId = tabletStat.TabletId + def be1 + for (def be : beNodes) { + if (be.BackendId == tabletBackendId) { + be1 = be + } + } + logger.info("tablet ${tabletId} on backend ${be1.Host} with backendId=${be1.BackendId}"); + logger.info("backends: ${cluster.getBackends()}") + int beIndex = 1 + for (def backend : cluster.getBackends()) { + if (backend.host == be1.Host) { + beIndex = backend.index + break + } + } + assert cluster.getBeByIndex(beIndex).backendId as String == tabletBackendId + + try { + GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block") + sql "alter table ${table1} modify column c1 varchar(100);" + Thread.sleep(1000) + + cluster.stopBackends(beIndex) + + Thread.sleep(1000) + + // let tablet be on another BE + sql "insert into ${table1} values(10,88,88);" + qt_sql "select * from ${table1} order by k1;" + assert sql_return_maparray("show tablets from ${table1};").get(0).BackendId as String != tabletBackendId + + // block FE's task report handler to avoid alter task re-sended to BE before we enable debug points for SC + GetDebugPoint().enableDebugPointForAllFEs("ReportHandler.block") + cluster.startBackends(beIndex) + GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block") + GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob.process_alter_tablet.sleep") + GetDebugPoint().disableDebugPointForAllFEs("ReportHandler.block") + + def newThreadInDocker = { Closure actionSupplier -> + def connInfo = context.threadLocalConn.get() + return Thread.start { + connect(connInfo.username, connInfo.password, connInfo.conn.getMetaData().getURL(), actionSupplier) + } + } + + // let load 1 block before publish + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + def t1 = newThreadInDocker { + // load 1 will not see any historical data when flush + // and will skip to calculate delete bitmaps in later phase becase the tablet's state is NOT_READY + sql "insert into ${table1} values(1,88,88);" + } + Thread.sleep(800) + + // let sc finish converting historical data + GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block") + Thread.sleep(1000) + + // let load 1 publish + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + t1.join() + + // load 2 + sql "insert into ${table1} values(1,77,77);" + + + // let load 3 block before publish + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + def t2 = newThreadInDocker { + sql "insert into ${table1} values(1,99,99);" + } + Thread.sleep(1000) + + // let sc finish + GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob.process_alter_tablet.sleep") + + dockerAwaitUntil(30) { + def res = sql_return_maparray """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1 """ + logger.info("alter status: ${res}") + res[0].State as String == "FINISHED" + } + // tablet state has changed to NORMAL in MS + + // let load 3 publish + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + t2.join() + + qt_dup_key_count "select k1,count() as cnt from ${table1} group by k1 having cnt>1;" + qt_sql "select * from ${table1} order by k1;" + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } + } +}