Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion be/src/index-tools/index_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -621,11 +621,16 @@ int main(int argc, char** argv) {
_CLLDELETE(analyzer);
_CLLDELETE(char_string_reader);

auto ret = index_file_writer->close();
auto ret = index_file_writer->begin_close();
if (!ret.ok()) {
std::cerr << "IndexFileWriter close error:" << ret.msg() << std::endl;
return -1;
}
ret = index_file_writer->finish_close();
if (!ret.ok()) {
std::cerr << "IndexFileWriter wait close error:" << ret.msg() << std::endl;
return -1;
}
} else if (FLAGS_operation == "show_nested_files_v2") {
if (FLAGS_idx_file_path == "") {
std::cout << "no file flag for show " << std::endl;
Expand Down
15 changes: 12 additions & 3 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,24 @@ Status InvertedIndexFileCollection::add(int seg_id, IndexFileWriterPtr&& index_w
return Status::OK();
}

Status InvertedIndexFileCollection::close() {
Status InvertedIndexFileCollection::begin_close() {
std::lock_guard lock(_lock);
for (auto&& [id, writer] : _inverted_index_file_writers) {
RETURN_IF_ERROR(writer->close());
RETURN_IF_ERROR(writer->begin_close());
_total_size += writer->get_index_file_total_size();
}

return Status::OK();
}

Status InvertedIndexFileCollection::finish_close() {
std::lock_guard lock(_lock);
for (auto&& [id, writer] : _inverted_index_file_writers) {
RETURN_IF_ERROR(writer->finish_close());
}
return Status::OK();
}

Result<std::vector<const InvertedIndexFileInfo*>>
InvertedIndexFileCollection::inverted_index_file_info(int seg_id_offset) {
std::lock_guard lock(_lock);
Expand Down Expand Up @@ -1097,7 +1105,8 @@ Status BetaRowsetWriter::create_segment_writer_for_segcompaction(
_segcompaction_worker->get_file_writer().reset(file_writer.release());
if (auto& idx_file_writer = _segcompaction_worker->get_inverted_index_file_writer();
idx_file_writer != nullptr) {
RETURN_IF_ERROR(idx_file_writer->close());
RETURN_IF_ERROR(idx_file_writer->begin_close());
RETURN_IF_ERROR(idx_file_writer->finish_close());
}
_segcompaction_worker->get_inverted_index_file_writer().reset(index_file_writer.release());
return Status::OK();
Expand Down
9 changes: 7 additions & 2 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ class InvertedIndexFileCollection {

// Close all file writers
// If the inverted index file writer is not closed, an error will be thrown during destruction
Status close();
Status begin_close();

// Wait for all inverted index file writers to be closed
Status finish_close();

// Get inverted index file info in segment id order.
// `seg_id_offset` is the offset of the segment id relative to the subscript of `_inverted_index_file_writers`,
Expand Down Expand Up @@ -214,9 +217,11 @@ class BaseBetaRowsetWriter : public RowsetWriter {
// Some index files are written during normal compaction and some files are written during index compaction.
// After all index writes are completed, call this method to write the final compound index file.
Status _close_inverted_index_file_writers() {
RETURN_NOT_OK_STATUS_WITH_WARN(_idx_files.close(),
RETURN_NOT_OK_STATUS_WITH_WARN(_idx_files.begin_close(),
"failed to close index file when build new rowset");
this->_total_index_size += _idx_files.get_total_index_size();
RETURN_NOT_OK_STATUS_WITH_WARN(_idx_files.finish_close(),
"failed to wait close index file when build new rowset");
return Status::OK();
}

Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ Status SegmentFlusher::_internal_parse_variant_columns(vectorized::Block& block)
}

Status SegmentFlusher::close() {
return _seg_files.close();
RETURN_IF_ERROR(_seg_files.close());
RETURN_IF_ERROR(_idx_files.finish_close());
return Status::OK();
}

Status SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer,
Expand Down
23 changes: 20 additions & 3 deletions be/src/olap/rowset/segment_v2/index_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,15 @@ Result<std::unique_ptr<IndexSearcherBuilder>> IndexFileWriter::_construct_index_
return IndexSearcherBuilder::create_index_searcher_builder(reader_type);
}

Status IndexFileWriter::close() {
Status IndexFileWriter::begin_close() {
DCHECK(!_closed) << debug_string();
_closed = true;
if (_indices_dirs.empty()) {
// An empty file must still be created even if there are no indexes to write
if (dynamic_cast<io::StreamSinkFileWriter*>(_idx_v2_writer.get()) != nullptr ||
dynamic_cast<io::S3FileWriter*>(_idx_v2_writer.get()) != nullptr ||
dynamic_cast<io::PackedFileWriter*>(_idx_v2_writer.get()) != nullptr) {
return _idx_v2_writer->close();
return _idx_v2_writer->close(true);
}
return Status::OK();
}
Expand Down Expand Up @@ -224,7 +224,24 @@ Status IndexFileWriter::close() {
err.what());
}
}
LOG_INFO("IndexFileWriter closing, enable_write_index_searcher_cache: {}",
return Status::OK();
}

Status IndexFileWriter::finish_close() {
DCHECK(_closed) << debug_string();
if (_indices_dirs.empty()) {
// An empty file must still be created even if there are no indexes to write
if (dynamic_cast<io::StreamSinkFileWriter*>(_idx_v2_writer.get()) != nullptr ||
dynamic_cast<io::S3FileWriter*>(_idx_v2_writer.get()) != nullptr ||
dynamic_cast<io::PackedFileWriter*>(_idx_v2_writer.get()) != nullptr) {
return _idx_v2_writer->close(false);
}
return Status::OK();
}
if (_idx_v2_writer != nullptr && _idx_v2_writer->state() != io::FileWriter::State::CLOSED) {
RETURN_IF_ERROR(_idx_v2_writer->close(false));
}
LOG_INFO("IndexFileWriter finish_close, enable_write_index_searcher_cache: {}",
config::enable_write_index_searcher_cache);
Status st = Status::OK();
if (config::enable_write_index_searcher_cache) {
Expand Down
8 changes: 7 additions & 1 deletion be/src/olap/rowset/segment_v2/index_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,13 @@ class IndexFileWriter {
Status delete_index(const TabletIndex* index_meta);
Status initialize(InvertedIndexDirectoryMap& indices_dirs);
Status add_into_searcher_cache();
Status close();
// Begin the close process. This mainly triggers the asynchronous close operation of
// _idx_v2_writer by calling close(true), which starts the close process but returns
// immediately without waiting for completion.
Status begin_close();
// Finish the close process. This waits for the close operation to complete by calling
// _idx_v2_writer->close(false), which blocks until the close is fully done.
Status finish_close();
const InvertedIndexFileInfo* get_index_file_info() const {
DCHECK(_closed) << debug_string();
return &_file_info;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ void DorisFSDirectory::FSIndexOutputV2::close() {
_index_v2_file_writer = nullptr;
})
if (_index_v2_file_writer) {
auto ret = _index_v2_file_writer->close();
auto ret = _index_v2_file_writer->close(true);
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error",
{ ret = Status::Error<INTERNAL_ERROR>("writer close status error"); })
if (!ret.ok()) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/segment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class SegmentWriter {
*inverted_index_file_size = 0;
return Status::OK();
}
RETURN_IF_ERROR(_index_file_writer->close());
RETURN_IF_ERROR(_index_file_writer->begin_close());
*inverted_index_file_size = _index_file_writer->get_index_file_total_size();
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/vertical_segment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class VerticalSegmentWriter {
*inverted_index_file_size = 0;
return Status::OK();
}
RETURN_IF_ERROR(_index_file_writer->close());
RETURN_IF_ERROR(_index_file_writer->begin_close());
*inverted_index_file_size = _index_file_writer->get_index_file_total_size();
return Status::OK();
}
Expand Down
18 changes: 16 additions & 2 deletions be/src/olap/task/index_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,13 +368,20 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
_index_file_writers.emplace(seg_ptr->id(), std::move(index_file_writer));
}
for (auto&& [seg_id, index_file_writer] : _index_file_writers) {
auto st = index_file_writer->close();
auto st = index_file_writer->begin_close();
if (!st.ok()) {
LOG(ERROR) << "close index_file_writer error:" << st;
return st;
}
inverted_index_size += index_file_writer->get_index_file_total_size();
}
for (auto&& [seg_id, index_file_writer] : _index_file_writers) {
auto st = index_file_writer->finish_close();
if (!st.ok()) {
LOG(ERROR) << "wait close index_file_writer error:" << st;
return st;
}
}
_index_file_writers.clear();
output_rowset_meta->set_data_disk_size(output_rowset_meta->data_disk_size());
output_rowset_meta->set_total_disk_size(output_rowset_meta->total_disk_size() +
Expand Down Expand Up @@ -610,7 +617,7 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
_olap_data_convertor->reset();
}
for (auto&& [seg_id, index_file_writer] : _index_file_writers) {
auto st = index_file_writer->close();
auto st = index_file_writer->begin_close();
DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_file_writer_close_error", {
st = Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"debug point: handle_single_rowset_file_writer_close_error");
Expand All @@ -621,6 +628,13 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
}
inverted_index_size += index_file_writer->get_index_file_total_size();
}
for (auto&& [seg_id, index_file_writer] : _index_file_writers) {
auto st = index_file_writer->finish_close();
if (!st.ok()) {
LOG(ERROR) << "wait close index_file_writer error:" << st;
return st;
}
}
_index_column_writers.clear();
_index_file_writers.clear();
output_rowset_meta->set_data_disk_size(output_rowset_meta->data_disk_size());
Expand Down
3 changes: 2 additions & 1 deletion be/test/io/fs/s3_file_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1492,7 +1492,8 @@ TEST_F(S3FileWriterTest, test_empty_file) {
auto index_file_writer = std::make_unique<segment_v2::IndexFileWriter>(
fs, index_path, rowset_id, seg_id, InvertedIndexStorageFormatPB::V2,
std::move(file_writer), false);
EXPECT_TRUE(index_file_writer->close().ok());
EXPECT_TRUE(index_file_writer->begin_close().ok());
EXPECT_TRUE(index_file_writer->finish_close().ok());
}

} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ TEST_F(EmptyIndexFileTest, test_empty_index_file) {
auto index_file_writer = std::make_unique<segment_v2::IndexFileWriter>(
fs, index_path, rowset_id, seg_id, InvertedIndexStorageFormatPB::V2,
std::move(file_writer), false);
EXPECT_TRUE(index_file_writer->close().ok());
EXPECT_TRUE(index_file_writer->begin_close().ok());
EXPECT_TRUE(index_file_writer->finish_close().ok());
}

} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ class PhraseEdgeQueryTest : public testing::Test {
// Finish and close
status = column_writer->finish();
EXPECT_TRUE(status.ok()) << status;
status = index_file_writer->close();
status = index_file_writer->begin_close();
EXPECT_TRUE(status.ok()) << status;
status = index_file_writer->finish_close();
EXPECT_TRUE(status.ok()) << status;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ class PhrasePrefixQueryTest : public testing::Test {
// Finish and close
status = column_writer->finish();
EXPECT_TRUE(status.ok()) << status;
status = index_file_writer->close();
status = index_file_writer->begin_close();
EXPECT_TRUE(status.ok()) << status;
status = index_file_writer->finish_close();
EXPECT_TRUE(status.ok()) << status;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ class PhraseQueryTest : public testing::Test {
// Finish and close
status = column_writer->finish();
EXPECT_TRUE(status.ok()) << status;
status = index_file_writer->close();
status = index_file_writer->begin_close();
EXPECT_TRUE(status.ok()) << status;
status = index_file_writer->finish_close();
EXPECT_TRUE(status.ok()) << status;
}

Expand Down
21 changes: 14 additions & 7 deletions be/test/olap/rowset/segment_v2/inverted_index_array_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ class InvertedIndexArrayTest : public testing::Test {
EXPECT_EQ(st, Status::OK());

EXPECT_EQ(_inverted_index_builder->finish(), Status::OK());
EXPECT_EQ(index_file_writer->close(), Status::OK());
EXPECT_EQ(index_file_writer->begin_close(), Status::OK());
EXPECT_EQ(index_file_writer->finish_close(), Status::OK());

ExpectedDocMap expected = {{"amory", {0, 1}}, {"doris", {0}}, {"commiter", {1}}};
check_terms_stats(index_path_prefix, &expected, {}, InvertedIndexStorageFormatPB::V1,
Expand Down Expand Up @@ -365,7 +366,8 @@ class InvertedIndexArrayTest : public testing::Test {
st = _inverted_index_builder->add_array_nulls(null_map, block.rows());
EXPECT_EQ(st, Status::OK());
EXPECT_EQ(_inverted_index_builder->finish(), Status::OK());
EXPECT_EQ(index_file_writer->close(), Status::OK());
EXPECT_EQ(index_file_writer->begin_close(), Status::OK());
EXPECT_EQ(index_file_writer->finish_close(), Status::OK());

ExpectedDocMap expected = {{"amory", {0, 1}}, {"doris", {0}}, {"commiter", {1}}};
check_terms_stats(index_path_prefix, &expected, {}, InvertedIndexStorageFormatPB::V1,
Expand Down Expand Up @@ -477,7 +479,8 @@ class InvertedIndexArrayTest : public testing::Test {
st = _inverted_index_builder->add_array_nulls(null_map, block.rows());
EXPECT_EQ(st, Status::OK());
EXPECT_EQ(_inverted_index_builder->finish(), Status::OK());
EXPECT_EQ(index_file_writer->close(), Status::OK());
EXPECT_EQ(index_file_writer->begin_close(), Status::OK());
EXPECT_EQ(index_file_writer->finish_close(), Status::OK());

// Expected inverted index result: only index non-null elements
// Row 1: non-null in a2 is "test"
Expand Down Expand Up @@ -591,7 +594,8 @@ class InvertedIndexArrayTest : public testing::Test {
st = _inverted_index_builder->add_array_nulls(null_map, block.rows());
EXPECT_EQ(st, Status::OK());
EXPECT_EQ(_inverted_index_builder->finish(), Status::OK());
EXPECT_EQ(index_file_writer->close(), Status::OK());
EXPECT_EQ(index_file_writer->begin_close(), Status::OK());
EXPECT_EQ(index_file_writer->finish_close(), Status::OK());

// Expected inverted index result: only index non-null elements
// Row 1: non-null in a2 is "test"
Expand Down Expand Up @@ -793,7 +797,8 @@ class InvertedIndexArrayTest : public testing::Test {
}

EXPECT_EQ(_inverted_index_builder->finish(), Status::OK());
EXPECT_EQ(index_file_writer->close(), Status::OK());
EXPECT_EQ(index_file_writer->begin_close(), Status::OK());
EXPECT_EQ(index_file_writer->finish_close(), Status::OK());

std::vector<int> expected_null_bitmap = {0, 3, 5, 7};
check_terms_stats(index_path_prefix, &merged_expected, expected_null_bitmap,
Expand Down Expand Up @@ -896,7 +901,8 @@ class InvertedIndexArrayTest : public testing::Test {
st = _inverted_index_builder->add_array_nulls(null_map, block.rows());
EXPECT_EQ(st, Status::OK());
EXPECT_EQ(_inverted_index_builder->finish(), Status::OK());
EXPECT_EQ(index_file_writer->close(), Status::OK());
EXPECT_EQ(index_file_writer->begin_close(), Status::OK());
EXPECT_EQ(index_file_writer->finish_close(), Status::OK());

// expected inverted index: row0 contains "123" and "456" (doc id 0), row1 is null, row2 contains "789" and "101112" (doc id 2)
ExpectedDocMap expected = {{"123", {0}}, {"456", {0}}, {"789", {2}}, {"101112", {2}}};
Expand Down Expand Up @@ -1007,7 +1013,8 @@ class InvertedIndexArrayTest : public testing::Test {
EXPECT_EQ(st, Status::OK());

EXPECT_EQ(_inverted_index_builder->finish(), Status::OK());
EXPECT_EQ(index_file_writer->close(), Status::OK());
EXPECT_EQ(index_file_writer->begin_close(), Status::OK());
EXPECT_EQ(index_file_writer->finish_close(), Status::OK());

std::vector<int> expected_null_bitmap = {0, 1};
ExpectedDocMap expected {};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,9 @@ TEST_F(DorisCompoundReaderTest, IntegrationWithFileWriter) {
out->close();
_CLLDELETE(out);

auto st = index_file_writer->close();
auto st = index_file_writer->begin_close();
ASSERT_TRUE(st.ok()) << st;
st = index_file_writer->finish_close();
ASSERT_TRUE(st.ok()) << st;

auto file_reader = std::make_unique<IndexFileReader>(
Expand Down Expand Up @@ -422,7 +424,9 @@ TEST_F(DorisCompoundReaderTest, IntegrationWithFileWriter) {
out->close();
_CLLDELETE(out);

st = index_file_writer->close();
st = index_file_writer->begin_close();
ASSERT_TRUE(st.ok()) << st;
st = index_file_writer->finish_close();
ASSERT_TRUE(st.ok()) << st;

auto file_reader = std::make_unique<IndexFileReader>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,10 @@ class InvertedIndexFileReaderTest : public testing::Test {

dir->close();

// Write and close the file - only call close(), not write()
st = writer->close();
// Write and close the file - only call begin_close() and finish_close(), not write()
st = writer->begin_close();
ASSERT_TRUE(st.ok()) << st.msg();
st = writer->finish_close();
ASSERT_TRUE(st.ok()) << st.msg();
}

Expand Down
Loading
Loading