Skip to content

Commit

Permalink
Raft: Add identifier to logger when wait index happens (#8446)
Browse files Browse the repository at this point in the history
close #8076, close #8448
  • Loading branch information
JaySon-Huang authored Dec 6, 2023
1 parent 88f9912 commit 713bca0
Show file tree
Hide file tree
Showing 13 changed files with 560 additions and 106 deletions.
14 changes: 14 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,20 @@ namespace DB
F(type_seg_split_ingest, {{"type", "seg_split_ingest"}}, ExpBuckets{0.001, 2, 20}), \
F(type_seg_merge_bg_gc, {{"type", "seg_merge_bg_gc"}}, ExpBuckets{0.001, 2, 20}), \
F(type_place_index_update, {{"type", "place_index_update"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_storage_subtask_throughput_bytes, \
"Calculate the throughput of (maybe foreground) tasks of storage in bytes", \
Counter, /**/ \
F(type_delta_flush, {"type", "delta_flush"}), /**/ \
F(type_delta_compact, {"type", "delta_compact"}), /**/ \
F(type_write_to_cache, {"type", "write_to_cache"}), /**/ \
F(type_write_to_disk, {"type", "write_to_disk"})) /**/ \
M(tiflash_storage_subtask_throughput_rows, \
"Calculate the throughput of (maybe foreground) tasks of storage in rows", \
Counter, /**/ \
F(type_delta_flush, {"type", "delta_flush"}), /**/ \
F(type_delta_compact, {"type", "delta_compact"}), /**/ \
F(type_write_to_cache, {"type", "write_to_cache"}), /**/ \
F(type_write_to_disk, {"type", "write_to_disk"})) /**/ \
M(tiflash_storage_throughput_bytes, \
"Calculate the throughput of tasks of storage in bytes", \
Gauge, /**/ \
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFileFlushTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class ColumnFileFlushTask
size_t flush_version;

size_t flush_rows = 0;
size_t flush_bytes = 0;
size_t flush_deletes = 0;

public:
Expand All @@ -70,6 +71,7 @@ class ColumnFileFlushTask
inline Task & addColumnFile(ColumnFilePtr column_file)
{
flush_rows += column_file->getRows();
flush_bytes += column_file->getBytes();
flush_deletes += column_file->getDeletes();
return tasks.emplace_back(column_file);
}
Expand All @@ -78,6 +80,7 @@ class ColumnFileFlushTask

size_t getTaskNum() const { return tasks.size(); }
size_t getFlushRows() const { return flush_rows; }
size_t getFlushBytes() const { return flush_bytes; }
size_t getFlushDeletes() const { return flush_deletes; }

// Persist data in ColumnFileInMemory
Expand Down
11 changes: 10 additions & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Common/SyncPoint/SyncPoint.h>
#include <Common/TiFlashMetrics.h>
#include <Functions/FunctionHelpers.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <IO/ReadHelpers.h>
Expand Down Expand Up @@ -361,6 +362,8 @@ bool DeltaValueSpace::flush(DMContext & context)
new_delta_index = cur_delta_index->cloneWithUpdates(delta_index_updates);
LOG_DEBUG(log, "Update index done, delta={}", simpleInfo());
}
GET_METRIC(tiflash_storage_subtask_throughput_bytes, type_delta_flush).Increment(flush_task->getFlushBytes());
GET_METRIC(tiflash_storage_subtask_throughput_rows, type_delta_flush).Increment(flush_task->getFlushRows());

SYNC_FOR("after_DeltaValueSpace::flush|prepare_flush");

Expand Down Expand Up @@ -394,9 +397,10 @@ bool DeltaValueSpace::flush(DMContext & context)

LOG_DEBUG(
log,
"Flush end, flush_tasks={} flush_rows={} flush_deletes={} delta={}",
"Flush end, flush_tasks={} flush_rows={} flush_bytes={} flush_deletes={} delta={}",
flush_task->getTaskNum(),
flush_task->getFlushRows(),
flush_task->getFlushBytes(),
flush_task->getFlushDeletes(),
info());
}
Expand Down Expand Up @@ -441,6 +445,11 @@ bool DeltaValueSpace::compact(DMContext & context)
log_storage_snap.reset(); // release the snapshot ASAP
}

GET_METRIC(tiflash_storage_subtask_throughput_bytes, type_delta_compact)
.Increment(compaction_task->getTotalCompactBytes());
GET_METRIC(tiflash_storage_subtask_throughput_rows, type_delta_compact)
.Increment(compaction_task->getTotalCompactRows());

{
std::scoped_lock lock(mutex);

Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Storages/DeltaMerge/Delta/MinorCompaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ void MinorCompaction::prepare(DMContext & context, WriteBatches & wbs, const Pag
}
Block compact_block = schema.cloneWithColumns(std::move(compact_columns));
auto compact_rows = compact_block.rows();
auto compact_bytes = compact_block.bytes();
auto compact_column_file = ColumnFileTiny::writeColumnFile(context, compact_block, 0, compact_rows, wbs);
wbs.writeLogAndData();
task.result = compact_column_file;

total_compact_files += task.to_compact.size();
total_compact_rows += compact_rows;
total_compact_bytes += compact_bytes;
result_compact_files += 1;
}
}
Expand All @@ -73,10 +75,11 @@ bool MinorCompaction::commit(ColumnFilePersistedSetPtr & persisted_file_set, Wri
String MinorCompaction::info() const
{
return fmt::format(
"Compact end, total_compact_files={} result_compact_files={} total_compact_rows={}",
"Compact end, total_compact_files={} result_compact_files={} total_compact_rows={} total_compact_bytes={}",
total_compact_files,
result_compact_files,
total_compact_rows);
total_compact_rows,
total_compact_bytes);
}
} // namespace DM
} // namespace DB
5 changes: 5 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/MinorCompaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class MinorCompaction : public std::enable_shared_from_this<MinorCompaction>

size_t total_compact_files = 0;
size_t total_compact_rows = 0;
size_t total_compact_bytes = 0;
size_t result_compact_files = 0;

public:
Expand Down Expand Up @@ -101,6 +102,10 @@ class MinorCompaction : public std::enable_shared_from_this<MinorCompaction>

size_t getCompactionVersion() const { return current_compaction_version; }

// The stats about compaction. Only effective after `prepare` is called.
size_t getTotalCompactRows() const { return total_compact_rows; }
size_t getTotalCompactBytes() const { return total_compact_bytes; }

/// Create new column file by combining several small `ColumnFileTiny`s
void prepare(DMContext & context, WriteBatches & wbs, const PageReader & reader);

Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,8 @@ DM::WriteResult DeltaMergeStore::write(const Context & db_context, const DB::Set
{
if (segment->writeToCache(*dm_context, block, offset, limit))
{
GET_METRIC(tiflash_storage_subtask_throughput_bytes, type_write_to_cache).Increment(alloc_bytes);
GET_METRIC(tiflash_storage_subtask_throughput_rows, type_write_to_cache).Increment(limit);
updated_segments.push_back(segment);
break;
}
Expand All @@ -632,6 +634,8 @@ DM::WriteResult DeltaMergeStore::write(const Context & db_context, const DB::Set
// Write could fail, because other threads could already updated the instance. Like split/merge, merge delta.
if (segment->writeToDisk(*dm_context, write_column_file))
{
GET_METRIC(tiflash_storage_subtask_throughput_bytes, type_write_to_disk).Increment(alloc_bytes);
GET_METRIC(tiflash_storage_subtask_throughput_rows, type_write_to_disk).Increment(limit);
updated_segments.push_back(segment);
break;
}
Expand Down
13 changes: 8 additions & 5 deletions dbms/src/Storages/KVStore/MultiRaft/RegionMeta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,38 +188,41 @@ WaitIndexResult RegionMeta::waitIndex( //
std::function<bool(void)> && check_running) const
{
std::unique_lock lock(mutex);
WaitIndexResult status = WaitIndexResult::Finished;
WaitIndexResult res;
res.prev_index = apply_state.applied_index();
if (timeout_ms != 0)
{
// wait for applied index with a timeout
auto timeout_timepoint = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
if (!cv.wait_until(lock, timeout_timepoint, [&] {
res.current_index = apply_state.applied_index();
if (!check_running())
{
status = WaitIndexResult::Terminated;
res.status = WaitIndexStatus::Terminated;
return true;
}
return doCheckIndex(index);
}))
{
// not terminated && not reach the `index` => timeout
status = WaitIndexResult::Timeout;
res.status = WaitIndexStatus::Timeout;
}
}
else
{
// wait infinitely
cv.wait(lock, [&] {
res.current_index = apply_state.applied_index();
if (!check_running())
{
status = WaitIndexResult::Terminated;
res.status = WaitIndexStatus::Terminated;
return true;
}
return doCheckIndex(index);
});
}

return status;
return res;
}

bool RegionMeta::checkIndex(UInt64 index) const
Expand Down
13 changes: 11 additions & 2 deletions dbms/src/Storages/KVStore/MultiRaft/RegionMeta.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,21 @@ struct RegionMergeResult;
class Region;
class MetaRaftCommandDelegate;
class RegionRaftCommandDelegate;
enum class WaitIndexResult

enum class WaitIndexStatus
{
Finished,
Terminated, // Read index is terminated due to upper layer.
Timeout,
};
struct WaitIndexResult
{
WaitIndexStatus status{WaitIndexStatus::Finished};
// the applied index before wait index
UInt64 prev_index = 0;
// the applied index when wait index finish
UInt64 current_index = 0;
};

struct RegionMetaSnapshot
{
Expand Down Expand Up @@ -102,7 +111,7 @@ class RegionMeta
// If `timeout_ms` == 0, it waits infinite except `check_running` return false.
// `timeout_ms` != 0 and not reaching `index` after waiting for `timeout_ms`, Return WaitIndexResult::Timeout.
// If `check_running` return false, returns WaitIndexResult::Terminated
WaitIndexResult waitIndex(UInt64 index, const UInt64 timeout_ms, std::function<bool(void)> && check_running) const;
WaitIndexResult waitIndex(UInt64 index, UInt64 timeout_ms, std::function<bool(void)> && check_running) const;
bool checkIndex(UInt64 index) const;

RegionMetaSnapshot dumpRegionMetaSnapshot() const;
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,12 @@ void LearnerReadWorker::waitIndex(

// Wait index timeout is disabled; or timeout is enabled but not happen yet, wait index for
// a specify Region.
const auto [wait_res, time_cost]
= region->waitIndex(index_to_wait, timeout_ms, [this]() { return tmt.checkRunning(); });
if (wait_res != WaitIndexResult::Finished)
const auto [wait_res, time_cost] = region->waitIndex(
index_to_wait,
timeout_ms,
[this]() { return tmt.checkRunning(); },
log);
if (wait_res != WaitIndexStatus::Finished)
{
auto current = region->appliedIndex();
unavailable_regions.addRegionWaitIndexTimeout(region_to_query.region_id, index_to_wait, current);
Expand Down
85 changes: 51 additions & 34 deletions dbms/src/Storages/KVStore/Read/ReadIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,47 +63,64 @@ bool Region::checkIndex(UInt64 index) const
return meta.checkIndex(index);
}

std::tuple<WaitIndexResult, double> Region::waitIndex(
std::tuple<WaitIndexStatus, double> Region::waitIndex(
UInt64 index,
const UInt64 timeout_ms,
std::function<bool(void)> && check_running)
std::function<bool(void)> && check_running,
const LoggerPtr & log)
{
fiu_return_on(FailPoints::force_wait_index_timeout, std::make_tuple(WaitIndexResult::Timeout, 1.0));
fiu_return_on(FailPoints::force_wait_index_timeout, std::make_tuple(WaitIndexStatus::Timeout, 1.0));
if (proxy_helper == nullptr) // just for debug
return {WaitIndexResult::Finished, 0};
return {WaitIndexStatus::Finished, 0};

if (!meta.checkIndex(index))
if (meta.checkIndex(index))
{
Stopwatch wait_index_watch;
LOG_DEBUG(log, "{} need to wait learner index {} timeout {}", toString(), index, timeout_ms);
auto wait_idx_res = meta.waitIndex(index, timeout_ms, std::move(check_running));
auto elapsed_secs = wait_index_watch.elapsedSeconds();
switch (wait_idx_res)
{
case WaitIndexResult::Finished:
{
LOG_DEBUG(log, "{} wait learner index {} done", toString(false), index);
return {wait_idx_res, elapsed_secs};
}
case WaitIndexResult::Terminated:
{
return {wait_idx_res, elapsed_secs};
}
case WaitIndexResult::Timeout:
{
ProfileEvents::increment(ProfileEvents::RaftWaitIndexTimeout);
LOG_WARNING(
log,
"{} wait learner index {} timeout current {} state {}",
toString(false),
index,
appliedIndex(),
fmt::underlying(peerState()));
return {wait_idx_res, elapsed_secs};
}
}
// already satisfied
return {WaitIndexStatus::Finished, 0};
}

Stopwatch wait_index_watch;
const auto wait_idx_res = meta.waitIndex(index, timeout_ms, std::move(check_running));
const auto elapsed_secs = wait_index_watch.elapsedSeconds();
const auto & status = wait_idx_res.status;
switch (status)
{
case WaitIndexStatus::Finished:
{
const auto log_lvl = elapsed_secs < 1.0 ? Poco::Message::PRIO_DEBUG : Poco::Message::PRIO_INFORMATION;
LOG_IMPL(
log,
log_lvl,
"{} wait learner index done, prev_index={} curr_index={} to_wait={} elapsed_s={:.3f} timeout_s={:.3f}",
toString(false),
wait_idx_res.prev_index,
wait_idx_res.current_index,
index,
elapsed_secs,
timeout_ms / 1000.0);
return {status, elapsed_secs};
}
case WaitIndexStatus::Terminated:
{
return {status, elapsed_secs};
}
case WaitIndexStatus::Timeout:
{
ProfileEvents::increment(ProfileEvents::RaftWaitIndexTimeout);
LOG_WARNING(
log,
"{} wait learner index timeout, prev_index={} curr_index={} to_wait={} state={}"
" elapsed_s={:.3f} timeout_s={:.3f}",
toString(false),
wait_idx_res.prev_index,
wait_idx_res.current_index,
index,
fmt::underlying(peerState()),
elapsed_secs,
timeout_ms / 1000.0);
return {status, elapsed_secs};
}
}
return {WaitIndexResult::Finished, 0};
}

void WaitCheckRegionReady(
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/KVStore/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,12 @@ class Region : public std::enable_shared_from_this<Region>
// Check if we can read by this index.
bool checkIndex(UInt64 index) const;

// Return <WaitIndexResult, time cost(seconds)> for wait-index.
std::tuple<WaitIndexResult, double> waitIndex(
// Return <WaitIndexStatus, time cost(seconds)> for wait-index.
std::tuple<WaitIndexStatus, double> waitIndex(
UInt64 index,
UInt64 timeout_ms,
std::function<bool(void)> && check_running);
std::function<bool(void)> && check_running,
const LoggerPtr & log);

// Requires RegionMeta's lock
UInt64 appliedIndex() const;
Expand Down
Loading

0 comments on commit 713bca0

Please sign in to comment.