Skip to content

Commit

Permalink
[Opt](flexible partial update) let SegmentWriter do not support fle…
Browse files Browse the repository at this point in the history
…xible partial update (#41950)

Considering that `SegmentWriter` will be removed in the near future, we
stop to support flexible partial update on `SegmentWriter` to reduce
maintenance costs
  • Loading branch information
bobhan1 authored Oct 22, 2024
1 parent bd2d846 commit 4c09663
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 907 deletions.
331 changes: 4 additions & 327 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -696,332 +696,6 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
RETURN_IF_ERROR(append_block_with_variant_subcolumns(full_block));
return Status::OK();
}
Status SegmentWriter::append_block_with_flexible_partial_content(const vectorized::Block* block,
size_t row_pos, size_t num_rows) {
DCHECK(_is_mow());
DCHECK(_opts.rowset_ctx->partial_update_info != nullptr);
DCHECK(_opts.rowset_ctx->partial_update_info->is_flexible_partial_update());
DCHECK_EQ(row_pos, 0);

// block has the same schema with full_block
DCHECK(block->columns() == _tablet_schema->num_columns());

// create full block and fill with sort key columns
auto full_block = _tablet_schema->create_block();

auto segment_start_pos = _column_writers.front()->get_next_rowid();

DCHECK(_tablet_schema->has_skip_bitmap_col());
auto skip_bitmap_col_idx = _tablet_schema->skip_bitmap_col_idx();
auto get_skip_bitmaps = [&skip_bitmap_col_idx](const vectorized::Block* block) {
return &(assert_cast<vectorized::ColumnBitmap*>(
block->get_by_position(skip_bitmap_col_idx).column->assume_mutable().get())
->get_data());
};
std::vector<BitmapValue>* skip_bitmaps = get_skip_bitmaps(block);

bool has_default_or_nullable = false;
std::vector<bool> use_default_or_null_flag;
use_default_or_null_flag.reserve(num_rows);

int32_t seq_map_col_unique_id = _opts.rowset_ctx->partial_update_info->sequence_map_col_uid();

DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_flexible_partial_content.sleep",
{ sleep(60); })
const std::vector<RowsetSharedPtr>& specified_rowsets = _mow_context->rowset_ptrs;
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());

std::vector<vectorized::IOlapColumnDataAccessor*> key_columns {};
vectorized::IOlapColumnDataAccessor* seq_column {nullptr};

auto encode_key_columns =
[&full_block, &block, &row_pos, &num_rows,
this](std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns) -> Status {
key_columns.clear();
for (std::size_t cid {0}; cid < _num_sort_key_columns; cid++) {
full_block.replace_by_position(cid, block->get_by_position(cid).column);
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
full_block.get_by_position(cid), row_pos, num_rows, cid));
auto [status, column] = _olap_data_convertor->convert_column_data(cid);
if (!status.ok()) {
return status;
}
key_columns.push_back(column);
}
return Status::OK();
};

auto encode_seq_column = [&block, &row_pos, &num_rows,
this](vectorized::IOlapColumnDataAccessor*& seq_column) -> Status {
seq_column = nullptr;
if (_tablet_schema->has_sequence_col()) {
auto seq_col_idx = _tablet_schema->sequence_col_idx();
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
block->get_by_position(seq_col_idx), row_pos, num_rows, seq_col_idx));
auto [status, column] = _olap_data_convertor->convert_column_data(seq_col_idx);
if (!status.ok()) {
return status;
}
seq_column = column;
}
return Status::OK();
};

// 1. encode key columns
// we can only encode sort key columns currently becasue all non-key columns in flexible partial update
// can have missing cells
RETURN_IF_ERROR(encode_key_columns(key_columns));

// 2. encode sequence column
// We encode the seguence column even thought it may have invalid values in some rows because we need to
// encode the value of sequence column in key for rows that have a valid value in sequence column during
// lookup_raw_key. We will encode the sequence column again at the end of this method. At that time, we have
// a valid sequence column to encode the key with seq col.
RETURN_IF_ERROR(encode_seq_column(seq_column));

// 3. merge duplicate rows when table has sequence column
// When there are multiple rows with the same keys in memtable, some of them specify specify the sequence column,
// some of them don't. We can't do the de-duplication in memtable. We must de-duplicate them here.
if (_tablet_schema->has_sequence_col()) {
std::size_t origin_rows = num_rows;
RETURN_IF_ERROR(merge_rows_for_sequence_column(block, row_pos, num_rows, skip_bitmaps,
key_columns, seq_column, specified_rowsets,
segment_caches));
if (origin_rows != num_rows) {
// data in block has changed, should re-encode key columns, sequence column and re-get skip_bitmaps
_olap_data_convertor->clear_source_content();
RETURN_IF_ERROR(encode_key_columns(key_columns));
RETURN_IF_ERROR(encode_seq_column(seq_column));
skip_bitmaps = get_skip_bitmaps(block);
}
}

const auto* delete_sign_column_data =
BaseTablet::get_delete_sign_column_data(*block, row_pos + num_rows);
DCHECK(delete_sign_column_data != nullptr);

// 4. write key columns data
for (std::size_t cid {0}; cid < _num_sort_key_columns; cid++) {
const auto& column = key_columns[cid];
DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written);
RETURN_IF_ERROR(
_column_writers[cid]->append(column->get_nullmap(), column->get_data(), num_rows));
DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written + num_rows);
}

// 5. genreate read plan
FlexibleReadPlan read_plan {_tablet_schema->has_row_store_for_all_columns()};
PartialUpdateStats stats;
RETURN_IF_ERROR(generate_flexible_read_plan(
read_plan, row_pos, num_rows, segment_start_pos, _tablet_schema->has_sequence_col(),
seq_map_col_unique_id, skip_bitmaps, key_columns, seq_column, delete_sign_column_data,
specified_rowsets, segment_caches, has_default_or_nullable, use_default_or_null_flag,
stats));
CHECK_EQ(use_default_or_null_flag.size(), num_rows);

if (config::enable_merge_on_write_correctness_check) {
_tablet->add_sentinel_mark_to_delete_bitmap(_mow_context->delete_bitmap.get(),
_mow_context->rowset_ids);
}

// 6. read according plan to fill full_block
RETURN_IF_ERROR(read_plan.fill_non_primary_key_columns(
_opts.rowset_ctx, _rsid_to_rowset, *_tablet_schema, full_block,
use_default_or_null_flag, has_default_or_nullable, segment_start_pos, row_pos, block,
skip_bitmaps));

// TODO(bobhan1): should we replace the skip bitmap column with empty bitmaps to reduce storage occupation?
// this column is not needed in read path for merge-on-write table

// 7. fill row store column
_serialize_block_to_row_column(full_block);

// 8. encode and write all non-primary key columns(including sequence column if exists)
for (auto cid = _num_sort_key_columns; cid < _tablet_schema->num_columns(); cid++) {
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
full_block.get_by_position(cid), row_pos, num_rows, cid));
auto [status, column] = _olap_data_convertor->convert_column_data(cid);
if (!status.ok()) {
return status;
}
if (cid == _tablet_schema->sequence_col_idx()) {
// should use the latest encoded sequence column to build the primary index
seq_column = column;
}
DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written);
RETURN_IF_ERROR(
_column_writers[cid]->append(column->get_nullmap(), column->get_data(), num_rows));
DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written + num_rows);
}

_num_rows_updated += stats.num_rows_updated;
_num_rows_deleted += stats.num_rows_deleted;
_num_rows_new_added += stats.num_rows_new_added;
_num_rows_filtered += stats.num_rows_filtered;

if (_num_rows_written != row_pos ||
_primary_key_index_builder->num_rows() != _num_rows_written) {
return Status::InternalError(
"Correctness check failed, _num_rows_written: {}, row_pos: {}, primary key "
"index builder num rows: {}",
_num_rows_written, row_pos, _primary_key_index_builder->num_rows());
}

// 9. build primary key index
RETURN_IF_ERROR(
_generate_primary_key_index(_key_coders, key_columns, seq_column, num_rows, false));

_num_rows_written += num_rows;
DCHECK_EQ(_primary_key_index_builder->num_rows(), _num_rows_written)
<< "primary key index builder num rows(" << _primary_key_index_builder->num_rows()
<< ") not equal to segment writer's num rows written(" << _num_rows_written << ")";
_olap_data_convertor->clear_source_content();
return Status::OK();
}

Status SegmentWriter::generate_flexible_read_plan(
FlexibleReadPlan& read_plan, size_t row_pos, size_t num_rows, size_t segment_start_pos,
bool schema_has_sequence_col, int32_t seq_map_col_unique_id,
std::vector<BitmapValue>* skip_bitmaps,
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
vectorized::IOlapColumnDataAccessor* seq_column, const signed char* delete_sign_column_data,
const std::vector<RowsetSharedPtr>& specified_rowsets,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
bool& has_default_or_nullable, std::vector<bool>& use_default_or_null_flag,
PartialUpdateStats& stats) {
int32_t delete_sign_col_unique_id =
_tablet_schema->column(_tablet_schema->delete_sign_idx()).unique_id();
int32_t seq_col_unique_id =
(_tablet_schema->has_sequence_col()
? _tablet_schema->column(_tablet_schema->sequence_col_idx()).unique_id()
: -1);
for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; block_pos++) {
size_t delta_pos = block_pos - row_pos;
size_t segment_pos = segment_start_pos + delta_pos;
auto& skip_bitmap = skip_bitmaps->at(block_pos);

// the hidden sequence column should have the same mark with sequence map column
if (seq_map_col_unique_id != -1) {
DCHECK(schema_has_sequence_col);
if (skip_bitmap.contains(seq_map_col_unique_id)) {
skip_bitmap.add(seq_col_unique_id);
}
}

std::string key = _full_encode_keys(key_columns, delta_pos);
_maybe_invalid_row_cache(key);
bool row_has_sequence_col =
(schema_has_sequence_col && !skip_bitmap.contains(seq_col_unique_id));
if (row_has_sequence_col) {
_encode_seq_column(seq_column, delta_pos, &key);
}

// mark key with delete sign as deleted.
bool have_delete_sign = (!skip_bitmap.contains(delete_sign_col_unique_id) &&
delete_sign_column_data[block_pos] != 0);

auto not_found_cb = [&]() {
return _opts.rowset_ctx->partial_update_info->handle_non_strict_mode_not_found_error(
*_tablet_schema, &skip_bitmap);
};
auto update_read_plan = [&](const RowLocation& loc) {
read_plan.prepare_to_read(loc, segment_pos, skip_bitmap);
};

RETURN_IF_ERROR(probe_key_for_mow(std::move(key), segment_pos, row_has_sequence_col,
have_delete_sign, specified_rowsets, segment_caches,
has_default_or_nullable, use_default_or_null_flag,
update_read_plan, not_found_cb, stats));
}
return Status::OK();
}

Status SegmentWriter::merge_rows_for_sequence_column(
const vectorized::Block* block, size_t row_pos, size_t& num_rows,
std::vector<BitmapValue>* skip_bitmaps,
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
vectorized::IOlapColumnDataAccessor* seq_column,
const std::vector<RowsetSharedPtr>& specified_rowsets,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
auto seq_col_unique_id = _tablet_schema->column(_tablet_schema->sequence_col_idx()).unique_id();
std::string previous_key {};
bool previous_has_seq_col {false};
int duplicate_keys {0};

auto filter_column = vectorized::ColumnUInt8::create(num_rows, 1);
auto* __restrict filter_map = filter_column->get_data().data();

for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; block_pos++) {
size_t delta_pos = block_pos - row_pos;
auto& skip_bitmap = skip_bitmaps->at(block_pos);
std::string key = _full_encode_keys(key_columns, delta_pos);
bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
Status st;
if (delta_pos > 0 && previous_key == key) {
DCHECK(previous_has_seq_col == !row_has_sequence_col);
++duplicate_keys;
RowLocation loc;
RowsetSharedPtr rowset;
size_t rid_missing_seq {};
size_t rid_with_seq {};
if (row_has_sequence_col) {
rid_missing_seq = block_pos - 1;
rid_with_seq = block_pos;
} else {
rid_missing_seq = block_pos;
rid_with_seq = block_pos - 1;
}
std::string previous_encoded_seq_value {};

st = _tablet->lookup_row_key(key, _tablet_schema.get(), false, specified_rowsets, &loc,
_mow_context->max_version, segment_caches, &rowset, true,
&previous_encoded_seq_value);
DCHECK(st.is<KEY_NOT_FOUND>() || st.ok());

Slice previous_seq_slice {};
if (st.is<KEY_NOT_FOUND>()) {
// TODO: handle default value
} else {
_rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
}
std::string cur_encoded_seq_value {};
_encode_seq_column(seq_column, rid_with_seq, &cur_encoded_seq_value);
int res = Slice {previous_encoded_seq_value}.compare(Slice {cur_encoded_seq_value});
VLOG_DEBUG << fmt::format(
"SegmentWriter::merge_rows_for_sequence_column: rid_with_seq={}, "
"rid_missing_seq={}, res={}",
rid_with_seq, rid_missing_seq, res);
if (res > 0) {
filter_map[rid_with_seq] = 0;
} else if (res < 0) {
filter_map[rid_missing_seq] = 0;
} else {
filter_map[std::min(rid_with_seq, rid_missing_seq)] = 0;
}
}
previous_key = std::move(key);
previous_has_seq_col = row_has_sequence_col;
}
if (duplicate_keys > 0) {
auto num_cols = block->columns();
auto* mutable_block = const_cast<vectorized::Block*>(block);
mutable_block->insert({std::move(filter_column),
std::make_shared<vectorized::DataTypeUInt8>(),
"__dup_key_filter_col__"});
RETURN_IF_ERROR(vectorized::Block::filter_block(mutable_block, num_cols, num_cols));
int merged_rows = num_rows - mutable_block->rows();
if (duplicate_keys != merged_rows) {
auto msg = fmt::format(
"duplicate_keys != merged_rows, duplicate_keys={}, merged_rows={}, "
"num_rows={}, mutable_block->rows()={}",
duplicate_keys, merged_rows, num_rows, block->rows());
DCHECK(false) << msg;
return Status::InternalError<false>(msg);
}
num_rows = mutable_block->rows();
}
return Status::OK();
}

Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_pos,
size_t num_rows) {
Expand All @@ -1032,7 +706,10 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po
if (_opts.rowset_ctx->partial_update_info->is_fixed_partial_update()) {
RETURN_IF_ERROR(append_block_with_partial_content(block, row_pos, num_rows));
} else {
RETURN_IF_ERROR(append_block_with_flexible_partial_content(block, row_pos, num_rows));
return Status::NotSupported<false>(
"SegmentWriter doesn't support flexible partial update, please set "
"enable_vertical_segment_writer=true in be.conf on all BEs to use "
"VerticalSegmentWriter.");
}
return Status::OK();
}
Expand Down
20 changes: 0 additions & 20 deletions be/src/olap/rowset/segment_v2/segment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,26 +107,6 @@ class SegmentWriter {
PartialUpdateStats& stats);
Status append_block_with_partial_content(const vectorized::Block* block, size_t row_pos,
size_t num_rows);
Status append_block_with_flexible_partial_content(const vectorized::Block* block,
size_t row_pos, size_t num_rows);
Status generate_flexible_read_plan(
FlexibleReadPlan& read_plan, size_t row_pos, size_t num_rows, size_t segment_start_pos,
bool schema_has_sequence_col, int32_t seq_map_col_unique_id,
std::vector<BitmapValue>* skip_bitmaps,
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
vectorized::IOlapColumnDataAccessor* seq_column,
const signed char* delete_sign_column_data,
const std::vector<RowsetSharedPtr>& specified_rowsets,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
bool& has_default_or_nullable, std::vector<bool>& use_default_or_null_flag,
PartialUpdateStats& stats);
Status merge_rows_for_sequence_column(
const vectorized::Block* block, size_t row_pos, size_t& num_rows,
std::vector<BitmapValue>* skip_bitmaps,
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
vectorized::IOlapColumnDataAccessor* seq_column,
const std::vector<RowsetSharedPtr>& specified_rowsets,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches);
Status append_block_with_variant_subcolumns(vectorized::Block& data);

int64_t max_row_to_add(size_t row_avg_size_in_bytes);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
0 0 0 0 0 0
1 1 1 1 1 1
2 2 2 2 2 2
3 3 3 3 3 3
4 4 4 4 4 4
5 5 5 5 5 5

-- !sql --
0 0 0 0 0 0
1 1 1 1 1 1
2 2 2 2 2 2
3 3 3 3 3 3
4 4 4 4 4 4
5 5 5 5 5 5

Loading

0 comments on commit 4c09663

Please sign in to comment.