Skip to content

Commit

Permalink
[fix](segcompaction) fix coredump for inverted index file writer clos…
Browse files Browse the repository at this point in the history
…e in segment compaction (#43477)

Related PR: #41625

Problem Summary:
Fix coredump when doing segcompaction after refactor of inverted index
file writer.

Co-authored-by: airborne12 <jiangkai@selectdb.com>
  • Loading branch information
airborne12 and airborne12 authored Nov 11, 2024
1 parent 37f489a commit 14084ee
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 1 deletion.
4 changes: 3 additions & 1 deletion be/src/olap/rowset/segcompaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,9 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
_writer->_num_segcompacted);
}
RETURN_IF_ERROR(_writer->_rename_compacted_segments(begin, end));

if (_inverted_index_file_writer != nullptr) {
_inverted_index_file_writer.reset();
}
if (VLOG_DEBUG_IS_ON) {
_writer->vlog_buffer.clear();
for (const auto& entry : std::filesystem::directory_iterator(ctx.tablet_path)) {
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ ExecEnv::~ExecEnv() {
}

#ifdef BE_TEST
void ExecEnv::set_inverted_index_searcher_cache(
segment_v2::InvertedIndexSearcherCache* inverted_index_searcher_cache) {
_inverted_index_searcher_cache = inverted_index_searcher_cache;
}
void ExecEnv::set_storage_engine(std::unique_ptr<BaseStorageEngine>&& engine) {
_storage_engine = std::move(engine);
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ class ExecEnv {
}

void set_storage_engine(std::unique_ptr<BaseStorageEngine>&& engine);
void set_inverted_index_searcher_cache(
segment_v2::InvertedIndexSearcherCache* inverted_index_searcher_cache);
void set_cache_manager(CacheManager* cm) { this->_cache_manager = cm; }
void set_process_profile(ProcessProfile* pp) { this->_process_profile = pp; }
void set_tablet_schema_cache(TabletSchemaCache* c) { this->_tablet_schema_cache = c; }
Expand Down
59 changes: 59 additions & 0 deletions be/test/olap/segcompaction_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ using namespace ErrorCode;
static const uint32_t MAX_PATH_LEN = 1024;
static StorageEngine* l_engine = nullptr;
static const std::string lTestDir = "./data_test/data/segcompaction_test";
constexpr static std::string_view tmp_dir = "./data_test/tmp";

class SegCompactionTest : public testing::Test {
public:
Expand All @@ -59,6 +60,7 @@ class SegCompactionTest : public testing::Test {
config::tablet_map_shard_size = 1;
config::txn_map_shard_size = 1;
config::txn_shard_size = 1;
config::inverted_index_fd_number_limit_percent = 0;

char buffer[MAX_PATH_LEN];
EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
Expand All @@ -72,6 +74,23 @@ class SegCompactionTest : public testing::Test {
std::vector<StorePath> paths;
paths.emplace_back(config::storage_root_path, -1);

// tmp dir
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(tmp_dir).ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(tmp_dir).ok());
paths.emplace_back(std::string(tmp_dir), 1024000000);
auto tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(paths);
EXPECT_TRUE(tmp_file_dirs->init().ok());
ExecEnv::GetInstance()->set_tmp_file_dir(std::move(tmp_file_dirs));

// use memory limit
int64_t inverted_index_cache_limit = 0;
_inverted_index_searcher_cache = std::unique_ptr<segment_v2::InvertedIndexSearcherCache>(
InvertedIndexSearcherCache::create_global_instance(inverted_index_cache_limit,
256));

ExecEnv::GetInstance()->set_inverted_index_searcher_cache(
_inverted_index_searcher_cache.get());

doris::EngineOptions options;
options.store_paths = paths;

Expand Down Expand Up @@ -99,6 +118,7 @@ class SegCompactionTest : public testing::Test {
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
l_engine = nullptr;
exec_env->set_storage_engine(nullptr);
exec_env->set_inverted_index_searcher_cache(nullptr);
}

protected:
Expand Down Expand Up @@ -140,6 +160,7 @@ class SegCompactionTest : public testing::Test {
tablet_schema_pb.set_num_rows_per_row_block(1024);
tablet_schema_pb.set_compress_kind(COMPRESS_NONE);
tablet_schema_pb.set_next_column_unique_id(4);
tablet_schema_pb.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V2);

ColumnPB* column_1 = tablet_schema_pb.add_column();
column_1->set_unique_id(1);
Expand All @@ -150,6 +171,11 @@ class SegCompactionTest : public testing::Test {
column_1->set_index_length(4);
column_1->set_is_nullable(true);
column_1->set_is_bf_column(false);
auto tablet_index_1 = tablet_schema_pb.add_index();
tablet_index_1->set_index_id(1);
tablet_index_1->set_index_name("column_1");
tablet_index_1->set_index_type(IndexType::INVERTED);
tablet_index_1->add_col_unique_id(1);

ColumnPB* column_2 = tablet_schema_pb.add_column();
column_2->set_unique_id(2);
Expand All @@ -162,6 +188,11 @@ class SegCompactionTest : public testing::Test {
column_2->set_is_key(true);
column_2->set_is_nullable(true);
column_2->set_is_bf_column(false);
auto tablet_index_2 = tablet_schema_pb.add_index();
tablet_index_2->set_index_id(2);
tablet_index_2->set_index_name("column_2");
tablet_index_2->set_index_type(IndexType::INVERTED);
tablet_index_2->add_col_unique_id(2);

for (int i = 1; i <= num_value_col; i++) {
ColumnPB* v_column = tablet_schema_pb.add_column();
Expand Down Expand Up @@ -245,6 +276,7 @@ class SegCompactionTest : public testing::Test {

private:
std::unique_ptr<DataDir> _data_dir;
std::unique_ptr<InvertedIndexSearcherCache> _inverted_index_searcher_cache;
};

TEST_F(SegCompactionTest, SegCompactionThenRead) {
Expand Down Expand Up @@ -300,6 +332,13 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) {
ls.push_back("10047_4.dat");
ls.push_back("10047_5.dat");
ls.push_back("10047_6.dat");
ls.push_back("10047_0.idx");
ls.push_back("10047_1.idx");
ls.push_back("10047_2.idx");
ls.push_back("10047_3.idx");
ls.push_back("10047_4.idx");
ls.push_back("10047_5.idx");
ls.push_back("10047_6.idx");
EXPECT_TRUE(check_dir(ls));
}

Expand Down Expand Up @@ -502,6 +541,13 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO) {
ls.push_back("10048_4.dat"); // O
ls.push_back("10048_5.dat"); // oooooooo
ls.push_back("10048_6.dat"); // O
ls.push_back("10048_0.idx"); // oooo
ls.push_back("10048_1.idx"); // O
ls.push_back("10048_2.idx"); // O
ls.push_back("10048_3.idx"); // o
ls.push_back("10048_4.idx"); // O
ls.push_back("10048_5.idx"); // oooooooo
ls.push_back("10048_6.idx"); // O
EXPECT_TRUE(check_dir(ls));
}
}
Expand Down Expand Up @@ -628,6 +674,11 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_OoOoO) {
ls.push_back("10049_2.dat"); // O
ls.push_back("10049_3.dat"); // o
ls.push_back("10049_4.dat"); // O
ls.push_back("10049_0.idx"); // O
ls.push_back("10049_1.idx"); // o
ls.push_back("10049_2.idx"); // O
ls.push_back("10049_3.idx"); // o
ls.push_back("10049_4.idx"); // O
EXPECT_TRUE(check_dir(ls));
}
}
Expand Down Expand Up @@ -789,6 +840,10 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) {
ls.push_back("10051_1.dat");
ls.push_back("10051_2.dat");
ls.push_back("10051_3.dat");
ls.push_back("10051_0.idx");
ls.push_back("10051_1.idx");
ls.push_back("10051_2.idx");
ls.push_back("10051_3.idx");
EXPECT_TRUE(check_dir(ls));
}

Expand Down Expand Up @@ -1049,6 +1104,10 @@ TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) {
ls.push_back("10052_1.dat");
ls.push_back("10052_2.dat");
ls.push_back("10052_3.dat");
ls.push_back("10052_0.idx");
ls.push_back("10052_1.idx");
ls.push_back("10052_2.idx");
ls.push_back("10052_3.idx");
EXPECT_TRUE(check_dir(ls));
}

Expand Down

0 comments on commit 14084ee

Please sign in to comment.