Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage: Refine logging about reading snap, place index, index to wait #8973

Merged
merged 3 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ bool ColumnFileSetReader::shouldPlace(
}
else
{
throw Exception("Unknown column file: " + column_file->toString(), ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown column file: {}", column_file->toString());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h>
#include <Storages/DeltaMerge/DMContext.h>

namespace DB
{
namespace DM
namespace DB::DM
{
RowKeyRange ColumnFileSetSnapshot::getSquashDeleteRange() const
{
Expand All @@ -30,5 +28,4 @@ RowKeyRange ColumnFileSetSnapshot::getSquashDeleteRange() const
}
return squashed_delete_range;
}
} // namespace DM
} // namespace DB
} // namespace DB::DM
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
#include <Storages/DeltaMerge/ColumnFile/ColumnFile.h>
#include <Storages/DeltaMerge/Remote/Serializer_fwd.h>

namespace DB
{
namespace DM
namespace DB::DM
{
class ColumnFileSetSnapshot;
using ColumnFileSetSnapshotPtr = std::shared_ptr<ColumnFileSetSnapshot>;
Expand Down Expand Up @@ -107,5 +105,4 @@ class ColumnFileSetSnapshot
const auto & getDataProvider() const { return data_provider; }
};

} // namespace DM
} // namespace DB
} // namespace DB::DM
10 changes: 5 additions & 5 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ class DeltaValueSnapshot
#else
public:
#endif
bool is_update{false};
const bool is_update{false};

// The delta index of cached.
DeltaIndexPtr shared_delta_index;
Expand All @@ -371,8 +371,7 @@ class DeltaValueSnapshot
// We only allow one for_update snapshots to exist, so it cannot be cloned.
RUNTIME_CHECK(!is_update);

auto c = std::make_shared<DeltaValueSnapshot>(type);
c->is_update = is_update;
auto c = std::make_shared<DeltaValueSnapshot>(type, is_update);
c->shared_delta_index = shared_delta_index;
c->delta_index_epoch = delta_index_epoch;
c->mem_table_snap = mem_table_snap->clone();
Expand All @@ -383,8 +382,9 @@ class DeltaValueSnapshot
return c;
}

explicit DeltaValueSnapshot(CurrentMetrics::Metric type_)
: type(type_)
explicit DeltaValueSnapshot(CurrentMetrics::Metric type_, bool update_)
: is_update(update_)
, type(type_)
{
CurrentMetrics::add(type);
}
Expand Down
13 changes: 7 additions & 6 deletions dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ DeltaSnapshotPtr DeltaValueSpace::createSnapshot(
if (abandoned.load(std::memory_order_relaxed))
return {};

auto snap = std::make_shared<DeltaValueSnapshot>(type);
snap->is_update = for_update;
auto snap = std::make_shared<DeltaValueSnapshot>(type, for_update);
snap->delta = this->shared_from_this();

auto storage_snap = std::make_shared<StorageSnapshot>(
Expand Down Expand Up @@ -220,16 +219,18 @@ bool DeltaValueReader::shouldPlace(
{
auto [placed_rows, placed_delete_ranges] = my_delta_index->getPlacedStatus();

// Already placed.
// The placed_rows, placed_delete_range already contains the data in delta_snap
if (placed_rows >= delta_snap->getRows() && placed_delete_ranges == delta_snap->getDeletes())
return false;

if (relevant_range.all() || relevant_range == segment_range_ //
if (relevant_range.all() || relevant_range == segment_range_ // read all the data in this segment
|| delta_snap->getRows() - placed_rows > context.delta_cache_limit_rows //
|| placed_delete_ranges != delta_snap->getDeletes())
|| placed_delete_ranges != delta_snap->getDeletes() // new delete_range appended, must place it
)
return true;

size_t rows_in_persisted_file_snap = delta_snap->getMemTableSetRowsOffset();
// otherwise check persisted_files and mem_tables
const size_t rows_in_persisted_file_snap = delta_snap->getMemTableSetRowsOffset();
return persisted_files_reader->shouldPlace(context, relevant_range, start_ts, placed_rows)
|| mem_table_reader->shouldPlace(
context,
Expand Down
8 changes: 3 additions & 5 deletions dbms/src/Storages/DeltaMerge/DeltaIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
#include <Storages/DeltaMerge/DeltaTree.h>
#include <Storages/DeltaMerge/Remote/RNDeltaIndexCache.h>
#include <Storages/Page/PageDefinesBase.h>
namespace DB
{
namespace DM
namespace DB::DM
{
class DeltaIndex;
using DeltaIndexPtr = std::shared_ptr<DeltaIndex>;
Expand Down Expand Up @@ -171,6 +169,7 @@ class DeltaIndex
return delta_tree->getBytes();
}

// Return <placed_rows, placed_deletes>
std::pair<size_t, size_t> getPlacedStatus()
{
std::scoped_lock lock(mutex);
Expand Down Expand Up @@ -219,5 +218,4 @@ class DeltaIndex
const std::optional<Remote::RNDeltaIndexCache::CacheKey> & getRNCacheKey() const { return rn_cache_key; }
};

} // namespace DM
} // namespace DB
} // namespace DB::DM
3 changes: 1 addition & 2 deletions dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ SegmentSnapshotPtr Serializer::deserializeSegment(

auto data_store = dm_context.global_context.getSharedContextDisagg()->remote_data_store;

auto delta_snap = std::make_shared<DeltaValueSnapshot>(CurrentMetrics::DT_SnapshotOfDisaggReadNodeRead);
delta_snap->is_update = false;
auto delta_snap = std::make_shared<DeltaValueSnapshot>(CurrentMetrics::DT_SnapshotOfDisaggReadNodeRead, false);
delta_snap->mem_table_snap
= deserializeColumnFileSet(dm_context, proto.column_files_memtable(), data_store, segment_range);
delta_snap->persisted_files_snap
Expand Down
72 changes: 52 additions & 20 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <Storages/DeltaMerge/Filter/FilterHelper.h>
#include <Storages/DeltaMerge/LateMaterializationBlockInputStream.h>
#include <Storages/DeltaMerge/PKSquashingBlockInputStream.h>
#include <Storages/DeltaMerge/Range.h>
#include <Storages/DeltaMerge/Remote/DataStore/DataStore.h>
#include <Storages/DeltaMerge/Remote/ObjectId.h>
#include <Storages/DeltaMerge/Remote/RNDeltaIndexCache.h>
Expand Down Expand Up @@ -121,6 +122,23 @@ extern const int UNKNOWN_FORMAT_VERSION;

namespace DM
{
String SegmentSnapshot::detailInfo() const
{
return fmt::format(
"{{"
"stable_rows={} "
"persisted_rows={} persisted_dels={} persisted_cfs={} "
"mem_rows={} mem_dels={} mem_cfs={}"
"}}",
stable->getRows(),
delta->getPersistedFileSetSnapshot()->getRows(),
delta->getPersistedFileSetSnapshot()->getDeletes(),
delta->getPersistedFileSetSnapshot()->getColumnFileCount(),
delta->getMemTableSetSnapshot()->getRows(),
delta->getMemTableSetSnapshot()->getDeletes(),
delta->getMemTableSetSnapshot()->getColumnFileCount());
}

const static size_t SEGMENT_BUFFER_SIZE = 128; // More than enough.

DMFilePtr writeIntoNewDMFile(
Expand Down Expand Up @@ -2401,7 +2419,7 @@ Segment::ReadInfo Segment::getReadInfo(
ReadTag read_tag,
UInt64 start_ts) const
{
LOG_DEBUG(segment_snap->log, "Begin segment getReadInfo");
LOG_DEBUG(segment_snap->log, "Begin segment getReadInfo {}", simpleInfo());

auto new_read_columns = arrangeReadColumns(getExtraHandleColumnDefine(is_common_handle), read_columns);
auto pk_ver_col_defs = std::make_shared<ColumnDefines>(
Expand All @@ -2417,20 +2435,30 @@ Segment::ReadInfo Segment::getReadInfo(

auto [my_delta_index, fully_indexed] = ensurePlace(dm_context, segment_snap, delta_reader, read_ranges, start_ts);
auto compacted_index = my_delta_index->getDeltaTree()->getCompactedEntries();


// Hold compacted_index reference, to prevent it from deallocated.
delta_reader->setDeltaIndex(compacted_index);

LOG_DEBUG(segment_snap->log, "Finish segment getReadInfo");
LOG_DEBUG(
segment_snap->log,
"Finish segment getReadInfo, my_delta_index={} fully_indexed={} read_ranges={} "
"snap={} {}",
my_delta_index->toString(),
fully_indexed,
DB::DM::toDebugString(read_ranges),
segment_snap->detailInfo(),
simpleInfo());

if (fully_indexed)
{
// Try update shared index, if my_delta_index is more advanced.
bool ok = segment_snap->delta->getSharedDeltaIndex()->updateIfAdvanced(*my_delta_index);
if (ok)
{
LOG_DEBUG(segment_snap->log, "Segment updated delta index");
LOG_DEBUG(
segment_snap->log,
"Segment updated delta index, my_delta_index={} {}",
my_delta_index->toString(),
simpleInfo());
// Update cache size.
if (auto cache = dm_context.global_context.getSharedContextDisagg()->rn_delta_index_cache; cache)
cache->setDeltaIndex(segment_snap->delta->getSharedDeltaIndex());
Expand Down Expand Up @@ -2539,8 +2567,8 @@ std::pair<DeltaIndexPtr, bool> Segment::ensurePlace(
auto my_delta_index = delta_snap->getSharedDeltaIndex()->tryClone(delta_snap->getRows(), delta_snap->getDeletes());
auto my_delta_tree = my_delta_index->getDeltaTree();

bool relevant_place = dm_context.enable_relevant_place;
bool skippable_place = dm_context.enable_skippable_place;
const bool relevant_place = dm_context.enable_relevant_place;
const bool skippable_place = dm_context.enable_skippable_place;

// Note that, when enable_relevant_place is false , we cannot use the range of this segment.
// Because some block / delete ranges could contain some data / range that are not belong to current segment.
Expand All @@ -2552,7 +2580,9 @@ std::pair<DeltaIndexPtr, bool> Segment::ensurePlace(

// Let's do a fast check, determine whether we need to do place or not.
if (!delta_reader->shouldPlace(dm_context, my_delta_index, rowkey_range, relevant_range, start_ts))
{
return {my_delta_index, false};
}

CurrentMetrics::Increment cur_dm_segments{CurrentMetrics::DT_PlaceIndexUpdate};
GET_METRIC(tiflash_storage_subtask_count, type_place_index_update).Increment();
Expand All @@ -2578,8 +2608,11 @@ std::pair<DeltaIndexPtr, bool> Segment::ensurePlace(
auto offset = v.getBlockOffset();
auto rows = block.rows();

if (unlikely(my_placed_rows != offset))
throw Exception("Place block offset not match", ErrorCodes::LOGICAL_ERROR);
RUNTIME_CHECK_MSG(
my_placed_rows == offset,
"Place block offset not match, my_placed_rows={} offset={}",
my_placed_rows,
offset);

if (skippable_place)
fully_indexed &= placeUpsert<true>(
Expand Down Expand Up @@ -2629,25 +2662,24 @@ std::pair<DeltaIndexPtr, bool> Segment::ensurePlace(
}
}

if (unlikely(my_placed_rows != delta_snap->getRows() || my_placed_deletes != delta_snap->getDeletes()))
{
throw Exception(fmt::format(
"Placed status not match! Expected place rows:{}, deletes:{}, but actually placed rows:{}, deletes:{}",
delta_snap->getRows(),
delta_snap->getDeletes(),
my_placed_rows,
my_placed_deletes));
}
RUNTIME_CHECK_MSG(
my_placed_rows == delta_snap->getRows() && my_placed_deletes == delta_snap->getDeletes(),
"Placed status not match! Expected place rows:{}, deletes:{}, but actually placed rows:{}, deletes:{}",
delta_snap->getRows(),
delta_snap->getDeletes(),
my_placed_rows,
my_placed_deletes);

my_delta_index->update(my_delta_tree, my_placed_rows, my_placed_deletes);

LOG_DEBUG(
segment_snap->log,
"Finish segment ensurePlace, read_ranges={} placed_items={} shared_delta_index={} my_delta_index={}",
"Finish segment ensurePlace, read_ranges={} placed_items={} shared_delta_index={} my_delta_index={} {}",
DB::DM::toDebugString(read_ranges),
items.size(),
delta_snap->getSharedDeltaIndex()->toString(),
my_delta_index->toString());
my_delta_index->toString(),
simpleInfo());

return {my_delta_index, fully_indexed};
}
Expand Down
10 changes: 9 additions & 1 deletion dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ struct SegmentSnapshot : private boost::noncopyable
// handle + version + flag
return (sizeof(Int64) + sizeof(UInt64) + sizeof(UInt8)) * getRows();
}

String detailInfo() const;
};

/// A segment contains many rows of a table. A table is split into segments by consecutive ranges.
Expand Down Expand Up @@ -570,7 +572,13 @@ class Segment
/// Returns whether this segment has been marked as abandoned.
/// Note: Segment member functions never abandon the segment itself.
/// The abandon state is usually triggered by the DeltaMergeStore.
bool hasAbandoned() const { return delta->hasAbandoned(); }
bool hasAbandoned() const
{
// `delta` at disagg read-node is empty
if (unlikely(!delta))
return false;
return delta->hasAbandoned();
}

bool isSplitForbidden() const { return split_forbidden; }
void forbidSplit() { split_forbidden = true; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ try
}
CATCH

TEST_F(DMStoreForSegmentReadTaskTest, FetchPages_NoTiny_NoInMem)
TEST_F(DMStoreForSegmentReadTaskTest, FetchPagesNoTinyNoInMem)
try
{
auto fp_guard = disableFlushCache();
Expand Down Expand Up @@ -704,7 +704,7 @@ try
}
CATCH

TEST_F(DMStoreForSegmentReadTaskTest, FetchPages_Tiny_NoInMem)
TEST_F(DMStoreForSegmentReadTaskTest, FetchPagesTinyNoInMem)
try
{
auto fp_guard = disableFlushCache();
Expand Down Expand Up @@ -747,7 +747,7 @@ try
}
CATCH

TEST_F(DMStoreForSegmentReadTaskTest, FetchPages_NoTiny_InMem)
TEST_F(DMStoreForSegmentReadTaskTest, FetchPagesNoTinyInMem)
try
{
auto fp_guard = disableFlushCache();
Expand Down Expand Up @@ -798,7 +798,7 @@ try
}
CATCH

TEST_F(DMStoreForSegmentReadTaskTest, FetchPages_Tiny_InMem)
TEST_F(DMStoreForSegmentReadTaskTest, FetchPagesTinyInMem)
try
{
auto fp_guard = disableFlushCache();
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ RegionsReadIndexResult LearnerReadWorker::readIndex(

void LearnerReadWorker::waitIndex(
const LearnerReadSnapshot & regions_snapshot,
RegionsReadIndexResult & batch_read_index_result,
const RegionsReadIndexResult & batch_read_index_result,
const UInt64 timeout_ms,
Stopwatch & watch)
{
Expand Down Expand Up @@ -436,7 +436,7 @@ void LearnerReadWorker::waitIndex(
unavailable_regions.size(),
mvcc_query_info.start_ts);

auto bypass_formatter = [&](const RegionQueryInfo & query_info) -> String {
auto bypass_formatter = [](const RegionQueryInfo & query_info) -> String {
if (query_info.bypass_lock_ts == nullptr)
return "";
FmtBuffer buffer;
Expand All @@ -456,9 +456,11 @@ void LearnerReadWorker::waitIndex(
regions_info.end(),
[&](const auto & region_to_query, FmtBuffer & f) {
const auto & region = regions_snapshot.find(region_to_query.region_id)->second;
const auto index_to_wait = batch_read_index_result.find(region_to_query.region_id)->second.read_index();
f.fmtAppend(
"(id:{} applied_index:{} bypass_locks:{})",
"(region_id={} to_wait={} applied_index={} bypass_locks={})",
region_to_query.region_id,
index_to_wait,
region->appliedIndex(),
bypass_formatter(region_to_query));
},
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/Read/LearnerReadWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class LearnerReadWorker
/// wait index relate methods
void waitIndex(
const LearnerReadSnapshot & regions_snapshot,
RegionsReadIndexResult & batch_read_index_result,
const RegionsReadIndexResult & batch_read_index_result,
UInt64 timeout_ms,
Stopwatch & watch);

Expand Down