Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,11 @@
path = contrib/apache-orc
url = https://github.com/apache/doris-thirdparty.git
branch = orc
[submodule "doris-faiss"]
path = contrib/faiss
url = https://github.com/apache/doris-thirdparty.git
branch = faiss
[submodule "doris-openblas"]
path = contrib/openblas
url = https://github.com/apache/doris-thirdparty.git
branch = openblas
4 changes: 3 additions & 1 deletion be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ endif ()
# use this to avoid some runtime tracker. reuse BE_TEST symbol, no need another.
if (BUILD_BENCHMARK)
add_definitions(-DBE_TEST)
# The separate BENCHMARK marker is introduced here because
# The separate BENCHMARK marker is introduced here because
# some BE UTs mock certain functions, and BENCHMARK cannot find their definitions.
add_definitions(-DBE_BENCHMARK)
endif()
Expand Down Expand Up @@ -767,6 +767,7 @@ function(pch_reuse target)
endif()
endfunction(pch_reuse target)


add_subdirectory(${SRC_DIR}/agent)
add_subdirectory(${SRC_DIR}/common)
add_subdirectory(${SRC_DIR}/exec)
Expand All @@ -775,6 +776,7 @@ add_subdirectory(${SRC_DIR}/gen_cpp)
add_subdirectory(${SRC_DIR}/geo)
add_subdirectory(${SRC_DIR}/http)
add_subdirectory(${SRC_DIR}/io)
add_subdirectory(${SRC_DIR}/olap/rowset/segment_v2/ann_index)
add_subdirectory(${SRC_DIR}/olap)
add_subdirectory(${SRC_DIR}/runtime)
add_subdirectory(${SRC_DIR}/runtime_filter)
Expand Down
3 changes: 1 addition & 2 deletions be/src/cloud/cloud_internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,8 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
// inverted index
auto schema_ptr = rs_meta.tablet_schema();
auto idx_version = schema_ptr->get_inverted_index_storage_format();
bool has_inverted_index = schema_ptr->has_inverted_index();

if (has_inverted_index) {
if (schema_ptr->has_inverted_index() || schema_ptr->has_ann_index()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If has_inverted_index always occurs with has_ann_index, we need another function to represent we need to use a index management.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If has_inverted_index always occurs with has_ann_index, we need another function to represent we need to use a index management.

maybe in the future refactor pr

if (idx_version == InvertedIndexStorageFormatPB::V1) {
auto&& inverted_index_info = rs_meta.inverted_index_file_info(segment_id);
std::unordered_map<int64_t, int64_t> index_size_map;
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) {
} else {
_rowset_meta->add_segments_file_size(seg_file_size.value());
}
if (_context.tablet_schema->has_inverted_index()) {
if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
if (auto idx_files_info = _idx_files.inverted_index_file_info(_segment_start_id);
!idx_files_info.has_value()) [[unlikely]] {
LOG(ERROR) << "expected inverted index files info, but none presents: "
Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_snapshot_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ Status CloudSnapshotMgr::_create_rowset_meta(
file_mapping[src_index_file] = dst_index_file;
}
} else {
if (context.tablet_schema->has_inverted_index()) {
if (context.tablet_schema->has_inverted_index() ||
context.tablet_schema->has_ann_index()) {
std::string src_index_file = InvertedIndexDescriptor::get_index_file_path_v2(
InvertedIndexDescriptor::get_index_file_path_prefix(src_segment_file));
std::string dst_index_file = InvertedIndexDescriptor::get_index_file_path_v2(
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
download_idx_file(idx_path, index_size_map[index->index_id()]);
}
} else {
if (schema_ptr->has_inverted_index()) {
if (schema_ptr->has_inverted_index() || schema_ptr->has_ann_index()) {
auto&& inverted_index_info =
rowset_meta->inverted_index_file_info(seg_id);
int64_t idx_size = 0;
Expand Down
5 changes: 2 additions & 3 deletions be/src/cloud/cloud_warm_up_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ void CloudWarmUpManager::handle_jobs() {
expiration_time, wait, true);
}
} else {
if (schema_ptr->has_inverted_index()) {
if (schema_ptr->has_inverted_index() || schema_ptr->has_ann_index()) {
auto idx_path =
storage_resource.value()->remote_idx_v2_path(*rs, seg_id);
file_size = idx_file_info.has_index_size() ? idx_file_info.index_size()
Expand Down Expand Up @@ -556,14 +556,13 @@ void CloudWarmUpManager::warm_up_rowset(RowsetMeta& rs_meta, int64_t sync_wait_t

// update metrics
auto schema_ptr = rs_meta.tablet_schema();
bool has_inverted_index = schema_ptr->has_inverted_index();
auto idx_version = schema_ptr->get_inverted_index_storage_format();
for (int64_t segment_id = 0; segment_id < rs_meta.num_segments(); segment_id++) {
g_file_cache_event_driven_warm_up_requested_segment_num << 1;
g_file_cache_event_driven_warm_up_requested_segment_size
<< rs_meta.segment_file_size(segment_id);

if (has_inverted_index) {
if (schema_ptr->has_inverted_index() || schema_ptr->has_ann_index()) {
if (idx_version == InvertedIndexStorageFormatPB::V1) {
auto&& inverted_index_info = rs_meta.inverted_index_file_info(segment_id);
if (inverted_index_info.index_info().empty()) {
Expand Down
21 changes: 9 additions & 12 deletions be/src/common/cast_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,21 @@ template <typename T, typename U>
void check_cast_value(U b) {
if constexpr (IsUnsignedV<U>) {
if (b > std::numeric_limits<T>::max()) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
"value {} cast to type {} out of range [{},{}]", b,
typeid(T).name(), std::numeric_limits<T>::min(),
std::numeric_limits<T>::max());
throw doris::Exception(
ErrorCode::INTERNAL_ERROR, "value {} cast to type {} out of range [{},{}]", b,
typeid(T).name(), std::numeric_limits<T>::min(), std::numeric_limits<T>::max());
}
} else if constexpr (IsUnsignedV<T>) {
if (b < 0 || b > std::numeric_limits<T>::max()) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
"value {} cast to type {} out of range [{},{}]", b,
typeid(T).name(), std::numeric_limits<T>::min(),
std::numeric_limits<T>::max());
throw doris::Exception(
ErrorCode::INTERNAL_ERROR, "value {} cast to type {} out of range [{},{}]", b,
typeid(T).name(), std::numeric_limits<T>::min(), std::numeric_limits<T>::max());
}
} else {
if (b < std::numeric_limits<T>::min() || b > std::numeric_limits<T>::max()) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
"value {} cast to type {} out of range [{},{}]", b,
typeid(T).name(), std::numeric_limits<T>::min(),
std::numeric_limits<T>::max());
throw doris::Exception(
ErrorCode::INTERNAL_ERROR, "value {} cast to type {} out of range [{},{}]", b,
typeid(T).name(), std::numeric_limits<T>::min(), std::numeric_limits<T>::max());
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1584,6 +1584,12 @@ DEFINE_mBool(enable_auto_clone_on_mow_publish_missing_version, "false");
// The maximum number of threads supported when executing LLMFunction
DEFINE_mInt32(llm_max_concurrent_requests, "1");

// Maximum number of openmp threads can be used by each doris threads.
// This configuration controls the parallelism level for OpenMP operations within Doris,
// helping to prevent resource contention and ensure stable performance when multiple
// Doris threads are executing OpenMP-accelerated operations simultaneously.
DEFINE_mInt32(omp_threads_limit, "8");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1636,6 +1636,9 @@ DECLARE_String(fuzzy_test_type);
// The maximum number of threads supported when executing LLMFunction
DECLARE_mInt32(llm_max_concurrent_requests);

// Maximum number of OpenMP threads that can be used by each Doris thread
DECLARE_Int32(omp_threads_limit);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
7 changes: 7 additions & 0 deletions be/src/olap/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,14 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/olap")
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/olap")

file(GLOB_RECURSE SRC_FILES CONFIGURE_DEPENDS *.cpp)

# Files in the ann_index directory use faiss header files
# Some of Doris's compilation check options fail on these header files, so exclude files in the ann_index directory
# They are compiled separately as a .a library and linked by Olap
list(FILTER SRC_FILES EXCLUDE REGEX ".*/olap/rowset/segment_v2/ann_index/.*\\.cpp$")

add_library(Olap STATIC ${SRC_FILES})
target_link_libraries(Olap PRIVATE ann_index)

if (OS_MACOSX)
target_compile_options(Olap PRIVATE -Wno-unused-lambda-capture)
Expand Down
25 changes: 16 additions & 9 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/thread_context.h"
#include "util/doris_metrics.h"
#include "util/pretty_printer.h"
#include "util/time.h"
#include "util/trace.h"
#include "vec/common/schema_util.h"
Expand Down Expand Up @@ -556,13 +557,19 @@ Status CompactionMixin::execute_compact_impl(int64_t permits) {
LOG(INFO) << "succeed to do " << compaction_name() << " is_vertical=" << _is_vertical
<< ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version
<< ", current_max_version=" << tablet()->max_version().second
<< ", disk=" << tablet()->data_dir()->path() << ", segments=" << _input_num_segments
<< ", input_rowsets_data_size=" << _input_rowsets_data_size
<< ", input_rowsets_index_size=" << _input_rowsets_index_size
<< ", input_rowsets_total_size=" << _input_rowsets_total_size
<< ", output_rowset_data_size=" << _output_rowset->data_disk_size()
<< ", output_rowset_index_size=" << _output_rowset->index_disk_size()
<< ", output_rowset_total_size=" << _output_rowset->total_disk_size()
<< ", disk=" << tablet()->data_dir()->path()
<< ", input_segments=" << _input_num_segments << ", input_rowsets_data_size="
<< PrettyPrinter::print_bytes(_input_rowsets_data_size)
<< ", input_rowsets_index_size="
<< PrettyPrinter::print_bytes(_input_rowsets_index_size)
<< ", input_rowsets_total_size="
<< PrettyPrinter::print_bytes(_input_rowsets_total_size)
<< ", output_rowset_data_size="
<< PrettyPrinter::print_bytes(_output_rowset->data_disk_size())
<< ", output_rowset_index_size="
<< PrettyPrinter::print_bytes(_output_rowset->index_disk_size())
<< ", output_rowset_total_size="
<< PrettyPrinter::print_bytes(_output_rowset->total_disk_size())
<< ", input_row_num=" << _input_row_num
<< ", output_row_num=" << _output_rowset->num_rows()
<< ", filtered_row_num=" << _stats.filtered_rows
Expand Down Expand Up @@ -769,8 +776,8 @@ Status Compaction::do_inverted_index_compaction() {

// dest index files
// format: rowsetId_segmentId
auto& inverted_index_file_writers = dynamic_cast<BaseBetaRowsetWriter*>(_output_rs_writer.get())
->inverted_index_file_writers();
auto& inverted_index_file_writers =
dynamic_cast<BaseBetaRowsetWriter*>(_output_rs_writer.get())->index_file_writers();
DBUG_EXECUTE_IF(
"Compaction::do_inverted_index_compaction_inverted_index_file_writers_size_error",
{ inverted_index_file_writers.clear(); })
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "olap/block_column_predicate.h"
#include "olap/column_predicate.h"
#include "olap/olap_common.h"
#include "olap/rowset/segment_v2/ann_index/ann_topn_runtime.h"
#include "olap/rowset/segment_v2/row_ranges.h"
#include "olap/tablet_schema.h"
#include "runtime/runtime_state.h"
Expand Down Expand Up @@ -122,6 +123,7 @@ class StorageReadOptions {
size_t topn_limit = 0;

std::map<ColumnId, vectorized::VExprContextSPtr> virtual_column_exprs;
std::shared_ptr<segment_v2::AnnTopNRuntime> ann_topn_runtime;
std::map<ColumnId, size_t> vir_cid_to_idx_in_block;
std::map<size_t, vectorized::DataTypePtr> vir_col_idx_to_type;

Expand Down
22 changes: 22 additions & 0 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,28 @@ struct OlapReaderStatistics {
int64_t inverted_index_lookup_timer = 0;
InvertedIndexStatistics inverted_index_stats;

int64_t ann_index_load_ns = 0;
int64_t ann_topn_search_ns = 0;
int64_t ann_index_topn_search_cnt = 0;

// Detailed timing for ANN operations
int64_t ann_index_topn_engine_search_ns = 0; // time spent in engine for range search
int64_t ann_index_topn_result_process_ns = 0; // time spent processing TopN results
int64_t ann_index_topn_engine_convert_ns = 0; // time spent on FAISS-side conversions (TopN)
int64_t ann_index_topn_engine_prepare_ns =
0; // time spent preparing before engine search (TopN)
int64_t rows_ann_index_topn_filtered = 0;

int64_t ann_index_range_search_ns = 0;
int64_t ann_index_range_search_cnt = 0;
// Detailed timing for ANN Range search
int64_t ann_range_engine_search_ns = 0; // time spent in engine for range search
int64_t ann_range_pre_process_ns = 0; // time spent preparing before engine search

int64_t ann_range_result_convert_ns = 0; // time spent processing range results
int64_t ann_range_engine_convert_ns = 0; // time spent on FAISS-side conversions (Range)
int64_t rows_ann_index_range_filtered = 0;

int64_t output_index_result_column_timer = 0;
// number of segment filtered by column stat when creating seg iterator
int64_t filtered_segment_number = 0;
Expand Down
12 changes: 6 additions & 6 deletions be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ Status BetaRowset::remove() {
}
}
} else {
if (_schema->has_inverted_index()) {
if (_schema->has_inverted_index() || _schema->has_ann_index()) {
std::string inverted_index_file = InvertedIndexDescriptor::get_index_file_path_v2(
InvertedIndexDescriptor::get_index_file_path_prefix(seg_path));
st = fs->delete_file(inverted_index_file);
Expand Down Expand Up @@ -367,7 +367,7 @@ Status BetaRowset::link_files_to(const std::string& dir, RowsetId new_rowset_id,
}
}
} else {
if (_schema->has_inverted_index() &&
if ((_schema->has_inverted_index() || _schema->has_ann_index()) &&
(without_index_uids == nullptr || without_index_uids->empty())) {
std::string inverted_index_file_src =
InvertedIndexDescriptor::get_index_file_path_v2(
Expand Down Expand Up @@ -434,7 +434,7 @@ Status BetaRowset::copy_files_to(const std::string& dir, const RowsetId& new_row
}
}
} else {
if (_schema->has_inverted_index()) {
if (_schema->has_inverted_index() || _schema->has_ann_index()) {
std::string inverted_index_src_file =
InvertedIndexDescriptor::get_index_file_path_v2(
InvertedIndexDescriptor::get_index_file_path_prefix(src_path));
Expand Down Expand Up @@ -492,7 +492,7 @@ Status BetaRowset::upload_to(const StorageResource& dest_fs, const RowsetId& new
}
}
} else {
if (_schema->has_inverted_index()) {
if (_schema->has_inverted_index() || _schema->has_ann_index()) {
std::string remote_inverted_index_file =
InvertedIndexDescriptor::get_index_file_path_v2(
InvertedIndexDescriptor::get_index_file_path_prefix(
Expand Down Expand Up @@ -646,7 +646,7 @@ Status BetaRowset::add_to_binlog() {
linked_success_files.push_back(binlog_index_file);
}
} else {
if (_schema->has_inverted_index()) {
if (_schema->has_inverted_index() || _schema->has_ann_index()) {
auto index_file = InvertedIndexDescriptor::get_index_file_path_v2(
InvertedIndexDescriptor::get_index_file_path_prefix(seg_file));
auto binlog_index_file = (std::filesystem::path(binlog_dir) /
Expand Down Expand Up @@ -693,7 +693,7 @@ Status BetaRowset::calc_file_crc(uint32_t* crc_value, int64_t* file_count) {
}
}
} else {
if (_schema->has_inverted_index()) {
if (_schema->has_inverted_index() || _schema->has_ann_index()) {
std::string inverted_index_file = InvertedIndexDescriptor::get_index_file_path_v2(
InvertedIndexDescriptor::get_index_file_path_prefix(seg_path));
file_paths.emplace_back(std::move(inverted_index_file));
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "olap/schema_cache.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "runtime/descriptors.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
#include "vec/olap/vgeneric_iterators.h"
Expand Down Expand Up @@ -102,6 +103,7 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
_read_options.remaining_conjunct_roots = _read_context->remaining_conjunct_roots;
_read_options.common_expr_ctxs_push_down = _read_context->common_expr_ctxs_push_down;
_read_options.virtual_column_exprs = _read_context->virtual_column_exprs;
_read_options.ann_topn_runtime = _read_context->ann_topn_runtime;
_read_options.vir_cid_to_idx_in_block = _read_context->vir_cid_to_idx_in_block;
_read_options.vir_col_idx_to_type = _read_context->vir_col_idx_to_type;
_read_options.score_runtime = _read_context->score_runtime;
Expand Down
Loading
Loading