diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index af065f332a00e4..061cc7b6681286 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -23,9 +23,10 @@ #include #include #include -#include #include +#include +#include #include #include #include @@ -40,13 +41,12 @@ #include "runtime/primitive_type.h" #include "runtime/raw_value.h" #include "runtime/types.h" -#include "util/hash_util.hpp" #include "util/string_parser.hpp" #include "util/string_util.h" #include "vec/columns/column.h" #include "vec/columns/column_nullable.h" -#include "vec/common/assert_cast.h" -#include "vec/common/string_ref.h" +// NOLINTNEXTLINE(unused-includes) +#include "vec/exprs/vexpr_context.h" #include "vec/exprs/vliteral.h" #include "vec/runtime/vdatetime_value.h" @@ -313,6 +313,12 @@ VOlapTablePartitionParam::VOlapTablePartitionParam(std::shared_ptrroot(); } } + + if (t_param.__isset.enable_auto_detect_overwrite && t_param.enable_auto_detect_overwrite) { + _is_auto_detect_overwrite = true; + DCHECK(t_param.__isset.overwrite_group_id); + _overwrite_group_id = t_param.overwrite_group_id; + } } VOlapTablePartitionParam::~VOlapTablePartitionParam() { @@ -393,6 +399,7 @@ Status VOlapTablePartitionParam::_create_partition_keys(const std::vectorid = t_part.id; part_result->is_mutable = t_part.is_mutable; @@ -452,6 +459,7 @@ Status VOlapTablePartitionParam::generate_partition_from(const TOlapTablePartiti Status VOlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, BlockRow* part_key, uint16_t pos) { auto column = std::move(*part_key->first->get_by_position(pos).column).mutate(); + //TODO: use assert_cast before insert_data switch (t_expr.node_type) { case TExprNodeType::DATE_LITERAL: { if (TypeDescriptor::from_thrift(t_expr.type).is_date_v2_type()) { @@ -587,10 +595,6 @@ Status VOlapTablePartitionParam::add_partitions( // check index for (int j = 0; j < num_indexes; ++j) { if (part->indexes[j].index_id != _schema->indexes()[j]->index_id) { - std::stringstream ss; - ss << "partition's index is not equal with schema's" - << ", part_index=" << part->indexes[j].index_id - << ", schema_index=" << _schema->indexes()[j]->index_id; return Status::InternalError( "partition's index is not equal with schema's" ", part_index={}, schema_index={}", @@ -612,4 +616,88 @@ Status VOlapTablePartitionParam::add_partitions( return Status::OK(); } +Status VOlapTablePartitionParam::replace_partitions( + std::vector& old_partition_ids, + const std::vector& new_partitions) { + // remove old replaced partitions + DCHECK(old_partition_ids.size() == new_partitions.size()); + + // init and add new partitions. insert into _partitions + for (int i = 0; i < new_partitions.size(); i++) { + const auto& t_part = new_partitions[i]; + // pair old_partition_ids and new_partitions one by one. TODO: sort to opt performance + VOlapTablePartition* old_part = nullptr; + auto old_part_id = old_partition_ids[i]; + if (auto it = std::find_if( + _partitions.begin(), _partitions.end(), + [=](const VOlapTablePartition* lhs) { return lhs->id == old_part_id; }); + it != _partitions.end()) { + old_part = *it; + } else { + return Status::InternalError("Cannot find old tablet {} in replacing", old_part_id); + } + + auto* part = _obj_pool.add(new VOlapTablePartition(&_partition_block)); + part->id = t_part.id; + part->is_mutable = t_part.is_mutable; + + /// just substitute directly. no need to remove and reinsert keys. + // range partition + part->start_key = std::move(old_part->start_key); + part->end_key = std::move(old_part->end_key); + // list partition + part->in_keys = std::move(old_part->in_keys); + if (t_part.__isset.is_default_partition && t_part.is_default_partition) { + _default_partition = part; + } + + part->num_buckets = t_part.num_buckets; + auto num_indexes = _schema->indexes().size(); + if (t_part.indexes.size() != num_indexes) { + return Status::InternalError( + "number of partition's index is not equal with schema's" + ", num_part_indexes={}, num_schema_indexes={}", + t_part.indexes.size(), num_indexes); + } + part->indexes = t_part.indexes; + std::sort(part->indexes.begin(), part->indexes.end(), + [](const OlapTableIndexTablets& lhs, const OlapTableIndexTablets& rhs) { + return lhs.index_id < rhs.index_id; + }); + // check index + for (int j = 0; j < num_indexes; ++j) { + if (part->indexes[j].index_id != _schema->indexes()[j]->index_id) { + return Status::InternalError( + "partition's index is not equal with schema's" + ", part_index={}, schema_index={}", + part->indexes[j].index_id, _schema->indexes()[j]->index_id); + } + } + + // add new partitions with new id. + _partitions.emplace_back(part); + + // replace items in _partition_maps + if (_is_in_partition) { + for (auto& in_key : part->in_keys) { + (*_partitions_map)[std::tuple {in_key.first, in_key.second, false}] = part; + } + } else { + (*_partitions_map)[std::tuple {part->end_key.first, part->end_key.second, false}] = + part; + } + } + // remove old partitions by id + std::ranges::sort(old_partition_ids); + for (auto it = _partitions.begin(); it != _partitions.end();) { + if (std::ranges::binary_search(old_partition_ids, (*it)->id)) { + it = _partitions.erase(it); + } else { + it++; + } + } + + return Status::OK(); +} + } // namespace doris diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index a157b57948721f..9c3a1b6db44073 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -32,6 +32,7 @@ #include #include +#include "common/logging.h" #include "common/object_pool.h" #include "common/status.h" #include "runtime/descriptors.h" @@ -40,7 +41,6 @@ #include "vec/core/block.h" #include "vec/core/column_with_type_and_name.h" #include "vec/exprs/vexpr.h" -#include "vec/exprs/vexpr_context.h" #include "vec/exprs/vexpr_fwd.h" namespace doris { @@ -119,7 +119,7 @@ using OlapTableIndexTablets = TOlapTableIndexTablets; using BlockRow = std::pair; using BlockRowWithIndicator = - std::tuple; // [block, column, is_transformed] + std::tuple; // [block, row, is_transformed] struct VOlapTablePartition { int64_t id = 0; @@ -133,6 +133,7 @@ struct VOlapTablePartition { int64_t load_tablet_idx = -1; VOlapTablePartition(vectorized::Block* partition_block) + // the default value of partition bound is -1. : start_key {partition_block, -1}, end_key {partition_block, -1} {} }; @@ -172,6 +173,10 @@ class VOlapTablePartitionParam { VOlapTablePartition*& partition) const { auto it = _is_in_partition ? _partitions_map->find(std::tuple {block, row, true}) : _partitions_map->upper_bound(std::tuple {block, row, true}); + VLOG_TRACE << "find row " << row << " of\n" + << block->dump_data() << "in:\n" + << _partition_block.dump_data() << "result line row: " << std::get<1>(it->first); + // for list partition it might result in default partition if (_is_in_partition) { partition = (it != _partitions_map->end()) ? it->second : _default_partition; @@ -246,9 +251,15 @@ class VOlapTablePartitionParam { bool is_projection_partition() const { return _is_auto_partition; } bool is_auto_partition() const { return _is_auto_partition; } + bool is_auto_detect_overwrite() const { return _is_auto_detect_overwrite; } + int64_t get_overwrite_group_id() const { return _overwrite_group_id; } + std::vector get_partition_keys() const { return _partition_slot_locs; } Status add_partitions(const std::vector& partitions); + // no need to del/reinsert partition keys, but change the link. reset the _partitions items + Status replace_partitions(std::vector& old_partition_ids, + const std::vector& new_partitions); vectorized::VExprContextSPtrs get_part_func_ctx() { return _part_func_ctx; } vectorized::VExprSPtrs get_partition_function() { return _partition_function; } @@ -293,11 +304,13 @@ class VOlapTablePartitionParam { // only works when using list partition, the resource is owned by _partitions VOlapTablePartition* _default_partition = nullptr; - // for auto partition, now only support 1 column. TODO: use vector to save them when we support multi column auto-partition. bool _is_auto_partition = false; vectorized::VExprContextSPtrs _part_func_ctx = {nullptr}; vectorized::VExprSPtrs _partition_function = {nullptr}; TPartitionType::type _part_type; // support list or range + // "insert overwrite partition(*)", detect which partitions by BE + bool _is_auto_detect_overwrite = false; + int64_t _overwrite_group_id = 0; }; // indicate where's the tablet and all its replications (node-wise) @@ -360,13 +373,13 @@ class DorisNodesInfo { public: DorisNodesInfo() = default; DorisNodesInfo(const TPaloNodesInfo& t_nodes) { - for (auto& node : t_nodes.nodes) { + for (const auto& node : t_nodes.nodes) { _nodes.emplace(node.id, node); } } void setNodes(const TPaloNodesInfo& t_nodes) { _nodes.clear(); - for (auto& node : t_nodes.nodes) { + for (const auto& node : t_nodes.nodes) { _nodes.emplace(node.id, node); } } @@ -380,7 +393,7 @@ class DorisNodesInfo { void add_nodes(const std::vector& t_nodes) { for (const auto& node : t_nodes) { - auto node_info = find_node(node.id); + const auto* node_info = find_node(node.id); if (node_info == nullptr) { _nodes.emplace(node.id, node); } diff --git a/be/src/vec/sink/vrow_distribution.cpp b/be/src/vec/sink/vrow_distribution.cpp index 358f8d3783ede8..d98131613a12f9 100644 --- a/be/src/vec/sink/vrow_distribution.cpp +++ b/be/src/vec/sink/vrow_distribution.cpp @@ -21,6 +21,7 @@ #include #include +#include #include #include "common/logging.h" @@ -88,7 +89,7 @@ Status VRowDistribution::automatic_create_partition() { request.__set_partitionValues(_partitions_need_create); request.__set_be_endpoint(be_endpoint); - VLOG(1) << "automatic partition rpc begin request " << request; + VLOG_NOTICE << "automatic partition rpc begin request " << request; TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; int time_out = _state->execution_timeout() * 1000; RETURN_IF_ERROR(ThriftRpcHelper::rpc( @@ -99,7 +100,7 @@ Status VRowDistribution::automatic_create_partition() { time_out)); Status status(Status::create(result.status)); - VLOG(1) << "automatic partition rpc end response " << result; + VLOG_NOTICE << "automatic partition rpc end response " << result; if (result.status.status_code == TStatusCode::OK) { // add new created partitions RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions)); @@ -109,6 +110,77 @@ Status VRowDistribution::automatic_create_partition() { return status; } +// for reuse the same create callback of create-partition +static TCreatePartitionResult cast_as_create_result(TReplacePartitionResult& arg) { + TCreatePartitionResult result; + result.status = arg.status; + result.nodes = std::move(arg.nodes); + result.partitions = std::move(arg.partitions); + result.tablets = std::move(arg.tablets); + return result; +} + +// use _partitions and replace them +Status VRowDistribution::_replace_overwriting_partition() { + SCOPED_TIMER(_add_partition_request_timer); + TReplacePartitionRequest request; + TReplacePartitionResult result; + request.__set_overwrite_group_id(_vpartition->get_overwrite_group_id()); + request.__set_db_id(_vpartition->db_id()); + request.__set_table_id(_vpartition->table_id()); + + // only request for partitions not recorded for replacement + std::set id_deduper; + for (const auto* part : _partitions) { + if (part == nullptr) [[unlikely]] { + return Status::EndOfFile( + "Cannot found origin partitions in auto detect overwriting, stop processing"); + } + if (_new_partition_ids.contains(part->id)) { + // this is a new partition. dont replace again. + } else { + // request for replacement + id_deduper.insert(part->id); + } + } + if (id_deduper.empty()) { + return Status::OK(); // no need to request + } + // de-duplicate. there's no check in FE + std::vector request_part_ids(id_deduper.begin(), id_deduper.end()); + + request.__set_partition_ids(request_part_ids); + + string be_endpoint = BackendOptions::get_be_endpoint(); + request.__set_be_endpoint(be_endpoint); + + VLOG_NOTICE << "auto detect replace partition request: " << request; + TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + int time_out = _state->execution_timeout() * 1000; + RETURN_IF_ERROR(ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->replacePartition(result, request); + }, + time_out)); + + Status status(Status::create(result.status)); + VLOG_NOTICE << "auto detect replace partition result: " << result; + if (result.status.status_code == TStatusCode::OK) { + // record new partitions + for (const auto& part : result.partitions) { + _new_partition_ids.insert(part.id); + } + // replace data in _partitions + RETURN_IF_ERROR(_vpartition->replace_partitions(request_part_ids, result.partitions)); + // reuse the function as the args' structure are same. it add nodes/locations and incremental_open + auto result_as_create = cast_as_create_result(result); + RETURN_IF_ERROR(_create_partition_callback(_caller, &result_as_create)); + } + + return status; +} + void VRowDistribution::_get_tablet_ids(vectorized::Block* block, int32_t index_idx, std::vector& tablet_ids) { tablet_ids.reserve(block->rows()); @@ -284,6 +356,29 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_partition( return Status::OK(); } +Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite( + vectorized::Block* block, bool has_filtered_rows, + std::vector& row_part_tablet_ids) { + auto num_rows = block->rows(); + + bool stop_processing = false; + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, + _tablet_indexes, stop_processing, _skip)); + RETURN_IF_ERROR(_replace_overwriting_partition()); + + // regenerate locations for new partitions & tablets + _reset_find_tablets(num_rows); + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, + _tablet_indexes, stop_processing, _skip)); + if (has_filtered_rows) { + for (int i = 0; i < num_rows; i++) { + _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; + } + } + RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); + return Status::OK(); +} + void VRowDistribution::_reset_row_part_tablet_ids( std::vector& row_part_tablet_ids, int64_t rows) { row_part_tablet_ids.resize(_schema->indexes().size()); @@ -321,12 +416,7 @@ Status VRowDistribution::generate_rows_distribution( } auto num_rows = block->rows(); - _tablet_finder->filter_bitmap().Reset(num_rows); - - //reuse vars for find_tablets - _partitions.assign(num_rows, nullptr); - _skip.assign(num_rows, false); - _tablet_indexes.assign(num_rows, 0); + _reset_find_tablets(num_rows); // if there's projection of partition calc, we need to calc it first. auto [part_ctxs, part_funcs] = _get_partition_function(); @@ -345,7 +435,11 @@ Status VRowDistribution::generate_rows_distribution( _vpartition->set_transformed_slots(partition_cols_idx); } - if (_vpartition->is_auto_partition() && !_deal_batched) { + if (_vpartition->is_auto_detect_overwrite()) { + // when overwrite, no auto create partition allowed. + RETURN_IF_ERROR(_generate_rows_distribution_for_auto_overwrite( + block.get(), has_filtered_rows, row_part_tablet_ids)); + } else if (_vpartition->is_auto_partition() && !_deal_batched) { RETURN_IF_ERROR(_generate_rows_distribution_for_auto_partition( block.get(), partition_cols_idx, has_filtered_rows, row_part_tablet_ids, rows_stat_val)); @@ -359,4 +453,12 @@ Status VRowDistribution::generate_rows_distribution( return Status::OK(); } +// reuse vars for find_tablets +void VRowDistribution::_reset_find_tablets(int64_t rows) { + _tablet_finder->filter_bitmap().Reset(rows); + _partitions.assign(rows, nullptr); + _skip.assign(rows, false); + _tablet_indexes.assign(rows, 0); +} + } // namespace doris::vectorized diff --git a/be/src/vec/sink/vrow_distribution.h b/be/src/vec/sink/vrow_distribution.h index 12acda73ed2033..19a6538cc12f20 100644 --- a/be/src/vec/sink/vrow_distribution.h +++ b/be/src/vec/sink/vrow_distribution.h @@ -18,23 +18,21 @@ #pragma once // IWYU pragma: no_include +#include #include #include #include #include #include -#include #include #include "common/status.h" #include "exec/tablet_info.h" #include "runtime/runtime_state.h" -#include "runtime/types.h" #include "util/runtime_profile.h" -#include "util/stopwatch.hpp" #include "vec/core/block.h" -#include "vec/data_types/data_type.h" +#include "vec/exprs/vexpr_context.h" #include "vec/exprs/vexpr_fwd.h" #include "vec/sink/vtablet_block_convertor.h" #include "vec/sink/vtablet_finder.h" @@ -50,6 +48,15 @@ class RowPartTabletIds { std::vector row_ids; std::vector partition_ids; std::vector tablet_ids; + + std::string debug_string() const { + std::string value; + value.reserve(row_ids.size() * 15); + for (int i = 0; i < row_ids.size(); i++) { + value.append(fmt::format("[{}, {}, {}]", row_ids[i], partition_ids[i], tablet_ids[i])); + } + return value; + } }; // void* for caller @@ -153,8 +160,14 @@ class VRowDistribution { vectorized::Block* block, bool has_filtered_rows, std::vector& row_part_tablet_ids); + Status _generate_rows_distribution_for_auto_overwrite( + vectorized::Block* block, bool has_filtered_rows, + std::vector& row_part_tablet_ids); + Status _replace_overwriting_partition(); + void _reset_row_part_tablet_ids(std::vector& row_part_tablet_ids, int64_t rows); + void _reset_find_tablets(int64_t rows); RuntimeState* _state = nullptr; int _batch_size = 0; @@ -177,16 +190,19 @@ class VRowDistribution { OlapTableLocationParam* _location = nullptr; // int64_t _number_output_rows = 0; const VExprContextSPtrs* _vec_output_expr_ctxs = nullptr; + // generally it's writer's on_partitions_created CreatePartitionCallback _create_partition_callback = nullptr; void* _caller = nullptr; std::shared_ptr _schema; - // reuse for find_tablet. + // reuse for find_tablet. save partitions found by find_tablets std::vector _partitions; std::vector _skip; std::vector _tablet_indexes; std::vector _tablet_ids; std::vector _missing_map; // indice of missing values in partition_col + // for auto detect overwrite partition + std::set _new_partition_ids; // if contains, not to replace it again. }; } // namespace doris::vectorized diff --git a/be/src/vec/sink/vtablet_finder.cpp b/be/src/vec/sink/vtablet_finder.cpp index 865a3066d62f8c..2e0d278fa4fe01 100644 --- a/be/src/vec/sink/vtablet_finder.cpp +++ b/be/src/vec/sink/vtablet_finder.cpp @@ -22,22 +22,14 @@ #include #include -#include #include -#include #include #include "common/compiler_util.h" // IWYU pragma: keep #include "common/status.h" #include "exec/tablet_info.h" -#include "exprs/runtime_filter.h" -#include "gutil/integral_types.h" -#include "runtime/descriptors.h" #include "runtime/runtime_state.h" #include "vec/core/block.h" -#include "vec/core/columns_with_type_and_name.h" -#include "vec/data_types/data_type.h" -#include "vec/functions/simple_function_factory.h" namespace doris::vectorized { Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int rows, diff --git a/be/src/vec/sink/vtablet_finder.h b/be/src/vec/sink/vtablet_finder.h index 537e38d86c3d34..24f8e357e28976 100644 --- a/be/src/vec/sink/vtablet_finder.h +++ b/be/src/vec/sink/vtablet_finder.h @@ -17,8 +17,8 @@ #pragma once +#include #include -#include #include "common/status.h" #include "exec/tablet_info.h" @@ -53,6 +53,7 @@ class OlapTabletFinder { bool is_single_tablet() { return _partition_to_tablet_map.size() == 1; } + // all partitions for multi find-processes of its relative writer. const vectorized::flat_hash_set& partition_ids() { return _partition_ids; } int64_t num_filtered_rows() const { return _num_filtered_rows; } diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index c06303e6a9ed88..404ca882be3db5 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -677,9 +677,8 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { request->add_partition_ids(pid); } - request->set_write_single_replica(false); + request->set_write_single_replica(_parent->_write_single_replica); if (_parent->_write_single_replica) { - request->set_write_single_replica(true); for (auto& _slave_tablet_node : _slave_tablet_nodes) { PSlaveTabletNodes slave_tablet_nodes; for (auto node_id : _slave_tablet_node.second) { diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 68506ca161e015..1e5b0e2d0cfbc9 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -24,13 +24,14 @@ #include #include -#include +#include #include #include #include #include #include "common/compiler_util.h" // IWYU pragma: keep +#include "common/logging.h" #include "common/object_pool.h" #include "common/signal_handler.h" #include "common/status.h" @@ -40,17 +41,14 @@ #include "runtime/exec_env.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" -#include "service/brpc.h" -#include "util/brpc_client_cache.h" #include "util/debug_points.h" #include "util/defer_op.h" #include "util/doris_metrics.h" -#include "util/threadpool.h" -#include "util/thrift_util.h" #include "util/uid_util.h" #include "vec/core/block.h" #include "vec/sink/delta_writer_v2_pool.h" -#include "vec/sink/load_stream_stub.h" +// NOLINTNEXTLINE(unused-includes) +#include "vec/sink/load_stream_stub.h" // IWYU pragma: keep #include "vec/sink/load_stream_stub_pool.h" #include "vec/sink/vtablet_block_convertor.h" #include "vec/sink/vtablet_finder.h" @@ -107,6 +105,8 @@ Status VTabletWriterV2::_incremental_open_streams( } _indexes_from_node[node].emplace_back(tablet); known_indexes.insert(index.index_id); + VLOG_DEBUG << "incremental open stream (" << partition->id << ", " << tablet_id + << ")"; } } } @@ -207,7 +207,7 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) { "output_tuple_slot_num {} should be equal to output_expr_num {}", _output_tuple_desc->slots().size() + 1, _vec_output_expr_ctxs.size()); }); - if (_vec_output_expr_ctxs.size() > 0 && + if (!_vec_output_expr_ctxs.empty() && _output_tuple_desc->slots().size() != _vec_output_expr_ctxs.size()) { LOG(WARNING) << "output tuple slot num should be equal to num of output exprs, " << "output_tuple_slot_num " << _output_tuple_desc->slots().size() @@ -298,7 +298,7 @@ Status VTabletWriterV2::_build_tablet_node_mapping() { for (const auto& partition : _vpartition->get_partitions()) { for (const auto& index : partition->indexes) { for (const auto& tablet_id : index.tablets) { - auto tablet_location = _location->find_tablet(tablet_id); + auto* tablet_location = _location->find_tablet(tablet_id); DBUG_EXECUTE_IF("VTabletWriterV2._build_tablet_node_mapping.tablet_location_null", { tablet_location = nullptr; }); if (tablet_location == nullptr) { @@ -359,6 +359,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, tablet.set_partition_id(partition_id); tablet.set_index_id(index_id); tablet.set_tablet_id(tablet_id); + VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id, index_id, tablet_id); _tablets_for_node[node_id].emplace(tablet_id, tablet); streams.emplace_back(_streams_for_node.at(node_id)->streams().at(_stream_index)); RETURN_IF_ERROR(streams[0]->wait_for_schema(partition_id, index_id, tablet_id)); @@ -632,11 +633,23 @@ Status VTabletWriterV2::close(Status exec_status) { Status VTabletWriterV2::_close_load(const Streams& streams) { auto node_id = streams[0]->dst_id(); std::vector tablets_to_commit; + std::vector partition_ids; for (auto [tablet_id, tablet] : _tablets_for_node[node_id]) { if (_tablet_finder->partition_ids().contains(tablet.partition_id())) { + if (VLOG_DEBUG_IS_ON) { + partition_ids.push_back(tablet.partition_id()); + } tablets_to_commit.push_back(tablet); } } + if (VLOG_DEBUG_IS_ON) { + std::string msg("close load partitions: "); + msg.reserve(partition_ids.size() * 7); + for (auto v : partition_ids) { + msg.append(std::to_string(v) + ", "); + } + LOG(WARNING) << msg; + } for (const auto& stream : streams) { RETURN_IF_ERROR(stream->close_load(tablets_to_commit)); } diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT-OVERWRITE.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT-OVERWRITE.md index f94e37f5e01ca2..7293785ba536ef 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT-OVERWRITE.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT-OVERWRITE.md @@ -33,11 +33,11 @@ INSERT OVERWRITE ### Description -The function of this statement is to overwrite a table or a partition of a table +The function of this statement is to overwrite a table or some partitions of a table ```sql INSERT OVERWRITE table table_name - [ PARTITION (p1, ...) ] + [ PARTITION (p1, ... | *) ] [ WITH LABEL label] [ (column [, ...]) ] [ [ hint [, ...] ] ] @@ -48,7 +48,10 @@ INSERT OVERWRITE table table_name > table_name: the destination table to overwrite. This table must exist. It can be of the form `db_name.table_name` > -> partitions: the table partition that needs to be overwritten must be one of the existing partitions in `table_name` separated by a comma +> partitions: the table partitions that needs to be overwritten. The following two formats are supported +> +>> 1. partition names. must be one of the existing partitions in `table_name` separated by a comma +>> 2. asterisk(*)。Enable [auto-detect-partition](#overwrite-auto-detect-partition). The write operation will automatically detect the partitions involved in the data and overwrite those partitions. > > label: specify a label for the Insert task > @@ -69,7 +72,7 @@ INSERT OVERWRITE table table_name Notice: 1. In the current version, the session variable `enable_insert_strict` is set to `true` by default. If some data that does not conform to the format of the target table is filtered out during the execution of the `INSERT OVERWRITE` statement, such as when overwriting a partition and not all partition conditions are satisfied, overwriting the target table will fail. -2. If the target table of the INSERT OVERWRITE is an [AUTO-PARTITION-table](../../../../advanced/partition/auto-partition), then new partitions can be created if PARTITION is not specified (that is, rewrite the whole table). If PARTITION for overwrite is specified, then the AUTO PARTITION table behaves as if it were a normal partitioned table during this process, and data that does not satisfy the existing partition conditions is filtered instead of creating a new partition. +2. If the target table of the INSERT OVERWRITE is an [AUTO-PARTITION-table](../../../../advanced/partition/auto-partition), then new partitions can be created if PARTITION is not specified (that is, rewrite the whole table). If PARTITION for overwrite is specified(Includes automatic detection and overwriting of partitions through the `partition(*)` syntax), then the AUTO PARTITION table behaves as if it were a normal partitioned table during this process, and data that does not satisfy the existing partition conditions is filtered instead of creating a new partition. 3. The `INSERT OVERWRITE` statement first creates a new table, inserts the data to be overwritten into the new table, and then atomically replaces the old table with the new table and modifies its name. Therefore, during the process of overwriting the table, the data in the old table can still be accessed normally until the overwriting is completed. ### Example @@ -138,6 +141,13 @@ PROPERTIES ( #### Overwrite Table Partition +When using INSERT OVERWRITE to rewrite partitions, we actually encapsulate the following three steps into a single transaction and execute it. If it fails halfway through, the operations that have been performed will be rolled back: +1. Assuming that partition `p1` is specified to be rewritten, first create an empty temporary partition `pTMP` with the same structure as the target partition to be rewritten. +2. Write data to `pTMP`. +3. replace `p1` with the `pTMP` atom + +The following is examples: + 1. Overwrite partitions `P1` and `P2` of the `test` table using the form of `VALUES`. ```sql @@ -175,6 +185,56 @@ PROPERTIES ( INSERT OVERWRITE table test PARTITION(p1,p2) WITH LABEL `label4` (c1, c2) SELECT * from test2; ``` + +#### Overwrite Auto Detect Partition + +When the PARTITION clause specified by the INSERT OVERWRITE command is `PARTITION(*)`, this overwrite will automatically detect the partition where the data is located. Example: + +```sql +mysql> create table test( + -> k0 int null + -> ) + -> partition by range (k0) + -> ( + -> PARTITION p10 values less than (10), + -> PARTITION p100 values less than (100), + -> PARTITION pMAX values less than (maxvalue) + -> ) + -> DISTRIBUTED BY HASH(`k0`) BUCKETS 1 + -> properties("replication_num" = "1"); +Query OK, 0 rows affected (0.11 sec) + +mysql> insert into test values (1), (2), (15), (100), (200); +Query OK, 5 rows affected (0.29 sec) + +mysql> select * from test order by k0; ++------+ +| k0 | ++------+ +| 1 | +| 2 | +| 15 | +| 100 | +| 200 | ++------+ +5 rows in set (0.23 sec) + +mysql> insert overwrite table test partition(*) values (3), (1234); +Query OK, 2 rows affected (0.24 sec) + +mysql> select * from test order by k0; ++------+ +| k0 | ++------+ +| 3 | +| 15 | +| 1234 | ++------+ +3 rows in set (0.20 sec) +``` + +As you can see, all data in partitions `p10` and `pMAX`, where data 3 and 1234 are located, are overwritten, while partition `p100` remains unchanged. This operation can be interpreted as syntactic sugar for specifying a specific partition to be overwritten by the PARTITION clause during an INSERT OVERWRITE operation, which is implemented in the same way as [specify a partition to overwrite](#overwrite-table-partition). The `PARTITION(*)` syntax eliminates the need to manually fill in all the partition names when overwriting a large number of partitions. + ### Keywords - INSERT OVERWRITE, OVERWRITE + INSERT OVERWRITE, OVERWRITE, AUTO DETECT diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT-OVERWRITE.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT-OVERWRITE.md index c506029155b19e..587a981c24b1a1 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT-OVERWRITE.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT-OVERWRITE.md @@ -33,11 +33,11 @@ INSERT OVERWRITE ### Description -该语句的功能是重写表或表的某个分区 +该语句的功能是重写表或表的某些分区 ```sql INSERT OVERWRITE table table_name - [ PARTITION (p1, ...) ] + [ PARTITION (p1, ... | *) ] [ WITH LABEL label] [ (column [, ...]) ] [ [ hint [, ...] ] ] @@ -48,7 +48,10 @@ INSERT OVERWRITE table table_name > table_name: 需要重写的目的表。这个表必须存在。可以是 `db_name.table_name` 形式 > -> partitions: 需要重写的表分区,必须是 `table_name` 中存在的分区,多个分区名称用逗号分隔 +> partitions: 需要重写的目标分区,支持两种形式: +> +>> 1. 分区名。必须是 `table_name` 中存在的分区,多个分区名称用逗号分隔。 +>> 2. 星号(*)。开启[自动检测分区](#overwrite-auto-detect-partition)功能。写入操作将会自动检测数据所涉及的分区,并覆写这些分区。 > > label: 为 Insert 任务指定一个 label > @@ -69,7 +72,7 @@ INSERT OVERWRITE table table_name 注意: 1. 在当前版本中,会话变量 `enable_insert_strict` 默认为 `true`,如果执行 `INSERT OVERWRITE` 语句时,对于有不符合目标表格式的数据被过滤掉的话会重写目标表失败(比如重写分区时,不满足所有分区条件的数据会被过滤)。 -2. 如果INSERT OVERWRITE的目标表是[AUTO-PARTITION表](../../../../advanced/partition/auto-partition),若未指定PARTITION(重写整表),那么可以创建新的分区。如果指定了覆写的PARTITION,那么在此过程中,AUTO PARTITION表表现得如同普通分区表一样,不满足现有分区条件的数据将被过滤,而非创建新的分区。 +2. 如果INSERT OVERWRITE的目标表是[AUTO-PARTITION表](../../../../advanced/partition/auto-partition),若未指定PARTITION(重写整表),那么可以创建新的分区。如果指定了覆写的PARTITION(包括通过 `partition(*)` 语法自动检测并覆盖分区),那么在此过程中,AUTO PARTITION表表现得如同普通分区表一样,不满足现有分区条件的数据将被过滤,而非创建新的分区。 3. INSERT OVERWRITE语句会首先创建一个新表,将需要重写的数据插入到新表中,最后原子性的用新表替换旧表并修改名称。因此,在重写表的过程中,旧表中的数据在重写完毕之前仍然可以正常访问。 ### Example @@ -139,6 +142,13 @@ PROPERTIES ( #### Overwrite Table Partition +使用 INSERT OVERWRITE 重写分区时,实际我们是将如下三步操作封装为一个事务并执行,如果中途失败,已进行的操作将会回滚: +1. 假设指定重写分区 p1,首先创建一个与重写的目标分区结构相同的空临时分区 `pTMP` +2. 向 `pTMP` 中写入数据 +3. 使用 `pTMP` 原子替换 `p1` 分区 + +举例如下: + 1. VALUES的形式重写`test`表分区`P1`和`p2` ```sql @@ -176,7 +186,56 @@ PROPERTIES ( INSERT OVERWRITE table test PARTITION(p1,p2) WITH LABEL `label4` (c1, c2) SELECT * from test2; ``` +#### Overwrite Auto Detect Partition + +当 INSERT OVERWRITE 命令指定的 PARTITION 子句为 `PARTITION(*)` 时,此次覆写将会自动检测分区数据所在的分区。例如: + +```sql +mysql> create table test( + -> k0 int null + -> ) + -> partition by range (k0) + -> ( + -> PARTITION p10 values less than (10), + -> PARTITION p100 values less than (100), + -> PARTITION pMAX values less than (maxvalue) + -> ) + -> DISTRIBUTED BY HASH(`k0`) BUCKETS 1 + -> properties("replication_num" = "1"); +Query OK, 0 rows affected (0.11 sec) + +mysql> insert into test values (1), (2), (15), (100), (200); +Query OK, 5 rows affected (0.29 sec) + +mysql> select * from test order by k0; ++------+ +| k0 | ++------+ +| 1 | +| 2 | +| 15 | +| 100 | +| 200 | ++------+ +5 rows in set (0.23 sec) + +mysql> insert overwrite table test partition(*) values (3), (1234); +Query OK, 2 rows affected (0.24 sec) + +mysql> select * from test order by k0; ++------+ +| k0 | ++------+ +| 3 | +| 15 | +| 1234 | ++------+ +3 rows in set (0.20 sec) +``` + +可以看到,数据 3、1234 所在的分区 `p10` 和 `pMAX` 中的全部数据均被覆写,而 `p100` 分区未发生变化。该操作可以理解为 INSERT OVERWRITE 操作时通过 PARTITION 子句指定覆写特定分区的语法糖,它的实现原理与[指定重写特定分区](#overwrite-table-partition)相同。通过 `PARTITION(*)` 的语法,在覆写大量分区数据时我们可以免于手动填写全部分区名的繁琐。 + ### Keywords - INSERT OVERWRITE, OVERWRITE + INSERT OVERWRITE, OVERWRITE, AUTO DETECT diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 921059637a596c..e16fa2d092d926 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -124,9 +124,9 @@ constraint partitionSpec : TEMPORARY? (PARTITION | PARTITIONS) partitions=identifierList | TEMPORARY? PARTITION partition=errorCapturingIdentifier - // TODO: support analyze external table partition spec https://github.com/apache/doris/pull/24154 - // | PARTITIONS LEFT_PAREN ASTERISK RIGHT_PAREN - // | PARTITIONS WITH RECENT + | (PARTITION | PARTITIONS) LEFT_PAREN ASTERISK RIGHT_PAREN // for auto detect partition in overwriting + // TODO: support analyze external table partition spec https://github.com/apache/doris/pull/24154 + // | PARTITIONS WITH RECENT ; partitionTable diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 22dd017b68a41e..a2b4b78dea9bb9 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -6071,6 +6071,10 @@ partition_names ::= {: RESULT = new PartitionNames(true); :} + | KW_PARTITION LPAREN STAR RPAREN + {: + RESULT = new PartitionNames(true); + :} | KW_PARTITIONS KW_WITH KW_RECENT INTEGER_LITERAL:count {: RESULT = new PartitionNames(count); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java index c6c95da9de0ed8..08efb31d6631fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java @@ -255,11 +255,15 @@ public Set getPartitionNames() { return partitions; } - public boolean isAllPartitions() { + /** + * @return for OLAP table, only in overwrite situation, overwrite auto detect partition + * for External table, all partitions. + */ + public boolean isStarPartition() { if (partitionNames == null) { return false; } - return partitionNames.isAllPartitions(); + return partitionNames.isStar(); } public long getPartitionCount() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertOverwriteTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertOverwriteTableStmt.java index 17cca1cecc5e06..3b4e651cc02071 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertOverwriteTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertOverwriteTableStmt.java @@ -73,6 +73,13 @@ public List getPartitionNames() { return target.getPartitionNames().getPartitionNames(); } + /* + * auto detect which partitions to replace. enable by partition(*) grammer + */ + public boolean isAutoDetectPartition() { + return target.getPartitionNames().isStar(); + } + @Override public void analyze(Analyzer analyzer) throws UserException { target.getTblName().analyze(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 3f36746444127e..1c4b13aa235c32 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -163,6 +163,7 @@ public class NativeInsertStmt extends InsertStmt { boolean hasEmptyTargetColumns = false; private boolean allowAutoPartition = true; + private boolean withAutoDetectOverwrite = false; enum InsertType { NATIVE_INSERT("insert_"), @@ -317,6 +318,11 @@ public boolean isTransactionBegin() { return isTransactionBegin; } + public NativeInsertStmt withAutoDetectOverwrite() { + this.withAutoDetectOverwrite = true; + return this; + } + protected void preCheckAnalyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionNames.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionNames.java index ca26a2978e0e54..f82f497176bec2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionNames.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionNames.java @@ -48,7 +48,7 @@ public class PartitionNames implements ParseNode, Writable { // true if these partitions are temp partitions @SerializedName(value = "isTemp") private final boolean isTemp; - private final boolean allPartitions; + private final boolean isStar; private final long count; // Default partition count to collect statistic for external table. private static final long DEFAULT_PARTITION_COUNT = 100; @@ -56,28 +56,28 @@ public class PartitionNames implements ParseNode, Writable { public PartitionNames(boolean isTemp, List partitionNames) { this.partitionNames = partitionNames; this.isTemp = isTemp; - this.allPartitions = false; + this.isStar = false; this.count = 0; } public PartitionNames(PartitionNames other) { this.partitionNames = Lists.newArrayList(other.partitionNames); this.isTemp = other.isTemp; - this.allPartitions = other.allPartitions; + this.isStar = other.isStar; this.count = 0; } - public PartitionNames(boolean allPartitions) { + public PartitionNames(boolean isStar) { this.partitionNames = null; this.isTemp = false; - this.allPartitions = allPartitions; + this.isStar = isStar; this.count = 0; } public PartitionNames(long partitionCount) { this.partitionNames = null; this.isTemp = false; - this.allPartitions = false; + this.isStar = false; this.count = partitionCount; } @@ -89,8 +89,12 @@ public boolean isTemp() { return isTemp; } - public boolean isAllPartitions() { - return allPartitions; + /** + * @return for OLAP table, only in overwrite situation, overwrite auto detect partition + * for External table, all partitions. + */ + public boolean isStar() { + return isStar; } public long getCount() { @@ -99,10 +103,10 @@ public long getCount() { @Override public void analyze(Analyzer analyzer) throws AnalysisException { - if (allPartitions && count > 0) { + if (isStar && count > 0) { throw new AnalysisException("All partition and partition count couldn't be set at the same time."); } - if (allPartitions || count > 0) { + if (isStar || count > 0) { return; } if (partitionNames == null || partitionNames.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 463b3b1f19d8e9..b6239f486cdeec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1106,6 +1106,14 @@ public Set getPartitionNames() { return Sets.newHashSet(nameToPartition.keySet()); } + public List uncheckedGetPartNamesById(List partitionIds) { + List names = new ArrayList(); + for (Long id : partitionIds) { + names.add(idToPartition.get(id).getName()); + } + return names; + } + public List getPartitionIds() { readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java index 9835efa531b2ef..32d52d7e7af100 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java @@ -26,6 +26,7 @@ import org.apache.doris.insertoverwrite.InsertOverwriteLog.InsertOverwriteOpType; import org.apache.doris.persist.gson.GsonUtils; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; @@ -34,10 +35,12 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.locks.ReentrantLock; public class InsertOverwriteManager extends MasterDaemon implements Writable { private static final Logger LOG = LogManager.getLogger(InsertOverwriteManager.class); @@ -47,6 +50,16 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable { @SerializedName(value = "tasks") private Map tasks = Maps.newConcurrentMap(); + // > + // for iot auto detect tasks. a txn will make many task by different rpc + @SerializedName(value = "taskGroups") + private Map> taskGroups = Maps.newConcurrentMap(); + // for one task group, there may be different requests about changing a partition to new. + // but we only change one time and save the relations in partitionPairs. they're protected by taskLocks + private Map taskLocks = Maps.newConcurrentMap(); + // > + private Map> partitionPairs = Maps.newConcurrentMap(); + public InsertOverwriteManager() { super("InsertOverwriteDropDirtyPartitions", CLEAN_INTERVAL_SECOND * 1000); } @@ -69,12 +82,85 @@ public long registerTask(long dbId, long tableId, List tempPartitionName return taskId; } + /** + * register insert overwrite task group for auto detect partition. + * it may have many tasks by FrontendService rpc deal. + * all of them will be involved in one txn.(success or fallback) + * + * @return group id, like a transaction id. + */ + public long preRegisterTask() { + long groupId = Env.getCurrentEnv().getNextId(); + taskGroups.put(groupId, new ArrayList()); + taskLocks.put(groupId, new ReentrantLock()); + partitionPairs.put(groupId, Maps.newConcurrentMap()); + return groupId; + } + + /** + * for iot auto detect. register task first. then put in group. + */ + public void registerTaskInGroup(long groupId, long taskId) { + LOG.info("register task " + taskId + " in group " + groupId); + taskGroups.get(groupId).add(taskId); + } + + public List tryReplacePartitionIds(long groupId, List oldPartitionIds) { + Map relations = partitionPairs.get(groupId); + List newIds = new ArrayList(); + for (Long id : oldPartitionIds) { + if (relations.containsKey(id)) { + // if we replaced it. then return new one. + newIds.add(relations.get(id)); + } else { + // otherwise itself. we will deal it soon. + newIds.add(id); + } + } + return newIds; + } + + public void recordPartitionPairs(long groupId, List oldIds, List newIds) { + Map relations = partitionPairs.get(groupId); + Preconditions.checkArgument(oldIds.size() == newIds.size()); + for (int i = 0; i < oldIds.size(); i++) { + relations.put(oldIds.get(i), newIds.get(i)); + } + } + + public ReentrantLock getLock(long groupId) { + return taskLocks.get(groupId); + } + + public void taskGroupFail(long groupId) { + LOG.info("insert overwrite auto detect partition task group [" + groupId + "] failed"); + for (Long taskId : taskGroups.get(groupId)) { + taskFail(taskId); + } + cleanTaskGroup(groupId); + } + + public void taskGroupSuccess(long groupId) { + LOG.info("insert overwrite auto detect partition task group [" + groupId + "] succeed"); + for (Long taskId : taskGroups.get(groupId)) { + taskSuccess(taskId); + } + cleanTaskGroup(groupId); + } + + private void cleanTaskGroup(long groupId) { + partitionPairs.remove(groupId); + taskLocks.remove(groupId); + taskGroups.remove(groupId); + } + /** * when insert overwrite fail, try drop temp partition * * @param taskId */ public void taskFail(long taskId) { + LOG.info("insert overwrite task [" + taskId + "] failed"); boolean rollback = rollback(taskId); if (rollback) { removeTask(taskId); @@ -89,6 +175,7 @@ public void taskFail(long taskId) { * @param taskId */ public void taskSuccess(long taskId) { + LOG.info("insert overwrite task [" + taskId + "] succeed"); removeTask(taskId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java index af906d2653e7d9..54f9895ab2c977 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java @@ -83,7 +83,7 @@ public static void replacePartition(OlapTable olapTable, List partitionN } /** - * generate temp partitionName + * generate temp partitionName. must keep same order. * * @param partitionNames * @return diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java index 2a4416686cc9c3..8ecd417691de02 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java @@ -50,6 +50,7 @@ public class UnboundTableSink extends UnboundLogicalSin private final List partitions; private final boolean isPartialUpdate; private final DMLCommandType dmlCommandType; + private final boolean autoDetectPartition; public UnboundTableSink(List nameParts, List colNames, List hints, List partitions, CHILD_TYPE child) { @@ -70,6 +71,25 @@ public UnboundTableSink(List nameParts, List colNames, List nameParts, List colNames, List hints, + boolean temporaryPartition, List partitions, boolean isAutoDetectPartition, + boolean isPartialUpdate, DMLCommandType dmlCommandType, + Optional groupExpression, Optional logicalProperties, + CHILD_TYPE child) { + super(nameParts, PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, ImmutableList.of(), groupExpression, + logicalProperties, colNames, dmlCommandType, child); + this.hints = Utils.copyRequiredList(hints); + this.temporaryPartition = temporaryPartition; + this.partitions = Utils.copyRequiredList(partitions); + this.autoDetectPartition = isAutoDetectPartition; this.isPartialUpdate = isPartialUpdate; this.dmlCommandType = dmlCommandType; } @@ -78,6 +98,10 @@ public boolean isTemporaryPartition() { return temporaryPartition; } + public boolean isAutoDetectPartition() { + return autoDetectPartition; + } + public List getPartitions() { return partitions; } @@ -93,8 +117,8 @@ public boolean isPartialUpdate() { @Override public Plan withChildren(List children) { Preconditions.checkArgument(children.size() == 1, "UnboundOlapTableSink only accepts one child"); - return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, isPartialUpdate, - dmlCommandType, groupExpression, Optional.empty(), children.get(0)); + return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, autoDetectPartition, + isPartialUpdate, dmlCommandType, groupExpression, Optional.empty(), children.get(0)); } @Override @@ -134,14 +158,14 @@ public int hashCode() { @Override public Plan withGroupExpression(Optional groupExpression) { - return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, isPartialUpdate, - dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); + return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, autoDetectPartition, + isPartialUpdate, dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { - return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, + return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, autoDetectPartition, isPartialUpdate, dmlCommandType, groupExpression, logicalProperties, children.get(0)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java index 335d2f58035d62..974bec90a2cdcc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java @@ -22,6 +22,8 @@ import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.hive.HMSExternalCatalog; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.exceptions.ParseException; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; @@ -29,6 +31,8 @@ import org.apache.doris.nereids.util.RelationUtil; import org.apache.doris.qe.ConnectContext; +import com.google.common.collect.ImmutableList; + import java.util.List; import java.util.Optional; @@ -71,4 +75,37 @@ public static LogicalSink createUnboundTableSink(List na } throw new RuntimeException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported."); } + + /** + * create unbound sink for DML plan with auto detect overwrite partition enable. + */ + public static LogicalSink createUnboundTableSinkMaybeOverwrite(List nameParts, + List colNames, List hints, boolean temporaryPartition, List partitions, + boolean isAutoDetectPartition, boolean isOverwrite, boolean isPartialUpdate, DMLCommandType dmlCommandType, + LogicalPlan plan) { + if (isAutoDetectPartition) { // partitions is null + if (!isOverwrite) { + throw new ParseException("ASTERISK is only supported in overwrite partition for OLAP table"); + } + temporaryPartition = false; + partitions = ImmutableList.of(); + } + + String catalogName = RelationUtil.getQualifierName(ConnectContext.get(), nameParts).get(0); + CatalogIf curCatalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName); + if (curCatalog instanceof InternalCatalog) { + return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, + isAutoDetectPartition, + isPartialUpdate, dmlCommandType, Optional.empty(), + Optional.empty(), plan); + } else if (curCatalog instanceof HMSExternalCatalog && !isAutoDetectPartition) { + return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, Optional.empty(), Optional.empty(), plan); + } + throw new AnalysisException( + "Auto overwrite data to " + curCatalog.getClass().getSimpleName() + " is not supported." + + (isAutoDetectPartition + ? " PARTITION(*) is only supported in overwrite partition for OLAP table" + : "")); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index bb7f316defb936..2985664f8b818a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -527,14 +527,18 @@ public LogicalPlan visitInsertTable(InsertTableContext ctx) { Optional labelName = ctx.labelName == null ? Optional.empty() : Optional.of(ctx.labelName.getText()); List colNames = ctx.cols == null ? ImmutableList.of() : visitIdentifierList(ctx.cols); // TODO visit partitionSpecCtx - Pair> partitionSpec = visitPartitionSpec(ctx.partitionSpec()); LogicalPlan plan = visitQuery(ctx.query()); - LogicalSink sink = UnboundTableSinkCreator.createUnboundTableSink( + // partitionSpec may be NULL. means auto detect partition. only available when IOT + Pair> partitionSpec = visitPartitionSpec(ctx.partitionSpec()); + boolean isAutoDetect = partitionSpec.second == null; + LogicalSink sink = UnboundTableSinkCreator.createUnboundTableSinkMaybeOverwrite( tableName.build(), colNames, - ImmutableList.of(), - partitionSpec.first, - partitionSpec.second, + ImmutableList.of(), // hints + partitionSpec.first, // isTemp + partitionSpec.second, // partition names + isAutoDetect, + isOverwrite, ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(), DMLCommandType.INSERT, plan); @@ -567,7 +571,9 @@ public Pair> visitPartitionSpec(PartitionSpecContext ctx) boolean temporaryPartition = false; if (ctx != null) { temporaryPartition = ctx.TEMPORARY() != null; - if (ctx.partition != null) { + if (ctx.ASTERISK() != null) { + partitions = null; + } else if (ctx.partition != null) { partitions = ImmutableList.of(ctx.partition.getText()); } else { partitions = visitIdentifierList(ctx.partitions); @@ -849,6 +855,10 @@ public LogicalPlan visitUpdate(UpdateContext ctx) { public LogicalPlan visitDelete(DeleteContext ctx) { List tableName = visitMultipartIdentifier(ctx.tableName); Pair> partitionSpec = visitPartitionSpec(ctx.partitionSpec()); + // TODO: now dont support delete auto detect partition. + if (partitionSpec == null) { + throw new ParseException("Now don't support auto detect partitions in deleting", ctx); + } LogicalPlan query = withTableAlias(LogicalPlanBuilderAssistant.withCheckPolicy( new UnboundRelation(StatementScopeIdGenerator.newRelationId(), tableName, partitionSpec.second, partitionSpec.first)), ctx.tableAlias()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 3e4ed3afd78d9d..8bf454f0980d2e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -162,6 +162,7 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor throw new AnalysisException("group commit is not supported in Nereids now"); } OlapTable olapTable = (OlapTable) targetTableIf; + // the insertCtx contains some variables to adjust SinkNode insertExecutor = new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx); boolean isEnableMemtableOnSinkNode = olapTable.getTableProperty().getUseSchemaLightChange() diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index 788871c744e384..0741982c968fc9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -87,6 +87,10 @@ public void setLabelName(Optional labelName) { this.labelName = labelName; } + public boolean isAutoDetectOverwrite() { + return ((UnboundTableSink) this.logicalQuery).isAutoDetectPartition(); + } + @Override public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { if (!ctx.getSessionVariable().isEnableNereidsDML()) { @@ -134,17 +138,32 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { if (CollectionUtils.isEmpty(partitionNames)) { partitionNames = Lists.newArrayList(targetTable.getPartitionNames()); } - List tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames); - long taskId = Env.getCurrentEnv().getInsertOverwriteManager() - .registerTask(targetTable.getDatabase().getId(), targetTable.getId(), tempPartitionNames); + + long taskId = 0; try { - InsertOverwriteUtil.addTempPartitions(targetTable, partitionNames, tempPartitionNames); - insertInto(ctx, executor, tempPartitionNames); - InsertOverwriteUtil.replacePartition(targetTable, partitionNames, tempPartitionNames); - Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId); + if (isAutoDetectOverwrite()) { + // taskId here is a group id. it contains all replace tasks made and registered in rpc process. + taskId = Env.getCurrentEnv().getInsertOverwriteManager().preRegisterTask(); + // When inserting, BE will call to replace partition by FrontendService. FE do the real + // add&replacement and return replace result. So there's no need to do anything else. + insertInto(ctx, executor, taskId); + Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId); + } else { + List tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames); + taskId = Env.getCurrentEnv().getInsertOverwriteManager() + .registerTask(targetTable.getDatabase().getId(), targetTable.getId(), tempPartitionNames); + InsertOverwriteUtil.addTempPartitions(targetTable, partitionNames, tempPartitionNames); + insertInto(ctx, executor, tempPartitionNames); + InsertOverwriteUtil.replacePartition(targetTable, partitionNames, tempPartitionNames); + Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId); + } } catch (Exception e) { LOG.warn("insert into overwrite failed"); - Env.getCurrentEnv().getInsertOverwriteManager().taskFail(taskId); + if (isAutoDetectOverwrite()) { + Env.getCurrentEnv().getInsertOverwriteManager().taskGroupFail(taskId); + } else { + Env.getCurrentEnv().getInsertOverwriteManager().taskFail(taskId); + } throw e; } finally { ConnectContext.get().setSkipAuth(false); @@ -152,14 +171,15 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { } /** - * insert into select + * insert into select. for sepecified temp partitions * - * @param ctx ctx - * @param executor executor + * @param ctx ctx + * @param executor executor * @param tempPartitionNames tempPartitionNames */ private void insertInto(ConnectContext ctx, StmtExecutor executor, List tempPartitionNames) throws Exception { + // copy sink tot replace by tempPartitions UnboundLogicalSink copySink; InsertCommandContext insertCtx; if (logicalQuery instanceof UnboundTableSink) { @@ -173,9 +193,9 @@ private void insertInto(ConnectContext ctx, StmtExecutor executor, List sink.isPartialUpdate(), sink.getDMLCommandType(), (LogicalPlan) (sink.child(0))); - // for overwrite situation, we disable auto create partition. - insertCtx = new OlapInsertCommandContext(); - ((OlapInsertCommandContext) insertCtx).setAllowAutoPartition(false); + // 1. for overwrite situation, we disable auto create partition. + // 2. we save and pass overwrite auto detect by insertCtx + insertCtx = new OlapInsertCommandContext(false); } else if (logicalQuery instanceof UnboundHiveTableSink) { UnboundHiveTableSink sink = (UnboundHiveTableSink) logicalQuery; copySink = (UnboundLogicalSink) UnboundTableSinkCreator.createUnboundTableSink( @@ -202,6 +222,26 @@ private void insertInto(ConnectContext ctx, StmtExecutor executor, List } } + /** + * insert into auto detect partition. + * + * @param ctx ctx + * @param executor executor + */ + private void insertInto(ConnectContext ctx, StmtExecutor executor, long groupId) throws Exception { + UnboundTableSink sink = (UnboundTableSink) logicalQuery; + // 1. for overwrite situation, we disable auto create partition. + // 2. we save and pass overwrite auto detect by insertCtx + OlapInsertCommandContext insertCtx = new OlapInsertCommandContext(false, sink.isAutoDetectPartition(), groupId); + InsertIntoTableCommand insertCommand = new InsertIntoTableCommand(sink, labelName, Optional.of(insertCtx)); + insertCommand.run(ctx, executor); + if (ctx.getState().getStateType() == MysqlStateType.ERR) { + String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage()); + LOG.warn("InsertInto state error:{}", errMsg); + throw new UserException(errMsg); + } + } + @Override public Plan getExplainPlan(ConnectContext ctx) { return InsertUtils.getPlanForExplain(ctx, this.logicalQuery); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertCommandContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertCommandContext.java index 23dd8d13d9182f..bebade142d97e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertCommandContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertCommandContext.java @@ -21,12 +21,32 @@ * For Olap Table */ public class OlapInsertCommandContext extends InsertCommandContext { - private boolean allowAutoPartition = true; + private boolean allowAutoPartition; + private boolean autoDetectOverwrite = false; + private long overwriteGroupId = 0; + + public OlapInsertCommandContext(boolean allowAutoPartition) { + this.allowAutoPartition = allowAutoPartition; + } + + public OlapInsertCommandContext(boolean allowAutoPartition, boolean autoDetectOverwrite, long overwriteGroupId) { + this.allowAutoPartition = allowAutoPartition; + this.autoDetectOverwrite = autoDetectOverwrite; + this.overwriteGroupId = overwriteGroupId; + } public boolean isAllowAutoPartition() { return allowAutoPartition; } + public boolean isAutoDetectOverwrite() { + return autoDetectOverwrite; + } + + public long getOverwriteGroupId() { + return overwriteGroupId; + } + public void setAllowAutoPartition(boolean allowAutoPartition) { this.allowAutoPartition = allowAutoPartition; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java index 0598b0d46f6b2c..270800b6afec19 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java @@ -113,7 +113,7 @@ public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink phys OlapTableSink olapTableSink = (OlapTableSink) sink; PhysicalOlapTableSink physicalOlapTableSink = (PhysicalOlapTableSink) physicalSink; OlapInsertCommandContext olapInsertCtx = (OlapInsertCommandContext) insertCtx.orElse( - new OlapInsertCommandContext()); + new OlapInsertCommandContext(true)); boolean isStrictMode = ctx.getSessionVariable().getEnableInsertStrict() && physicalOlapTableSink.isPartialUpdate() @@ -127,10 +127,15 @@ public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink phys false, isStrictMode, timeout); + // complete and set commands both modify thrift struct olapTableSink.complete(new Analyzer(Env.getCurrentEnv(), ctx)); if (!olapInsertCtx.isAllowAutoPartition()) { olapTableSink.setAutoPartition(false); } + if (olapInsertCtx.isAutoDetectOverwrite()) { + olapTableSink.setAutoDetectOverwite(true); + olapTableSink.setOverwriteGroupId(olapInsertCtx.getOverwriteGroupId()); + } // update // set schema and partition info for tablet id shuffle exchange diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 19c6e7e1d5a752..aa3c0b0d555e50 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -190,6 +190,14 @@ public void setAutoPartition(boolean var) { tDataSink.getOlapTableSink().getPartition().setEnableAutomaticPartition(var); } + public void setAutoDetectOverwite(boolean var) { + tDataSink.getOlapTableSink().getPartition().setEnableAutoDetectOverwrite(var); + } + + public void setOverwriteGroupId(long var) { + tDataSink.getOlapTableSink().getPartition().setOverwriteGroupId(var); + } + // must called after tupleDescriptor is computed public void complete(Analyzer analyzer) throws UserException { for (Long partitionId : partitionIds) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 72073e1e8478e4..820513132c5f3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -2794,7 +2794,10 @@ private void handleIotStmt() { ConnectContext.get().setSkipAuth(true); try { InsertOverwriteTableStmt iotStmt = (InsertOverwriteTableStmt) this.parsedStmt; - if (iotStmt.getPartitionNames().size() == 0) { + if (iotStmt.isAutoDetectPartition()) { + // insert overwrite table auto detect which partitions need to replace + handleAutoOverwritePartition(iotStmt); + } else if (iotStmt.getPartitionNames().size() == 0) { // insert overwrite table handleOverwriteTable(iotStmt); } else { @@ -2943,6 +2946,26 @@ private void handleOverwritePartition(InsertOverwriteTableStmt iotStmt) { } } + /* + * TODO: support insert overwrite auto detect partition in legacy planner + */ + private void handleAutoOverwritePartition(InsertOverwriteTableStmt iotStmt) { + // TODO: + TableName targetTableName = new TableName(null, iotStmt.getDb(), iotStmt.getTbl()); + try { + parsedStmt = new NativeInsertStmt(targetTableName, null, new LabelName(iotStmt.getDb(), iotStmt.getLabel()), + iotStmt.getQueryStmt(), iotStmt.getHints(), iotStmt.getCols(), true).withAutoDetectOverwrite(); + parsedStmt.setUserInfo(context.getCurrentUserIdentity()); + execute(); + } catch (Exception e) { + LOG.warn("IOT insert data error, stmt={}", parsedStmt.toSql(), e); + context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage()); + handleIotRollback(targetTableName); + return; + } + + } + private void handleIotRollback(TableName table) { // insert error drop the tmp table DropTableStmt dropTableStmt = new DropTableStmt(true, table, true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 04b7ea43997d59..dc058156b603a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -72,6 +72,8 @@ import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalDatabase; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.insertoverwrite.InsertOverwriteManager; +import org.apache.doris.insertoverwrite.InsertOverwriteUtil; import org.apache.doris.load.routineload.ErrorReason; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.load.routineload.RoutineLoadJob.JobState; @@ -200,6 +202,8 @@ import org.apache.doris.thrift.TPrivilegeType; import org.apache.doris.thrift.TQueryStatsResult; import org.apache.doris.thrift.TQueryType; +import org.apache.doris.thrift.TReplacePartitionRequest; +import org.apache.doris.thrift.TReplacePartitionResult; import org.apache.doris.thrift.TReplicaInfo; import org.apache.doris.thrift.TReportCommitTxnResultRequest; import org.apache.doris.thrift.TReportExecStatusParams; @@ -267,7 +271,9 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; +import java.util.stream.IntStream; // Frontend service used to serve all request for this frontend through // thrift protocol @@ -3552,6 +3558,166 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t return result; } + @Override + public TReplacePartitionResult replacePartition(TReplacePartitionRequest request) throws TException { + LOG.info("Receive create partition request: {}", request); + long dbId = request.getDbId(); + long tableId = request.getTableId(); + List partitionIds = request.getPartitionIds(); + long taskGroupId = request.getOverwriteGroupId(); + TReplacePartitionResult result = new TReplacePartitionResult(); + TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR); + if (!Env.getCurrentEnv().isMaster()) { + errorStatus.setStatusCode(TStatusCode.NOT_MASTER); + errorStatus.addToErrorMsgs(NOT_MASTER_ERR_MSG); + LOG.warn("failed to createPartition: {}", NOT_MASTER_ERR_MSG); + return result; + } + + Database db = Env.getCurrentEnv().getInternalCatalog().getDbNullable(dbId); + if (db == null) { + errorStatus.setErrorMsgs(Lists.newArrayList(String.format("dbId=%d is not exists", dbId))); + result.setStatus(errorStatus); + LOG.warn("send replace partition error status: {}", result); + return result; + } + + Table table = db.getTable(tableId).get(); + if (table == null) { + errorStatus.setErrorMsgs( + (Lists.newArrayList(String.format("dbId=%d tableId=%d is not exists", dbId, tableId)))); + result.setStatus(errorStatus); + LOG.warn("send replace partition error status: {}", result); + return result; + } + + if (!(table instanceof OlapTable)) { + errorStatus.setErrorMsgs( + Lists.newArrayList(String.format("dbId=%d tableId=%d is not olap table", dbId, tableId))); + result.setStatus(errorStatus); + LOG.warn("send replace partition error status: {}", result); + return result; + } + + OlapTable olapTable = (OlapTable) table; + InsertOverwriteManager overwriteManager = Env.getCurrentEnv().getInsertOverwriteManager(); + ReentrantLock taskLock = overwriteManager.getLock(taskGroupId); + List allReqPartNames; // all request partitions + try { + taskLock.lock(); + // we dont lock the table. other thread in this txn will be controled by taskLock. + // if we have already replaced. dont do it again, but acquire the recorded new partition directly. + // if not by this txn, just let it fail naturally is ok. + List replacedPartIds = overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds); + // here if replacedPartIds still have null. this will throw exception. + allReqPartNames = olapTable.uncheckedGetPartNamesById(replacedPartIds); + + List pendingPartitionIds = IntStream.range(0, partitionIds.size()) + .filter(i -> partitionIds.get(i) == replacedPartIds.get(i)) // equal means not replaced + .mapToObj(partitionIds::get) + .collect(Collectors.toList()); + // from here we ONLY deal the pending partitions. not include the dealed(by others). + if (!pendingPartitionIds.isEmpty()) { + // below two must have same order inner. + List pendingPartitionNames = olapTable.uncheckedGetPartNamesById(pendingPartitionIds); + List tempPartitionNames = InsertOverwriteUtil + .generateTempPartitionNames(pendingPartitionNames); + + long taskId = overwriteManager.registerTask(dbId, tableId, tempPartitionNames); + overwriteManager.registerTaskInGroup(taskGroupId, taskId); + InsertOverwriteUtil.addTempPartitions(olapTable, pendingPartitionNames, tempPartitionNames); + InsertOverwriteUtil.replacePartition(olapTable, pendingPartitionNames, tempPartitionNames); + // now temp partitions are bumped up and use new names. we get their ids and record them. + List newPartitionIds = new ArrayList(); + for (String newPartName : pendingPartitionNames) { + newPartitionIds.add(olapTable.getPartition(newPartName).getId()); + } + overwriteManager.recordPartitionPairs(taskGroupId, pendingPartitionIds, newPartitionIds); + if (LOG.isDebugEnabled()) { + LOG.debug("partition replacement: "); + for (int i = 0; i < pendingPartitionIds.size(); i++) { + LOG.debug("[" + pendingPartitionIds.get(i) + ", " + newPartitionIds.get(i) + "], "); + } + } + } + } catch (DdlException ex) { + errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage())); + result.setStatus(errorStatus); + LOG.warn("send create partition error status: {}", result); + return result; + } finally { + taskLock.unlock(); + } + + // build partition & tablets. now all partitions in allReqPartNames are replaced an recorded. + // so they won't be changed again. if other transaction changing it. just let it fail. + List partitions = Lists.newArrayList(); + List tablets = Lists.newArrayList(); + PartitionInfo partitionInfo = olapTable.getPartitionInfo(); + for (String partitionName : allReqPartNames) { + Partition partition = table.getPartition(partitionName); + TOlapTablePartition tPartition = new TOlapTablePartition(); + tPartition.setId(partition.getId()); + + // set partition keys + int partColNum = partitionInfo.getPartitionColumns().size(); + OlapTableSink.setPartitionKeys(tPartition, partitionInfo.getItem(partition.getId()), partColNum); + for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) { + tPartition.addToIndexes(new TOlapTableIndexTablets(index.getId(), Lists.newArrayList( + index.getTablets().stream().map(Tablet::getId).collect(Collectors.toList())))); + tPartition.setNumBuckets(index.getTablets().size()); + } + tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId())); + partitions.add(tPartition); + // tablet + int quorum = olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2 + + 1; + for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) { + for (Tablet tablet : index.getTablets()) { + // we should ensure the replica backend is alive + // otherwise, there will be a 'unknown node id, id=xxx' error for stream load + // BE id -> path hash + Multimap bePathsMap; + try { + if (Config.isCloudMode() && request.isSetBeEndpoint()) { + bePathsMap = ((CloudTablet) tablet) + .getNormalReplicaBackendPathMapCloud(request.be_endpoint); + } else { + bePathsMap = tablet.getNormalReplicaBackendPathMap(); + } + } catch (UserException ex) { + errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage())); + result.setStatus(errorStatus); + LOG.warn("send create partition error status: {}", result); + return result; + } + if (bePathsMap.keySet().size() < quorum) { + LOG.warn("auto go quorum exception"); + } + tablets.add(new TTabletLocation(tablet.getId(), Lists.newArrayList(bePathsMap.keySet()))); + } + } + } + result.setPartitions(partitions); + result.setTablets(tablets); + + // build nodes + List nodeInfos = Lists.newArrayList(); + SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); + for (Long id : systemInfoService.getAllBackendIds(false)) { + Backend backend = systemInfoService.getBackend(id); + nodeInfos.add(new TNodeInfo(backend.getId(), 0, backend.getHost(), backend.getBrpcPort())); + } + result.setNodes(nodeInfos); + + // successfully return + result.setStatus(new TStatus(TStatusCode.OK)); + if (LOG.isDebugEnabled()) { + LOG.debug("send replace partition result: {}", result); + } + return result; + } + public TGetMetaResult getMeta(TGetMetaRequest request) throws TException { String clientAddr = getClientAddrAsString(); if (LOG.isDebugEnabled()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index f90ce83a088185..258c33305afc4d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -348,7 +348,7 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlExceptio Set partitionNames = stmt.getPartitionNames(); boolean partitionOnly = stmt.isPartitionOnly(); boolean isSamplingPartition = stmt.isSamplingPartition(); - boolean isAllPartition = stmt.isAllPartitions(); + boolean isAllPartition = stmt.isStarPartition(); long partitionCount = stmt.getPartitionCount(); int samplePercent = stmt.getSamplePercent(); int sampleRows = stmt.getSampleRows(); diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 76c8b1b7b068ec..895feb8b5aa2c4 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -205,6 +205,9 @@ struct TOlapTablePartitionParam { 8: optional list partition_function_exprs 9: optional bool enable_automatic_partition 10: optional Partitions.TPartitionType partition_type + // insert overwrite partition(*) + 11: optional bool enable_auto_detect_overwrite + 12: optional i64 overwrite_group_id } struct TOlapTableIndex { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index c7bdbb4fc0ed03..b1dcb4defd7d5f 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1283,6 +1283,23 @@ struct TCreatePartitionResult { 4: optional list nodes } +// these two for auto detect replacing partition +struct TReplacePartitionRequest { + 1: optional i64 overwrite_group_id + 2: optional i64 db_id + 3: optional i64 table_id + 4: optional list partition_ids // partition to replace. + // be_endpoint = : to distinguish a particular BE + 5: optional string be_endpoint +} + +struct TReplacePartitionResult { + 1: optional Status.TStatus status + 2: optional list partitions + 3: optional list tablets + 4: optional list nodes +} + struct TGetMetaReplica { 1: optional i64 id } @@ -1498,6 +1515,8 @@ service FrontendService { TAutoIncrementRangeResult getAutoIncrementRange(1: TAutoIncrementRangeRequest request) TCreatePartitionResult createPartition(1: TCreatePartitionRequest request) + // insert overwrite partition(*) + TReplacePartitionResult replacePartition(1: TReplacePartitionRequest request) TGetMetaResult getMeta(1: TGetMetaRequest request) diff --git a/regression-test/data/insert_overwrite_p0/insert_overwrite_auto_detect.out b/regression-test/data/insert_overwrite_p0/insert_overwrite_auto_detect.out new file mode 100644 index 00000000000000..3cde86880473dc --- /dev/null +++ b/regression-test/data/insert_overwrite_p0/insert_overwrite_auto_detect.out @@ -0,0 +1,83 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 +2 +15 +100 +200 + +-- !sql -- +3 +15 +1234 + +-- !sql -- +1 +2 +15 +333 +444 +555 + +-- !sql -- +-100 +-100 +15 +333 +444 +555 + +-- !sql -- +-100 +-100 +15 +333 +444 +555 + +-- !sql -- +1234567 +Beijing +Shanghai +list +xxx + +-- !sql -- +1234567 +BEIJING +Shanghai +list +xxx + +-- !sql -- +7654321 +7654321 +7654321 +BEIJING +Shanghai +list +xxx + +-- !sql -- +7654321 +BEIJING +LIST +LIST +Shanghai +list +list +xxx + +-- !sql -- +7654321 +BEIJING +LIST +SHANGHAI +XXX + +-- !sql -- +2008-01-01 +2008-02-02 +2013-02-02 +2022-03-03 + diff --git a/regression-test/suites/insert_overwrite_p0/insert_overwrite_auto_detect.groovy b/regression-test/suites/insert_overwrite_p0/insert_overwrite_auto_detect.groovy new file mode 100644 index 00000000000000..367aaa9d536ec5 --- /dev/null +++ b/regression-test/suites/insert_overwrite_p0/insert_overwrite_auto_detect.groovy @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iot_auto_detect") { + sql """set enable_nereids_planner = true""" + sql """set enable_fallback_to_original_planner = false""" + sql """set enable_nereids_dml = true""" + + sql " drop table if exists range1; " + sql """ + create table range1( + k0 int null + ) + partition by range (k0) + ( + PARTITION p10 values less than (10), + PARTITION p100 values less than (100), + PARTITION pMAX values less than (maxvalue) + ) + DISTRIBUTED BY HASH(`k0`) BUCKETS 1 + properties("replication_num" = "1"); + """ + sql " insert into range1 values (1), (2), (15), (100), (200); " + qt_sql " select * from range1 order by k0; " + sql " insert overwrite table range1 partition(*) values (3), (1234); " + qt_sql " select * from range1 order by k0; " + sql " insert overwrite table range1 partition(*) values (1), (2), (333), (444), (555); " + qt_sql " select * from range1 order by k0; " + sql " insert overwrite table range1 partition(*) values (-100), (-100), (333), (444), (555); " + qt_sql " select * from range1 order by k0; " + sql " insert into range1 values (-12345), (12345); " + sql " insert overwrite table range1 partition(*) values (-100), (-100), (333), (444), (555); " + qt_sql " select * from range1 order by k0; " + + sql " drop table if exists list1; " + sql """ + create table list1( + k0 varchar null + ) + partition by list (k0) + ( + PARTITION p1 values in (("Beijing"), ("BEIJING")), + PARTITION p2 values in (("Shanghai"), ("SHANGHAI")), + PARTITION p3 values in (("xxx"), ("XXX")), + PARTITION p4 values in (("list"), ("LIST")), + PARTITION p5 values in (("1234567"), ("7654321")) + ) + DISTRIBUTED BY HASH(`k0`) BUCKETS 1 + properties("replication_num" = "1"); + """ + sql """ insert into list1 values ("Beijing"),("Shanghai"),("xxx"),("list"),("1234567"); """ + qt_sql " select * from list1 order by k0; " + sql """ insert overwrite table list1 partition(*) values ("BEIJING"); """ + qt_sql " select * from list1 order by k0; " + sql """ insert overwrite table list1 partition(*) values ("7654321"), ("7654321"), ("7654321"); """ + qt_sql " select * from list1 order by k0; " + sql """ insert overwrite table list1 partition(*) values ("7654321"), ("list"), ("list"), ("LIST"), ("LIST"); """ + qt_sql " select * from list1 order by k0; " + sql """ insert overwrite table list1 partition(*) values ("BEIJING"), ("SHANGHAI"), ("XXX"), ("LIST"), ("7654321"); """ + qt_sql " select * from list1 order by k0; " + try { + sql """ insert overwrite table list1 partition(*) values ("BEIJING"), ("invalid"); """ + } catch (Exception e) { + log.info(e.getMessage()) + assertTrue(e.getMessage().contains('Insert has filtered data in strict mode') || + e.getMessage().contains('Cannot found origin partitions in auto detect overwriting')) + } + + sql " drop table if exists dt; " + sql """ + create table dt( + k0 date null + ) + partition by range (k0) + ( + PARTITION p10 values less than ("2010-01-01"), + PARTITION p100 values less than ("2020-01-01"), + PARTITION pMAX values less than ("2030-01-01") + ) + DISTRIBUTED BY HASH(`k0`) BUCKETS 1 + properties("replication_num" = "1"); + """ + sql """ insert into dt values ("2005-01-01"), ("2013-02-02"), ("2022-03-03"); """ + sql """ insert overwrite table dt partition(*) values ("2008-01-01"), ("2008-02-02"); """ + qt_sql " select * from dt order by k0; " + try { + sql """ insert overwrite table dt partition(*) values ("2023-02-02"), ("3000-12-12"); """ + } catch (Exception e) { + log.info(e.getMessage()) + assertTrue(e.getMessage().contains('Insert has filtered data in strict mode') || + e.getMessage().contains('Cannot found origin partitions in auto detect overwriting')) + } + +}