Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Raft: Add identifier to logger when wait index happens(release-6.1) (#8473) #8476

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
385 changes: 385 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFileFlushTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class ColumnFileFlushTask
size_t flush_version;

size_t flush_rows = 0;
size_t flush_bytes = 0;
size_t flush_deletes = 0;

public:
Expand All @@ -70,6 +71,7 @@ class ColumnFileFlushTask
inline Task & addColumnFile(ColumnFilePtr column_file)
{
flush_rows += column_file->getRows();
flush_bytes += column_file->getBytes();
flush_deletes += column_file->getDeletes();
return tasks.emplace_back(column_file);
}
Expand All @@ -78,6 +80,7 @@ class ColumnFileFlushTask

size_t getTaskNum() const { return tasks.size(); }
size_t getFlushRows() const { return flush_rows; }
size_t getFlushBytes() const { return flush_bytes; }
size_t getFlushDeletes() const { return flush_deletes; }

// Persist data in ColumnFileInMemory
Expand Down
19 changes: 19 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

<<<<<<< HEAD
=======
#include <Common/SyncPoint/SyncPoint.h>
#include <Common/TiFlashMetrics.h>
>>>>>>> bb529e6836 (Raft: Add identifier to logger when wait index happens(release-7.1) (#8473))
#include <Functions/FunctionHelpers.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <IO/ReadHelpers.h>
Expand Down Expand Up @@ -180,6 +185,8 @@ bool DeltaValueSpace::flush(DMContext & context)
new_delta_index = cur_delta_index->cloneWithUpdates(delta_index_updates);
LOG_FMT_DEBUG(log, "{} Update index done", simpleInfo());
}
GET_METRIC(tiflash_storage_subtask_throughput_bytes, type_delta_flush).Increment(flush_task->getFlushBytes());
GET_METRIC(tiflash_storage_subtask_throughput_rows, type_delta_flush).Increment(flush_task->getFlushRows());

{
/// If this instance is still valid, then commit.
Expand All @@ -203,7 +210,16 @@ bool DeltaValueSpace::flush(DMContext & context)
if (new_delta_index)
delta_index = new_delta_index;

<<<<<<< HEAD
LOG_FMT_DEBUG(log, "{} Flush end. Flushed {} column files, {} rows and {} deletes.", info(), flush_task->getTaskNum(), flush_task->getFlushRows(), flush_task->getFlushDeletes());
=======
// Indicate that the index with old epoch should not be used anymore.
// This is useful in disaggregated mode which will invalidate the delta index cache in RN.
delta_index_epoch += 1;
}

LOG_DEBUG(log, "Flush end, flush_tasks={} flush_rows={} flush_bytes={} flush_deletes={} delta={}", flush_task->getTaskNum(), flush_task->getFlushRows(), flush_task->getFlushBytes(), flush_task->getFlushDeletes(), info());
>>>>>>> bb529e6836 (Raft: Add identifier to logger when wait index happens(release-7.1) (#8473))
}
return true;
}
Expand Down Expand Up @@ -249,6 +265,9 @@ bool DeltaValueSpace::compact(DMContext & context)
log_storage_snap.reset(); // release the snapshot ASAP
}

GET_METRIC(tiflash_storage_subtask_throughput_bytes, type_delta_compact).Increment(compaction_task->getTotalCompactBytes());
GET_METRIC(tiflash_storage_subtask_throughput_rows, type_delta_compact).Increment(compaction_task->getTotalCompactRows());

{
std::scoped_lock lock(mutex);

Expand Down
10 changes: 10 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/MinorCompaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,18 @@ void MinorCompaction::prepare(DMContext & context, WriteBatches & wbs, const Pag
}
Block compact_block = schema.cloneWithColumns(std::move(compact_columns));
auto compact_rows = compact_block.rows();
<<<<<<< HEAD
auto compact_column_file = ColumnFileTiny::writeColumnFile(context, compact_block, 0, compact_rows, wbs, task.to_compact.front()->tryToTinyFile()->getSchema());
=======
auto compact_bytes = compact_block.bytes();
auto compact_column_file = ColumnFileTiny::writeColumnFile(context, compact_block, 0, compact_rows, wbs);
>>>>>>> bb529e6836 (Raft: Add identifier to logger when wait index happens(release-7.1) (#8473))
wbs.writeLogAndData();
task.result = compact_column_file;

total_compact_files += task.to_compact.size();
total_compact_rows += compact_rows;
total_compact_bytes += compact_bytes;
result_compact_files += 1;
}
}
Expand All @@ -72,7 +78,11 @@ bool MinorCompaction::commit(ColumnFilePersistedSetPtr & persisted_file_set, Wri

String MinorCompaction::info() const
{
<<<<<<< HEAD
return fmt::format("Compacted {} column files into {} column files, total {} rows.", total_compact_files, result_compact_files, total_compact_rows);
=======
return fmt::format("Compact end, total_compact_files={} result_compact_files={} total_compact_rows={} total_compact_bytes={}", total_compact_files, result_compact_files, total_compact_rows, total_compact_bytes);
>>>>>>> bb529e6836 (Raft: Add identifier to logger when wait index happens(release-7.1) (#8473))
}
} // namespace DM
} // namespace DB
5 changes: 5 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/MinorCompaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class MinorCompaction : public std::enable_shared_from_this<MinorCompaction>

size_t total_compact_files = 0;
size_t total_compact_rows = 0;
size_t total_compact_bytes = 0;
size_t result_compact_files = 0;

public:
Expand Down Expand Up @@ -101,6 +102,10 @@ class MinorCompaction : public std::enable_shared_from_this<MinorCompaction>

size_t getCompactionVersion() const { return current_compaction_version; }

// The stats about compaction. Only effective after `prepare` is called.
size_t getTotalCompactRows() const { return total_compact_rows; }
size_t getTotalCompactBytes() const { return total_compact_bytes; }

/// Create new column file by combining several small `ColumnFileTiny`s
void prepare(DMContext & context, WriteBatches & wbs, const PageReader & reader);

Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,8 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_
{
if (segment->writeToCache(*dm_context, block, offset, limit))
{
GET_METRIC(tiflash_storage_subtask_throughput_bytes, type_write_to_cache).Increment(alloc_bytes);
GET_METRIC(tiflash_storage_subtask_throughput_rows, type_write_to_cache).Increment(limit);
updated_segments.push_back(segment);
break;
}
Expand All @@ -677,6 +679,8 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_
// Write could fail, because other threads could already updated the instance. Like split/merge, merge delta.
if (segment->writeToDisk(*dm_context, write_column_file))
{
GET_METRIC(tiflash_storage_subtask_throughput_bytes, type_write_to_disk).Increment(alloc_bytes);
GET_METRIC(tiflash_storage_subtask_throughput_rows, type_write_to_disk).Increment(limit);
updated_segments.push_back(segment);
break;
}
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Storages/Transaction/LearnerRead.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,12 @@ LearnerReadSnapshot doLearnerRead(
{
// Wait index timeout is disabled; or timeout is enabled but not happen yet, wait index for
// a specify Region.
auto [wait_res, time_cost] = region->waitIndex(index_to_wait, tmt.waitIndexTimeout(), [&tmt]() { return tmt.checkRunning(); });
if (wait_res != WaitIndexResult::Finished)
auto [wait_res, time_cost] = region->waitIndex(
index_to_wait,
tmt.waitIndexTimeout(),
[&tmt]() { return tmt.checkRunning(); },
log);
if (wait_res != WaitIndexStatus::Finished)
{
handle_wait_timeout_region(region_to_query.region_id);
continue;
Expand Down
60 changes: 57 additions & 3 deletions dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -515,10 +515,18 @@ bool Region::checkIndex(UInt64 index) const
return meta.checkIndex(index);
}

std::tuple<WaitIndexResult, double> Region::waitIndex(UInt64 index, const UInt64 timeout_ms, std::function<bool(void)> && check_running)
std::tuple<WaitIndexStatus, double> Region::waitIndex(
UInt64 index,
const UInt64 timeout_ms,
std::function<bool(void)> && check_running,
const LoggerPtr & log)
{
if (proxy_helper != nullptr)
if (proxy_helper == nullptr) // just for debug
return {WaitIndexStatus::Finished, 0};

if (meta.checkIndex(index))
{
<<<<<<< HEAD
if (!meta.checkIndex(index))
{
Stopwatch wait_index_watch;
Expand Down Expand Up @@ -550,8 +558,54 @@ std::tuple<WaitIndexResult, double> Region::waitIndex(UInt64 index, const UInt64
}
}
}
=======
// already satisfied
return {WaitIndexStatus::Finished, 0};
}

Stopwatch wait_index_watch;
const auto wait_idx_res = meta.waitIndex(index, timeout_ms, std::move(check_running));
const auto elapsed_secs = wait_index_watch.elapsedSeconds();
const auto & status = wait_idx_res.status;
switch (status)
{
case WaitIndexStatus::Finished:
{
const auto log_lvl = elapsed_secs < 1.0 ? Poco::Message::PRIO_DEBUG : Poco::Message::PRIO_INFORMATION;
LOG_IMPL(
log,
log_lvl,
"{} wait learner index done, prev_index={} curr_index={} to_wait={} elapsed_s={:.3f} timeout_s={:.3f}",
toString(false),
wait_idx_res.prev_index,
wait_idx_res.current_index,
index,
elapsed_secs,
timeout_ms / 1000.0);
return {status, elapsed_secs};
}
case WaitIndexStatus::Terminated:
{
return {status, elapsed_secs};
}
case WaitIndexStatus::Timeout:
{
ProfileEvents::increment(ProfileEvents::RaftWaitIndexTimeout);
LOG_WARNING(
log,
"{} wait learner index timeout, prev_index={} curr_index={} to_wait={} state={}"
" elapsed_s={:.3f} timeout_s={:.3f}",
toString(false),
wait_idx_res.prev_index,
wait_idx_res.current_index,
index,
static_cast<Int32>(peerState()),
elapsed_secs,
timeout_ms / 1000.0);
return {status, elapsed_secs};
}
>>>>>>> bb529e6836 (Raft: Add identifier to logger when wait index happens(release-7.1) (#8473))
}
return {WaitIndexResult::Finished, 0};
}

UInt64 Region::version() const
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Transaction/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ class Region : public std::enable_shared_from_this<Region>

bool checkIndex(UInt64 index) const;

// Return <WaitIndexResult, time cost(seconds)> for wait-index.
std::tuple<WaitIndexResult, double> waitIndex(UInt64 index, const UInt64 timeout_ms, std::function<bool(void)> && check_running);
// Return <WaitIndexStatus, time cost(seconds)> for wait-index.
std::tuple<WaitIndexStatus, double> waitIndex(UInt64 index, UInt64 timeout_ms, std::function<bool(void)> && check_running, const LoggerPtr & log);

UInt64 appliedIndex() const;

Expand Down
13 changes: 8 additions & 5 deletions dbms/src/Storages/Transaction/RegionMeta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,38 +157,41 @@ void RegionMeta::setPeerState(const raft_serverpb::PeerState peer_state_)
WaitIndexResult RegionMeta::waitIndex(UInt64 index, const UInt64 timeout_ms, std::function<bool(void)> && check_running) const
{
std::unique_lock lock(mutex);
WaitIndexResult status = WaitIndexResult::Finished;
WaitIndexResult res;
res.prev_index = apply_state.applied_index();
if (timeout_ms != 0)
{
// wait for applied index with a timeout
auto timeout_timepoint = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
if (!cv.wait_until(lock, timeout_timepoint, [&] {
res.current_index = apply_state.applied_index();
if (!check_running())
{
status = WaitIndexResult::Terminated;
res.status = WaitIndexStatus::Terminated;
return true;
}
return doCheckIndex(index);
}))
{
// not terminated && not reach the `index` => timeout
status = WaitIndexResult::Timeout;
res.status = WaitIndexStatus::Timeout;
}
}
else
{
// wait infinitely
cv.wait(lock, [&] {
res.current_index = apply_state.applied_index();
if (!check_running())
{
status = WaitIndexResult::Terminated;
res.status = WaitIndexStatus::Terminated;
return true;
}
return doCheckIndex(index);
});
}

return status;
return res;
}

bool RegionMeta::checkIndex(UInt64 index) const
Expand Down
14 changes: 11 additions & 3 deletions dbms/src/Storages/Transaction/RegionMeta.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,20 @@ struct RegionMergeResult;
class Region;
class MetaRaftCommandDelegate;
class RegionRaftCommandDelegate;
enum class WaitIndexResult
enum class WaitIndexStatus
{
Finished,
Terminated,
Terminated, // Read index is terminated due to upper layer.
Timeout,
};
struct WaitIndexResult
{
WaitIndexStatus status{WaitIndexStatus::Finished};
// the applied index before wait index
UInt64 prev_index = 0;
// the applied index when wait index finish
UInt64 current_index = 0;
};

struct RegionMetaSnapshot
{
Expand Down Expand Up @@ -101,7 +109,7 @@ class RegionMeta
// If `timeout_ms` == 0, it waits infinite except `check_running` return false.
// `timeout_ms` != 0 and not reaching `index` after waiting for `timeout_ms`, Return WaitIndexResult::Timeout.
// If `check_running` return false, returns WaitIndexResult::Terminated
WaitIndexResult waitIndex(UInt64 index, const UInt64 timeout_ms, std::function<bool(void)> && check_running) const;
WaitIndexResult waitIndex(UInt64 index, UInt64 timeout_ms, std::function<bool(void)> && check_running) const;
bool checkIndex(UInt64 index) const;

RegionMetaSnapshot dumpRegionMetaSnapshot() const;
Expand Down
Loading