Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ CONF_mInt64(max_txn_commit_byte, "7340032");
CONF_Bool(enable_cloud_txn_lazy_commit, "true");
CONF_Int32(txn_lazy_commit_rowsets_thresold, "1000");
CONF_Int32(txn_lazy_commit_num_threads, "8");
CONF_Int32(txn_lazy_max_rowsets_per_batch, "1000");
CONF_mInt64(txn_lazy_max_rowsets_per_batch, "1000");
// max TabletIndexPB num for batch get
CONF_Int32(max_tablet_index_num_per_batch, "1000");
CONF_Int32(max_restore_job_rowsets_per_batch, "1000");
Expand Down
22 changes: 15 additions & 7 deletions cloud/src/meta-service/txn_lazy_committer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,6 @@ void TxnLazyCommitTask::commit() {
LOG(WARNING) << "scan_tmp_rowset failed, txn_id=" << txn_id_ << " code=" << code_;
break;
}

VLOG_DEBUG << "txn_id=" << txn_id_
<< " tmp_rowset_metas.size()=" << all_tmp_rowset_metas.size();
if (all_tmp_rowset_metas.size() == 0) {
Expand All @@ -541,12 +540,23 @@ void TxnLazyCommitTask::commit() {
// <partition_id, tmp_rowsets>
std::map<int64_t, std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>>
partition_to_tmp_rowset_metas;
size_t max_rowset_meta_size = 0;
for (auto& [tmp_rowset_key, tmp_rowset_pb] : all_tmp_rowset_metas) {
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].emplace_back();
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].back().first =
tmp_rowset_key;
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].back().second =
tmp_rowset_pb;
max_rowset_meta_size = std::max(max_rowset_meta_size, tmp_rowset_pb.ByteSizeLong());
}

// fdb txn limit 10MB, we use 4MB as the max size for each batch.
size_t max_rowsets_per_batch = config::txn_lazy_max_rowsets_per_batch;
if (max_rowset_meta_size > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4L << 20 / max_rowset_meta_size 这个表达是是错的 应该会先算 除法 再移位 .
这里最好加个syncpoint 用来观察 实际 max_rowset_meta_size 是多少, 在单测里通通过 syncpoint来 check 它的值

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4L << 20 / max_rowset_meta_size 这个表达是是错的 应该会先算 除法 再移位 . 这里最好加个syncpoint 用来观察 实际 max_rowset_meta_size 是多少, 在单测里通通过 syncpoint来 check 它的值

done

max_rowsets_per_batch = std::min((4UL << 20) / max_rowset_meta_size,
size_t(config::txn_lazy_max_rowsets_per_batch));
TEST_SYNC_POINT_CALLBACK("TxnLazyCommitTask::commit::max_rowsets_per_batch",
&max_rowsets_per_batch, &max_rowset_meta_size);
}

for (auto& [partition_id, tmp_rowset_metas] : partition_to_tmp_rowset_metas) {
Expand All @@ -573,12 +583,10 @@ void TxnLazyCommitTask::commit() {
}
}

for (size_t i = 0; i < tmp_rowset_metas.size();
i += config::txn_lazy_max_rowsets_per_batch) {
size_t end =
(i + config::txn_lazy_max_rowsets_per_batch) > tmp_rowset_metas.size()
? tmp_rowset_metas.size()
: i + config::txn_lazy_max_rowsets_per_batch;
for (size_t i = 0; i < tmp_rowset_metas.size(); i += max_rowsets_per_batch) {
size_t end = (i + max_rowsets_per_batch) > tmp_rowset_metas.size()
? tmp_rowset_metas.size()
: i + max_rowsets_per_batch;
std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>
sub_partition_tmp_rowset_metas(tmp_rowset_metas.begin() + i,
tmp_rowset_metas.begin() + end);
Expand Down
139 changes: 139 additions & 0 deletions cloud/test/txn_lazy_commit_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,33 @@ static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id,
return rowset;
}

static doris::RowsetMetaCloudPB create_huge_rowset(int64_t txn_id, int64_t tablet_id, int index_id,
int partition_id, int64_t version = -1,
int num_rows = 100) {
doris::RowsetMetaCloudPB rowset;
rowset.set_rowset_id(0); // required
rowset.set_rowset_id_v2(next_rowset_id());
rowset.set_tablet_id(tablet_id);
rowset.set_partition_id(partition_id);
rowset.set_index_id(index_id);
rowset.set_txn_id(txn_id);
if (version > 0) {
rowset.set_start_version(version);
rowset.set_end_version(version);
}
rowset.set_num_segments(600);
for (int i = 0; i < 600; i++) {
auto ptr = rowset.add_segments_key_bounds();
ptr->set_min_key("xxsqewqeqweeqwewqeqeq");
ptr->set_max_key("dase23452rr234ewdw534523");
}
rowset.set_num_rows(0);
rowset.set_data_disk_size(0);
rowset.mutable_tablet_schema()->set_schema_version(0);
rowset.set_txn_expiration(::time(nullptr)); // Required by DCHECK
return rowset;
}

static void commit_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset,
CreateRowsetResponse& res) {
brpc::Controller cntl;
Expand All @@ -203,6 +230,18 @@ static std::shared_ptr<TxnKv> get_mem_txn_kv() {
return txn_kv;
}

static std::shared_ptr<TxnKv> get_fdb_txn_kv() {
int ret = 0;
cloud::config::fdb_cluster_file_path = "fdb.cluster";
auto fdb_txn_kv = std::dynamic_pointer_cast<cloud::TxnKv>(std::make_shared<cloud::FdbTxnKv>());
if (fdb_txn_kv != nullptr) {
ret = fdb_txn_kv->init();
[&] { ASSERT_EQ(ret, 0); }();
}
[&] { ASSERT_NE(fdb_txn_kv.get(), nullptr); }();
return fdb_txn_kv;
}

static void check_tablet_idx_db_id(std::unique_ptr<Transaction>& txn, int64_t db_id,
int64_t tablet_id) {
std::string mock_instance = "test_instance";
Expand Down Expand Up @@ -2928,4 +2967,104 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithMultiTableTest) {
sp->disable_processing();
}

TEST(TxnLazyCommitTest, CommitTxnEventuallyWithHugeRowsetMetaTest) {
config::txn_lazy_max_rowsets_per_batch = 1000;
auto txn_kv = get_fdb_txn_kv();
int64_t db_id = 14135425;
int64_t table_id = 31245456;
int64_t index_id = 434324;
int64_t partition_id = 3215764;

int64_t table_id2 = 213476;
int64_t index_id2 = 126765;
int64_t partition_id2 = 214567;
bool commit_txn_eventually_finish_hit = false;

auto sp = SyncPoint::get_instance();
sp->set_call_back("commit_txn_eventually::task->wait", [&](auto&& args) {
auto [code, msg] = *try_any_cast<std::pair<MetaServiceCode, std::string>*>(args[0]);
ASSERT_EQ(code, MetaServiceCode::OK);
commit_txn_eventually_finish_hit = true;
});

sp->set_call_back("TxnLazyCommitTask::commit::max_rowsets_per_batch", [&](auto&& args) {
size_t max_rowsets_per_batch = *try_any_cast<size_t*>(args[0]);
size_t max_rowset_meta_size = *try_any_cast<size_t*>(args[1]);
LOG(INFO) << "max_rowsets_per_batch:" << max_rowsets_per_batch
<< " max_rowset_meta_size:" << max_rowset_meta_size;
ASSERT_EQ(max_rowsets_per_batch, 134);
});

sp->enable_processing();

auto meta_service = get_meta_service(txn_kv, true);
brpc::Controller cntl;
BeginTxnRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
TxnInfoPB txn_info_pb;
txn_info_pb.set_db_id(db_id);
txn_info_pb.set_label("test_label_multi_table_commit_txn");
txn_info_pb.add_table_ids(table_id);
txn_info_pb.add_table_ids(table_id2);
txn_info_pb.set_timeout_ms(36000);
req.mutable_txn_info()->CopyFrom(txn_info_pb);
BeginTxnResponse res;
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res,
nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
int64_t txn_id = res.txn_id();

// mock rowset and tablet
int64_t tablet_id_base = 3131124;
for (int i = 0; i < 1000; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id,
tablet_id_base + i);
auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i, index_id, partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}

for (int i = 1000; i < 2000; ++i) {
create_tablet_with_db_id(meta_service.get(), db_id, table_id2, index_id2, partition_id2,
tablet_id_base + i);
auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i, index_id2, partition_id2);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}

{
brpc::Controller cntl;
CommitTxnRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
req.set_db_id(db_id);
req.set_txn_id(txn_id);
req.set_is_2pc(false);
req.set_enable_txn_lazy_commit(true);
CommitTxnResponse res;
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req,
&res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
ASSERT_TRUE(commit_txn_eventually_finish_hit);
}

{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string mock_instance = "test_instance";
for (int i = 0; i < 2000; ++i) {
int64_t tablet_id = tablet_id_base + i;
check_tablet_idx_db_id(txn, db_id, tablet_id);
check_tmp_rowset_not_exist(txn, tablet_id, txn_id);
check_rowset_meta_exist(txn, tablet_id, 2);
}
}

sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
config::txn_lazy_max_rowsets_per_batch = 2;
}

} // namespace doris::cloud
Loading