Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
356 changes: 231 additions & 125 deletions be/src/exec/tablet_info.cpp

Large diffs are not rendered by default.

31 changes: 21 additions & 10 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <utility>
#include <vector>

#include "common/logging.h"
#include "common/object_pool.h"
#include "common/status.h"
#include "runtime/descriptors.h"
Expand All @@ -40,7 +41,6 @@
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/exprs/vexpr_fwd.h"

namespace doris {
Expand Down Expand Up @@ -119,7 +119,7 @@ using OlapTableIndexTablets = TOlapTableIndexTablets;

using BlockRow = std::pair<vectorized::Block*, int32_t>;
using BlockRowWithIndicator =
std::tuple<vectorized::Block*, int32_t, bool>; // [block, column, is_transformed]
std::tuple<vectorized::Block*, int32_t, bool>; // [block, row, is_transformed]

struct VOlapTablePartition {
int64_t id = 0;
Expand All @@ -133,6 +133,7 @@ struct VOlapTablePartition {
int64_t load_tablet_idx = -1;

VOlapTablePartition(vectorized::Block* partition_block)
// the default value of partition bound is -1.
: start_key {partition_block, -1}, end_key {partition_block, -1} {}
};

Expand All @@ -145,7 +146,7 @@ class VOlapTablePartKeyComparator {

// return true if lhs < rhs
// 'row' is -1 mean maximal boundary
bool operator()(const BlockRowWithIndicator lhs, const BlockRowWithIndicator rhs) const;
bool operator()(const BlockRowWithIndicator& lhs, const BlockRowWithIndicator& rhs) const;

private:
const std::vector<uint16_t>& _slot_locs;
Expand All @@ -167,11 +168,14 @@ class VOlapTablePartitionParam {
int64_t version() const { return _t_param.version; }

// return true if we found this block_row in partition
//TODO: use virtual function to refactor it
ALWAYS_INLINE bool find_partition(vectorized::Block* block, int row,
VOlapTablePartition*& partition) const {
auto it = _is_in_partition ? _partitions_map->find(std::tuple {block, row, true})
: _partitions_map->upper_bound(std::tuple {block, row, true});
VLOG_TRACE << "find row " << row << " of\n"
<< block->dump_data() << "in:\n"
<< _partition_block.dump_data() << "result line row: " << std::get<1>(it->first);

// for list partition it might result in default partition
if (_is_in_partition) {
partition = (it != _partitions_map->end()) ? it->second : _default_partition;
Expand Down Expand Up @@ -246,9 +250,15 @@ class VOlapTablePartitionParam {
bool is_projection_partition() const { return _is_auto_partition; }
bool is_auto_partition() const { return _is_auto_partition; }

bool is_auto_detect_overwrite() const { return _is_auto_detect_overwrite; }
int64_t get_overwrite_group_id() const { return _overwrite_group_id; }

std::vector<uint16_t> get_partition_keys() const { return _partition_slot_locs; }

Status add_partitions(const std::vector<TOlapTablePartition>& partitions);
// no need to del/reinsert partition keys, but change the link. reset the _partitions items
Status replace_partitions(std::vector<int64_t>& old_partition_ids,
const std::vector<TOlapTablePartition>& new_partitions);

vectorized::VExprContextSPtrs get_part_func_ctx() { return _part_func_ctx; }
vectorized::VExprSPtrs get_partition_function() { return _partition_function; }
Expand All @@ -264,8 +274,6 @@ class VOlapTablePartitionParam {
private:
Status _create_partition_keys(const std::vector<TExprNode>& t_exprs, BlockRow* part_key);

Status _create_partition_key(const TExprNode& t_expr, BlockRow* part_key, uint16_t pos);

// check if this partition contain this key
bool _part_contains(VOlapTablePartition* part, BlockRowWithIndicator key) const;

Expand All @@ -284,6 +292,7 @@ class VOlapTablePartitionParam {
std::vector<VOlapTablePartition*> _partitions;
// For all partition value rows saved in this map, indicator is false. whenever we use a value to find in it, the param is true.
// so that we can distinguish which column index to use (origin slots or transformed slots).
// For range partition we ONLY SAVE RIGHT ENDS. when we find a part's RIGHT by a value, check if part's left cover it then.
std::unique_ptr<
std::map<BlockRowWithIndicator, VOlapTablePartition*, VOlapTablePartKeyComparator>>
_partitions_map;
Expand All @@ -293,11 +302,13 @@ class VOlapTablePartitionParam {
// only works when using list partition, the resource is owned by _partitions
VOlapTablePartition* _default_partition = nullptr;

// for auto partition, now only support 1 column. TODO: use vector to save them when we support multi column auto-partition.
bool _is_auto_partition = false;
vectorized::VExprContextSPtrs _part_func_ctx = {nullptr};
vectorized::VExprSPtrs _partition_function = {nullptr};
TPartitionType::type _part_type; // support list or range
// "insert overwrite partition(*)", detect which partitions by BE
bool _is_auto_detect_overwrite = false;
int64_t _overwrite_group_id = 0;
};

// indicate where's the tablet and all its replications (node-wise)
Expand Down Expand Up @@ -360,13 +371,13 @@ class DorisNodesInfo {
public:
DorisNodesInfo() = default;
DorisNodesInfo(const TPaloNodesInfo& t_nodes) {
for (auto& node : t_nodes.nodes) {
for (const auto& node : t_nodes.nodes) {
_nodes.emplace(node.id, node);
}
}
void setNodes(const TPaloNodesInfo& t_nodes) {
_nodes.clear();
for (auto& node : t_nodes.nodes) {
for (const auto& node : t_nodes.nodes) {
_nodes.emplace(node.id, node);
}
}
Expand All @@ -380,7 +391,7 @@ class DorisNodesInfo {

void add_nodes(const std::vector<TNodeInfo>& t_nodes) {
for (const auto& node : t_nodes) {
auto node_info = find_node(node.id);
const auto* node_info = find_node(node.id);
if (node_info == nullptr) {
_nodes.emplace(node.id, node);
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,10 @@ void StorageEngine::stop() {
_cold_data_compaction_thread_pool->shutdown();
}

if (_cooldown_thread_pool) {
_cooldown_thread_pool->shutdown();
}

_memtable_flush_executor.reset(nullptr);
_calc_delete_bitmap_executor.reset(nullptr);

Expand Down
34 changes: 9 additions & 25 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,30 +177,6 @@ void set_last_failure_time(Tablet* tablet, const Compaction& compaction, int64_t

} // namespace

struct WriteCooldownMetaExecutors {
WriteCooldownMetaExecutors(size_t executor_nums = 5);

static WriteCooldownMetaExecutors* get_instance() {
static WriteCooldownMetaExecutors instance;
return &instance;
}

void submit(TabletSharedPtr tablet);
size_t _get_executor_pos(int64_t tablet_id) const {
return std::hash<int64_t>()(tablet_id) % _executor_nums;
};
// Each executor is a mpsc to ensure uploads of the same tablet meta are not concurrent
// FIXME(AlexYue): Use mpsc instead of `ThreadPool` with 1 thread
// We use PriorityThreadPool since it would call status inside it's `shutdown` function.
// Consider one situation where the StackTraceCache's singleton is detructed before
// this WriteCooldownMetaExecutors's singleton, then invoking the status would also call
// StackTraceCache which would then result in heap use after free like #23834
std::vector<std::unique_ptr<PriorityThreadPool>> _executors;
std::unordered_set<int64_t> _pending_tablets;
std::mutex _latch;
size_t _executor_nums;
};

WriteCooldownMetaExecutors::WriteCooldownMetaExecutors(size_t executor_nums)
: _executor_nums(executor_nums) {
for (size_t i = 0; i < _executor_nums; i++) {
Expand All @@ -214,6 +190,14 @@ WriteCooldownMetaExecutors::WriteCooldownMetaExecutors(size_t executor_nums)
}
}

void WriteCooldownMetaExecutors::stop() {
for (auto& pool_ptr : _executors) {
if (pool_ptr) {
pool_ptr->shutdown();
}
}
}

void WriteCooldownMetaExecutors::WriteCooldownMetaExecutors::submit(TabletSharedPtr tablet) {
auto tablet_id = tablet->tablet_id();

Expand Down Expand Up @@ -2217,7 +2201,7 @@ Status check_version_continuity(const std::vector<RowsetMetaSharedPtr>& rs_metas
// It's guaranteed the write cooldown meta task would be invoked at the end unless BE crashes
// one tablet would at most have one async task to be done
void Tablet::async_write_cooldown_meta(TabletSharedPtr tablet) {
WriteCooldownMetaExecutors::get_instance()->submit(std::move(tablet));
ExecEnv::GetInstance()->write_cooldown_meta_executors()->submit(std::move(tablet));
}

bool Tablet::update_cooldown_conf(int64_t cooldown_term, int64_t cooldown_replica_id) {
Expand Down
25 changes: 21 additions & 4 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,13 @@
#include <cstddef>
#include <cstdint>
#include <functional>
#include <limits>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <ostream>
#include <set>
#include <shared_mutex>
#include <string>
#include <string_view>
#include <unordered_map>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -86,6 +82,27 @@ enum TabletStorageType { STORAGE_TYPE_LOCAL, STORAGE_TYPE_REMOTE, STORAGE_TYPE_R

static inline constexpr auto TRACE_TABLET_LOCK_THRESHOLD = std::chrono::seconds(1);

struct WriteCooldownMetaExecutors {
WriteCooldownMetaExecutors(size_t executor_nums = 5);

void stop();

void submit(TabletSharedPtr tablet);
size_t _get_executor_pos(int64_t tablet_id) const {
return std::hash<int64_t>()(tablet_id) % _executor_nums;
};
// Each executor is a mpsc to ensure uploads of the same tablet meta are not concurrent
// FIXME(AlexYue): Use mpsc instead of `ThreadPool` with 1 thread
// We use PriorityThreadPool since it would call status inside it's `shutdown` function.
// Consider one situation where the StackTraceCache's singleton is detructed before
// this WriteCooldownMetaExecutors's singleton, then invoking the status would also call
// StackTraceCache which would then result in heap use after free like #23834
std::vector<std::unique_ptr<PriorityThreadPool>> _executors;
std::unordered_set<int64_t> _pending_tablets;
std::mutex _latch;
size_t _executor_nums;
};

class Tablet final : public BaseTablet {
public:
Tablet(StorageEngine& engine, TabletMetaSharedPtr tablet_meta, DataDir* data_dir,
Expand Down
17 changes: 8 additions & 9 deletions be/src/olap/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,14 @@ constexpr bool is_numeric_type(const FieldType& field_type) {
}

// Util used to get string name of thrift enum item
#define EnumToString(enum_type, index, out) \
do { \
std::map<int, const char*>::const_iterator it = \
_##enum_type##_VALUES_TO_NAMES.find(index); \
if (it == _##enum_type##_VALUES_TO_NAMES.end()) { \
out = "NULL"; \
} else { \
out = it->second; \
} \
#define EnumToString(enum_type, index, out) \
do { \
auto it = _##enum_type##_VALUES_TO_NAMES.find(index); \
if (it == _##enum_type##_VALUES_TO_NAMES.end()) { \
out = "NULL"; \
} else { \
out = it->second; \
} \
} while (0)

struct RowLocation {
Expand Down
10 changes: 9 additions & 1 deletion be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include "olap/tablet_manager.h"
#include "runtime/fragment_mgr.h"
#include "runtime/frontend_info.h"
#include "time.h"
#include "util/debug_util.h"
#include "util/time.h"
#include "vec/sink/delta_writer_v2_pool.h"
Expand All @@ -45,6 +44,15 @@ ExecEnv::~ExecEnv() {
}

// TODO(plat1ko): template <class Engine>
#ifdef BE_TEST
void ExecEnv::set_storage_engine(std::unique_ptr<BaseStorageEngine>&& engine) {
_storage_engine = std::move(engine);
}
void ExecEnv::set_write_cooldown_meta_executors() {
_write_cooldown_meta_executors = std::make_unique<WriteCooldownMetaExecutors>();
}
#endif // BE_TEST

Result<BaseTabletSPtr> ExecEnv::get_tablet(int64_t tablet_id) {
BaseTabletSPtr tablet;
std::string err;
Expand Down
10 changes: 6 additions & 4 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,15 @@

#include <common/multi_version.h>

#include <algorithm>
#include <atomic>
#include <cstddef>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>

#include "common/status.h"
#include "olap/memtable_memory_limiter.h"
#include "olap/olap_define.h"
#include "olap/options.h"
#include "olap/rowset/segment_v2/inverted_index_writer.h"
#include "olap/tablet_fwd.h"
Expand All @@ -52,6 +48,7 @@ class BlockedTaskScheduler;
struct RuntimeFilterTimerQueue;
} // namespace pipeline
class WorkloadGroupMgr;
struct WriteCooldownMetaExecutors;
namespace io {
class FileCacheFactory;
} // namespace io
Expand Down Expand Up @@ -233,6 +230,9 @@ class ExecEnv {
MemTableMemoryLimiter* memtable_memory_limiter() { return _memtable_memory_limiter.get(); }
WalManager* wal_mgr() { return _wal_manager.get(); }
DNSCache* dns_cache() { return _dns_cache; }
WriteCooldownMetaExecutors* write_cooldown_meta_executors() {
return _write_cooldown_meta_executors.get();
}

#ifdef BE_TEST
void set_tmp_file_dir(std::unique_ptr<segment_v2::TmpFileDirs> tmp_file_dirs) {
Expand Down Expand Up @@ -263,6 +263,7 @@ class ExecEnv {
void set_dummy_lru_cache(std::shared_ptr<DummyLRUCache> dummy_lru_cache) {
this->_dummy_lru_cache = dummy_lru_cache;
}
void set_write_cooldown_meta_executors();

#endif
LoadStreamMapPool* load_stream_map_pool() { return _load_stream_map_pool.get(); }
Expand Down Expand Up @@ -396,6 +397,7 @@ class ExecEnv {
std::unique_ptr<vectorized::DeltaWriterV2Pool> _delta_writer_v2_pool;
std::shared_ptr<WalManager> _wal_manager;
DNSCache* _dns_cache = nullptr;
std::unique_ptr<WriteCooldownMetaExecutors> _write_cooldown_meta_executors;

std::mutex _frontends_lock;
// ip:brpc_port -> frontend_indo
Expand Down
9 changes: 5 additions & 4 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,16 @@
#include <cstdlib>
#include <cstring>
#include <limits>
#include <map>
#include <memory>
#include <ostream>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "io/cache/block/block_file_cache_factory.h"
#include "io/fs/file_meta_cache.h"
#include "io/fs/s3_file_bufferpool.h"
#include "olap/memtable_memory_limiter.h"
#include "olap/olap_define.h"
#include "olap/options.h"
Expand Down Expand Up @@ -234,6 +230,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
_delta_writer_v2_pool = std::make_unique<vectorized::DeltaWriterV2Pool>();
_wal_manager = WalManager::create_shared(this, config::group_commit_wal_path);
_dns_cache = new DNSCache();
_write_cooldown_meta_executors = std::make_unique<WriteCooldownMetaExecutors>();
_spill_stream_mgr = new vectorized::SpillStreamManager(spill_store_paths);

_backend_client_cache->init_metrics("backend");
Expand Down Expand Up @@ -567,8 +564,11 @@ void ExecEnv::destroy() {
_memtable_memory_limiter.reset();
_delta_writer_v2_pool.reset();
_load_stream_map_pool.reset();

SAFE_STOP(_storage_engine);
SAFE_STOP(_write_cooldown_meta_executors);
SAFE_STOP(_spill_stream_mgr);

SAFE_SHUTDOWN(_buffered_reader_prefetch_thread_pool);
SAFE_SHUTDOWN(_s3_file_upload_thread_pool);
SAFE_SHUTDOWN(_join_node_thread_pool);
Expand Down Expand Up @@ -625,6 +625,7 @@ void ExecEnv::destroy() {
_buffered_reader_prefetch_thread_pool.reset(nullptr);
_s3_file_upload_thread_pool.reset(nullptr);
_send_batch_thread_pool.reset(nullptr);
_write_cooldown_meta_executors.reset(nullptr);

SAFE_DELETE(_broker_client_cache);
SAFE_DELETE(_frontend_client_cache);
Expand Down
Loading