Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2648,6 +2648,11 @@ int InstanceRecycler::recycle_expired_txn_label() {
};

auto loop_done = [&]() -> int {
std::unique_ptr<int, std::function<void(int*)>> defer(
(int*)0x01, [&](int*) { recycle_txn_info_keys.clear(); });
TEST_SYNC_POINT_CALLBACK(
"InstanceRecycler::recycle_expired_txn_label.check_recycle_txn_info_keys",
&recycle_txn_info_keys);
for (const auto& k : recycle_txn_info_keys) {
concurrent_delete_executor.add([&]() {
if (delete_recycle_txn_kv(k) != 0) {
Expand All @@ -2669,14 +2674,15 @@ int InstanceRecycler::recycle_expired_txn_label() {

ret = finished ? ret : -1;

TEST_SYNC_POINT_CALLBACK("InstanceRecycler::recycle_expired_txn_label.failure", &ret);

if (ret != 0) {
LOG_WARNING("recycle txn kv ret!=0")
.tag("finished", finished)
.tag("ret", ret)
.tag("instance_id", instance_id_);
return ret;
}
recycle_txn_info_keys.clear();
return ret;
};

Expand Down
55 changes: 55 additions & 0 deletions cloud/test/recycler_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4329,4 +4329,59 @@ TEST(RecyclerTest, concurrent_recycle_txn_label_test) {
<< "ms" << std::endl;
check_multiple_txn_info_kvs(txn_kv, 2000);
}

TEST(RecyclerTest, concurrent_recycle_txn_label_failure_test) {
config::label_keep_max_second = 259200;
doris::cloud::RecyclerThreadPoolGroup recycle_txn_label_thread_group;
config::recycle_pool_parallelism = 40;
auto recycle_txn_label_s3_producer_pool =
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
recycle_txn_label_s3_producer_pool->start();
auto recycle_txn_label_recycle_tablet_pool =
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
recycle_txn_label_recycle_tablet_pool->start();
auto recycle_txn_label_group_recycle_function_pool =
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
recycle_txn_label_group_recycle_function_pool->start();
recycle_txn_label_thread_group =
RecyclerThreadPoolGroup(std::move(recycle_txn_label_s3_producer_pool),
std::move(recycle_txn_label_recycle_tablet_pool),
std::move(recycle_txn_label_group_recycle_function_pool));

auto mem_txn_kv = std::make_shared<MemTxnKv>();

auto txn_kv = mem_txn_kv;
ASSERT_TRUE(txn_kv.get()) << "exit get MemTxnKv error" << std::endl;
make_multiple_txn_info_kvs(txn_kv, 20000, 15000);
check_multiple_txn_info_kvs(txn_kv, 20000);

auto* sp = SyncPoint::get_instance();
std::unique_ptr<int, std::function<void(int*)>> defer(
(int*)0x01, [](int*) { SyncPoint::get_instance()->clear_all_call_backs(); });
sp->set_call_back("InstanceRecycler::recycle_expired_txn_label.check_recycle_txn_info_keys",
[](auto&& args) {
auto* recycle_txn_info_keys =
try_any_cast<std::vector<std::string>*>(args[0]);

ASSERT_LE(recycle_txn_info_keys->size(), 10000);
});
sp->set_call_back("InstanceRecycler::recycle_expired_txn_label.failure", [](auto&& args) {
auto* ret = try_any_cast<int*>(args[0]);
*ret = -1;
});
sp->enable_processing();

InstanceInfoPB instance;
instance.set_instance_id(instance_id);
InstanceRecycler recycler(txn_kv, instance, recycle_txn_label_thread_group,
std::make_shared<TxnLazyCommitter>(txn_kv));
ASSERT_EQ(recycler.init(), 0);
auto start = std::chrono::steady_clock::now();
ASSERT_EQ(recycler.recycle_expired_txn_label(), -1);
auto finish = std::chrono::steady_clock::now();
std::cout << "recycle expired txn label cost="
<< std::chrono::duration_cast<std::chrono::milliseconds>(finish - start).count()
<< "ms" << std::endl;
check_multiple_txn_info_kvs(txn_kv, 5000);
}
} // namespace doris::cloud
Loading