Skip to content

Commit

Permalink
KVStore: Fix test of async tasks (#8928)
Browse files Browse the repository at this point in the history
close #8926
  • Loading branch information
CalvinNeo authored Apr 10, 2024
1 parent adc57e2 commit 8cec573
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 11 deletions.
36 changes: 36 additions & 0 deletions dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,42 @@ void LearnerReadWorker::waitIndex(
stats.wait_index_elapsed_ms,
stats.num_regions,
unavailable_regions.size());

auto bypass_formatter = [&](const RegionQueryInfo & query_info) -> String {
if (query_info.bypass_lock_ts == nullptr)
return "";
FmtBuffer buffer;
buffer.append("[");
buffer.joinStr(
query_info.bypass_lock_ts->begin(),
query_info.bypass_lock_ts->end(),
[](const auto & v, FmtBuffer & f) { f.fmtAppend("{}", v); },
"|");
buffer.append("]");
return buffer.toString();
};
auto region_info_formatter = [&]() -> String {
FmtBuffer buffer;
buffer.joinStr(
regions_info.begin(),
regions_info.end(),
[&](const auto & region_to_query, FmtBuffer & f) {
const auto & region = regions_snapshot.find(region_to_query.region_id)->second;
f.fmtAppend(
"(id:{} applied_index:{} bypass_locks:{})",
region_to_query.region_id,
region->appliedIndex(),
bypass_formatter(region_to_query));
},
";");
return buffer.toString();
};

LOG_DEBUG(
log,
"[Learner Read] Learner Read Summary, regions_info={}, unavailable_regions_info={}",
region_info_formatter(),
unavailable_regions.toDebugString());
}

std::tuple<Clock::time_point, Clock::time_point> //
Expand Down
20 changes: 20 additions & 0 deletions dbms/src/Storages/KVStore/Read/LearnerReadWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,26 @@ struct UnavailableRegions

void addRegionWaitIndexTimeout(RegionID region_id, UInt64 index_to_wait, UInt64 current_applied_index);

String toDebugString() const
{
FmtBuffer buffer;
buffer.append("{ids=[");
buffer.joinStr(
ids.begin(),
ids.end(),
[](const auto & v, FmtBuffer & f) { f.fmtAppend("{}", v); },
"|");
buffer.append("] locks=");
buffer.append("[");
buffer.joinStr(
region_locks.begin(),
region_locks.end(),
[](const auto & v, FmtBuffer & f) { f.fmtAppend("{}({})", v.first, v.second->DebugString()); },
"|");
buffer.append("]}");
return buffer.toString();
}

private:
const bool batch_cop;
const bool is_wn_disagg_read;
Expand Down
37 changes: 26 additions & 11 deletions dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,35 +28,50 @@ TEST(AsyncTasksTest, AsyncTasksNormal)
auto log = DB::Logger::get();
LOG_INFO(log, "Cancel and addTask");
// Cancel and addTask
// 3 -> 1 -> 4 -> 2
{
auto async_tasks = std::make_unique<TestAsyncTasks>(1, 1, 2);
auto m = std::make_shared<std::mutex>();
int flag = 0;
std::unique_lock cl(*m);
std::atomic_bool finished_flag = false;
async_tasks->addTask(1, [m, &flag, &async_tasks, &finished_flag]() {
std::atomic_bool running_flag = false;
async_tasks->addTask(1, [m, &flag, &async_tasks, &finished_flag, &running_flag]() {
running_flag.store(true, std::memory_order_seq_cst);
auto cancel_handle = async_tasks->getCancelHandleFromExecutor(1);
std::scoped_lock rl(*m); // 2
SCOPE_EXIT({ finished_flag.store(true); });
std::scoped_lock rl(*m);
SCOPE_EXIT({ finished_flag.store(true, std::memory_order_seq_cst); });
// Run after `cl` is released.
if (cancel_handle->isCanceled())
{
return;
}
flag = 1;
});
ASSERT_TRUE(async_tasks->isScheduled(1));
{
int cnt_wait_sche = 0;
while (!running_flag.load(std::memory_order_seq_cst))
{
cnt_wait_sche += 1;
ASSERT(cnt_wait_sche < 6);
std::this_thread::sleep_for(200ms);
}
}
// Make sure we don't cancel in queue.
async_tasks->asyncCancelTask(1);
// The task is not registered anymore.
ASSERT_FALSE(async_tasks->isScheduled(1));
async_tasks->addTask(1, [&flag]() { flag = 2; });
cl.unlock(); // Now can task 1 run.
int count = 0;
using namespace std::chrono_literals;
while (!finished_flag.load())
cl.unlock();
{
count += 1;
ASSERT(count < 6);
std::this_thread::sleep_for(200ms);
int cnt_wait_finish = 0;
using namespace std::chrono_literals;
while (!finished_flag.load(std::memory_order_seq_cst))
{
cnt_wait_finish += 1;
ASSERT(cnt_wait_finish < 6);
std::this_thread::sleep_for(200ms);
}
}
ASSERT_NO_THROW(async_tasks->fetchResult(1));
ASSERT_EQ(flag, 2);
Expand Down

0 comments on commit 8cec573

Please sign in to comment.