Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/common/consts.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const std::string GLOBAL_ROWID_COL = "__DORIS_GLOBAL_ROWID_COL__";
const std::string ROW_STORE_COL = "__DORIS_ROW_STORE_COL__";
const std::string DYNAMIC_COLUMN_NAME = "__DORIS_DYNAMIC_COL__";
const std::string PARTIAL_UPDATE_AUTO_INC_COL = "__PARTIAL_UPDATE_AUTO_INC_COLUMN__";
const std::string VIRTUAL_COLUMN_PREFIX = "__DORIS_VIRTUAL_COL__";

/// The maximum precision representable by a 4-byte decimal (Decimal4Value)
constexpr int MAX_DECIMAL32_PRECISION = 9;
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ class StorageReadOptions {
std::map<std::string, PrimitiveType> target_cast_type_for_variants;
RowRanges row_ranges;
size_t topn_limit = 0;

std::map<ColumnId, vectorized::VExprContextSPtr> virtual_column_exprs;
std::map<ColumnId, size_t> vir_cid_to_idx_in_block;
std::map<size_t, vectorized::DataTypePtr> vir_col_idx_to_type;
};

struct CompactionSampleInfo {
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
_read_options.push_down_agg_type_opt = _read_context->push_down_agg_type_opt;
_read_options.remaining_conjunct_roots = _read_context->remaining_conjunct_roots;
_read_options.common_expr_ctxs_push_down = _read_context->common_expr_ctxs_push_down;
_read_options.virtual_column_exprs = _read_context->virtual_column_exprs;
_read_options.vir_cid_to_idx_in_block = _read_context->vir_cid_to_idx_in_block;
_read_options.vir_col_idx_to_type = _read_context->vir_col_idx_to_type;
_read_options.rowset_id = _rowset->rowset_id();
_read_options.version = _rowset->version();
_read_options.tablet_id = _rowset->rowset_meta()->tablet_id();
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/rowset_reader_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ struct RowsetReaderContext {
// slots that cast may be eliminated in storage layer
std::map<std::string, PrimitiveType> target_cast_type_for_variants;
int64_t ttl_seconds = 0;

std::map<ColumnId, vectorized::VExprContextSPtr> virtual_column_exprs;
std::map<ColumnId, size_t> vir_cid_to_idx_in_block;
std::map<size_t, vectorized::DataTypePtr> vir_col_idx_to_type;
};

} // namespace doris
Expand Down
331 changes: 279 additions & 52 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp

Large diffs are not rendered by default.

31 changes: 20 additions & 11 deletions be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,6 @@ struct ColumnPredicateInfo {
int32_t column_id;
};

class SegmentIterator;
struct FuncExprParams {
ColumnId _column_id = 0;
uint32_t _unique_id = 0;
std::string _column_name;
SegmentIterator* _segment_iterator = nullptr;
std::shared_ptr<roaring::Roaring> result;
};

class SegmentIterator : public RowwiseIterator {
public:
SegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr schema);
Expand Down Expand Up @@ -221,8 +212,7 @@ class SegmentIterator : public RowwiseIterator {
// for vectorization implementation
[[nodiscard]] Status _read_columns(const std::vector<ColumnId>& column_ids,
vectorized::MutableColumns& column_block, size_t nrows);
[[nodiscard]] Status _read_columns_by_index(uint32_t nrows_read_limit, uint32_t& nrows_read,
bool set_block_rowid);
[[nodiscard]] Status _read_columns_by_index(uint32_t nrows_read_limit, uint32_t& nrows_read);
void _replace_version_col(size_t num_rows);
Status _init_current_block(vectorized::Block* block,
std::vector<vectorized::MutableColumnPtr>& non_pred_vector,
Expand Down Expand Up @@ -375,6 +365,11 @@ class SegmentIterator : public RowwiseIterator {

void _clear_iterators();

// Initialize virtual columns in the block, set all virtual columns in the block to ColumnNothing
void _init_virtual_columns(vectorized::Block* block);
// Fallback logic for virtual column materialization, materializing all unmaterialized virtual columns through expressions
Status _materialization_of_virtual_column(vectorized::Block* block);

class BitmapRangeIterator;
class BackwardBitmapRangeIterator;

Expand Down Expand Up @@ -426,8 +421,10 @@ class SegmentIterator : public RowwiseIterator {
// so we need a field to stand for columns first time to read
std::vector<ColumnId> _predicate_column_ids;
std::vector<ColumnId> _non_predicate_column_ids;
// TODO: Should use std::vector<size_t>
std::vector<ColumnId> _columns_to_filter;
std::vector<ColumnId> _converted_column_ids;
// TODO: Should use std::vector<size_t>
std::vector<int> _schema_block_id_map; // map from schema column id to column idx in Block

// the actual init process is delayed to the first call to next_batch()
Expand Down Expand Up @@ -475,11 +472,23 @@ class SegmentIterator : public RowwiseIterator {

std::vector<uint8_t> _ret_flags;

/*
* column and column_predicates on it.
* a boolean value to indicate whether the column has been read by the index.
*/
std::unordered_map<ColumnId, std::unordered_map<ColumnPredicate*, bool>>
_column_predicate_inverted_index_status;

/*
* column and common expr on it.
* a boolean value to indicate whether the column has been read by the index.
*/
std::unordered_map<ColumnId, std::unordered_map<const vectorized::VExpr*, bool>>
_common_expr_inverted_index_status;

// cid to virtual column expr
std::map<ColumnId, vectorized::VExprContextSPtr> _virtual_column_exprs;
std::map<ColumnId, size_t> _vir_cid_to_idx_in_block;
};

} // namespace segment_v2
Expand Down
168 changes: 168 additions & 0 deletions be/src/olap/rowset/segment_v2/virtual_column_iterator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "virtual_column_iterator.h"

#include <cstddef>
#include <cstring>
#include <memory>

#include "vec/columns/column.h"
#include "vec/columns/column_nothing.h"

namespace doris::segment_v2 {

VirtualColumnIterator::VirtualColumnIterator()
: _materialized_column_ptr(vectorized::ColumnNothing::create(0)) {}

// Init implementation
Status VirtualColumnIterator::init(const ColumnIteratorOptions& opts) {
// Virtual column doesn't need special initialization
return Status::OK();
}

void VirtualColumnIterator::prepare_materialization(vectorized::IColumn::Ptr column,
std::unique_ptr<std::vector<uint64_t>> labels) {
DCHECK(labels->size() == column->size()) << "labels size: " << labels->size()
<< ", materialized column size: " << column->size();
// 1. do sort to labels
// column: [100, 101, 102, 99, 50, 49]
// lables: [5, 4, 1, 10, 7, 2]
const std::vector<uint64_t>& labels_ref = *labels;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::vector<std::pair<int, int>> labels_order;
<5,0> <4,1> <1.2> <10,3> <7,4> <2,5>

sort first

<1,2> <2,5> <4,1> ... <10,3>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::vector<std::pair<int, int>> labels_order; <5,0> <4,1> <1.2> <10,3> <7,4> <2,5>

sort first

<1,2> <2,5> <4,1> ... <10,3>

applied

const size_t n = labels_ref.size();
VLOG_DEBUG << fmt::format("Input labels {}", fmt::join(labels_ref, ", "));
if (n == 0) {
_size = 0;
_max_ordinal = 0;
return;
}
std::vector<std::pair<size_t, size_t>> order(n);
// {5:0, 4:1, 1:2, 10:3, 7:4, 2:5}
for (size_t i = 0; i < n; ++i) {
order[i] = {labels_ref[i], i};
}
// Sort by labels, so we can scatter the column by global row id.
// After sort, order will be:
// order: {1-2, 2-5, 4-1, 5-0, 7-4, 10-3}
std::sort(order.begin(), order.end(),
[&](const auto& a, const auto& b) { return a.first < b.first; });
_max_ordinal = order[n - 1].first;
// 2. scatter column
auto scattered_column = column->clone_empty();
// We need a mapping from global row id to local index in the materialized column.
_row_id_to_idx.clear();
for (size_t i = 0; i < n; ++i) {
size_t global_idx = order[i].first; // global row id
size_t original_col_idx = order[i].second; // original index in the column
_row_id_to_idx[global_idx] = i;
scattered_column->insert_from(*column, original_col_idx);
}

// After scatter:
// scattered_column: [102, 49, 101, 100, 50, 99]
// _row_id_to_idx: {1:0, 2:1, 4:2, 5:3, 7:4, 10:5}
_materialized_column_ptr = std::move(scattered_column);

_size = n;

std::string msg;
for (const auto& pair : _row_id_to_idx) {
msg += fmt::format("{}: {}, ", pair.first, pair.second);
}

VLOG_DEBUG << fmt::format("virtual column iterator, row_idx_to_idx:\n{}", msg);
_filter = doris::vectorized::IColumn::Filter(_size, 0);
}

Status VirtualColumnIterator::seek_to_ordinal(ordinal_t ord_idx) {
if (_size == 0 ||
vectorized::check_and_get_column<vectorized::ColumnNothing>(*_materialized_column_ptr)) {
// _materialized_column is not set. do nothing.
return Status::OK();
}

if (ord_idx >= _max_ordinal) {
return Status::InternalError("Seek to ordinal out of range: {} out of {}", ord_idx,
_max_ordinal);
}

_current_ordinal = ord_idx;

return Status::OK();
}

// Next batch implementation
Status VirtualColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst,
bool* has_null) {
size_t rows_num_to_read = *n;
if (rows_num_to_read == 0 ||
vectorized::check_and_get_column<vectorized::ColumnNothing>(*_materialized_column_ptr)) {
return Status::OK();
}

if (_row_id_to_idx.find(_current_ordinal) == _row_id_to_idx.end()) {
return Status::InternalError("Current ordinal {} not found in row_id_to_idx map",
_current_ordinal);
}

// Update dst column
if (vectorized::check_and_get_column<vectorized::ColumnNothing>(*dst)) {
VLOG_DEBUG << fmt::format("Dst is nothing column, create new mutable column");
dst = _materialized_column_ptr->clone_empty();
}

size_t start = _row_id_to_idx[_current_ordinal];
dst->insert_range_from(*_materialized_column_ptr, start, rows_num_to_read);

VLOG_DEBUG << fmt::format("Virtual column iterators, next_batch, rows reads: {}, dst size: {}",
rows_num_to_read, dst->size());

_current_ordinal += rows_num_to_read;
return Status::OK();
}

Status VirtualColumnIterator::read_by_rowids(const rowid_t* rowids, const size_t count,
vectorized::MutableColumnPtr& dst) {
if (count == 0 ||
vectorized::check_and_get_column<vectorized::ColumnNothing>(*_materialized_column_ptr)) {
return Status::OK();
}

memset(_filter.data(), 0, _size);

// Convert rowids to filter
for (size_t i = 0; i < count; ++i) {
_filter[_row_id_to_idx[rowids[i]]] = 1;
}

// Apply filter to materialized column
doris::vectorized::IColumn::Ptr res_col = _materialized_column_ptr->filter(_filter, 0);
// Update dst column
if (vectorized::check_and_get_column<vectorized::ColumnNothing>(*dst)) {
VLOG_DEBUG << fmt::format("Dst is nothing column, create new mutable column");
dst = res_col->assume_mutable();
} else {
dst->insert_range_from(*res_col, 0, res_col->size());
}

VLOG_DEBUG << fmt::format(
"Virtual column iterators, read_by_rowids, rowids size: {}, dst size: {}", count,
dst->size());
return Status::OK();
}

} // namespace doris::segment_v2
64 changes: 64 additions & 0 deletions be/src/olap/rowset/segment_v2/virtual_column_iterator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <sys/types.h>

#include <cstdint>
#include <unordered_map>

#include "column_reader.h"
#include "common/be_mock_util.h"
#include "vec/columns/column.h"

namespace doris::segment_v2 {

class VirtualColumnIterator : public ColumnIterator {
public:
VirtualColumnIterator();
~VirtualColumnIterator() override = default;

MOCK_FUNCTION void prepare_materialization(vectorized::IColumn::Ptr column,
std::unique_ptr<std::vector<uint64_t>> labels);

Status init(const ColumnIteratorOptions& opts) override;

Status seek_to_ordinal(ordinal_t ord_idx) override;

Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override;

Status read_by_rowids(const rowid_t* rowids, const size_t count,
vectorized::MutableColumnPtr& dst) override;

ordinal_t get_current_ordinal() const override { return 0; }

#ifdef BE_TEST
vectorized::IColumn::Ptr get_materialized_column() const { return _materialized_column_ptr; }
const std::unordered_map<size_t, size_t>& get_row_id_to_idx() const { return _row_id_to_idx; }
#endif
private:
vectorized::IColumn::Ptr _materialized_column_ptr;
// segment rowid to index in column.
std::unordered_map<size_t, size_t> _row_id_to_idx;
doris::vectorized::IColumn::Filter _filter;
size_t _size = 0;
size_t _max_ordinal = 0;
ordinal_t _current_ordinal = 0;
};

} // namespace doris::segment_v2
4 changes: 4 additions & 0 deletions be/src/olap/tablet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) {
_reader_context.push_down_agg_type_opt = read_params.push_down_agg_type_opt;
_reader_context.ttl_seconds = _tablet->ttl_seconds();

_reader_context.virtual_column_exprs = read_params.virtual_column_exprs;
_reader_context.vir_cid_to_idx_in_block = read_params.vir_cid_to_idx_in_block;
_reader_context.vir_col_idx_to_type = read_params.vir_col_idx_to_type;

return Status::OK();
}

Expand Down
Loading
Loading