Skip to content

Commit

Permalink
Cherry-pick: wait parallel prehandle resource (pingcap#208)
Browse files Browse the repository at this point in the history
Signed-off-by: CalvinNeo <calvinneo1995@gmail.com>
  • Loading branch information
CalvinNeo authored Jun 5, 2024
1 parent 29cf2b0 commit 0628566
Show file tree
Hide file tree
Showing 14 changed files with 223 additions and 53 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
M(DT_DeltaIndexCacheSize) \
M(RaftNumSnapshotsPendingApply) \
M(RaftNumPrehandlingSubTasks) \
M(RaftNumParallelPrehandlingTasks) \
M(RateLimiterPendingWriteRequest) \
M(DT_SegmentReadTasks) \
M(DT_SnapshotOfRead) \
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ namespace DB
M(pause_after_copr_streams_acquired) \
M(pause_query_init) \
M(pause_before_prehandle_snapshot) \
M(pause_before_prehandle_subtask) \
M(pause_when_persist_region) \
M(pause_before_wn_establish_task) \
M(pause_passive_flush_before_persist_region)
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,9 @@ namespace DB
F(type_apply_snapshot_predecode, {{"type", "snapshot_predecode"}}, ExpBuckets{0.05, 2, 15}), \
F(type_apply_snapshot_total, {{"type", "snapshot_total"}}, ExpBucketsWithRange{0.1, 2, 600}), \
F(type_apply_snapshot_predecode_sst2dt, {{"type", "snapshot_predecode_sst2dt"}}, ExpBuckets{0.05, 2, 15}), \
F(type_apply_snapshot_predecode_parallel_wait, \
{{"type", "snapshot_predecode_parallel_wait"}}, \
ExpBuckets{0.1, 2, 10}), \
F(type_apply_snapshot_predecode_upload, {{"type", "snapshot_predecode_upload"}}, ExpBuckets{0.05, 2, 10}), \
F(type_apply_snapshot_flush, {{"type", "snapshot_flush"}}, ExpBuckets{0.05, 2, 10})) \
M(tiflash_raft_process_keys, \
Expand Down
22 changes: 21 additions & 1 deletion dbms/src/Debug/MockKVStore/MockRaftStoreProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,25 @@ extern void setupPutRequest(raft_cmdpb::Request *, const std::string &, const Ti
extern void setupDelRequest(raft_cmdpb::Request *, const std::string &, const TiKVKey &);
} // namespace RegionBench

void fn_notify_compact_log(
RaftStoreProxyPtr ptr,
uint64_t region_id,
uint64_t compact_index,
uint64_t compact_term,
uint64_t applied_index)
{
UNUSED(applied_index);
// Update flushed applied_index and truncated state.
auto & x = as_ref(ptr);
auto region = x.getRegion(region_id);
ASSERT(region);
// `applied_index` in proxy's disk can still be less than the `applied_index` here when fg flush.
if (region && region->getApply().truncated_state().index() < compact_index)
{
region->tryUpdateTruncatedState(compact_index, compact_term);
}
}

TiFlashRaftProxyHelper MockRaftStoreProxy::SetRaftStoreProxyFFIHelper(RaftStoreProxyPtr proxy_ptr)
{
TiFlashRaftProxyHelper res{};
Expand Down Expand Up @@ -603,7 +622,7 @@ std::tuple<RegionPtr, PrehandleResult> MockRaftStoreProxy::snapshot(
region_id,
std::move(cfs),
old_kv_region->cloneMetaRegion(),
old_kv_region->mutMeta().peerId(),
old_kv_region->getMeta().peerId(),
index,
term,
deadline_index,
Expand Down Expand Up @@ -671,6 +690,7 @@ std::tuple<RegionPtr, PrehandleResult> MockRaftStoreProxy::snapshot(
exit(-1);
}


TableID MockRaftStoreProxy::bootstrapTable(Context & ctx, KVStore & kvs, TMTContext & tmt, bool drop_at_first)
{
UNUSED(kvs);
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Debug/MockKVStore/MockRaftStoreProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@

namespace DB
{
kvrpcpb::ReadIndexRequest make_read_index_reqs(uint64_t region_id, uint64_t start_ts);

struct MockProxyRegion : MutexLockWrap
{
raft_serverpb::RegionLocalState getState();
Expand Down Expand Up @@ -278,6 +276,9 @@ struct MockRaftStoreProxy : MutexLockWrap
, table_id(1)
, cluster_ver(RaftstoreVer::V1)
{
log = Logger::get("MockRaftStoreProxy");
table_id = 1;
cluster_ver = RaftstoreVer::V1;
proxy_config_string = R"({"raftstore":{"snap-handle-pool-size":4}})";
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/MockKVStore/MockReadIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void MockFFIImpls::fn_handle_batch_read_index(
throw Exception("`fn_handle_batch_read_index` is deprecated");
}

kvrpcpb::ReadIndexRequest make_read_index_reqs(uint64_t region_id, uint64_t start_ts)
kvrpcpb::ReadIndexRequest MockFFIImpls::make_read_index_reqs(uint64_t region_id, uint64_t start_ts)
{
kvrpcpb::ReadIndexRequest req;
req.set_start_ts(start_ts);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Debug/MockKVStore/MockSSTReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace DB
SSTReaderPtr fn_get_sst_reader(SSTView v, RaftStoreProxyPtr)
{
std::string s(v.path.data, v.path.len);
std::scoped_lock<std::mutex> lock(MockSSTReader::mut);
auto iter = MockSSTReader::getMockSSTData().find({s, v.type});
if (iter == MockSSTReader::getMockSSTData().end())
throw Exception("Can not find data in MockSSTData, [key=" + s + "] [type=" + CFToName(v.type) + "]");
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/KVStore/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,13 @@ void KVStore::restore(PathPool & path_pool, const TiFlashRaftProxyHelper * proxy
}
}

fetchProxyConfig(proxy_helper);
}

void KVStore::fetchProxyConfig(const TiFlashRaftProxyHelper * proxy_helper)
{
// Try fetch proxy's config as a json string
UNUSED(proxy_helper); // serverless proxy not support
// if (proxy_helper && proxy_helper->fn_get_config_json)
// {
// RustStrWithView rust_string
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/KVStore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <Storages/KVStore/MultiRaft/RegionRangeKeys.h>
#include <Storages/KVStore/StorageEngineType.h>

#include <condition_variable>
#include <magic_enum.hpp>

namespace TiDB
Expand Down Expand Up @@ -262,6 +263,8 @@ class KVStore final : private boost::noncopyable

RaftLogEagerGcTasks::Hints getRaftLogGcHints();
void applyRaftLogGcTaskRes(const RaftLogGcTasksRes & res) const;
RegionTaskLock genRegionTaskLock(UInt64 region_id) const;
size_t getMaxParallelPrehandleSize() const;

#ifndef DBMS_PUBLIC_GTEST
private:
Expand Down Expand Up @@ -375,7 +378,6 @@ class KVStore final : private boost::noncopyable
void releaseReadIndexWorkers();
void handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock &);
void fetchProxyConfig(const TiFlashRaftProxyHelper * proxy_helper);
RegionTaskLock genRegionTaskLock(UInt64 region_id) const;

#ifndef DBMS_PUBLIC_GTEST
private:
Expand Down
20 changes: 20 additions & 0 deletions dbms/src/Storages/KVStore/MultiRaft/PreHandlingTrace.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,14 @@ struct PreHandlingTrace : MutexLockWrap

// Prehandle use thread pool from Proxy's side, so it can't benefit from AsyncTasks.
std::unordered_map<uint64_t, std::shared_ptr<Item>> tasks;
std::atomic<uint64_t> ongoing_prehandle_subtask_count{0};
std::mutex cpu_resource_mut;
std::condition_variable cpu_resource_cv;
LoggerPtr log;

PreHandlingTrace()
: log(Logger::get("PreHandlingTrace"))
{}
std::shared_ptr<Item> registerTask(uint64_t region_id)
{
// Automaticlly override the old one.
Expand All @@ -60,5 +67,18 @@ struct PreHandlingTrace : MutexLockWrap
auto _ = genLockGuard();
return tasks.find(region_id) != tasks.end();
}
void waitForSubtaskResources(uint64_t region_id, size_t parallel, size_t parallel_subtask_limit);
void releaseSubtaskResources(uint64_t region_id, size_t split_id)
{
std::unique_lock<std::mutex> cpu_resource_lock(cpu_resource_mut);
// TODO(split) refine this to avoid notify_all
auto prev = ongoing_prehandle_subtask_count.fetch_sub(1);
RUNTIME_CHECK_MSG(
prev > 0,
"Try to decrease prehandle subtask count to below 0, region_id={}, split_id={}",
region_id,
split_id);
cpu_resource_cv.notify_all();
}
};
} // namespace DB
Loading

0 comments on commit 0628566

Please sign in to comment.