Skip to content

Commit

Permalink
KVStore: Improve codes for read index worker (#8873)
Browse files Browse the repository at this point in the history
ref #8864
  • Loading branch information
CalvinNeo authored Mar 27, 2024
1 parent ce8ae39 commit 38e4272
Show file tree
Hide file tree
Showing 15 changed files with 1,283 additions and 933 deletions.
7 changes: 7 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -533,9 +533,16 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
"Bucketed snapshot total size", \
Histogram, \
F(type_approx_raft_snapshot, {{"type", "approx_raft_snapshot"}}, ExpBuckets{1024, 2, 24})) /* 16G */ \
M(tiflash_raft_read_index_events_count, \
"Raft read index events counter", \
Counter, \
F(type_use_histroy, {{"type", "use_histroy"}}), \
F(type_use_cache, {{"type", "use_cache"}})) \
M(tiflash_raft_learner_read_failures_count, \
"Raft learner read failure reason counter", \
Counter, \
F(type_request_error, {{"type", "request_error"}}), \
F(type_read_index_timeout, {{"type", "read_index_timeout"}}), \
F(type_not_found_tiflash, {{"type", "not_found_tiflash"}}), \
F(type_epoch_not_match, {{"type", "epoch_not_match"}}), \
F(type_not_leader, {{"type", "not_leader"}}), \
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/CoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ grpc::Status CoprocessorHandler<is_stream>::execute()
case RegionException::RegionReadStatus::FLASHBACK:
case RegionException::RegionReadStatus::KEY_NOT_IN_REGION:
case RegionException::RegionReadStatus::TIKV_SERVER_ISSUE:
case RegionException::RegionReadStatus::READ_INDEX_TIMEOUT:
case RegionException::RegionReadStatus::NOT_LEADER:
case RegionException::RegionReadStatus::NOT_FOUND_TIKV:
case RegionException::RegionReadStatus::NOT_FOUND:
Expand Down
77 changes: 77 additions & 0 deletions dbms/src/Storages/KVStore/Read/AsyncNotifier.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Storages/KVStore/Read/ReadIndexWorkerImpl.h>

namespace DB
{
AsyncNotifier::Status AsyncWaker::Notifier::blockedWaitUtil(const SteadyClock::time_point & time_point)
{
// if flag from false to false, wait for notification.
// if flag from true to false, do nothing.
auto res = AsyncNotifier::Status::Normal;
if (!is_awake->exchange(false, std::memory_order_acq_rel))
{
{
auto lock = genUniqueLock();
if (!is_awake->load(std::memory_order_acquire))
{
if (condVar().wait_until(lock, time_point) == std::cv_status::timeout)
res = AsyncNotifier::Status::Timeout;
}
}
is_awake->store(false, std::memory_order_release);
}
return res;
}

void AsyncWaker::Notifier::wake() NO_THREAD_SAFETY_ANALYSIS
{
// if flag from false -> true, then wake up.
// if flag from true -> true, do nothing.
if (is_awake->load(std::memory_order_acquire))
return;
if (!is_awake->exchange(true, std::memory_order_acq_rel))
{
// wake up notifier
auto _ = genLockGuard();
condVar().notify_one();
}
}

void AsyncWaker::wake(RawVoidPtr notifier_)
{
auto & notifier = *reinterpret_cast<AsyncNotifier *>(notifier_);
notifier.wake();
}

AsyncWaker::AsyncWaker(const TiFlashRaftProxyHelper & helper_)
: AsyncWaker(helper_, new AsyncWaker::Notifier{})
{}

AsyncWaker::AsyncWaker(const TiFlashRaftProxyHelper & helper_, AsyncNotifier * notifier_)
: inner(helper_.makeAsyncWaker(AsyncWaker::wake, GenRawCppPtr(notifier_, RawCppPtrTypeImpl::WakerNotifier)))
, notifier(*notifier_)
{}

AsyncNotifier::Status AsyncWaker::waitUtil(SteadyClock::time_point time_point)
{
return notifier.blockedWaitUtil(time_point);
}

RawVoidPtr AsyncWaker::getRaw() const
{
return inner.ptr;
}
} // namespace DB
22 changes: 17 additions & 5 deletions dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ void UnavailableRegions::addRegionWaitIndexTimeout(
current_applied_index);
}

/// LearnerReadWorker ///

LearnerReadWorker::LearnerReadWorker(
MvccQueryInfo & mvcc_query_info_,
TMTContext & tmt_,
Expand Down Expand Up @@ -134,6 +132,7 @@ std::vector<kvrpcpb::ReadIndexRequest> LearnerReadWorker::buildBatchReadIndexReq

if (auto ori_read_index = mvcc_query_info.getReadIndexRes(region_id); ori_read_index)
{
GET_METRIC(tiflash_raft_read_index_events_count, type_use_cache).Increment();
// the read index result from cache
auto resp = kvrpcpb::ReadIndexResponse();
resp.set_read_index(ori_read_index);
Expand Down Expand Up @@ -204,6 +203,8 @@ void LearnerReadWorker::recordReadIndexError(
auto region_status = RegionException::RegionReadStatus::OTHER;
if (region_error.has_epoch_not_match())
{
// 1. From TiKV
// 2. Find a TiKV mem lock of start_ts, and retry all other ts in the batch
auto snapshot_region_iter = regions_snapshot.find(region_id);
if (snapshot_region_iter != regions_snapshot.end())
{
Expand All @@ -227,6 +228,9 @@ void LearnerReadWorker::recordReadIndexError(
}
else if (region_error.has_region_not_found())
{
// 1. From TiKV
// 2. Can't send read index request
// 3. Read index timeout
GET_METRIC(tiflash_raft_learner_read_failures_count, type_not_found_tikv).Increment();
region_status = RegionException::RegionReadStatus::NOT_FOUND_TIKV;
}
Expand Down Expand Up @@ -256,10 +260,18 @@ void LearnerReadWorker::recordReadIndexError(
region_id);
region_status = RegionException::RegionReadStatus::KEY_NOT_IN_REGION;
}
else if (region_error.has_server_is_busy())
{
// 1. From TiKV
// 2. Read index request timeout
GET_METRIC(tiflash_raft_learner_read_failures_count, type_read_index_timeout).Increment();
LOG_DEBUG(log, "meet abnormal region error {}, [region_id={}]", resp.ShortDebugString(), region_id);
region_status = RegionException::RegionReadStatus::READ_INDEX_TIMEOUT;
}
else if (
region_error.has_server_is_busy() || region_error.has_raft_entry_too_large()
|| region_error.has_region_not_initialized() || region_error.has_disk_full()
|| region_error.has_read_index_not_ready() || region_error.has_proposal_in_merging_mode())
region_error.has_raft_entry_too_large() || region_error.has_region_not_initialized()
|| region_error.has_disk_full() || region_error.has_read_index_not_ready()
|| region_error.has_proposal_in_merging_mode())
{
GET_METRIC(tiflash_raft_learner_read_failures_count, type_tikv_server_issue).Increment();
LOG_DEBUG(log, "meet abnormal region error {}, [region_id={}]", resp.ShortDebugString(), region_id);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/KVStore/Read/LearnerReadWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ struct UnavailableRegions
};

using RegionsReadIndexResult = std::unordered_map<RegionID, kvrpcpb::ReadIndexResponse>;

/// LearnerReadWorker is serves all read index requests in a query.
class LearnerReadWorker
{
public:
Expand Down
131 changes: 131 additions & 0 deletions dbms/src/Storages/KVStore/Read/Proxy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Storages/KVStore/Read/ReadIndexWorkerImpl.h>

namespace DB
{
BatchReadIndexRes TiFlashRaftProxyHelper::batchReadIndex(
const std::vector<kvrpcpb::ReadIndexRequest> & req,
uint64_t timeout_ms) const
{
return batchReadIndex_v2(req, timeout_ms);
}

BatchReadIndexRes TiFlashRaftProxyHelper::batchReadIndex_v2(
const std::vector<kvrpcpb::ReadIndexRequest> & req,
uint64_t timeout_ms) const
{
AsyncWaker waker(*this);
BlockedReadIndexHelper helper{timeout_ms, waker};

std::queue<std::pair<RegionID, ReadIndexTask>> tasks;
BatchReadIndexRes resps;
resps.reserve(req.size());

for (const auto & r : req)
{
if (auto task = makeReadIndexTask(r); !task)
{
// The read index request is not sent successfully.
GET_METRIC(tiflash_raft_learner_read_failures_count, type_request_error).Increment();
kvrpcpb::ReadIndexResponse res;
res.mutable_region_error();
resps.emplace_back(std::move(res), r.context().region_id());
}
else
{
tasks.emplace(r.context().region_id(), std::move(*task));
}
}

{
// Block wait for all tasks are ready or timeout.
kvrpcpb::ReadIndexResponse tmp;
while (!tasks.empty())
{
auto & it = tasks.front();
if (pollReadIndexTask(it.second, tmp, helper.getWaker().getRaw()))
{
resps.emplace_back(std::move(tmp), it.first);
tmp.Clear();
tasks.pop();
}
else
{
if (helper.blockedWait() == AsyncNotifier::Status::Timeout)
break;
}
}
}
{
// If meets timeout, which means some of the regions can not get response from leader, try to poll rest tasks
while (!tasks.empty())
{
auto & it = tasks.front();
kvrpcpb::ReadIndexResponse tmp;
if (pollReadIndexTask(it.second, tmp))
{
resps.emplace_back(std::move(tmp), it.first);
}
else
{
tmp.mutable_region_error()->mutable_region_not_found();
resps.emplace_back(std::move(tmp), it.first);
}
tasks.pop();
}
}

return resps;
}

RawRustPtr TiFlashRaftProxyHelper::makeAsyncWaker(void (*wake_fn)(RawVoidPtr), RawCppPtr data) const
{
return fn_make_async_waker(wake_fn, data);
}

std::optional<ReadIndexTask> TiFlashRaftProxyHelper::makeReadIndexTask(const kvrpcpb::ReadIndexRequest & req) const
{
thread_local std::string buff_cache;
req.SerializeToString(&buff_cache);
auto req_view = strIntoView(&buff_cache);
if (RawRustPtr ptr = fn_make_read_index_task(proxy_ptr, req_view); ptr.ptr)
{
return ReadIndexTask{ptr};
}
else
{
return {};
}
}

bool TiFlashRaftProxyHelper::pollReadIndexTask(
ReadIndexTask & task,
kvrpcpb::ReadIndexResponse & resp,
RawVoidPtr waker) const
{
return fn_poll_read_index_task(proxy_ptr, task.ptr, &resp, waker);
}

TimerTask TiFlashRaftProxyHelper::makeTimerTask(uint64_t time_ms) const
{
return TimerTask{fn_make_timer_task(time_ms)};
}

bool TiFlashRaftProxyHelper::pollTimerTask(TimerTask & task, RawVoidPtr waker) const
{
return fn_poll_timer_task(task.ptr, waker);
}
} // namespace DB
66 changes: 66 additions & 0 deletions dbms/src/Storages/KVStore/Read/ReadIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Interpreters/Context.h>
#include <Storages/KVStore/FFI/ProxyFFI.h>
#include <Storages/KVStore/KVStore.h>
#include <Storages/KVStore/Read/ReadIndexWorkerImpl.h>
#include <Storages/KVStore/Region.h>
#include <Storages/KVStore/TMTContext.h>
#include <common/logger_useful.h>
Expand Down Expand Up @@ -309,4 +310,69 @@ void WaitCheckRegionReady(
wait_region_ready_timeout_sec);
}


BatchReadIndexRes KVStore::batchReadIndex(const std::vector<kvrpcpb::ReadIndexRequest> & reqs, uint64_t timeout_ms)
const
{
assert(this->proxy_helper);
if (read_index_worker_manager)
{
return this->read_index_worker_manager->batchReadIndex(reqs, timeout_ms);
}
else
{
return proxy_helper->batchReadIndex_v1(reqs, timeout_ms);
}
}

void KVStore::initReadIndexWorkers(
ReadIndexWorkerManager::FnGetTickTime && fn_min_dur_handle_region,
size_t runner_cnt,
size_t worker_coefficient)
{
if (!runner_cnt)
{
LOG_WARNING(log, "Run without read-index workers");
return;
}
auto worker_cnt = worker_coefficient * runner_cnt;
LOG_INFO(log, "Start to initialize read-index workers: worker count {}, runner count {}", worker_cnt, runner_cnt);
auto * ptr = ReadIndexWorkerManager::newReadIndexWorkerManager(
*proxy_helper,
*this,
worker_cnt,
std::move(fn_min_dur_handle_region),
runner_cnt)
.release();
std::atomic_thread_fence(std::memory_order_seq_cst);
read_index_worker_manager = ptr;
}

void KVStore::asyncRunReadIndexWorkers() const
{
if (!read_index_worker_manager)
return;

assert(this->proxy_helper);
read_index_worker_manager->asyncRun();
}

void KVStore::stopReadIndexWorkers() const
{
if (!read_index_worker_manager)
return;

assert(this->proxy_helper);
read_index_worker_manager->stop();
}

void KVStore::releaseReadIndexWorkers()
{
if (read_index_worker_manager)
{
delete read_index_worker_manager;
read_index_worker_manager = nullptr;
}
}

} // namespace DB
Loading

0 comments on commit 38e4272

Please sign in to comment.