Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DNM:test #9809

Open
wants to merge 48 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
08bd317
ci
JinheLin Dec 4, 2024
9a74ded
ci
JinheLin Jan 8, 2025
d83d6c2
ci
JinheLin Jan 10, 2025
1064e29
ci
JinheLin Jan 10, 2025
0e01334
ci
JinheLin Jan 10, 2025
659c93a
ci
JinheLin Jan 10, 2025
2976245
ci
JinheLin Jan 10, 2025
1ead298
ci
JinheLin Jan 10, 2025
d4673fc
ci
JinheLin Jan 10, 2025
a2df5c9
ci
JinheLin Jan 10, 2025
67652e8
ci
JinheLin Jan 10, 2025
2d6f7d8
ci
JinheLin Jan 10, 2025
f03e18a
ci
JinheLin Jan 10, 2025
cf10dec
ci
JinheLin Jan 10, 2025
6be97b1
ci
JinheLin Jan 10, 2025
c6f982f
ci
JinheLin Jan 13, 2025
4a58a50
ci
JinheLin Jan 13, 2025
2bd5e64
ci
JinheLin Jan 13, 2025
138e328
ci
JinheLin Jan 13, 2025
7e353c1
ci
JinheLin Jan 13, 2025
8e16688
ci
JinheLin Jan 13, 2025
8d5df1f
ci
JinheLin Jan 14, 2025
29f7bb4
ci
JinheLin Jan 14, 2025
89f3645
ci
JinheLin Jan 15, 2025
267dc00
ci
JinheLin Jan 15, 2025
3ad5159
ci
JinheLin Jan 16, 2025
96a0968
ci
JinheLin Jan 16, 2025
0656616
ci
JinheLin Jan 16, 2025
9b1fa3a
ci
JinheLin Jan 16, 2025
db6824c
ci
JinheLin Jan 16, 2025
1ca81ad
ci
JinheLin Jan 16, 2025
9e44b71
ci
JinheLin Jan 16, 2025
a8b8ae4
ci
JinheLin Jan 16, 2025
9ba7936
ci
JinheLin Jan 17, 2025
cece602
ci
JinheLin Jan 17, 2025
9524948
ci
JinheLin Jan 17, 2025
ce48594
ci
JinheLin Jan 20, 2025
90528a9
ci
JinheLin Jan 20, 2025
a819c81
ci
JinheLin Jan 22, 2025
3dc5d1a
ci
JinheLin Jan 22, 2025
2c1e344
ci
JinheLin Jan 22, 2025
6c5c380
ci
JinheLin Jan 22, 2025
80fd240
ci
JinheLin Jan 22, 2025
0eceafb
ci
JinheLin Jan 22, 2025
621f4b0
ci
JinheLin Jan 22, 2025
a2fa2c7
ci
JinheLin Jan 23, 2025
66cefb0
ci
JinheLin Jan 23, 2025
bf1e6c0
ci
JinheLin Jan 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ struct Settings
M(SettingBool, dt_flush_after_write, false, "[testing] Flush cache or not after write in DeltaTree Engine.") \
M(SettingBool, dt_log_record_version, false, "[testing] Whether log the version of records when write them to storage") \
\
M(SettingBool, dt_enable_version_chain, true, "") \
/* These PageStorage V2 settings are deprecated since v6.5 */ \
M(SettingUInt64, dt_open_file_max_idle_seconds, 15, "Deprecated.") \
M(SettingUInt64, dt_page_num_max_expect_legacy_files, 100, "Deprecated.") \
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ class BitmapFilter
void rangeAnd(IColumn::Filter & f, UInt32 start, UInt32 limit) const;

void runOptimize();
void setAllMatch(bool all_match_) { all_match = all_match_; }

String toDebugString() const;
size_t count() const;
inline size_t size() const { return filter.size(); }

IColumn::Filter & getFilter() { return filter; }

friend class BitmapFilterView;

private:
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ add_headers_and_sources(delta_merge ./Remote)
add_headers_and_sources(delta_merge ./Remote/DataStore)
add_headers_and_sources(delta_merge ./Decode)
add_headers_and_sources(delta_merge ./StoragePool)
add_headers_and_sources(delta_merge ./VersionChain)

add_library(delta_merge ${delta_merge_headers} ${delta_merge_sources})
target_link_libraries(delta_merge PRIVATE dbms page)
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class ColumnFileBig : public ColumnFilePersisted

auto getFile() const { return file; }

RowKeyRange getRange() const { return segment_range; }

PageIdU64 getDataPageId() { return file->pageId(); }

size_t getRows() const override { return valid_rows; }
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFileFlushTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ DeltaIndex::Updates ColumnFileFlushTask::prepare(WriteBatches & wbs)
{
if (!task.block_data)
continue;

/*
IColumn::Permutation perm;
task.sorted = sortBlockByPk(getExtraHandleColumnDefine(context.is_common_handle), task.block_data, perm);
if (task.sorted)
delta_index_updates.emplace_back(task.deletes_offset, task.rows_offset, perm);

*/
task.data_page = ColumnFileTiny::writeColumnFileData(context, task.block_data, 0, task.block_data.rows(), wbs);
}

Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,18 @@ class DeltaValueSnapshot : private boost::noncopyable
ColumnFileSetSnapshotPtr getMemTableSetSnapshot() const { return mem_table_snap; }
ColumnFileSetSnapshotPtr getPersistedFileSetSnapshot() const { return persisted_files_snap; }

ColumnFiles getColumnFiles() const
{
auto cfs = persisted_files_snap->getColumnFiles();
const auto & memory_cfs = mem_table_snap->getColumnFiles();
cfs.insert(cfs.end(), memory_cfs.begin(), memory_cfs.end());
return cfs;
}
const auto & getDataProvider() const
{
RUNTIME_CHECK(persisted_files_snap->getDataProvider() == mem_table_snap->getDataProvider());
return persisted_files_snap->getDataProvider();
}
size_t getColumnFileCount() const
{
return mem_table_snap->getColumnFileCount() + persisted_files_snap->getColumnFileCount();
Expand Down
11 changes: 6 additions & 5 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ class DMFile : private boost::noncopyable
UInt32 metaVersion() const { return meta->metaVersion(); }

bool isColIndexExist(const ColId & col_id) const;
static FileNameBase getFileNameBase(ColId col_id, const IDataType::SubstreamPath & substream = {})
{
return IDataType::getFileNameForStream(DB::toString(col_id), substream);
}

private:
DMFile(
Expand Down Expand Up @@ -301,11 +305,6 @@ class DMFile : private boost::noncopyable
EncryptionPath encryptionIndexPath(const FileNameBase & file_name_base) const;
EncryptionPath encryptionMarkPath(const FileNameBase & file_name_base) const;

static FileNameBase getFileNameBase(ColId col_id, const IDataType::SubstreamPath & substream = {})
{
return IDataType::getFileNameForStream(DB::toString(col_id), substream);
}

static String localIndexFileName(IndexID index_id, TiDB::ColumnarIndexKind kind)
{
// Note: Keep sync with FileCache::getFileType()
Expand All @@ -322,6 +321,8 @@ class DMFile : private boost::noncopyable
{
return subFilePath(localIndexFileName(index_id, kind));
}
static String vectorIndexFileName(IndexID index_id) { return fmt::format("idx_{}.vector", index_id); }
String vectorIndexPath(IndexID index_id) const { return subFilePath(vectorIndexFileName(index_id)); }

void addPack(const DMFileMeta::PackStat & pack_stat) const { meta->pack_stats.push_back(pack_stat); }

Expand Down
66 changes: 40 additions & 26 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,41 +193,55 @@ void DMFilePackFilter::loadIndex(
const ReadLimiterPtr & read_limiter,
const ScanContextPtr & scan_context)
{
const auto & type = dmfile->getColumnStat(col_id).type;
auto [type, minmax_index]
= loadIndex(*dmfile, file_provider, index_cache, set_cache_if_miss, col_id, read_limiter, scan_context);
indexes.emplace(col_id, RSIndex(type, minmax_index));
}

std::pair<DataTypePtr, MinMaxIndexPtr> DMFilePackFilter::loadIndex(
const DMFile & dmfile,
const FileProviderPtr & file_provider,
const MinMaxIndexCachePtr & index_cache,
bool set_cache_if_miss,
ColId col_id,
const ReadLimiterPtr & read_limiter,
const ScanContextPtr & scan_context)
{
const auto & type = dmfile.getColumnStat(col_id).type;
const auto file_name_base = DMFile::getFileNameBase(col_id);

auto load = [&]() {
auto index_file_size = dmfile->colIndexSize(col_id);
auto index_file_size = dmfile.colIndexSize(col_id);
if (index_file_size == 0)
return std::make_shared<MinMaxIndex>(*type);
auto index_guard = S3::S3RandomAccessFile::setReadFileInfo({
.size = dmfile->getReadFileSize(col_id, colIndexFileName(file_name_base)),
.size = dmfile.getReadFileSize(col_id, colIndexFileName(file_name_base)),
.scan_context = scan_context,
});
if (!dmfile->getConfiguration()) // v1
if (!dmfile.getConfiguration()) // v1
{
auto index_buf = ReadBufferFromRandomAccessFileBuilder::build(
file_provider,
dmfile->colIndexPath(file_name_base),
dmfile->encryptionIndexPath(file_name_base),
dmfile.colIndexPath(file_name_base),
dmfile.encryptionIndexPath(file_name_base),
std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), index_file_size),
read_limiter);
return MinMaxIndex::read(*type, index_buf, index_file_size);
}
else if (dmfile->useMetaV2()) // v3
else if (dmfile.useMetaV2()) // v3
{
const auto * dmfile_meta = typeid_cast<const DMFileMetaV2 *>(dmfile->meta.get());
const auto * dmfile_meta = typeid_cast<const DMFileMetaV2 *>(dmfile.meta.get());
assert(dmfile_meta != nullptr);
auto info = dmfile_meta->merged_sub_file_infos.find(colIndexFileName(file_name_base));
if (info == dmfile_meta->merged_sub_file_infos.end())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Unknown index file {}",
dmfile->colIndexPath(file_name_base));
dmfile.colIndexPath(file_name_base));
}

auto file_path = dmfile->meta->mergedPath(info->second.number);
auto file_path = dmfile.meta->mergedPath(info->second.number);
auto encryp_path = dmfile_meta->encryptionMergedPath(info->second.number);
auto offset = info->second.offset;
auto data_size = info->second.size;
Expand All @@ -236,7 +250,7 @@ void DMFilePackFilter::loadIndex(
file_provider,
file_path,
encryp_path,
dmfile->getConfiguration()->getChecksumFrameLength(),
dmfile.getConfiguration()->getChecksumFrameLength(),
read_limiter);
buffer.seek(offset);

Expand All @@ -247,13 +261,13 @@ void DMFilePackFilter::loadIndex(

auto buf = ChecksumReadBufferBuilder::build(
std::move(raw_data),
dmfile->colIndexPath(file_name_base), // just for debug
dmfile->getConfiguration()->getChecksumFrameLength(),
dmfile->getConfiguration()->getChecksumAlgorithm(),
dmfile->getConfiguration()->getChecksumFrameLength());
dmfile.colIndexPath(file_name_base), // just for debug
dmfile.getConfiguration()->getChecksumFrameLength(),
dmfile.getConfiguration()->getChecksumAlgorithm(),
dmfile.getConfiguration()->getChecksumFrameLength());

auto header_size = dmfile->getConfiguration()->getChecksumHeaderLength();
auto frame_total_size = dmfile->getConfiguration()->getChecksumFrameLength() + header_size;
auto header_size = dmfile.getConfiguration()->getChecksumHeaderLength();
auto frame_total_size = dmfile.getConfiguration()->getChecksumFrameLength() + header_size;
auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0);

return MinMaxIndex::read(*type, *buf, index_file_size - header_size * frame_count);
Expand All @@ -262,32 +276,32 @@ void DMFilePackFilter::loadIndex(
{ // v2
auto index_buf = ChecksumReadBufferBuilder::build(
file_provider,
dmfile->colIndexPath(file_name_base),
dmfile->encryptionIndexPath(file_name_base),
dmfile.colIndexPath(file_name_base),
dmfile.encryptionIndexPath(file_name_base),
index_file_size,
read_limiter,
dmfile->getConfiguration()->getChecksumAlgorithm(),
dmfile->getConfiguration()->getChecksumFrameLength());
auto header_size = dmfile->getConfiguration()->getChecksumHeaderLength();
auto frame_total_size = dmfile->getConfiguration()->getChecksumFrameLength() + header_size;
dmfile.getConfiguration()->getChecksumAlgorithm(),
dmfile.getConfiguration()->getChecksumFrameLength());
auto header_size = dmfile.getConfiguration()->getChecksumHeaderLength();
auto frame_total_size = dmfile.getConfiguration()->getChecksumFrameLength() + header_size;
auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0);
return MinMaxIndex::read(*type, *index_buf, index_file_size - header_size * frame_count);
}
};
MinMaxIndexPtr minmax_index;
if (index_cache && set_cache_if_miss)
{
minmax_index = index_cache->getOrSet(dmfile->colIndexCacheKey(file_name_base), load);
minmax_index = index_cache->getOrSet(dmfile.colIndexCacheKey(file_name_base), load);
}
else
{
// try load from the cache first
if (index_cache)
minmax_index = index_cache->get(dmfile->colIndexCacheKey(file_name_base));
minmax_index = index_cache->get(dmfile.colIndexCacheKey(file_name_base));
if (minmax_index == nullptr)
minmax_index = load();
}
indexes.emplace(col_id, RSIndex(type, minmax_index));
return {type, minmax_index};
}

void DMFilePackFilter::tryLoadIndex(RSCheckParam & param, ColId col_id)
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ class DMFilePackFilter
const DMFiles & dmfiles,
const DMFilePackFilterResults & pack_filter_results,
UInt64 start_ts);
static std::pair<DataTypePtr, MinMaxIndexPtr> loadIndex(
const DMFile & dmfile,
const FileProviderPtr & file_provider,
const MinMaxIndexCachePtr & index_cache,
bool set_cache_if_miss,
ColId col_id,
const ReadLimiterPtr & read_limiter,
const ScanContextPtr & scan_context);

private:
DMFilePackFilter(
Expand Down
27 changes: 23 additions & 4 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ Segment::Segment( //
, stable(stable_)
, parent_log(parent_log_)
, log(parent_log_->getChild(fmt::format("segment_id={} epoch={}", segment_id, epoch)))
, version_chain(createVersionChain(is_common_handle))
{
if (delta != nullptr)
delta->resetLogger(log);
Expand Down Expand Up @@ -3078,10 +3079,22 @@ BitmapFilterPtr Segment::buildBitmapFilter(
const RowKeyRanges & read_ranges,
const DMFilePackFilterResults & pack_filter_results,
UInt64 start_ts,
size_t expected_block_size)
size_t expected_block_size,
bool use_version_chain)
{
RUNTIME_CHECK_MSG(!dm_context.read_delta_only, "Read delta only is unsupported");
sanitizeCheckReadRanges(__FUNCTION__, read_ranges, rowkey_range, log);
if (use_version_chain)
{
return ::DB::DM::buildBitmapFilter(
dm_context,
*segment_snap,
read_ranges,
pack_filter_results,
start_ts,
version_chain);
}

if (dm_context.read_stable_only || (segment_snap->delta->getRows() == 0 && segment_snap->delta->getDeletes() == 0))
{
return buildBitmapFilterStableOnly(
Expand Down Expand Up @@ -3452,18 +3465,23 @@ BlockInputStreamPtr Segment::getLateMaterializationStream(
dm_context.tracing_id);
}

RowKeyRanges Segment::shrinkRowKeyRanges(const RowKeyRanges & read_ranges) const
RowKeyRanges Segment::shrinkRowKeyRanges(const RowKeyRange & target_range, const RowKeyRanges & read_ranges)
{
RowKeyRanges real_ranges;
for (const auto & read_range : read_ranges)
{
auto real_range = rowkey_range.shrink(read_range);
auto real_range = target_range.shrink(read_range);
if (!real_range.none())
real_ranges.emplace_back(std::move(real_range));
}
return real_ranges;
}

RowKeyRanges Segment::shrinkRowKeyRanges(const RowKeyRanges & read_ranges) const
{
return shrinkRowKeyRanges(rowkey_range, read_ranges);
}

static bool hasCacheableColumn(const ColumnDefines & columns)
{
return std::find_if(columns.begin(), columns.end(), DMFileReader::isCacheableColumn) != columns.end();
Expand All @@ -3488,7 +3506,8 @@ BlockInputStreamPtr Segment::getBitmapFilterInputStream(
read_ranges,
pack_filter_results,
start_ts,
build_bitmap_filter_block_rows);
build_bitmap_filter_block_rows,
dm_context.global_context.getSettingsRef().dt_enable_version_chain);

// If we don't need to read the cacheable columns, release column cache as soon as possible.
if (!hasCacheableColumn(columns_to_read))
Expand Down
10 changes: 8 additions & 2 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@
#include <Storages/DeltaMerge/Segment_fwd.h>
#include <Storages/DeltaMerge/SkippableBlockInputStream.h>
#include <Storages/DeltaMerge/StableValueSpace.h>
#include <Storages/DeltaMerge/VersionChain/BuildBitmapFilter.h>
#include <Storages/DeltaMerge/VersionChain/VersionChain.h>
#include <Storages/KVStore/MultiRaft/Disagg/CheckpointInfo.h>
#include <Storages/KVStore/MultiRaft/Disagg/fast_add_peer.pb.h>

namespace DB
{
struct GeneralCancelHandle;
Expand Down Expand Up @@ -663,6 +664,8 @@ class Segment
}
}

static RowKeyRanges shrinkRowKeyRanges(const RowKeyRange & target_range, const RowKeyRanges & read_ranges);

#ifndef DBMS_PUBLIC_GTEST
private:
#else
Expand Down Expand Up @@ -736,7 +739,8 @@ class Segment
const RowKeyRanges & read_ranges,
const DMFilePackFilterResults & pack_filter_results,
UInt64 start_ts,
size_t expected_block_size);
size_t expected_block_size,
bool use_version_chain);
BitmapFilterPtr buildBitmapFilterNormal(
const DMContext & dm_context,
const SegmentSnapshotPtr & segment_snap,
Expand Down Expand Up @@ -838,6 +842,8 @@ class Segment

const LoggerPtr parent_log; // Used when constructing new segments in split
const LoggerPtr log;

std::variant<VersionChain<Int64>, VersionChain<String>> version_chain;
};

void readSegmentMetaInfo(ReadBuffer & buf, Segment::SegmentMetaInfo & segment_info);
Expand Down
Loading