diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp b/cloud/src/meta-service/txn_lazy_committer.cpp index d46f6f5c9eb5f5..51bcf59cd04f86 100644 --- a/cloud/src/meta-service/txn_lazy_committer.cpp +++ b/cloud/src/meta-service/txn_lazy_committer.cpp @@ -249,6 +249,17 @@ void convert_tmp_rowsets( LOG(INFO) << "txn_id=" << txn_id << " partition_id=" << tmp_rowset_pb.partition_id() << " version_pb:" << version_pb.ShortDebugString(); } + + if (version_pb.pending_txn_ids_size() == 0 || version_pb.pending_txn_ids(0) != txn_id) { + LOG(INFO) << "txn_id=" << txn_id << " partition_id=" << tmp_rowset_pb.partition_id() + << " tmp_rowset_key=" << hex(tmp_rowset_key) + << " version has already been converted." + << " version_pb:" << version_pb.ShortDebugString(); + TEST_SYNC_POINT_CALLBACK("convert_tmp_rowsets::already_been_converted", + &version_pb); + return; + } + partition_versions.emplace(tmp_rowset_pb.partition_id(), version_pb); DCHECK_EQ(partition_versions.size(), 1) << partition_versions.size(); } @@ -460,7 +471,8 @@ void make_committed_txn_visible(const std::string& instance_id, int64_t db_id, i txn->put(recycle_key, recycle_val); LOG(INFO) << "put recycle_key=" << hex(recycle_key) << " txn_id=" << txn_id; } - + TEST_SYNC_POINT_RETURN_WITH_VOID("TxnLazyCommitTask::make_committed_txn_visible::commit", + &code); err = txn->commit(); if (err != TxnErrorCode::TXN_OK) { code = cast_as(err); diff --git a/cloud/test/txn_lazy_commit_test.cpp b/cloud/test/txn_lazy_commit_test.cpp index dca22a563833f8..2aaf6cf96eaf1b 100644 --- a/cloud/test/txn_lazy_commit_test.cpp +++ b/cloud/test/txn_lazy_commit_test.cpp @@ -68,11 +68,6 @@ int main(int argc, char** argv) { } config::enable_cloud_txn_lazy_commit = true; - config::txn_lazy_commit_rowsets_thresold = 2; - config::txn_lazy_max_rowsets_per_batch = 2; - config::txn_lazy_commit_num_threads = 2; - config::max_tablet_index_num_per_batch = 2; - config::enable_txn_store_retry = false; config::label_keep_max_second = 0; @@ -157,8 +152,8 @@ static void create_tablet_with_db_id(MetaServiceProxy* meta_service, int64_t db_ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id; } -static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, int index_id, - int partition_id, int64_t version = -1, +static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, int64_t index_id, + int64_t partition_id, int64_t version = -1, int num_rows = 100) { doris::RowsetMetaCloudPB rowset; rowset.set_rowset_id(0); // required @@ -179,9 +174,9 @@ static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, return rowset; } -static doris::RowsetMetaCloudPB create_huge_rowset(int64_t txn_id, int64_t tablet_id, int index_id, - int partition_id, int64_t version = -1, - int num_rows = 100) { +static doris::RowsetMetaCloudPB create_huge_rowset(int64_t txn_id, int64_t tablet_id, + int64_t index_id, int64_t partition_id, + int64_t version = -1, int num_rows = 100) { doris::RowsetMetaCloudPB rowset; rowset.set_rowset_id(0); // required rowset.set_rowset_id_v2(next_rowset_id()); @@ -410,14 +405,14 @@ TEST(TxnLazyCommitTest, CreateTabletWithDbIdTest) { // mock rowset and tablet int64_t tablet_id_base = 11414703; - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2001; ++i) { create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, tablet_id_base + i); } std::unique_ptr txn; ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2001; ++i) { int64_t tablet_id = tablet_id_base + i; check_tablet_idx_db_id(txn, db_id, tablet_id); } @@ -432,14 +427,14 @@ TEST(TxnLazyCommitTest, CreateTabletWithoutDbIdTest) { // mock rowset and tablet int64_t tablet_id_base = 42411890; - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2001; ++i) { create_tablet_without_db_id(meta_service.get(), table_id, index_id, partition_id, tablet_id_base + i); } std::unique_ptr txn; ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2001; ++i) { int64_t tablet_id = tablet_id_base + i; check_tablet_idx_without_db_id(txn, tablet_id); } @@ -479,7 +474,7 @@ TEST(TxnLazyCommitTest, RepairTabletIndexTest) { // mock rowset and tablet int64_t tablet_id_base = 87134121; std::vector> tmp_rowsets_meta; - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2001; ++i) { create_tablet_without_db_id(meta_service.get(), table_id, index_id, partition_id, tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); @@ -492,7 +487,7 @@ TEST(TxnLazyCommitTest, RepairTabletIndexTest) { { std::unique_ptr txn; ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2001; ++i) { int64_t tablet_id = tablet_id_base + i; check_tablet_idx_without_db_id(txn, tablet_id); } @@ -506,7 +501,7 @@ TEST(TxnLazyCommitTest, RepairTabletIndexTest) { { std::unique_ptr txn; ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2001; ++i) { int64_t tablet_id = tablet_id_base + i; check_tablet_idx_db_id(txn, db_id, tablet_id); } @@ -567,7 +562,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithoutDbIdTest) { // mock rowset and tablet int64_t tablet_id_base = 1103; - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2999; ++i) { create_tablet_without_db_id(meta_service.get(), table_id, index_id, partition_id, tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); @@ -579,7 +574,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithoutDbIdTest) { { std::unique_ptr txn; ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2001; ++i) { int64_t tablet_id = tablet_id_base + i; check_tmp_rowset_exist(txn, tablet_id, txn_id); } @@ -593,7 +588,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithoutDbIdTest) { req.set_txn_id(txn_id); req.set_is_2pc(false); req.set_enable_txn_lazy_commit(true); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2999; ++i) { int64_t tablet_id = tablet_id_base + i; req.add_base_tablet_ids(tablet_id); } @@ -610,7 +605,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithoutDbIdTest) { std::unique_ptr txn; ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); std::string mock_instance = "test_instance"; - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2999; ++i) { int64_t tablet_id = tablet_id_base + i; check_tablet_idx_db_id(txn, db_id, tablet_id); check_tmp_rowset_not_exist(txn, tablet_id, txn_id); @@ -648,7 +643,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithAbortedTest) { // mock rowset and tablet int64_t tablet_id_base = 372323; - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2001; ++i) { create_tablet_without_db_id(meta_service.get(), table_id, index_id, partition_id, tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); @@ -660,7 +655,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithAbortedTest) { { std::unique_ptr txn; ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2001; ++i) { int64_t tablet_id = tablet_id_base + i; check_tmp_rowset_exist(txn, tablet_id, txn_id); } @@ -687,7 +682,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithAbortedTest) { req.set_txn_id(txn_id); req.set_is_2pc(false); req.set_enable_txn_lazy_commit(true); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2001; ++i) { int64_t tablet_id = tablet_id_base + i; req.add_base_tablet_ids(tablet_id); } @@ -700,7 +695,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithAbortedTest) { { std::unique_ptr txn; ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2001; ++i) { int64_t tablet_id = tablet_id_base + i; check_tablet_idx_db_id(txn, db_id, tablet_id); check_tmp_rowset_exist(txn, tablet_id, txn_id); @@ -757,7 +752,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithDbIdTest) { // mock rowset and tablet int64_t tablet_id_base = 3131124; - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2048; ++i) { create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); @@ -787,7 +782,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithDbIdTest) { std::unique_ptr txn; ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); std::string mock_instance = "test_instance"; - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2048; ++i) { int64_t tablet_id = tablet_id_base + i; check_tablet_idx_db_id(txn, db_id, tablet_id); check_tmp_rowset_not_exist(txn, tablet_id, txn_id); @@ -865,7 +860,7 @@ TEST(TxnLazyCommitVersionedReadTest, CommitTxnEventually) { // mock rowset and tablet int64_t tablet_id_base = 3131124; - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2001; ++i) { create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); @@ -895,7 +890,7 @@ TEST(TxnLazyCommitVersionedReadTest, CommitTxnEventually) { std::unique_ptr txn; ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); std::string mock_instance = "test_instance"; - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2001; ++i) { int64_t tablet_id = tablet_id_base + i; check_tablet_idx_db_id(txn, db_id, tablet_id); check_tmp_rowset_not_exist(txn, tablet_id, txn_id); @@ -976,7 +971,7 @@ TEST(TxnLazyCommitVersionedReadTest, CommitTxnEventuallyWithoutDbIdTest) { // mock rowset and tablet int64_t tablet_id_base = 1103; - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2001; ++i) { create_tablet_without_db_id(meta_service.get(), table_id, index_id, partition_id, tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); @@ -988,7 +983,7 @@ TEST(TxnLazyCommitVersionedReadTest, CommitTxnEventuallyWithoutDbIdTest) { { std::unique_ptr txn; ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2001; ++i) { int64_t tablet_id = tablet_id_base + i; check_tmp_rowset_exist(txn, tablet_id, txn_id); } @@ -1002,7 +997,7 @@ TEST(TxnLazyCommitVersionedReadTest, CommitTxnEventuallyWithoutDbIdTest) { req.set_txn_id(txn_id); req.set_is_2pc(false); req.set_enable_txn_lazy_commit(true); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2001; ++i) { int64_t tablet_id = tablet_id_base + i; req.add_base_tablet_ids(tablet_id); } @@ -1018,7 +1013,7 @@ TEST(TxnLazyCommitVersionedReadTest, CommitTxnEventuallyWithoutDbIdTest) { { std::unique_ptr txn; ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2001; ++i) { int64_t tablet_id = tablet_id_base + i; check_tablet_idx_db_id(txn, db_id, tablet_id); check_tmp_rowset_not_exist(txn, tablet_id, txn_id); @@ -1382,7 +1377,7 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase1Test) { auto meta_service = get_meta_service(txn_kv, true); // mock rowset and tablet int64_t tablet_id_base = 1908462; - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, tablet_id_base + i); } @@ -1411,7 +1406,7 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase1Test) { ASSERT_GT(txn_id1, 0); } { - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { auto tmp_rowset = create_rowset(txn_id1, tablet_id_base + i, index_id, partition_id); CreateRowsetResponse res; @@ -1459,7 +1454,7 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase1Test) { ASSERT_GT(txn_id2, 0); } { - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { auto tmp_rowset = create_rowset(txn_id2, tablet_id_base + i, index_id, partition_id); CreateRowsetResponse res; @@ -1502,7 +1497,7 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase1Test) { std::unique_ptr txn; ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); std::string mock_instance = "test_instance"; - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { int64_t tablet_id = tablet_id_base + i; std::string key = meta_tablet_idx_key({mock_instance, tablet_id}); std::string val; @@ -1622,7 +1617,7 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase2Test) { auto meta_service = get_meta_service(txn_kv, true); // mock rowset and tablet int64_t tablet_id_base = 1908462; - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 1999; ++i) { create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, tablet_id_base + i); } @@ -1651,7 +1646,7 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase2Test) { ASSERT_GT(txn_id1, 0); } { - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 1999; ++i) { auto tmp_rowset = create_rowset(txn_id1, tablet_id_base + i, index_id, partition_id); CreateRowsetResponse res; @@ -1699,7 +1694,7 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase2Test) { ASSERT_GT(txn_id2, 0); } { - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 1999; ++i) { auto tmp_rowset = create_rowset(txn_id2, tablet_id_base + i, index_id, partition_id); CreateRowsetResponse res; @@ -1742,7 +1737,7 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase2Test) { { std::unique_ptr txn; ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 1999; ++i) { int64_t tablet_id = tablet_id_base + i; check_tablet_idx_db_id(txn, db_id, tablet_id); @@ -1785,7 +1780,7 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase3Test) { // mock rowset and tablet int64_t tablet_id_base = 19201262; - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 10001; ++i) { create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, tablet_id_base + i); } @@ -1810,7 +1805,7 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase3Test) { } { - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 10001; ++i) { auto tmp_rowset = create_rowset(tmp_txn_id, tablet_id_base + i, index_id, partition_id); CreateRowsetResponse res; @@ -1919,7 +1914,7 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase3Test) { } { - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 10001; ++i) { auto tmp_rowset = create_rowset(txn_id1, tablet_id_base + i, index_id, partition_id); CreateRowsetResponse res; @@ -2046,7 +2041,7 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase4Test) { auto meta_service = get_meta_service(txn_kv, true); // mock rowset and tablet int64_t tablet_id_base = 213430076554; - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, tablet_id_base + i); } @@ -2071,7 +2066,7 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase4Test) { } { - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); CreateRowsetResponse res; commit_rowset(meta_service.get(), tmp_rowset, res); @@ -2219,7 +2214,7 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase5Test) { auto meta_service = get_meta_service(txn_kv, true); // mock rowset and tablet int64_t tablet_id_base = 1908562; - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, tablet_id_base + i); } @@ -2248,7 +2243,7 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase5Test) { ASSERT_GT(txn_id1, 0); } { - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { auto tmp_rowset = create_rowset(txn_id1, tablet_id_base + i, index_id, partition_id); CreateRowsetResponse res; @@ -2296,7 +2291,7 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase5Test) { ASSERT_GT(txn_id2, 0); } { - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { auto tmp_rowset = create_rowset(txn_id2, tablet_id_base + i, index_id, partition_id); CreateRowsetResponse res; @@ -2329,7 +2324,7 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase5Test) { ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]); } { - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { auto tmp_rowset = create_rowset(sub_txn_id2, tablet_id_base + i, index_id, partition_id); CreateRowsetResponse res; @@ -2349,14 +2344,14 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase5Test) { SubTxnInfo sub_txn_info1; sub_txn_info1.set_sub_txn_id(sub_txn_id1); sub_txn_info1.set_table_id(table_id); - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { sub_txn_info1.mutable_base_tablet_ids()->Add(tablet_id_base + i); } SubTxnInfo sub_txn_info2; sub_txn_info2.set_sub_txn_id(sub_txn_id2); sub_txn_info2.set_table_id(table_id); - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { sub_txn_info1.mutable_base_tablet_ids()->Add(tablet_id_base + i); } @@ -2388,7 +2383,7 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase5Test) { { std::unique_ptr txn; ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { int64_t tablet_id = tablet_id_base + i; check_tablet_idx_db_id(txn, db_id, tablet_id); @@ -2579,7 +2574,7 @@ TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase1) { auto meta_service = get_meta_service(txn_kv, true); // mock rowset and tablet int64_t tablet_id_base = 2313324; - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, tablet_id_base + i); } @@ -2604,7 +2599,7 @@ TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase1) { } { - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); CreateRowsetResponse res; commit_rowset(meta_service.get(), tmp_rowset, res); @@ -2698,7 +2693,7 @@ TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase2) { auto meta_service = get_meta_service(txn_kv, true); // mock rowset and tablet int64_t tablet_id_base = 2313324; - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, tablet_id_base + i); } @@ -2723,7 +2718,7 @@ TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase2) { } { - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); CreateRowsetResponse res; commit_rowset(meta_service.get(), tmp_rowset, res); @@ -2798,7 +2793,7 @@ TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase3) { auto meta_service = get_meta_service(txn_kv, true); // mock rowset and tablet int64_t tablet_id_base = 2313324; - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, tablet_id_base + i); } @@ -2823,7 +2818,7 @@ TEST(TxnLazyCommitTest, RecycleTmpRowsetsCase3) { } { - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); CreateRowsetResponse res; commit_rowset(meta_service.get(), tmp_rowset, res); @@ -2924,7 +2919,7 @@ TEST(TxnLazyCommitTest, RecyclePartitions) { auto meta_service = get_meta_service(txn_kv, true); // mock rowset and tablet int64_t tablet_id_base = 2313324; - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, tablet_id_base + i); } @@ -2949,7 +2944,7 @@ TEST(TxnLazyCommitTest, RecyclePartitions) { } { - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); CreateRowsetResponse res; commit_rowset(meta_service.get(), tmp_rowset, res); @@ -3070,7 +3065,7 @@ TEST(TxnLazyCommitTest, RecycleIndexes) { auto meta_service = get_meta_service(txn_kv, true); // mock rowset and tablet int64_t tablet_id_base = 2313324; - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, tablet_id_base + i); } @@ -3095,7 +3090,7 @@ TEST(TxnLazyCommitTest, RecycleIndexes) { } { - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 2001; ++i) { auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); CreateRowsetResponse res; commit_rowset(meta_service.get(), tmp_rowset, res); @@ -3196,7 +3191,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithMultiTableTest) { // mock rowset and tablet int64_t tablet_id_base = 3131124; - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 2001; ++i) { create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id, partition_id); @@ -3205,7 +3200,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithMultiTableTest) { ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } - for (int i = 5; i < 10; ++i) { + for (int i = 2001; i < 4002; ++i) { create_tablet_with_db_id(meta_service.get(), db_id, table_id2, index_id2, partition_id2, tablet_id_base + i); auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id2, partition_id2); @@ -3233,7 +3228,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithMultiTableTest) { std::unique_ptr txn; ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); std::string mock_instance = "test_instance"; - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 4002; ++i) { int64_t tablet_id = tablet_id_base + i; check_tablet_idx_db_id(txn, db_id, tablet_id); check_tmp_rowset_not_exist(txn, tablet_id, txn_id); @@ -3247,7 +3242,6 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithMultiTableTest) { } TEST(TxnLazyCommitTest, CommitTxnEventuallyWithHugeRowsetMetaTest) { - config::txn_lazy_max_rowsets_per_batch = 1000; auto txn_kv = get_fdb_txn_kv(); int64_t db_id = 14135425; int64_t table_id = 31245456; @@ -3295,7 +3289,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithHugeRowsetMetaTest) { // mock rowset and tablet int64_t tablet_id_base = 3131124; - for (int i = 0; i < 1000; ++i) { + for (int i = 0; i < 1001; ++i) { create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, tablet_id_base + i); auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i, index_id, partition_id); @@ -3304,7 +3298,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithHugeRowsetMetaTest) { ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } - for (int i = 1000; i < 2000; ++i) { + for (int i = 1001; i < 2002; ++i) { create_tablet_with_db_id(meta_service.get(), db_id, table_id2, index_id2, partition_id2, tablet_id_base + i); auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i, index_id2, partition_id2); @@ -3332,7 +3326,7 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithHugeRowsetMetaTest) { std::unique_ptr txn; ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); std::string mock_instance = "test_instance"; - for (int i = 0; i < 2000; ++i) { + for (int i = 0; i < 2002; ++i) { int64_t tablet_id = tablet_id_base + i; check_tablet_idx_db_id(txn, db_id, tablet_id); check_tmp_rowset_not_exist(txn, tablet_id, txn_id); @@ -3343,7 +3337,172 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithHugeRowsetMetaTest) { sp->clear_all_call_backs(); sp->clear_trace(); sp->disable_processing(); - config::txn_lazy_max_rowsets_per_batch = 2; +} + +TEST(TxnLazyCommitTest, CommitTxnEventuallyWithSchemaChangeTest) { + // =========================================================================== + // threads concurrent execution flow: + // + // thread1 lazy thread1 thread3 + // | | | + // commit_txn_eventually begin | | + // | | | + // lazy commit wait | | + // | | | + // | make_committed_txn_visible | + // | | | + // | inject TXN_TOO_OLD fdb error | + // | | sc create new tablet tmp rowset + // | | | + // | | | + // retry commit_txn | | + // v v + auto txn_kv = get_mem_txn_kv(); + int64_t db_id = 4534445675; + int64_t table_id = 4365676543; + int64_t index_id = 665453237; + int64_t partition_id = 2136776543678; + + bool go = false; + std::mutex go_mutex; + std::condition_variable go_cv; + std::atomic make_committed_txn_visible_count = {0}; + std::atomic sc_create_tmp_rowset_count = {0}; + std::atomic sc_create_tmp_rowset_finish_count = {0}; + std::atomic tmp_rowsets_been_already_converted = {0}; + + auto sp = SyncPoint::get_instance(); + sp->set_call_back("TxnLazyCommitTask::make_committed_txn_visible::commit", [&](auto&& args) { + LOG(INFO) << "zhangleiyyy"; + { + std::unique_lock _lock(go_mutex); + if (make_committed_txn_visible_count == 0) { + make_committed_txn_visible_count++; + if (sc_create_tmp_rowset_count == 0) { + go_cv.wait(_lock, [&] { return sc_create_tmp_rowset_count == 1; }); + } + MetaServiceCode* code = try_any_cast(args[0]); + *code = MetaServiceCode::KV_TXN_CONFLICT; + bool* pred = try_any_cast(args.back()); + *pred = true; + LOG(INFO) << "inject kv error KV_TXN_CONFLICT"; + go_cv.notify_all(); + } + } + }); + + sp->set_call_back("convert_tmp_rowsets::already_been_converted", [&](auto&& args) { + auto version_pb = *try_any_cast(args[0]); + LOG(INFO) << "version_pb:" << version_pb.ShortDebugString(); + std::unique_lock _lock(go_mutex); + tmp_rowsets_been_already_converted++; + go_cv.notify_all(); + }); + + sp->enable_processing(); + + auto meta_service = get_meta_service(txn_kv, true); + brpc::Controller cntl; + BeginTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + TxnInfoPB txn_info_pb; + txn_info_pb.set_db_id(db_id); + txn_info_pb.set_label("test_sc_with_commit_txn_label"); + txn_info_pb.add_table_ids(table_id); + txn_info_pb.set_timeout_ms(36000); + req.mutable_txn_info()->CopyFrom(txn_info_pb); + BeginTxnResponse res; + meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, + nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + int64_t txn_id = res.txn_id(); + + // mock rowset and tablet + int64_t tablet_id_base = 3131124; + for (int i = 0; i < 1001; ++i) { + create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, + tablet_id_base + i); + auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i, index_id, partition_id); + CreateRowsetResponse res; + commit_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + + std::thread thread1([&] { + { + { + std::unique_lock _lock(go_mutex); + go_cv.wait(_lock, [&] { return go; }); + } + + brpc::Controller cntl; + CommitTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_db_id(db_id); + req.set_txn_id(txn_id); + req.set_is_2pc(false); + req.set_enable_txn_lazy_commit(true); + CommitTxnResponse res; + meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + LOG(INFO) << "thread1 finish"; + }); + + std::thread thread2([&] { + { + { + std::unique_lock _lock(go_mutex); + go_cv.wait(_lock, [&] { return go; }); + } + + { + std::unique_lock _lock(go_mutex); + sc_create_tmp_rowset_count++; + if (make_committed_txn_visible_count == 0) { + go_cv.wait(_lock, [&] { return make_committed_txn_visible_count > 0; }); + } + for (int i = 0; i < 1001; ++i) { + create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, + partition_id, tablet_id_base + i); + auto tmp_rowset = + create_huge_rowset(txn_id, tablet_id_base + i, index_id, partition_id); + CreateRowsetResponse res; + commit_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + LOG(INFO) << "sc_create_tmp_rowset_finish_count finish"; + sc_create_tmp_rowset_finish_count++; + go_cv.notify_all(); + } + LOG(INFO) << "thread2 finish"; + } + }); + + std::unique_lock go_lock(go_mutex); + go = true; + go_lock.unlock(); + go_cv.notify_all(); + + thread1.join(); + thread2.join(); + + ASSERT_GE(tmp_rowsets_been_already_converted, 1); + { + std::unique_ptr txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string mock_instance = "test_instance"; + for (int i = 0; i < 1001; ++i) { + int64_t tablet_id = tablet_id_base + i; + check_tablet_idx_db_id(txn, db_id, tablet_id); + check_tmp_rowset_exist(txn, tablet_id, txn_id); + check_rowset_meta_exist(txn, tablet_id, 2); + } + } + sp->clear_all_call_backs(); + sp->clear_trace(); + sp->disable_processing(); } } // namespace doris::cloud