diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 0cee75994e65fa..307beab63d40b0 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -2635,6 +2635,11 @@ int InstanceRecycler::recycle_expired_txn_label() { }; auto loop_done = [&]() -> int { + std::unique_ptr> defer( + (int*)0x01, [&](int*) { recycle_txn_info_keys.clear(); }); + TEST_SYNC_POINT_CALLBACK( + "InstanceRecycler::recycle_expired_txn_label.check_recycle_txn_info_keys", + &recycle_txn_info_keys); for (const auto& k : recycle_txn_info_keys) { concurrent_delete_executor.add([&]() { if (delete_recycle_txn_kv(k) != 0) { @@ -2656,6 +2661,8 @@ int InstanceRecycler::recycle_expired_txn_label() { ret = finished ? ret : -1; + TEST_SYNC_POINT_CALLBACK("InstanceRecycler::recycle_expired_txn_label.failure", &ret); + if (ret != 0) { LOG_WARNING("recycle txn kv ret!=0") .tag("finished", finished) @@ -2663,7 +2670,6 @@ int InstanceRecycler::recycle_expired_txn_label() { .tag("instance_id", instance_id_); return ret; } - recycle_txn_info_keys.clear(); return ret; }; diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index 6ef782b9c5fb5e..bcd7dd39160651 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -4285,4 +4285,59 @@ TEST(RecyclerTest, concurrent_recycle_txn_label_test) { << "ms" << std::endl; check_multiple_txn_info_kvs(txn_kv, 2000); } + +TEST(RecyclerTest, concurrent_recycle_txn_label_failure_test) { + config::label_keep_max_second = 259200; + doris::cloud::RecyclerThreadPoolGroup recycle_txn_label_thread_group; + config::recycle_pool_parallelism = 40; + auto recycle_txn_label_s3_producer_pool = + std::make_shared(config::recycle_pool_parallelism); + recycle_txn_label_s3_producer_pool->start(); + auto recycle_txn_label_recycle_tablet_pool = + std::make_shared(config::recycle_pool_parallelism); + recycle_txn_label_recycle_tablet_pool->start(); + auto recycle_txn_label_group_recycle_function_pool = + std::make_shared(config::recycle_pool_parallelism); + recycle_txn_label_group_recycle_function_pool->start(); + recycle_txn_label_thread_group = + RecyclerThreadPoolGroup(std::move(recycle_txn_label_s3_producer_pool), + std::move(recycle_txn_label_recycle_tablet_pool), + std::move(recycle_txn_label_group_recycle_function_pool)); + + auto mem_txn_kv = std::make_shared(); + + auto txn_kv = mem_txn_kv; + ASSERT_TRUE(txn_kv.get()) << "exit get MemTxnKv error" << std::endl; + make_multiple_txn_info_kvs(txn_kv, 20000, 15000); + check_multiple_txn_info_kvs(txn_kv, 20000); + + auto* sp = SyncPoint::get_instance(); + std::unique_ptr> defer( + (int*)0x01, [](int*) { SyncPoint::get_instance()->clear_all_call_backs(); }); + sp->set_call_back("InstanceRecycler::recycle_expired_txn_label.check_recycle_txn_info_keys", + [](auto&& args) { + auto* recycle_txn_info_keys = + try_any_cast*>(args[0]); + + ASSERT_LE(recycle_txn_info_keys->size(), 10000); + }); + sp->set_call_back("InstanceRecycler::recycle_expired_txn_label.failure", [](auto&& args) { + auto* ret = try_any_cast(args[0]); + *ret = -1; + }); + sp->enable_processing(); + + InstanceInfoPB instance; + instance.set_instance_id(instance_id); + InstanceRecycler recycler(txn_kv, instance, recycle_txn_label_thread_group, + std::make_shared(txn_kv)); + ASSERT_EQ(recycler.init(), 0); + auto start = std::chrono::steady_clock::now(); + ASSERT_EQ(recycler.recycle_expired_txn_label(), -1); + auto finish = std::chrono::steady_clock::now(); + std::cout << "recycle expired txn label cost=" + << std::chrono::duration_cast(finish - start).count() + << "ms" << std::endl; + check_multiple_txn_info_kvs(txn_kv, 5000); +} } // namespace doris::cloud