Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Opt](flexible partial update) let SegmentWriter do not support flexible partial update #41950

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
331 changes: 4 additions & 327 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -695,332 +695,6 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
RETURN_IF_ERROR(append_block_with_variant_subcolumns(full_block));
return Status::OK();
}
Status SegmentWriter::append_block_with_flexible_partial_content(const vectorized::Block* block,
size_t row_pos, size_t num_rows) {
DCHECK(_is_mow());
DCHECK(_opts.rowset_ctx->partial_update_info != nullptr);
DCHECK(_opts.rowset_ctx->partial_update_info->is_flexible_partial_update());
DCHECK_EQ(row_pos, 0);

// block has the same schema with full_block
DCHECK(block->columns() == _tablet_schema->num_columns());

// create full block and fill with sort key columns
auto full_block = _tablet_schema->create_block();

auto segment_start_pos = _column_writers.front()->get_next_rowid();

DCHECK(_tablet_schema->has_skip_bitmap_col());
auto skip_bitmap_col_idx = _tablet_schema->skip_bitmap_col_idx();
auto get_skip_bitmaps = [&skip_bitmap_col_idx](const vectorized::Block* block) {
return &(assert_cast<vectorized::ColumnBitmap*>(
block->get_by_position(skip_bitmap_col_idx).column->assume_mutable().get())
->get_data());
};
std::vector<BitmapValue>* skip_bitmaps = get_skip_bitmaps(block);

bool has_default_or_nullable = false;
std::vector<bool> use_default_or_null_flag;
use_default_or_null_flag.reserve(num_rows);

int32_t seq_map_col_unique_id = _opts.rowset_ctx->partial_update_info->sequence_map_col_uid();

DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_flexible_partial_content.sleep",
{ sleep(60); })
const std::vector<RowsetSharedPtr>& specified_rowsets = _mow_context->rowset_ptrs;
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());

std::vector<vectorized::IOlapColumnDataAccessor*> key_columns {};
vectorized::IOlapColumnDataAccessor* seq_column {nullptr};

auto encode_key_columns =
[&full_block, &block, &row_pos, &num_rows,
this](std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns) -> Status {
key_columns.clear();
for (std::size_t cid {0}; cid < _num_sort_key_columns; cid++) {
full_block.replace_by_position(cid, block->get_by_position(cid).column);
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
full_block.get_by_position(cid), row_pos, num_rows, cid));
auto [status, column] = _olap_data_convertor->convert_column_data(cid);
if (!status.ok()) {
return status;
}
key_columns.push_back(column);
}
return Status::OK();
};

auto encode_seq_column = [&block, &row_pos, &num_rows,
this](vectorized::IOlapColumnDataAccessor*& seq_column) -> Status {
seq_column = nullptr;
if (_tablet_schema->has_sequence_col()) {
auto seq_col_idx = _tablet_schema->sequence_col_idx();
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
block->get_by_position(seq_col_idx), row_pos, num_rows, seq_col_idx));
auto [status, column] = _olap_data_convertor->convert_column_data(seq_col_idx);
if (!status.ok()) {
return status;
}
seq_column = column;
}
return Status::OK();
};

// 1. encode key columns
// we can only encode sort key columns currently becasue all non-key columns in flexible partial update
// can have missing cells
RETURN_IF_ERROR(encode_key_columns(key_columns));

// 2. encode sequence column
// We encode the seguence column even thought it may have invalid values in some rows because we need to
// encode the value of sequence column in key for rows that have a valid value in sequence column during
// lookup_raw_key. We will encode the sequence column again at the end of this method. At that time, we have
// a valid sequence column to encode the key with seq col.
RETURN_IF_ERROR(encode_seq_column(seq_column));

// 3. merge duplicate rows when table has sequence column
// When there are multiple rows with the same keys in memtable, some of them specify specify the sequence column,
// some of them don't. We can't do the de-duplication in memtable. We must de-duplicate them here.
if (_tablet_schema->has_sequence_col()) {
std::size_t origin_rows = num_rows;
RETURN_IF_ERROR(merge_rows_for_sequence_column(block, row_pos, num_rows, skip_bitmaps,
key_columns, seq_column, specified_rowsets,
segment_caches));
if (origin_rows != num_rows) {
// data in block has changed, should re-encode key columns, sequence column and re-get skip_bitmaps
_olap_data_convertor->clear_source_content();
RETURN_IF_ERROR(encode_key_columns(key_columns));
RETURN_IF_ERROR(encode_seq_column(seq_column));
skip_bitmaps = get_skip_bitmaps(block);
}
}

const auto* delete_sign_column_data =
BaseTablet::get_delete_sign_column_data(*block, row_pos + num_rows);
DCHECK(delete_sign_column_data != nullptr);

// 4. write key columns data
for (std::size_t cid {0}; cid < _num_sort_key_columns; cid++) {
const auto& column = key_columns[cid];
DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written);
RETURN_IF_ERROR(
_column_writers[cid]->append(column->get_nullmap(), column->get_data(), num_rows));
DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written + num_rows);
}

// 5. genreate read plan
FlexibleReadPlan read_plan {_tablet_schema->has_row_store_for_all_columns()};
PartialUpdateStats stats;
RETURN_IF_ERROR(generate_flexible_read_plan(
read_plan, row_pos, num_rows, segment_start_pos, _tablet_schema->has_sequence_col(),
seq_map_col_unique_id, skip_bitmaps, key_columns, seq_column, delete_sign_column_data,
specified_rowsets, segment_caches, has_default_or_nullable, use_default_or_null_flag,
stats));
CHECK_EQ(use_default_or_null_flag.size(), num_rows);

if (config::enable_merge_on_write_correctness_check) {
_tablet->add_sentinel_mark_to_delete_bitmap(_mow_context->delete_bitmap.get(),
_mow_context->rowset_ids);
}

// 6. read according plan to fill full_block
RETURN_IF_ERROR(read_plan.fill_non_primary_key_columns(
_opts.rowset_ctx, _rsid_to_rowset, *_tablet_schema, full_block,
use_default_or_null_flag, has_default_or_nullable, segment_start_pos, row_pos, block,
skip_bitmaps));

// TODO(bobhan1): should we replace the skip bitmap column with empty bitmaps to reduce storage occupation?
// this column is not needed in read path for merge-on-write table

// 7. fill row store column
_serialize_block_to_row_column(full_block);

// 8. encode and write all non-primary key columns(including sequence column if exists)
for (auto cid = _num_sort_key_columns; cid < _tablet_schema->num_columns(); cid++) {
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
full_block.get_by_position(cid), row_pos, num_rows, cid));
auto [status, column] = _olap_data_convertor->convert_column_data(cid);
if (!status.ok()) {
return status;
}
if (cid == _tablet_schema->sequence_col_idx()) {
// should use the latest encoded sequence column to build the primary index
seq_column = column;
}
DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written);
RETURN_IF_ERROR(
_column_writers[cid]->append(column->get_nullmap(), column->get_data(), num_rows));
DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written + num_rows);
}

_num_rows_updated += stats.num_rows_updated;
_num_rows_deleted += stats.num_rows_deleted;
_num_rows_new_added += stats.num_rows_new_added;
_num_rows_filtered += stats.num_rows_filtered;

if (_num_rows_written != row_pos ||
_primary_key_index_builder->num_rows() != _num_rows_written) {
return Status::InternalError(
"Correctness check failed, _num_rows_written: {}, row_pos: {}, primary key "
"index builder num rows: {}",
_num_rows_written, row_pos, _primary_key_index_builder->num_rows());
}

// 9. build primary key index
RETURN_IF_ERROR(
_generate_primary_key_index(_key_coders, key_columns, seq_column, num_rows, false));

_num_rows_written += num_rows;
DCHECK_EQ(_primary_key_index_builder->num_rows(), _num_rows_written)
<< "primary key index builder num rows(" << _primary_key_index_builder->num_rows()
<< ") not equal to segment writer's num rows written(" << _num_rows_written << ")";
_olap_data_convertor->clear_source_content();
return Status::OK();
}

Status SegmentWriter::generate_flexible_read_plan(
FlexibleReadPlan& read_plan, size_t row_pos, size_t num_rows, size_t segment_start_pos,
bool schema_has_sequence_col, int32_t seq_map_col_unique_id,
std::vector<BitmapValue>* skip_bitmaps,
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
vectorized::IOlapColumnDataAccessor* seq_column, const signed char* delete_sign_column_data,
const std::vector<RowsetSharedPtr>& specified_rowsets,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
bool& has_default_or_nullable, std::vector<bool>& use_default_or_null_flag,
PartialUpdateStats& stats) {
int32_t delete_sign_col_unique_id =
_tablet_schema->column(_tablet_schema->delete_sign_idx()).unique_id();
int32_t seq_col_unique_id =
(_tablet_schema->has_sequence_col()
? _tablet_schema->column(_tablet_schema->sequence_col_idx()).unique_id()
: -1);
for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; block_pos++) {
size_t delta_pos = block_pos - row_pos;
size_t segment_pos = segment_start_pos + delta_pos;
auto& skip_bitmap = skip_bitmaps->at(block_pos);

// the hidden sequence column should have the same mark with sequence map column
if (seq_map_col_unique_id != -1) {
DCHECK(schema_has_sequence_col);
if (skip_bitmap.contains(seq_map_col_unique_id)) {
skip_bitmap.add(seq_col_unique_id);
}
}

std::string key = _full_encode_keys(key_columns, delta_pos);
_maybe_invalid_row_cache(key);
bool row_has_sequence_col =
(schema_has_sequence_col && !skip_bitmap.contains(seq_col_unique_id));
if (row_has_sequence_col) {
_encode_seq_column(seq_column, delta_pos, &key);
}

// mark key with delete sign as deleted.
bool have_delete_sign = (!skip_bitmap.contains(delete_sign_col_unique_id) &&
delete_sign_column_data[block_pos] != 0);

auto not_found_cb = [&]() {
return _opts.rowset_ctx->partial_update_info->handle_non_strict_mode_not_found_error(
*_tablet_schema, &skip_bitmap);
};
auto update_read_plan = [&](const RowLocation& loc) {
read_plan.prepare_to_read(loc, segment_pos, skip_bitmap);
};

RETURN_IF_ERROR(probe_key_for_mow(std::move(key), segment_pos, row_has_sequence_col,
have_delete_sign, specified_rowsets, segment_caches,
has_default_or_nullable, use_default_or_null_flag,
update_read_plan, not_found_cb, stats));
}
return Status::OK();
}

Status SegmentWriter::merge_rows_for_sequence_column(
const vectorized::Block* block, size_t row_pos, size_t& num_rows,
std::vector<BitmapValue>* skip_bitmaps,
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
vectorized::IOlapColumnDataAccessor* seq_column,
const std::vector<RowsetSharedPtr>& specified_rowsets,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
auto seq_col_unique_id = _tablet_schema->column(_tablet_schema->sequence_col_idx()).unique_id();
std::string previous_key {};
bool previous_has_seq_col {false};
int duplicate_keys {0};

auto filter_column = vectorized::ColumnUInt8::create(num_rows, 1);
auto* __restrict filter_map = filter_column->get_data().data();

for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; block_pos++) {
size_t delta_pos = block_pos - row_pos;
auto& skip_bitmap = skip_bitmaps->at(block_pos);
std::string key = _full_encode_keys(key_columns, delta_pos);
bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
Status st;
if (delta_pos > 0 && previous_key == key) {
DCHECK(previous_has_seq_col == !row_has_sequence_col);
++duplicate_keys;
RowLocation loc;
RowsetSharedPtr rowset;
size_t rid_missing_seq {};
size_t rid_with_seq {};
if (row_has_sequence_col) {
rid_missing_seq = block_pos - 1;
rid_with_seq = block_pos;
} else {
rid_missing_seq = block_pos;
rid_with_seq = block_pos - 1;
}
std::string previous_encoded_seq_value {};

st = _tablet->lookup_row_key(key, _tablet_schema.get(), false, specified_rowsets, &loc,
_mow_context->max_version, segment_caches, &rowset, true,
&previous_encoded_seq_value);
DCHECK(st.is<KEY_NOT_FOUND>() || st.ok());

Slice previous_seq_slice {};
if (st.is<KEY_NOT_FOUND>()) {
// TODO: handle default value
} else {
_rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
}
std::string cur_encoded_seq_value {};
_encode_seq_column(seq_column, rid_with_seq, &cur_encoded_seq_value);
int res = Slice {previous_encoded_seq_value}.compare(Slice {cur_encoded_seq_value});
VLOG_DEBUG << fmt::format(
"SegmentWriter::merge_rows_for_sequence_column: rid_with_seq={}, "
"rid_missing_seq={}, res={}",
rid_with_seq, rid_missing_seq, res);
if (res > 0) {
filter_map[rid_with_seq] = 0;
} else if (res < 0) {
filter_map[rid_missing_seq] = 0;
} else {
filter_map[std::min(rid_with_seq, rid_missing_seq)] = 0;
}
}
previous_key = std::move(key);
previous_has_seq_col = row_has_sequence_col;
}
if (duplicate_keys > 0) {
auto num_cols = block->columns();
auto* mutable_block = const_cast<vectorized::Block*>(block);
mutable_block->insert({std::move(filter_column),
std::make_shared<vectorized::DataTypeUInt8>(),
"__dup_key_filter_col__"});
RETURN_IF_ERROR(vectorized::Block::filter_block(mutable_block, num_cols, num_cols));
int merged_rows = num_rows - mutable_block->rows();
if (duplicate_keys != merged_rows) {
auto msg = fmt::format(
"duplicate_keys != merged_rows, duplicate_keys={}, merged_rows={}, "
"num_rows={}, mutable_block->rows()={}",
duplicate_keys, merged_rows, num_rows, block->rows());
DCHECK(false) << msg;
return Status::InternalError<false>(msg);
}
num_rows = mutable_block->rows();
}
return Status::OK();
}

Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_pos,
size_t num_rows) {
Expand All @@ -1031,7 +705,10 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po
if (_opts.rowset_ctx->partial_update_info->is_fixed_partial_update()) {
RETURN_IF_ERROR(append_block_with_partial_content(block, row_pos, num_rows));
} else {
RETURN_IF_ERROR(append_block_with_flexible_partial_content(block, row_pos, num_rows));
return Status::NotSupported<false>(
"SegmentWriter doesn't support flexible partial update, please set "
"enable_vertical_segment_writer=true in be.conf on all BEs to use "
"VerticalSegmentWriter.");
}
return Status::OK();
}
Expand Down
20 changes: 0 additions & 20 deletions be/src/olap/rowset/segment_v2/segment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,26 +107,6 @@ class SegmentWriter {
PartialUpdateStats& stats);
Status append_block_with_partial_content(const vectorized::Block* block, size_t row_pos,
size_t num_rows);
Status append_block_with_flexible_partial_content(const vectorized::Block* block,
size_t row_pos, size_t num_rows);
Status generate_flexible_read_plan(
FlexibleReadPlan& read_plan, size_t row_pos, size_t num_rows, size_t segment_start_pos,
bool schema_has_sequence_col, int32_t seq_map_col_unique_id,
std::vector<BitmapValue>* skip_bitmaps,
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
vectorized::IOlapColumnDataAccessor* seq_column,
const signed char* delete_sign_column_data,
const std::vector<RowsetSharedPtr>& specified_rowsets,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
bool& has_default_or_nullable, std::vector<bool>& use_default_or_null_flag,
PartialUpdateStats& stats);
Status merge_rows_for_sequence_column(
const vectorized::Block* block, size_t row_pos, size_t& num_rows,
std::vector<BitmapValue>* skip_bitmaps,
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
vectorized::IOlapColumnDataAccessor* seq_column,
const std::vector<RowsetSharedPtr>& specified_rowsets,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches);
Status append_block_with_variant_subcolumns(vectorized::Block& data);

int64_t max_row_to_add(size_t row_avg_size_in_bytes);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
0 0 0 0 0 0
1 1 1 1 1 1
2 2 2 2 2 2
3 3 3 3 3 3
4 4 4 4 4 4
5 5 5 5 5 5

-- !sql --
0 0 0 0 0 0
1 1 1 1 1 1
2 2 2 2 2 2
3 3 3 3 3 3
4 4 4 4 4 4
5 5 5 5 5 5

Loading
Loading