Skip to content

Commit

Permalink
[refactor](load) remove FlushContext from SegmentWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen committed Jul 6, 2023
1 parent 79221a5 commit a8782e6
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 24 deletions.
11 changes: 9 additions & 2 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -818,15 +818,22 @@ Status BetaRowsetWriter::_do_create_segment_writer(
}
_segcompaction_worker.get_file_writer().reset(file_writer.release());
} else {
const auto& tablet_schema = flush_ctx && flush_ctx->flush_schema ? flush_ctx->flush_schema
: _context.tablet_schema;
writer_options.compression_type =
(flush_ctx == nullptr || flush_ctx->block == nullptr ||
flush_ctx->block->bytes() > config::segment_compression_threshold_kb * 1024)
? tablet_schema->compression_type()
: NO_COMPRESSION;
writer->reset(new segment_v2::SegmentWriter(
file_writer.get(), segment_id, _context.tablet_schema, _context.tablet,
file_writer.get(), segment_id, tablet_schema, _context.tablet,
_context.data_dir, _context.max_rows_per_segment, writer_options,
_context.mow_context));
{
std::lock_guard<SpinLock> l(_lock);
_file_writers.push_back(std::move(file_writer));
}
auto s = (*writer)->init(flush_ctx);
auto s = (*writer)->init();
if (!s.ok()) {
LOG(WARNING) << "failed to init segment writer: " << s.to_string();
writer->reset(nullptr);
Expand Down
22 changes: 4 additions & 18 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
#include "olap/primary_key_index.h"
#include "olap/row_cursor.h" // IWYU pragma: keep
#include "olap/row_cursor.h" // RowCursor
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/rowset_writer_context.h" // RowsetWriterContext
#include "olap/rowset/segment_v2/column_writer.h" // ColumnWriter
#include "olap/rowset/segment_v2/page_io.h"
Expand Down Expand Up @@ -118,31 +117,22 @@ void SegmentWriter::init_column_meta(ColumnMetaPB* meta, uint32_t column_id,
}
}

Status SegmentWriter::init(const FlushContext* flush_ctx) {
Status SegmentWriter::init() {
std::vector<uint32_t> column_ids;
int column_cnt = _tablet_schema->num_columns();
if (flush_ctx && flush_ctx->flush_schema) {
column_cnt = flush_ctx->flush_schema->num_columns();
}
for (uint32_t i = 0; i < column_cnt; ++i) {
column_ids.emplace_back(i);
}
return init(column_ids, true, flush_ctx);
return init(column_ids, true);
}

Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key,
const FlushContext* flush_ctx) {
Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key) {
DCHECK(_column_writers.empty());
DCHECK(_column_ids.empty());
_has_key = has_key;
_column_writers.reserve(_tablet_schema->columns().size());
_column_ids.insert(_column_ids.end(), col_ids.begin(), col_ids.end());
_olap_data_convertor = std::make_unique<vectorized::OlapBlockDataConvertor>();
_opts.compression_type =
(flush_ctx == nullptr || flush_ctx->block == nullptr ||
flush_ctx->block->bytes() > config::segment_compression_threshold_kb * 1024)
? _tablet_schema->compression_type()
: NO_COMPRESSION;
auto create_column_writer = [&](uint32_t cid, const auto& column) -> auto {
ColumnWriterOptions opts;
opts.meta = _footer.add_columns();
Expand Down Expand Up @@ -242,11 +232,7 @@ Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key,
return Status::OK();
};

if (flush_ctx && flush_ctx->flush_schema) {
RETURN_IF_ERROR(_create_writers(*flush_ctx->flush_schema, col_ids, create_column_writer));
} else {
RETURN_IF_ERROR(_create_writers(*_tablet_schema, col_ids, create_column_writer));
}
RETURN_IF_ERROR(_create_writers(*_tablet_schema, col_ids, create_column_writer));

// we don't need the short key index for unique key merge on write table.
if (_has_key) {
Expand Down
6 changes: 2 additions & 4 deletions be/src/olap/rowset/segment_v2/segment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ class ShortKeyIndexBuilder;
class PrimaryKeyIndexBuilder;
class KeyCoder;
struct RowsetWriterContext;
struct FlushContext;

namespace io {
class FileWriter;
Expand Down Expand Up @@ -89,11 +88,10 @@ class SegmentWriter {
std::shared_ptr<MowContext> mow_context);
~SegmentWriter();

Status init(const FlushContext* flush_ctx = nullptr);
Status init();

// for vertical compaction
Status init(const std::vector<uint32_t>& col_ids, bool has_key,
const FlushContext* flush_ctx = nullptr);
Status init(const std::vector<uint32_t>& col_ids, bool has_key);

template <typename RowType>
Status append_row(const RowType& row);
Expand Down

0 comments on commit a8782e6

Please sign in to comment.