Skip to content

Commit

Permalink
[fix](merge-clod) fix file not found when load for mow table
Browse files Browse the repository at this point in the history
  • Loading branch information
liaoxin01 committed Mar 12, 2024
1 parent f7776bb commit 9300360
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 4 deletions.
2 changes: 2 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,8 @@ Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStati
update_rowset_schema(flush_schema);
}
if (_context.mow_context != nullptr) {
// ensure that the segment file writing is complete
RETURN_IF_ERROR(_segment_creator.get_file_writer(segment_id)->close());
RETURN_IF_ERROR(_generate_delete_bitmap(segment_id));
}
return Status::OK();
Expand Down
11 changes: 8 additions & 3 deletions be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block,

Status SegmentFlusher::close() {
std::lock_guard<SpinLock> l(_lock);
for (auto& file_writer : _file_writers) {
for (auto& [segment_id, file_writer] : _file_writers) {
Status status = file_writer->close();
if (!status.ok()) {
LOG(WARNING) << "failed to close file writer, path=" << file_writer->path()
Expand Down Expand Up @@ -205,7 +205,7 @@ Status SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::Segmen
_context->max_rows_per_segment, writer_options, _context->mow_context));
{
std::lock_guard<SpinLock> l(_lock);
_file_writers.push_back(std::move(file_writer));
_file_writers.emplace(segment_id, std::move(file_writer));
}
auto s = writer->init();
if (!s.ok()) {
Expand Down Expand Up @@ -236,7 +236,7 @@ Status SegmentFlusher::_create_segment_writer(
_context->max_rows_per_segment, writer_options, _context->mow_context));
{
std::lock_guard<SpinLock> l(_lock);
_file_writers.push_back(std::move(file_writer));
_file_writers.emplace(segment_id, std::move(file_writer));
}
auto s = writer->init();
if (!s.ok()) {
Expand Down Expand Up @@ -345,6 +345,11 @@ Status SegmentFlusher::create_writer(std::unique_ptr<SegmentFlusher::Writer>& wr
return Status::OK();
}

io::FileWriter* SegmentFlusher::get_file_writer(int32_t segment_id) {
DCHECK(_file_writers.contains(segment_id));
return _file_writers[segment_id].get();
}

SegmentFlusher::Writer::Writer(SegmentFlusher* flusher,
std::unique_ptr<segment_v2::SegmentWriter>& segment_writer)
: _flusher(flusher), _writer(std::move(segment_writer)) {};
Expand Down
9 changes: 8 additions & 1 deletion be/src/olap/rowset/segment_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <gen_cpp/olap_file.pb.h>

#include <string>
#include <unordered_map>
#include <vector>

#include "common/status.h"
Expand Down Expand Up @@ -102,6 +103,8 @@ class SegmentFlusher {

int64_t num_rows_filtered() const { return _num_rows_filtered; }

io::FileWriter* get_file_writer(int32_t segment_id);

Status close();

public:
Expand Down Expand Up @@ -153,7 +156,7 @@ class SegmentFlusher {
RowsetWriterContext* _context;

mutable SpinLock _lock; // protect following vectors.
std::vector<io::FileWriterPtr> _file_writers;
std::unordered_map<int32_t, io::FileWriterPtr> _file_writers;

// written rows by add_block/add_row
std::atomic<int64_t> _num_rows_written = 0;
Expand Down Expand Up @@ -196,6 +199,10 @@ class SegmentCreator {

Status close();

io::FileWriter* get_file_writer(int32_t segment_id) {
return _segment_flusher.get_file_writer(segment_id);
}

private:
std::atomic<int32_t> _next_segment_id = 0;
SegmentFlusher _segment_flusher;
Expand Down

0 comments on commit 9300360

Please sign in to comment.