diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index 26c5c2d86157c1..64e2ee41938163 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -280,7 +280,7 @@ CONF_mInt64(max_txn_commit_byte, "7340032"); CONF_Bool(enable_cloud_txn_lazy_commit, "true"); CONF_Int32(txn_lazy_commit_rowsets_thresold, "1000"); CONF_Int32(txn_lazy_commit_num_threads, "8"); -CONF_Int32(txn_lazy_max_rowsets_per_batch, "1000"); +CONF_mInt64(txn_lazy_max_rowsets_per_batch, "1000"); // max TabletIndexPB num for batch get CONF_Int32(max_tablet_index_num_per_batch, "1000"); CONF_Int32(max_restore_job_rowsets_per_batch, "1000"); diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp b/cloud/src/meta-service/txn_lazy_committer.cpp index 9ef38ec9951a34..c32358af651f2c 100644 --- a/cloud/src/meta-service/txn_lazy_committer.cpp +++ b/cloud/src/meta-service/txn_lazy_committer.cpp @@ -531,7 +531,6 @@ void TxnLazyCommitTask::commit() { LOG(WARNING) << "scan_tmp_rowset failed, txn_id=" << txn_id_ << " code=" << code_; break; } - VLOG_DEBUG << "txn_id=" << txn_id_ << " tmp_rowset_metas.size()=" << all_tmp_rowset_metas.size(); if (all_tmp_rowset_metas.size() == 0) { @@ -541,12 +540,23 @@ void TxnLazyCommitTask::commit() { // std::map>> partition_to_tmp_rowset_metas; + size_t max_rowset_meta_size = 0; for (auto& [tmp_rowset_key, tmp_rowset_pb] : all_tmp_rowset_metas) { partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].emplace_back(); partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].back().first = tmp_rowset_key; partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].back().second = tmp_rowset_pb; + max_rowset_meta_size = std::max(max_rowset_meta_size, tmp_rowset_pb.ByteSizeLong()); + } + + // fdb txn limit 10MB, we use 4MB as the max size for each batch. + size_t max_rowsets_per_batch = config::txn_lazy_max_rowsets_per_batch; + if (max_rowset_meta_size > 0) { + max_rowsets_per_batch = std::min((4UL << 20) / max_rowset_meta_size, + size_t(config::txn_lazy_max_rowsets_per_batch)); + TEST_SYNC_POINT_CALLBACK("TxnLazyCommitTask::commit::max_rowsets_per_batch", + &max_rowsets_per_batch, &max_rowset_meta_size); } for (auto& [partition_id, tmp_rowset_metas] : partition_to_tmp_rowset_metas) { @@ -573,12 +583,10 @@ void TxnLazyCommitTask::commit() { } } - for (size_t i = 0; i < tmp_rowset_metas.size(); - i += config::txn_lazy_max_rowsets_per_batch) { - size_t end = - (i + config::txn_lazy_max_rowsets_per_batch) > tmp_rowset_metas.size() - ? tmp_rowset_metas.size() - : i + config::txn_lazy_max_rowsets_per_batch; + for (size_t i = 0; i < tmp_rowset_metas.size(); i += max_rowsets_per_batch) { + size_t end = (i + max_rowsets_per_batch) > tmp_rowset_metas.size() + ? tmp_rowset_metas.size() + : i + max_rowsets_per_batch; std::vector> sub_partition_tmp_rowset_metas(tmp_rowset_metas.begin() + i, tmp_rowset_metas.begin() + end); diff --git a/cloud/test/txn_lazy_commit_test.cpp b/cloud/test/txn_lazy_commit_test.cpp index cc4da09d575799..08f98da0531af0 100644 --- a/cloud/test/txn_lazy_commit_test.cpp +++ b/cloud/test/txn_lazy_commit_test.cpp @@ -179,6 +179,33 @@ static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, return rowset; } +static doris::RowsetMetaCloudPB create_huge_rowset(int64_t txn_id, int64_t tablet_id, int index_id, + int partition_id, int64_t version = -1, + int num_rows = 100) { + doris::RowsetMetaCloudPB rowset; + rowset.set_rowset_id(0); // required + rowset.set_rowset_id_v2(next_rowset_id()); + rowset.set_tablet_id(tablet_id); + rowset.set_partition_id(partition_id); + rowset.set_index_id(index_id); + rowset.set_txn_id(txn_id); + if (version > 0) { + rowset.set_start_version(version); + rowset.set_end_version(version); + } + rowset.set_num_segments(600); + for (int i = 0; i < 600; i++) { + auto ptr = rowset.add_segments_key_bounds(); + ptr->set_min_key("xxsqewqeqweeqwewqeqeq"); + ptr->set_max_key("dase23452rr234ewdw534523"); + } + rowset.set_num_rows(0); + rowset.set_data_disk_size(0); + rowset.mutable_tablet_schema()->set_schema_version(0); + rowset.set_txn_expiration(::time(nullptr)); // Required by DCHECK + return rowset; +} + static void commit_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset, CreateRowsetResponse& res) { brpc::Controller cntl; @@ -203,6 +230,18 @@ static std::shared_ptr get_mem_txn_kv() { return txn_kv; } +static std::shared_ptr get_fdb_txn_kv() { + int ret = 0; + cloud::config::fdb_cluster_file_path = "fdb.cluster"; + auto fdb_txn_kv = std::dynamic_pointer_cast(std::make_shared()); + if (fdb_txn_kv != nullptr) { + ret = fdb_txn_kv->init(); + [&] { ASSERT_EQ(ret, 0); }(); + } + [&] { ASSERT_NE(fdb_txn_kv.get(), nullptr); }(); + return fdb_txn_kv; +} + static void check_tablet_idx_db_id(std::unique_ptr& txn, int64_t db_id, int64_t tablet_id) { std::string mock_instance = "test_instance"; @@ -2928,4 +2967,104 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithMultiTableTest) { sp->disable_processing(); } +TEST(TxnLazyCommitTest, CommitTxnEventuallyWithHugeRowsetMetaTest) { + config::txn_lazy_max_rowsets_per_batch = 1000; + auto txn_kv = get_fdb_txn_kv(); + int64_t db_id = 14135425; + int64_t table_id = 31245456; + int64_t index_id = 434324; + int64_t partition_id = 3215764; + + int64_t table_id2 = 213476; + int64_t index_id2 = 126765; + int64_t partition_id2 = 214567; + bool commit_txn_eventually_finish_hit = false; + + auto sp = SyncPoint::get_instance(); + sp->set_call_back("commit_txn_eventually::task->wait", [&](auto&& args) { + auto [code, msg] = *try_any_cast*>(args[0]); + ASSERT_EQ(code, MetaServiceCode::OK); + commit_txn_eventually_finish_hit = true; + }); + + sp->set_call_back("TxnLazyCommitTask::commit::max_rowsets_per_batch", [&](auto&& args) { + size_t max_rowsets_per_batch = *try_any_cast(args[0]); + size_t max_rowset_meta_size = *try_any_cast(args[1]); + LOG(INFO) << "max_rowsets_per_batch:" << max_rowsets_per_batch + << " max_rowset_meta_size:" << max_rowset_meta_size; + ASSERT_EQ(max_rowsets_per_batch, 134); + }); + + sp->enable_processing(); + + auto meta_service = get_meta_service(txn_kv, true); + brpc::Controller cntl; + BeginTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + TxnInfoPB txn_info_pb; + txn_info_pb.set_db_id(db_id); + txn_info_pb.set_label("test_label_multi_table_commit_txn"); + txn_info_pb.add_table_ids(table_id); + txn_info_pb.add_table_ids(table_id2); + txn_info_pb.set_timeout_ms(36000); + req.mutable_txn_info()->CopyFrom(txn_info_pb); + BeginTxnResponse res; + meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, + nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + int64_t txn_id = res.txn_id(); + + // mock rowset and tablet + int64_t tablet_id_base = 3131124; + for (int i = 0; i < 1000; ++i) { + create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, + tablet_id_base + i); + auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i, index_id, partition_id); + CreateRowsetResponse res; + commit_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + + for (int i = 1000; i < 2000; ++i) { + create_tablet_with_db_id(meta_service.get(), db_id, table_id2, index_id2, partition_id2, + tablet_id_base + i); + auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i, index_id2, partition_id2); + CreateRowsetResponse res; + commit_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + + { + brpc::Controller cntl; + CommitTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_db_id(db_id); + req.set_txn_id(txn_id); + req.set_is_2pc(false); + req.set_enable_txn_lazy_commit(true); + CommitTxnResponse res; + meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, + &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + ASSERT_TRUE(commit_txn_eventually_finish_hit); + } + + { + std::unique_ptr txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string mock_instance = "test_instance"; + for (int i = 0; i < 2000; ++i) { + int64_t tablet_id = tablet_id_base + i; + check_tablet_idx_db_id(txn, db_id, tablet_id); + check_tmp_rowset_not_exist(txn, tablet_id, txn_id); + check_rowset_meta_exist(txn, tablet_id, 2); + } + } + + sp->clear_all_call_backs(); + sp->clear_trace(); + sp->disable_processing(); + config::txn_lazy_max_rowsets_per_batch = 2; +} + } // namespace doris::cloud