Skip to content

Commit

Permalink
Storages: load RSResult only once (#9738)
Browse files Browse the repository at this point in the history
ref #6233

Before, TiFlash will load the `RSResult` three times for one query (MVCC/Build Bitmap/Query).

This PR introduces `DMFilePackFilterResult` which load `RSResult` only once, and only passes the `RSResult` to `DMFileReader`.

Signed-off-by: JaySon-Huang <tshent@qq.com>

Co-authored-by: JaySon <tshent@qq.com>
Co-authored-by: JaySon-Huang <tshent@qq.com>
  • Loading branch information
Lloyd-Pottiger and JaySon-Huang authored Dec 25, 2024
1 parent 29d4b4f commit 8369058
Show file tree
Hide file tree
Showing 27 changed files with 663 additions and 424 deletions.
20 changes: 5 additions & 15 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Interpreters/Context.h>
#include <Interpreters/SharedContexts/Disagg.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileBig.h>
#include <Storages/DeltaMerge/DMContext.h>
Expand All @@ -38,22 +37,13 @@ ColumnFileBig::ColumnFileBig(const DMContext & dm_context, const DMFilePtr & fil

void ColumnFileBig::calculateStat(const DMContext & dm_context)
{
auto index_cache = dm_context.global_context.getMinMaxIndexCache();

auto pack_filter = DMFilePackFilter::loadFrom(
auto m = DMFilePackFilter::loadValidRowsAndBytes(
dm_context,
file,
index_cache,
/*set_cache_if_miss*/ false,
{segment_range},
EMPTY_RS_OPERATOR,
{},
dm_context.global_context.getFileProvider(),
dm_context.getReadLimiter(),
dm_context.scan_context,
/*tracing_id*/ dm_context.tracing_id,
ReadTag::Internal);

std::tie(valid_rows, valid_bytes) = pack_filter.validRowsAndBytes();
{segment_range});
valid_rows = m.match_rows;
valid_bytes = m.match_bytes;
}

void ColumnFileBig::removeData(WriteBatches & wbs) const
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <IO/FileProvider/ChecksumReadBufferBuilder.h>
#include <Storages/DeltaMerge/File/ColumnStream.h>
#include <Storages/DeltaMerge/File/DMFileReader.h>
#include <Storages/Page/PageUtil.h>
Expand Down Expand Up @@ -157,7 +158,7 @@ std::unique_ptr<CompressedSeekableReaderBuffer> ColumnReadStream::buildColDataRe

// Try to get the largest buffer size of reading continuous packs
size_t buffer_size = 0;
const auto & pack_res = reader.pack_filter.getPackResConst();
const auto & pack_res = reader.pack_filter->getPackRes();
for (size_t i = 0; i < n_packs; /*empty*/)
{
if (!pack_res[i].isUse())
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ size_t DMFile::colIndexSize(ColId id) const
}
else
{
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Index of {} not exist", id);
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Index is not exist, col_id={}", id);
}
}
else
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ class DMFile : private boost::noncopyable

UInt32 metaVersion() const { return meta->metaVersion(); }

bool isColIndexExist(const ColId & col_id) const;

private:
DMFile(
UInt64 file_id_,
Expand Down Expand Up @@ -293,8 +295,6 @@ class DMFile : private boost::noncopyable
String colIndexCacheKey(const FileNameBase & file_name_base) const;
String colMarkCacheKey(const FileNameBase & file_name_base) const;

bool isColIndexExist(const ColId & col_id) const;

String encryptionBasePath() const;
EncryptionPath encryptionDataPath(const FileNameBase & file_name_base) const;
EncryptionPath encryptionIndexPath(const FileNameBase & file_name_base) const;
Expand Down
73 changes: 36 additions & 37 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>
#include <Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.h>
#include <Storages/DeltaMerge/Filter/WithANNQueryInfo.h>
#include <Storages/DeltaMerge/Index/VectorIndex.h>
#include <Storages/DeltaMerge/ScanContext.h>


namespace DB::DM
{

Expand Down Expand Up @@ -58,19 +56,6 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(

bool is_common_handle = !rowkey_ranges.empty() && rowkey_ranges[0].is_common_handle;

DMFilePackFilter pack_filter = DMFilePackFilter::loadFrom(
dmfile,
index_cache,
/*set_cache_if_miss*/ true,
rowkey_ranges,
rs_filter,
read_packs,
file_provider,
read_limiter,
scan_context,
tracing_id,
read_tag);

bool enable_read_thread = SegmentReaderPoolManager::instance().isSegmentReader();

if (!enable_read_thread || max_sharing_column_bytes_for_all <= 0)
Expand All @@ -79,6 +64,22 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(
max_sharing_column_bytes_for_all = 0;
}

// If pack_filter is not set, load from EMPTY_RS_OPERATOR.
if (!pack_filter)
{
pack_filter = DMFilePackFilter::loadFrom(
index_cache,
file_provider,
read_limiter,
scan_context,
dmfile,
true,
rowkey_ranges,
EMPTY_RS_OPERATOR,
read_packs,
tracing_id);
}

DMFileReader reader(
dmfile,
read_columns,
Expand All @@ -87,7 +88,7 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(
enable_del_clean_read,
is_fast_scan,
max_data_version,
std::move(pack_filter),
pack_filter,
mark_cache,
enable_column_cache,
column_cache,
Expand Down Expand Up @@ -140,18 +141,13 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn
return build(dmfile, read_columns, rowkey_ranges, scan_context);
};

if (!rs_filter)
return fallback();

auto filter_with_ann = std::dynamic_pointer_cast<WithANNQueryInfo>(rs_filter);
if (!filter_with_ann)
if (!ann_query_info)
return fallback();

if (!bitmap_filter.has_value())
return fallback();

Block header_layout = toEmptyBlock(read_columns);
auto ann_query_info = filter_with_ann->ann_query_info;

// Copy out the vector column for later use. Copy is intentionally performed after the
// fast check so that in fallback conditions we don't need unnecessary copies.
Expand Down Expand Up @@ -181,22 +177,25 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn

// All check passed. Let's read via vector index.

DMFilePackFilter pack_filter = DMFilePackFilter::loadFrom(
dmfile,
index_cache,
/*set_cache_if_miss*/ true,
rowkey_ranges,
rs_filter,
read_packs,
file_provider,
read_limiter,
scan_context,
tracing_id,
ReadTag::Query);

bool enable_read_thread = SegmentReaderPoolManager::instance().isSegmentReader();
bool is_common_handle = !rowkey_ranges.empty() && rowkey_ranges[0].is_common_handle;

// If pack_filter is not set, load from EMPTY_RS_OPERATOR.
if (!pack_filter)
{
pack_filter = DMFilePackFilter::loadFrom(
index_cache,
file_provider,
read_limiter,
scan_context,
dmfile,
true,
rowkey_ranges,
EMPTY_RS_OPERATOR,
read_packs,
tracing_id);
}

DMFileReader rest_columns_reader(
dmfile,
rest_columns,
Expand All @@ -205,7 +204,7 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn
enable_del_clean_read,
is_fast_scan,
max_data_version,
std::move(pack_filter),
pack_filter,
mark_cache,
enable_column_cache,
column_cache,
Expand All @@ -217,7 +216,7 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn
tracing_id,
enable_read_thread,
scan_context,
ReadTag::Query);
read_tag);

if (column_cache_long_term && pk_col_id)
// ColumnCacheLongTerm is only filled in Vector Search.
Expand Down
17 changes: 13 additions & 4 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ class DMFileBlockInputStreamBuilder
return *this;
}

DMFileBlockInputStreamBuilder & setRSOperator(const RSOperatorPtr & filter_)
DMFileBlockInputStreamBuilder & setAnnQureyInfo(const ANNQueryInfoPtr & ann_query_info_)
{
rs_filter = filter_;
ann_query_info = ann_query_info_;
return *this;
}

Expand All @@ -162,6 +162,7 @@ class DMFileBlockInputStreamBuilder
read_one_pack_every_time = true;
return *this;
}

DMFileBlockInputStreamBuilder & setRowsThreshold(size_t rows_threshold_per_read_)
{
rows_threshold_per_read = rows_threshold_per_read_;
Expand All @@ -180,6 +181,12 @@ class DMFileBlockInputStreamBuilder
return *this;
}

DMFileBlockInputStreamBuilder & setDMFilePackFilterResult(const DMFilePackFilterResultPtr & pack_filter_)
{
pack_filter = pack_filter_;
return *this;
}

/**
* @note To really enable the long term cache, you also need to ensure
* ColumnCacheLongTerm is initialized in the global context.
Expand Down Expand Up @@ -217,8 +224,6 @@ class DMFileBlockInputStreamBuilder
bool is_fast_scan = false;
bool enable_del_clean_read = false;
UInt64 max_data_version = std::numeric_limits<UInt64>::max();
// Rough set filter
RSOperatorPtr rs_filter;
// packs filter (filter by pack index)
IdSetPtr read_packs;
MarkCachePtr mark_cache;
Expand All @@ -234,6 +239,10 @@ class DMFileBlockInputStreamBuilder
String tracing_id;
ReadTag read_tag = ReadTag::Internal;

DMFilePackFilterResultPtr pack_filter;

ANNQueryInfoPtr ann_query_info = nullptr;

VectorIndexCachePtr vector_index_cache;
// Note: Currently thie field is assigned only for Stable streams, not available for ColumnFileBig
std::optional<BitmapFilterView> bitmap_filter;
Expand Down
Loading

0 comments on commit 8369058

Please sign in to comment.