Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 96 additions & 8 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
#include <gen_cpp/Types_types.h>
#include <gen_cpp/descriptors.pb.h>
#include <glog/logging.h>
#include <stddef.h>

#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <ostream>
#include <tuple>
Expand All @@ -40,13 +41,12 @@
#include "runtime/primitive_type.h"
#include "runtime/raw_value.h"
#include "runtime/types.h"
#include "util/hash_util.hpp"
#include "util/string_parser.hpp"
#include "util/string_util.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/common/assert_cast.h"
#include "vec/common/string_ref.h"
// NOLINTNEXTLINE(unused-includes)
#include "vec/exprs/vexpr_context.h"
#include "vec/exprs/vliteral.h"
#include "vec/runtime/vdatetime_value.h"

Expand Down Expand Up @@ -313,6 +313,12 @@ VOlapTablePartitionParam::VOlapTablePartitionParam(std::shared_ptr<OlapTableSche
_partition_function[i] = _part_func_ctx[i]->root();
}
}

if (t_param.__isset.enable_auto_detect_overwrite && t_param.enable_auto_detect_overwrite) {
_is_auto_detect_overwrite = true;
DCHECK(t_param.__isset.overwrite_group_id);
_overwrite_group_id = t_param.overwrite_group_id;
}
}

VOlapTablePartitionParam::~VOlapTablePartitionParam() {
Expand Down Expand Up @@ -393,6 +399,7 @@ Status VOlapTablePartitionParam::_create_partition_keys(const std::vector<TExprN
Status VOlapTablePartitionParam::generate_partition_from(const TOlapTablePartition& t_part,
VOlapTablePartition*& part_result) {
DCHECK(part_result == nullptr);
// here we set the default value of partition bounds first! if it doesn't have some key, it will be -1.
part_result = _obj_pool.add(new VOlapTablePartition(&_partition_block));
part_result->id = t_part.id;
part_result->is_mutable = t_part.is_mutable;
Expand Down Expand Up @@ -452,6 +459,7 @@ Status VOlapTablePartitionParam::generate_partition_from(const TOlapTablePartiti
Status VOlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, BlockRow* part_key,
uint16_t pos) {
auto column = std::move(*part_key->first->get_by_position(pos).column).mutate();
//TODO: use assert_cast before insert_data
switch (t_expr.node_type) {
case TExprNodeType::DATE_LITERAL: {
if (TypeDescriptor::from_thrift(t_expr.type).is_date_v2_type()) {
Expand Down Expand Up @@ -587,10 +595,6 @@ Status VOlapTablePartitionParam::add_partitions(
// check index
for (int j = 0; j < num_indexes; ++j) {
if (part->indexes[j].index_id != _schema->indexes()[j]->index_id) {
std::stringstream ss;
ss << "partition's index is not equal with schema's"
<< ", part_index=" << part->indexes[j].index_id
<< ", schema_index=" << _schema->indexes()[j]->index_id;
return Status::InternalError(
"partition's index is not equal with schema's"
", part_index={}, schema_index={}",
Expand All @@ -612,4 +616,88 @@ Status VOlapTablePartitionParam::add_partitions(
return Status::OK();
}

Status VOlapTablePartitionParam::replace_partitions(
std::vector<int64_t>& old_partition_ids,
const std::vector<TOlapTablePartition>& new_partitions) {
// remove old replaced partitions
DCHECK(old_partition_ids.size() == new_partitions.size());

// init and add new partitions. insert into _partitions
for (int i = 0; i < new_partitions.size(); i++) {
const auto& t_part = new_partitions[i];
// pair old_partition_ids and new_partitions one by one. TODO: sort to opt performance
VOlapTablePartition* old_part = nullptr;
auto old_part_id = old_partition_ids[i];
if (auto it = std::find_if(
_partitions.begin(), _partitions.end(),
[=](const VOlapTablePartition* lhs) { return lhs->id == old_part_id; });
it != _partitions.end()) {
old_part = *it;
} else {
return Status::InternalError("Cannot find old tablet {} in replacing", old_part_id);
}

auto* part = _obj_pool.add(new VOlapTablePartition(&_partition_block));
part->id = t_part.id;
part->is_mutable = t_part.is_mutable;

/// just substitute directly. no need to remove and reinsert keys.
// range partition
part->start_key = std::move(old_part->start_key);
part->end_key = std::move(old_part->end_key);
// list partition
part->in_keys = std::move(old_part->in_keys);
if (t_part.__isset.is_default_partition && t_part.is_default_partition) {
_default_partition = part;
}

part->num_buckets = t_part.num_buckets;
auto num_indexes = _schema->indexes().size();
if (t_part.indexes.size() != num_indexes) {
return Status::InternalError(
"number of partition's index is not equal with schema's"
", num_part_indexes={}, num_schema_indexes={}",
t_part.indexes.size(), num_indexes);
}
part->indexes = t_part.indexes;
std::sort(part->indexes.begin(), part->indexes.end(),
[](const OlapTableIndexTablets& lhs, const OlapTableIndexTablets& rhs) {
return lhs.index_id < rhs.index_id;
});
// check index
for (int j = 0; j < num_indexes; ++j) {
if (part->indexes[j].index_id != _schema->indexes()[j]->index_id) {
return Status::InternalError(
"partition's index is not equal with schema's"
", part_index={}, schema_index={}",
part->indexes[j].index_id, _schema->indexes()[j]->index_id);
}
}

// add new partitions with new id.
_partitions.emplace_back(part);

// replace items in _partition_maps
if (_is_in_partition) {
for (auto& in_key : part->in_keys) {
(*_partitions_map)[std::tuple {in_key.first, in_key.second, false}] = part;
}
} else {
(*_partitions_map)[std::tuple {part->end_key.first, part->end_key.second, false}] =
part;
}
}
// remove old partitions by id
std::ranges::sort(old_partition_ids);
for (auto it = _partitions.begin(); it != _partitions.end();) {
if (std::ranges::binary_search(old_partition_ids, (*it)->id)) {
it = _partitions.erase(it);
} else {
it++;
}
}

return Status::OK();
}

} // namespace doris
25 changes: 19 additions & 6 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <utility>
#include <vector>

#include "common/logging.h"
#include "common/object_pool.h"
#include "common/status.h"
#include "runtime/descriptors.h"
Expand All @@ -40,7 +41,6 @@
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/exprs/vexpr_fwd.h"

namespace doris {
Expand Down Expand Up @@ -119,7 +119,7 @@ using OlapTableIndexTablets = TOlapTableIndexTablets;

using BlockRow = std::pair<vectorized::Block*, int32_t>;
using BlockRowWithIndicator =
std::tuple<vectorized::Block*, int32_t, bool>; // [block, column, is_transformed]
std::tuple<vectorized::Block*, int32_t, bool>; // [block, row, is_transformed]

struct VOlapTablePartition {
int64_t id = 0;
Expand All @@ -133,6 +133,7 @@ struct VOlapTablePartition {
int64_t load_tablet_idx = -1;

VOlapTablePartition(vectorized::Block* partition_block)
// the default value of partition bound is -1.
: start_key {partition_block, -1}, end_key {partition_block, -1} {}
};

Expand Down Expand Up @@ -172,6 +173,10 @@ class VOlapTablePartitionParam {
VOlapTablePartition*& partition) const {
auto it = _is_in_partition ? _partitions_map->find(std::tuple {block, row, true})
: _partitions_map->upper_bound(std::tuple {block, row, true});
VLOG_TRACE << "find row " << row << " of\n"
<< block->dump_data() << "in:\n"
<< _partition_block.dump_data() << "result line row: " << std::get<1>(it->first);

// for list partition it might result in default partition
if (_is_in_partition) {
partition = (it != _partitions_map->end()) ? it->second : _default_partition;
Expand Down Expand Up @@ -246,9 +251,15 @@ class VOlapTablePartitionParam {
bool is_projection_partition() const { return _is_auto_partition; }
bool is_auto_partition() const { return _is_auto_partition; }

bool is_auto_detect_overwrite() const { return _is_auto_detect_overwrite; }
int64_t get_overwrite_group_id() const { return _overwrite_group_id; }

std::vector<uint16_t> get_partition_keys() const { return _partition_slot_locs; }

Status add_partitions(const std::vector<TOlapTablePartition>& partitions);
// no need to del/reinsert partition keys, but change the link. reset the _partitions items
Status replace_partitions(std::vector<int64_t>& old_partition_ids,
const std::vector<TOlapTablePartition>& new_partitions);

vectorized::VExprContextSPtrs get_part_func_ctx() { return _part_func_ctx; }
vectorized::VExprSPtrs get_partition_function() { return _partition_function; }
Expand Down Expand Up @@ -293,11 +304,13 @@ class VOlapTablePartitionParam {
// only works when using list partition, the resource is owned by _partitions
VOlapTablePartition* _default_partition = nullptr;

// for auto partition, now only support 1 column. TODO: use vector to save them when we support multi column auto-partition.
bool _is_auto_partition = false;
vectorized::VExprContextSPtrs _part_func_ctx = {nullptr};
vectorized::VExprSPtrs _partition_function = {nullptr};
TPartitionType::type _part_type; // support list or range
// "insert overwrite partition(*)", detect which partitions by BE
bool _is_auto_detect_overwrite = false;
int64_t _overwrite_group_id = 0;
};

// indicate where's the tablet and all its replications (node-wise)
Expand Down Expand Up @@ -360,13 +373,13 @@ class DorisNodesInfo {
public:
DorisNodesInfo() = default;
DorisNodesInfo(const TPaloNodesInfo& t_nodes) {
for (auto& node : t_nodes.nodes) {
for (const auto& node : t_nodes.nodes) {
_nodes.emplace(node.id, node);
}
}
void setNodes(const TPaloNodesInfo& t_nodes) {
_nodes.clear();
for (auto& node : t_nodes.nodes) {
for (const auto& node : t_nodes.nodes) {
_nodes.emplace(node.id, node);
}
}
Expand All @@ -380,7 +393,7 @@ class DorisNodesInfo {

void add_nodes(const std::vector<TNodeInfo>& t_nodes) {
for (const auto& node : t_nodes) {
auto node_info = find_node(node.id);
const auto* node_info = find_node(node.id);
if (node_info == nullptr) {
_nodes.emplace(node.id, node);
}
Expand Down
Loading