Skip to content

Commit

Permalink
KVStore: More log for Parallel Prehandle (pingcap#9160) (pingcap#223)
Browse files Browse the repository at this point in the history
Co-authored-by: JaySon <tshent@qq.com>
  • Loading branch information
CalvinNeo and JaySon-Huang authored Jun 25, 2024
1 parent 7416418 commit d20015a
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 23 deletions.
25 changes: 18 additions & 7 deletions dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ SSTFilesToBlockInputStream::SSTFilesToBlockInputStream( //
auto make_inner_func = [&](const TiFlashRaftProxyHelper * proxy_helper,
SSTView snap,
SSTReader::RegionRangeFilter range,
size_t split_id) {
return std::make_unique<MonoSSTReader>(proxy_helper, snap, range, split_id);
size_t split_id,
size_t region_id) {
return std::make_unique<MonoSSTReader>(proxy_helper, snap, range, split_id, region_id);
};
for (UInt64 i = 0; i < snaps.len; ++i)
{
Expand Down Expand Up @@ -95,7 +96,8 @@ SSTFilesToBlockInputStream::SSTFilesToBlockInputStream( //
ssts_default,
log,
region->getRange(),
soft_limit.has_value() ? soft_limit.value().split_id : DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT);
soft_limit.has_value() ? soft_limit.value().split_id : DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT,
region->id());
}
if (!ssts_write.empty())
{
Expand All @@ -106,7 +108,8 @@ SSTFilesToBlockInputStream::SSTFilesToBlockInputStream( //
ssts_write,
log,
region->getRange(),
soft_limit.has_value() ? soft_limit.value().split_id : DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT);
soft_limit.has_value() ? soft_limit.value().split_id : DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT,
region->id());
}
if (!ssts_lock.empty())
{
Expand All @@ -117,7 +120,8 @@ SSTFilesToBlockInputStream::SSTFilesToBlockInputStream( //
ssts_lock,
log,
region->getRange(),
soft_limit.has_value() ? soft_limit.value().split_id : DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT);
soft_limit.has_value() ? soft_limit.value().split_id : DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT,
region->id());
}
LOG_INFO(
log,
Expand Down Expand Up @@ -451,6 +455,7 @@ bool SSTFilesToBlockInputStream::maybeSkipBySoftLimit(ColumnFamilyType cf, SSTRe

// Skip other versions of the same PK.
// TODO(split) use seek to optimize if failed several iterations.
size_t skipped_times = 0;
while (reader && reader->remained())
{
// Read until find the next pk.
Expand All @@ -463,17 +468,22 @@ bool SSTFilesToBlockInputStream::maybeSkipBySoftLimit(ColumnFamilyType cf, SSTRe
{
RUNTIME_CHECK_MSG(
current_truncated_ts > start_limit,
"current pk decreases as reader advances, start_raw {} start_pk {} current {}, cf={}, split_id={}, "
"current pk decreases as reader advances, skipped_times={} start_raw={} start_pk={} current_pk={} "
"current_raw={} cf={} split_id={}, "
"region_id={}",
skipped_times,
soft_limit.value().raw_start.toDebugString(),
start_limit.value().toDebugString(),
current_truncated_ts.toDebugString(),
tikv_key.toDebugString(),
magic_enum::enum_name(cf),
soft_limit.value().split_id,
region->id());
LOG_INFO(
log,
"Re-Seek after start_raw {} start_pk {} to {}, current_pk = {}, cf={}, split_id={}, region_id={}",
"Re-Seek after skipped_times={} start_raw={} start_pk={} current_raw={} current_pk={} cf={} "
"split_id={} region_id={}",
skipped_times,
soft_limit.value().raw_start.toDebugString(),
start_limit.value().toDebugString(),
tikv_key.toDebugString(),
Expand All @@ -483,6 +493,7 @@ bool SSTFilesToBlockInputStream::maybeSkipBySoftLimit(ColumnFamilyType cf, SSTRe
region->id());
return true;
}
skipped_times++;
reader->next();
}
// `start_limit` is the last pk of the sst file.
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/KVStore/FFI/SSTReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,15 @@ MonoSSTReader::MonoSSTReader(
const TiFlashRaftProxyHelper * proxy_helper_,
SSTView view,
RegionRangeFilter range_,
size_t split_id_)
size_t split_id_,
size_t region_id_)
: proxy_helper(proxy_helper_)
, inner(proxy_helper->sst_reader_interfaces.fn_get_sst_reader(view, proxy_helper->proxy_ptr))
, type(view.type)
, range(range_)
, tail_checked(false)
, split_id(split_id_)
, region_id(region_id_)
{
log = &Poco::Logger::get("MonoSSTReader");
kind = proxy_helper->sst_reader_interfaces.fn_kind(inner, view.type);
Expand All @@ -126,10 +128,11 @@ MonoSSTReader::MonoSSTReader(
// 'z' will be added in proxy.
LOG_INFO(
log,
"Seek cf {} to {}, split_id={}",
"Seek cf {} to {}, split_id={} region_id={}",
magic_enum::enum_name(type),
Redact::keyToDebugString(start.data(), start.size()),
split_id);
split_id,
region_id);
if (!start.empty())
{
proxy_helper->sst_reader_interfaces
Expand Down
26 changes: 17 additions & 9 deletions dbms/src/Storages/KVStore/FFI/SSTReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class MonoSSTReader : public SSTReader
const TiFlashRaftProxyHelper * proxy_helper_,
SSTView view,
RegionRangeFilter range_,
size_t split_id_);
size_t split_id_,
size_t region_id_);
~MonoSSTReader() override;

private:
Expand All @@ -72,6 +73,7 @@ class MonoSSTReader : public SSTReader
SSTFormatKind kind;
mutable bool tail_checked;
size_t split_id;
size_t region_id;
Poco::Logger * log;
};

Expand All @@ -86,7 +88,8 @@ template <typename R, typename E>
class MultiSSTReader : public SSTReader
{
public:
using Initer = std::function<std::unique_ptr<R>(const TiFlashRaftProxyHelper *, E, RegionRangeFilter, size_t)>;
using Initer
= std::function<std::unique_ptr<R>(const TiFlashRaftProxyHelper *, E, RegionRangeFilter, size_t, size_t)>;

DISALLOW_COPY_AND_MOVE(MultiSSTReader);

Expand Down Expand Up @@ -162,12 +165,13 @@ class MultiSSTReader : public SSTReader
// The `path` is serialized changeset under cse-proxy, don't print it to logging
LOG_INFO(
log,
"Open sst file, range={} sst_idx={} sst_tot={} split_id={}",
"Open sst file range={} sst_idx={} sst_tot={} split_id={} region_id={}",
range->toDebugString(),
sst_idx,
args.size(),
split_id);
mono = initer(proxy_helper, args[sst_idx], range, split_id);
split_id,
region_id);
mono = initer(proxy_helper, args[sst_idx], range, split_id, region_id);
}
}

Expand All @@ -178,7 +182,8 @@ class MultiSSTReader : public SSTReader
std::vector<E> args_,
LoggerPtr log_,
RegionRangeFilter range_,
size_t split_id_)
size_t split_id_,
size_t region_id_)
: log(log_)
, proxy_helper(proxy_helper_)
, type(type_)
Expand All @@ -187,15 +192,17 @@ class MultiSSTReader : public SSTReader
, sst_idx(0)
, range(range_)
, split_id(split_id_)
, region_id(region_id_)
{
assert(args.size() > 0);
LOG_INFO(
log,
"Open sst file first, range={} sst_tot={} split_id={}",
"Open sst file first, range={} sst_tot={} split_id={} region_id={}",
range->toDebugString(),
args.size(),
split_id);
mono = initer(proxy_helper, args[sst_idx], range, split_id);
split_id,
region_id);
mono = initer(proxy_helper, args[sst_idx], range, split_id, region_id);
}

~MultiSSTReader() override
Expand All @@ -215,6 +222,7 @@ class MultiSSTReader : public SSTReader
size_t sst_idx;
RegionRangeFilter range;
const size_t split_id;
const size_t region_id;
};

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ static inline std::tuple<ReadFromStreamResult, PrehandleResult> executeTransform
res = ReadFromStreamResult{.error = ReadFromStreamError::Aborted, .extra_msg = "", .region = new_region};
}
auto keys_per_second = (sst_stream->getProcessKeys().write_cf + sst_stream->getProcessKeys().lock_cf
+ sst_stream->getProcessKeys().write_cf)
+ sst_stream->getProcessKeys().default_cf)
* 1.0 / sw.elapsedSeconds();
GET_METRIC(tiflash_raft_command_throughput, type_prehandle_snapshot).Observe(keys_per_second);
return std::make_pair(
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/Storages/KVStore/tests/region_kvstore_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ static inline void validateSSTGeneration(
auto make_inner_func = [](const TiFlashRaftProxyHelper * proxy_helper,
SSTView snap,
SSTReader::RegionRangeFilter range,
size_t split_id) -> std::unique_ptr<MonoSSTReader> {
size_t split_id,
size_t region_id) -> std::unique_ptr<MonoSSTReader> {
auto parsed_kind = MockSSTGenerator::parseSSTViewKind(buffToStrView(snap.path));
auto reader = std::make_unique<MonoSSTReader>(proxy_helper, snap, range, split_id);
auto reader = std::make_unique<MonoSSTReader>(proxy_helper, snap, range, split_id, region_id);
assert(reader->sstFormatKind() == parsed_kind);
return reader;
};
Expand All @@ -75,7 +76,8 @@ static inline void validateSSTGeneration(
ssts,
Logger::get(),
kvr1->getRange(),
DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT};
DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT,
region_id};

size_t counter = 0;
while (reader.remained())
Expand Down

0 comments on commit d20015a

Please sign in to comment.