Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vector: refine vector search #9709

Merged
merged 18 commits into from
Dec 23, 2024
Merged
1 change: 0 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
#include <Operators/NullSourceOp.h>
#include <Operators/UnorderedSourceOp.h>
#include <Parsers/makeDummyQuery.h>
#include <Storages/DeltaMerge/ReadThread/UnorderedInputStream.h>
#include <Storages/DeltaMerge/Remote/DisaggSnapshot.h>
#include <Storages/DeltaMerge/Remote/WNDisaggSnapshotManager.h>
#include <Storages/DeltaMerge/ScanContext.h>
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Operators/UnorderedSourceOp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/AddExtraTableIDColumnTransformAction.h>
#include <Flash/Pipeline/Schedule/TaskScheduler.h>
#include <Flash/Pipeline/Schedule/Tasks/Impls/RFWaitTask.h>
#include <Operators/UnorderedSourceOp.h>
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Operators/UnorderedSourceOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#pragma once

#include <Common/Logger.h>
#include <DataStreams/AddExtraTableIDColumnTransformAction.h>
#include <Flash/Coprocessor/RuntimeFilterMgr.h>
#include <Operators/Operator.h>
#include <Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class BitmapFilter
private:
void set(std::span<const UInt32> row_ids, const FilterPtr & f);

std::vector<UInt8> filter;
IColumn::Filter filter;
bool all_match;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,40 +32,18 @@ BitmapFilterBlockInputStream::BitmapFilterBlockInputStream(

Block BitmapFilterBlockInputStream::read()
{
FilterPtr block_filter = nullptr;
auto block = children.at(0)->read(block_filter, true);
auto block = children.at(0)->read();
if (!block)
return block;

filter.resize(block.rows());
bool all_match = bitmap_filter->get(filter, block.startOffset(), block.rows());
if (!block_filter)
{
if (all_match)
return block;
size_t passed_count = countBytesInFilter(filter);
for (auto & col : block)
{
col.column = col.column->filter(filter, passed_count);
}
}
else
if (bool all_match = bitmap_filter->get(filter, block.startOffset(), block.rows()); all_match)
return block;

size_t passed_count = countBytesInFilter(filter);
for (auto & col : block)
{
RUNTIME_CHECK(filter.size() == block_filter->size(), filter.size(), block_filter->size());
if (!all_match)
{
std::transform(
filter.begin(),
filter.end(),
block_filter->begin(),
block_filter->begin(),
[](UInt8 a, UInt8 b) { return a && b; });
}
size_t passed_count = countBytesInFilter(*block_filter);
for (auto & col : block)
{
col.column = col.column->filter(*block_filter, passed_count);
}
col.column = col.column->filter(filter, passed_count);
}
return block;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class BitmapFilterBlockInputStream : public IBlockInputStream
private:
Block header;
BitmapFilterPtr bitmap_filter;
IColumn::Filter filter;
IColumn::Filter filter; // reuse the memory allocated among all `read`
};

} // namespace DB::DM
8 changes: 8 additions & 0 deletions dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ class BitmapFilterView
return BitmapFilterView(filter, filter_offset + offset, size);
}

IColumn::Filter getRawSubFilter(UInt32 offset, UInt32 size) const
{
RUNTIME_CHECK(offset + size <= filter_size, offset, size, filter_size);
return IColumn::Filter{
JinheLin marked this conversation as resolved.
Show resolved Hide resolved
filter->filter.data() + filter_offset + offset,
filter->filter.data() + filter_offset + offset + size};
}

// Caller should ensure n in [0, size).
inline bool get(UInt32 n) const { return filter->get(filter_offset + n); }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,23 @@ SkippableBlockInputStreamPtr ColumnFileSetWithVectorIndexInputStream::tryBuild(
rest_columns->emplace_back(cd);
}

// No vector index column is specified, just use the normal logic.
// No vector index column is specified, fallback.
if (!vec_cd.has_value())
return fallback();

bool has_vector_index = false;
for (const auto & file : delta_snap->getColumnFiles())
{
if (auto * tiny_file = file->tryToTinyFile(); tiny_file && tiny_file->hasIndex(ann_query_info->index_id()))
{
has_vector_index = true;
break;
}
}
// No file has vector index, fallback.
if (!has_vector_index)
return fallback();

// All check passed. Let's read via vector index.
return std::make_shared<ColumnFileSetWithVectorIndexInputStream>(
context,
Expand All @@ -68,25 +81,6 @@ SkippableBlockInputStreamPtr ColumnFileSetWithVectorIndexInputStream::tryBuild(
read_tag_);
}

Block ColumnFileSetWithVectorIndexInputStream::read(FilterPtr & res_filter, bool return_filter)
{
if (return_filter)
return readImpl(res_filter);

// If return_filter == false, we must filter by ourselves.

FilterPtr filter = nullptr;
auto res = readImpl(filter);
if (filter != nullptr)
{
auto passed_count = countBytesInFilter(*filter);
for (auto & col : res)
col.column = col.column->filter(*filter, passed_count);
}
// filter == nullptr means all rows are valid and no need to filter.
return res;
}

Block ColumnFileSetWithVectorIndexInputStream::readOtherColumns()
{
auto reset_column_file_reader = (*cur_column_file_reader)->createNewReader(rest_col_defs, ReadTag::Query);
Expand All @@ -102,7 +96,7 @@ void ColumnFileSetWithVectorIndexInputStream::toNextFile(size_t current_file_ind
tiny_readers[current_file_index].reset();
}

Block ColumnFileSetWithVectorIndexInputStream::readImpl(FilterPtr & res_filter)
Block ColumnFileSetWithVectorIndexInputStream::read()
{
load();

Expand All @@ -115,7 +109,7 @@ Block ColumnFileSetWithVectorIndexInputStream::readImpl(FilterPtr & res_filter)
continue;
}
auto current_file_index = std::distance(reader.column_file_readers.begin(), cur_column_file_reader);
// If has index, we can read the column by vector index.
// If file has index, we can read the column by vector index.
if (tiny_readers[current_file_index] != nullptr)
{
const auto file_rows = column_files[current_file_index]->getRows();
Expand All @@ -133,44 +127,43 @@ Block ColumnFileSetWithVectorIndexInputStream::readImpl(FilterPtr & res_filter)
auto tiny_reader = tiny_readers[current_file_index];
auto vec_column = vec_cd.type->createColumn();
const std::span file_selected_rows{selected_row_begin, selected_row_end};
tiny_reader->read(vec_column, file_selected_rows, /* rowid_start_offset= */ read_rows, file_rows);
assert(vec_column->size() == file_rows);
tiny_reader->read(vec_column, file_selected_rows, /* rowid_start_offset= */ read_rows);
assert(vec_column->size() == selected_rows);

// read other columns if needed
Block block;
if (!rest_col_defs->empty())
{
block = readOtherColumns();
assert(block.rows() == vec_column->size());
}

auto index = header.getPositionByName(vec_cd.name);
block.insert(index, ColumnWithTypeAndName(std::move(vec_column), vec_cd.type, vec_cd.name));

// Fill res_filter
if (selected_rows == file_rows)
{
res_filter = nullptr;
}
else
{
filter.clear();
filter.resize_fill(file_rows, 0);
for (const auto rowid : file_selected_rows)
filter[rowid - read_rows] = 1;
res_filter = &filter;
for (auto & col : block)
col.column = col.column->filter(filter, selected_rows);

assert(block.rows() == selected_rows);
}

auto index = header.getPositionByName(vec_cd.name);
block.insert(index, ColumnWithTypeAndName(std::move(vec_column), vec_cd.type, vec_cd.name));

// All rows in this ColumnFileTiny have been read.
block.setStartOffset(read_rows);
toNextFile(current_file_index, file_rows);
return block;
}
// If file does not have index, reader by cur_column_file_reader.
auto block = (*cur_column_file_reader)->readNextBlock();
if (block)
{
block.setStartOffset(read_rows);
read_rows += block.rows();
res_filter = nullptr;
size_t rows = block.rows();
filter = valid_rows.getRawSubFilter(read_rows, rows);
size_t passed_count = countBytesInFilter(filter);
for (auto & col : block)
col.column = col.column->filter(filter, passed_count);
read_rows += rows;
return block;
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,24 +92,13 @@ class ColumnFileSetWithVectorIndexInputStream : public VectorIndexBlockInputStre
String getName() const override { return "ColumnFileSetWithVectorIndex"; }
Block getHeader() const override { return header; }

Block read() override
{
FilterPtr filter = nullptr;
return read(filter, false);
}

// When all rows in block are not filtered out,
// `res_filter` will be set to null.
// The caller needs to do handle this situation.
Block read(FilterPtr & res_filter, bool return_filter) override;
Block read() override;

std::vector<VectorIndexViewer::SearchResult> load() override;

void setSelectedRows(const std::span<const UInt32> & selected_rows) override;

private:
Block readImpl(FilterPtr & res_filter);

Block readOtherColumns();

void toNextFile(size_t current_file_index, size_t current_file_rows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,18 @@ namespace DB::DM
void ColumnFileTinyVectorIndexReader::read(
MutableColumnPtr & vec_column,
const std::span<const VectorIndexViewer::Key> & read_rowids,
size_t rowid_start_offset,
size_t read_rows)
size_t rowid_start_offset)
{
RUNTIME_CHECK(loaded);

Stopwatch watch;
vec_column->reserve(read_rows);
vec_column->reserve(read_rowids.size());
std::vector<Float32> value;
size_t current_rowid = rowid_start_offset;
for (const auto & rowid : read_rowids)
{
// Each ColomnFileTiny has its own vector index, rowid_start_offset is the offset of the ColmnFilePersistSet.
vec_index->get(rowid - rowid_start_offset, value);
if (rowid > current_rowid)
{
UInt32 nulls = rowid - current_rowid;
// Insert [] if column is Not Null, or NULL if column is Nullable
vec_column->insertManyDefaults(nulls);
}
vec_column->insertData(reinterpret_cast<const char *>(value.data()), value.size() * sizeof(Float32));
current_rowid = rowid + 1;
}
if (current_rowid < rowid_start_offset + read_rows)
{
UInt32 nulls = rowid_start_offset + read_rows - current_rowid;
vec_column->insertManyDefaults(nulls);
}

perf_stat.returned_rows = read_rowids.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,11 @@ class ColumnFileTinyVectorIndexReader

~ColumnFileTinyVectorIndexReader();

// Read vector column data and set filter.
// The column will be as same as as the rows of the tiny file,
// but only the rows in selected_rows will be filled,
// others will be filled with default values.
// Read vector column data with the specified rowids.
void read(
MutableColumnPtr & vec_column,
const std::span<const VectorIndexViewer::Key> & read_rowids,
size_t rowid_start_offset,
size_t read_rows);
size_t rowid_start_offset);

// Load vector index and search results.
// Return the rowids of the selected rows.
Expand Down
32 changes: 28 additions & 4 deletions dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Columns/countBytesInFilter.h>
#include <Storages/DeltaMerge/ConcatSkippableBlockInputStream.h>
#include <Storages/DeltaMerge/ScanContext.h>

Expand Down Expand Up @@ -132,13 +133,13 @@ Block ConcatSkippableBlockInputStream<need_row_id>::readWithFilter(const IColumn
}

template <bool need_row_id>
Block ConcatSkippableBlockInputStream<need_row_id>::read(FilterPtr & res_filter, bool return_filter)
Block ConcatSkippableBlockInputStream<need_row_id>::read()
{
Block res;

while (current_stream != children.end())
{
res = (*current_stream)->read(res_filter, return_filter);
res = (*current_stream)->read();
if (res)
{
res.setStartOffset(res.startOffset() + precede_stream_rows);
Expand Down Expand Up @@ -239,12 +240,34 @@ void ConcatVectorIndexBlockInputStream::load()
sr_it = end;
}

// Not used anymore, release memory.
index_streams.clear();
loaded = true;
}

Block ConcatVectorIndexBlockInputStream::read()
{
load();
auto block = stream->read();

// The block read from `VectorIndexBlockInputStream` only return the selected rows. Return it directly.
// For streams which are not `VectorIndexBlockInputStream`, the block should be filtered by bitmap.
if (auto index = std::distance(stream->children.begin(), stream->current_stream); !index_streams[index])
{
filter.resize(block.rows());
if (bool all_match = bitmap_filter->get(filter, block.startOffset(), block.rows()); all_match)
return block;

size_t passed_count = countBytesInFilter(filter);
for (auto & col : block)
{
col.column = col.column->filter(filter, passed_count);
}
}

return block;
}

SkippableBlockInputStreamPtr ConcatVectorIndexBlockInputStream::build(
const BitmapFilterPtr & bitmap_filter,
std::shared_ptr<ConcatSkippableBlockInputStream<false>> stream,
const ANNQueryInfoPtr & ann_query_info)
{
Expand All @@ -267,6 +290,7 @@ SkippableBlockInputStreamPtr ConcatVectorIndexBlockInputStream::build(
return stream;

return std::make_shared<ConcatVectorIndexBlockInputStream>(
bitmap_filter,
stream,
std::move(index_streams),
ann_query_info->top_k());
Expand Down
Loading