diff --git a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp index 1da417b5d59..03b9219ed32 100644 --- a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp +++ b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp @@ -431,6 +431,42 @@ void LearnerReadWorker::waitIndex( stats.wait_index_elapsed_ms, stats.num_regions, unavailable_regions.size()); + + auto bypass_formatter = [&](const RegionQueryInfo & query_info) -> String { + if (query_info.bypass_lock_ts == nullptr) + return ""; + FmtBuffer buffer; + buffer.append("["); + buffer.joinStr( + query_info.bypass_lock_ts->begin(), + query_info.bypass_lock_ts->end(), + [](const auto & v, FmtBuffer & f) { f.fmtAppend("{}", v); }, + "|"); + buffer.append("]"); + return buffer.toString(); + }; + auto region_info_formatter = [&]() -> String { + FmtBuffer buffer; + buffer.joinStr( + regions_info.begin(), + regions_info.end(), + [&](const auto & region_to_query, FmtBuffer & f) { + const auto & region = regions_snapshot.find(region_to_query.region_id)->second; + f.fmtAppend( + "(id:{} applied_index:{} bypass_locks:{})", + region_to_query.region_id, + region->appliedIndex(), + bypass_formatter(region_to_query)); + }, + ";"); + return buffer.toString(); + }; + + LOG_DEBUG( + log, + "[Learner Read] Learner Read Summary, regions_info={}, unavailable_regions_info={}", + region_info_formatter(), + unavailable_regions.toDebugString()); } std::tuple // diff --git a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.h b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.h index d9bf888b0e9..f546257bea7 100644 --- a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.h +++ b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.h @@ -71,6 +71,26 @@ struct UnavailableRegions void addRegionWaitIndexTimeout(RegionID region_id, UInt64 index_to_wait, UInt64 current_applied_index); + String toDebugString() const + { + FmtBuffer buffer; + buffer.append("{ids=["); + buffer.joinStr( + ids.begin(), + ids.end(), + [](const auto & v, FmtBuffer & f) { f.fmtAppend("{}", v); }, + "|"); + buffer.append("] locks="); + buffer.append("["); + buffer.joinStr( + region_locks.begin(), + region_locks.end(), + [](const auto & v, FmtBuffer & f) { f.fmtAppend("{}({})", v.first, v.second->DebugString()); }, + "|"); + buffer.append("]}"); + return buffer.toString(); + } + private: const bool batch_cop; const bool is_wn_disagg_read; diff --git a/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp b/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp index 1160137161f..e988b1e40b9 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp @@ -28,17 +28,18 @@ TEST(AsyncTasksTest, AsyncTasksNormal) auto log = DB::Logger::get(); LOG_INFO(log, "Cancel and addTask"); // Cancel and addTask - // 3 -> 1 -> 4 -> 2 { auto async_tasks = std::make_unique(1, 1, 2); auto m = std::make_shared(); int flag = 0; std::unique_lock cl(*m); std::atomic_bool finished_flag = false; - async_tasks->addTask(1, [m, &flag, &async_tasks, &finished_flag]() { + std::atomic_bool running_flag = false; + async_tasks->addTask(1, [m, &flag, &async_tasks, &finished_flag, &running_flag]() { + running_flag.store(true, std::memory_order_seq_cst); auto cancel_handle = async_tasks->getCancelHandleFromExecutor(1); - std::scoped_lock rl(*m); // 2 - SCOPE_EXIT({ finished_flag.store(true); }); + std::scoped_lock rl(*m); + SCOPE_EXIT({ finished_flag.store(true, std::memory_order_seq_cst); }); // Run after `cl` is released. if (cancel_handle->isCanceled()) { @@ -46,17 +47,31 @@ TEST(AsyncTasksTest, AsyncTasksNormal) } flag = 1; }); + ASSERT_TRUE(async_tasks->isScheduled(1)); + { + int cnt_wait_sche = 0; + while (!running_flag.load(std::memory_order_seq_cst)) + { + cnt_wait_sche += 1; + ASSERT(cnt_wait_sche < 6); + std::this_thread::sleep_for(200ms); + } + } + // Make sure we don't cancel in queue. async_tasks->asyncCancelTask(1); + // The task is not registered anymore. ASSERT_FALSE(async_tasks->isScheduled(1)); async_tasks->addTask(1, [&flag]() { flag = 2; }); - cl.unlock(); // Now can task 1 run. - int count = 0; - using namespace std::chrono_literals; - while (!finished_flag.load()) + cl.unlock(); { - count += 1; - ASSERT(count < 6); - std::this_thread::sleep_for(200ms); + int cnt_wait_finish = 0; + using namespace std::chrono_literals; + while (!finished_flag.load(std::memory_order_seq_cst)) + { + cnt_wait_finish += 1; + ASSERT(cnt_wait_finish < 6); + std::this_thread::sleep_for(200ms); + } } ASSERT_NO_THROW(async_tasks->fetchResult(1)); ASSERT_EQ(flag, 2);