Skip to content

Commit

Permalink
Merge branch 'master' into nereids_alter_database_set_quota
Browse files Browse the repository at this point in the history
vinlee19 authored Jan 10, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
2 parents 9b4dcbd + 7a6c125 commit 60530f5
Showing 1,352 changed files with 32,288 additions and 8,136 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/code-checks.yml
Original file line number Diff line number Diff line change
@@ -50,7 +50,7 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
sh_checker_comment: true
sh_checker_exclude: .git .github ^docker ^thirdparty/src ^thirdparty/installed ^ui ^docs/node_modules ^tools/clickbench-tools ^extension ^output ^fs_brokers/apache_hdfs_broker/output (^|.*/)Dockerfile$ ^be/src/apache-orc ^be/src/clucene ^pytest ^samples
sh_checker_exclude: .git .github ^docker/compilation ^docker/runtime ^thirdparty/src ^thirdparty/installed ^ui ^docs/node_modules ^tools/clickbench-tools ^extension ^output ^fs_brokers/apache_hdfs_broker/output (^|.*/)Dockerfile$ ^be/src/apache-orc ^be/src/clucene ^pytest ^samples

preparation:
name: "Clang Tidy Preparation"
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -202,7 +202,7 @@ See how to compile 🔗[Compilation](https://doris.apache.org/docs/dev/install/

### 📮 Install

See how to install and deploy 🔗[Installation and deployment](https://doris.apache.org/docs/dev/install/standard-deployment)
See how to install and deploy 🔗[Installation and deployment](https://doris.apache.org/docs/dev/install/cluster-deployment/standard-deployment)

## 🧩 Components

2 changes: 2 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
@@ -454,6 +454,7 @@ bvar::Adder<uint64_t> CLONE_count("task", "CLONE");
bvar::Adder<uint64_t> STORAGE_MEDIUM_MIGRATE_count("task", "STORAGE_MEDIUM_MIGRATE");
bvar::Adder<uint64_t> GC_BINLOG_count("task", "GC_BINLOG");
bvar::Adder<uint64_t> UPDATE_VISIBLE_VERSION_count("task", "UPDATE_VISIBLE_VERSION");
bvar::Adder<uint64_t> CALCULATE_DELETE_BITMAP_count("task", "CALCULATE_DELETE_BITMAP");

void add_task_count(const TAgentTaskRequest& task, int n) {
// clang-format off
@@ -481,6 +482,7 @@ void add_task_count(const TAgentTaskRequest& task, int n) {
ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE)
ADD_TASK_COUNT(GC_BINLOG)
ADD_TASK_COUNT(UPDATE_VISIBLE_VERSION)
ADD_TASK_COUNT(CALCULATE_DELETE_BITMAP)
#undef ADD_TASK_COUNT
case TTaskType::REALTIME_PUSH:
case TTaskType::PUSH:
146 changes: 9 additions & 137 deletions be/src/cloud/cloud_cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@
#include "common/config.h"
#include "common/logging.h"
#include "cpp/sync_point.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/olap_common.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
@@ -221,146 +222,17 @@ int64_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
size_t* compaction_score, bool allow_delete) {
if (tablet->tablet_state() == TABLET_NOTREADY) {
return 0;
}

input_rowsets->clear();
int64_t compaction_goal_size_mbytes =
tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();

int transient_size = 0;
*compaction_score = 0;
int64_t total_size = 0;

for (const auto& rowset : candidate_rowsets) {
// check whether this rowset is delete version
if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) {
*last_delete_version = rowset->version();
if (!input_rowsets->empty()) {
// we meet a delete version, and there were other versions before.
// we should compact those version before handling them over to base compaction
break;
} else {
// we meet a delete version, and no other versions before, skip it and continue
input_rowsets->clear();
*compaction_score = 0;
transient_size = 0;
total_size = 0;
continue;
}
}

*compaction_score += rowset->rowset_meta()->get_compaction_score();
total_size += rowset->rowset_meta()->total_disk_size();

transient_size += 1;
input_rowsets->push_back(rowset);

// Condition 1: the size of input files for compaction meets the requirement of parameter compaction_goal_size
if (total_size >= (compaction_goal_size_mbytes * 1024 * 1024)) {
if (input_rowsets->size() == 1 &&
!input_rowsets->front()->rowset_meta()->is_segments_overlapping()) {
// Only 1 non-overlapping rowset, skip it
input_rowsets->clear();
*compaction_score = 0;
total_size = 0;
continue;
}
return transient_size;
} else if (
*compaction_score >=
config::compaction_max_rowset_count) { // If the number of rowsets is too large: FDB_ERROR_CODE_TXN_TOO_LARGE
return transient_size;
}
}

// if there is delete version, do compaction directly
if (last_delete_version->first != -1) {
// if there is only one rowset and not overlapping,
// we do not need to do cumulative compaction
if (input_rowsets->size() == 1 &&
!input_rowsets->front()->rowset_meta()->is_segments_overlapping()) {
input_rowsets->clear();
*compaction_score = 0;
}
return transient_size;
}

// Condition 2: the number of input files reaches the threshold specified by parameter compaction_file_count_threshold
if (*compaction_score >= tablet->tablet_meta()->time_series_compaction_file_count_threshold()) {
return transient_size;
}

// Condition 3: level1 achieve compaction_goal_size
std::vector<RowsetSharedPtr> level1_rowsets;
if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
int64_t continuous_size = 0;
for (const auto& rowset : candidate_rowsets) {
const auto& rs_meta = rowset->rowset_meta();
if (rs_meta->compaction_level() == 0) {
break;
}
level1_rowsets.push_back(rowset);
continuous_size += rs_meta->total_disk_size();
if (level1_rowsets.size() >= 2) {
if (continuous_size >= compaction_goal_size_mbytes * 1024 * 1024) {
input_rowsets->swap(level1_rowsets);
return input_rowsets->size();
}
}
}
}

int64_t now = UnixMillis();
int64_t last_cumu = tablet->last_cumu_compaction_success_time();
if (last_cumu != 0) {
int64_t cumu_interval = now - last_cumu;

// Condition 4: the time interval between compactions exceeds the value specified by parameter compaction_time_threshold_second
if (cumu_interval >
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 1000)) {
if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
if (input_rowsets->empty() && level1_rowsets.size() >= 2) {
input_rowsets->swap(level1_rowsets);
return input_rowsets->size();
}
}
return transient_size;
}
}

input_rowsets->clear();
// Condition 5: If their are many empty rowsets, maybe should be compacted
tablet->calc_consecutive_empty_rowsets(
input_rowsets, candidate_rowsets,
tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold());
if (!input_rowsets->empty()) {
VLOG_NOTICE << "tablet is " << tablet->tablet_id()
<< ", there are too many consecutive empty rowsets, size is "
<< input_rowsets->size();
return 0;
}
*compaction_score = 0;

return 0;
return TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
tablet, last_cumu, candidate_rowsets, max_compaction_score, min_compaction_score,
input_rowsets, last_delete_version, compaction_score, allow_delete);
}

int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_compaction_level(
const std::vector<RowsetSharedPtr>& input_rowsets) {
int64_t first_level = 0;
for (size_t i = 0; i < input_rowsets.size(); i++) {
int64_t cur_level = input_rowsets[i]->rowset_meta()->compaction_level();
if (i == 0) {
first_level = cur_level;
} else {
if (first_level != cur_level) {
LOG(ERROR) << "Failed to check compaction level, first_level: " << first_level
<< ", cur_level: " << cur_level;
}
}
}
return first_level + 1;
int64_t CloudTimeSeriesCumulativeCompactionPolicy::get_compaction_level(
CloudTablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr output_rowset) {
return TimeSeriesCumulativeCompactionPolicy::get_compaction_level((BaseTablet*)tablet,
input_rowsets, output_rowset);
}

int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_cumulative_point(
18 changes: 15 additions & 3 deletions be/src/cloud/cloud_cumulative_compaction_policy.h
Original file line number Diff line number Diff line change
@@ -43,7 +43,9 @@ class CloudCumulativeCompactionPolicy {
Version& last_delete_version,
int64_t last_cumulative_point) = 0;

virtual int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) = 0;
virtual int64_t get_compaction_level(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr output_rowset) = 0;

virtual int64_t pick_input_rowsets(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets,
@@ -52,6 +54,8 @@ class CloudCumulativeCompactionPolicy {
std::vector<RowsetSharedPtr>* input_rowsets,
Version* last_delete_version, size_t* compaction_score,
bool allow_delete = false) = 0;

virtual std::string name() = 0;
};

class CloudSizeBasedCumulativeCompactionPolicy : public CloudCumulativeCompactionPolicy {
@@ -68,7 +72,9 @@ class CloudSizeBasedCumulativeCompactionPolicy : public CloudCumulativeCompactio
Version& last_delete_version,
int64_t last_cumulative_point) override;

int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) override {
int64_t get_compaction_level(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr output_rowset) override {
return 0;
}

@@ -80,6 +86,8 @@ class CloudSizeBasedCumulativeCompactionPolicy : public CloudCumulativeCompactio
Version* last_delete_version, size_t* compaction_score,
bool allow_delete = false) override;

std::string name() override { return "size_based"; }

private:
int64_t _level_size(const int64_t size);

@@ -105,7 +113,9 @@ class CloudTimeSeriesCumulativeCompactionPolicy : public CloudCumulativeCompacti
Version& last_delete_version,
int64_t last_cumulative_point) override;

int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) override;
int64_t get_compaction_level(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr output_rowset) override;

int64_t pick_input_rowsets(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets,
@@ -114,6 +124,8 @@ class CloudTimeSeriesCumulativeCompactionPolicy : public CloudCumulativeCompacti
std::vector<RowsetSharedPtr>* input_rowsets,
Version* last_delete_version, size_t* compaction_score,
bool allow_delete = false) override;

std::string name() override { return "time_series"; }
};

#include "common/compile_check_end.h"
6 changes: 5 additions & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
@@ -226,7 +226,11 @@ class MetaServiceProxy {
static size_t num_proxies = 1;
static std::atomic<size_t> index(0);
static std::unique_ptr<MetaServiceProxy[]> proxies;

if (config::meta_service_endpoint.empty()) {
return Status::InvalidArgument(
"Meta service endpoint is empty. Please configure manually or wait for "
"heartbeat to obtain.");
}
std::call_once(
proxies_flag, +[]() {
if (config::meta_service_connection_pooled) {
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_rowset_builder.cpp
Original file line number Diff line number Diff line change
@@ -51,8 +51,8 @@ Status CloudRowsetBuilder::init() {
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();

// build tablet schema in request level
_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
*_tablet->tablet_schema());
RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
*_tablet->tablet_schema()));

RowsetWriterContext context;
context.txn_id = _req.txn_id;
8 changes: 7 additions & 1 deletion be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
@@ -283,7 +283,13 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
}

context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE;
auto rowset_writer = DORIS_TRY(_new_tablet->create_rowset_writer(context, false));
// TODO if support VerticalSegmentWriter, also need to handle cluster key primary key index
bool vertical = false;
if (sc_sorting && !_new_tablet->tablet_schema()->cluster_key_uids().empty()) {
// see VBaseSchemaChangeWithSorting::_external_sorting
vertical = true;
}
auto rowset_writer = DORIS_TRY(_new_tablet->create_rowset_writer(context, vertical));

RowsetMetaSharedPtr existed_rs_meta;
auto st = _cloud_storage_engine.meta_mgr().prepare_rowset(*rowset_writer->rowset_meta(),
7 changes: 7 additions & 0 deletions be/src/cloud/cloud_stream_load_executor.cpp
Original file line number Diff line number Diff line change
@@ -17,6 +17,8 @@

#include "cloud/cloud_stream_load_executor.h"

#include <bvar/bvar.h>

#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/config.h"
@@ -27,6 +29,10 @@

namespace doris {

bvar::Adder<uint64_t> stream_load_commit_retry_counter;
bvar::Window<bvar::Adder<uint64_t>> stream_load_commit_retry_counter_minute(
&stream_load_commit_retry_counter, 60);

enum class TxnOpParamType : int {
ILLEGAL,
WITH_TXN_ID,
@@ -114,6 +120,7 @@ Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
.tag("retry_times", retry_times)
.error(st);
retry_times++;
stream_load_commit_retry_counter << 1;
}
return st;
}
4 changes: 4 additions & 0 deletions be/src/cloud/pb_convert.cpp
Original file line number Diff line number Diff line change
@@ -84,6 +84,7 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const RowsetMetaPB& in)
out->set_enable_segments_file_size(in.enable_segments_file_size());
out->set_has_variant_type_in_schema(in.has_has_variant_type_in_schema());
out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info());
out->set_compaction_level(in.compaction_level());
out->mutable_inverted_index_file_info()->CopyFrom(in.inverted_index_file_info());
}

@@ -136,6 +137,7 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
out->set_enable_segments_file_size(in.enable_segments_file_size());
out->set_has_variant_type_in_schema(in.has_variant_type_in_schema());
out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info());
out->set_compaction_level(in.compaction_level());
out->mutable_inverted_index_file_info()->Swap(in.mutable_inverted_index_file_info());
}

@@ -234,6 +236,7 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in,
}
out->set_enable_segments_file_size(in.enable_segments_file_size());
out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info());
out->set_compaction_level(in.compaction_level());
out->mutable_inverted_index_file_info()->CopyFrom(in.inverted_index_file_info());
}

@@ -287,6 +290,7 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in,
}
out->set_enable_segments_file_size(in.enable_segments_file_size());
out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info());
out->set_compaction_level(in.compaction_level());
out->mutable_inverted_index_file_info()->Swap(in.mutable_inverted_index_file_info());
}

2 changes: 1 addition & 1 deletion be/src/clucene
4 changes: 2 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
@@ -529,7 +529,7 @@ DEFINE_String(ssl_private_key_path, "");
// Whether to check authorization
DEFINE_Bool(enable_all_http_auth, "false");
// Number of webserver workers
DEFINE_Int32(webserver_num_workers, "48");
DEFINE_Int32(webserver_num_workers, "128");

DEFINE_Bool(enable_single_replica_load, "true");
// Number of download workers for single replica load
@@ -915,7 +915,7 @@ DEFINE_String(rpc_load_balancer, "rr");

// a soft limit of string type length, the hard limit is 2GB - 4, but if too long will cause very low performance,
// so we set a soft limit, default is 1MB
DEFINE_mInt32(string_type_length_soft_limit_bytes, "1048576");
DEFINE_Int32(string_type_length_soft_limit_bytes, "1048576");

DEFINE_Validator(string_type_length_soft_limit_bytes,
[](const int config) -> bool { return config > 0 && config <= 2147483643; });
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
@@ -586,7 +586,7 @@ DECLARE_mInt64(load_error_log_limit_bytes);

// be brpc interface is classified into two categories: light and heavy
// each category has diffrent thread number
// threads to handle heavy api interface, such as transmit_data/transmit_block etc
// threads to handle heavy api interface, such as transmit_block etc
DECLARE_Int32(brpc_heavy_work_pool_threads);
// threads to handle light api interface, such as exec_plan_fragment_prepare/exec_plan_fragment_start
DECLARE_Int32(brpc_light_work_pool_threads);
3 changes: 1 addition & 2 deletions be/src/exec/es/es_scroll_parser.cpp
Original file line number Diff line number Diff line change
@@ -401,8 +401,7 @@ Status insert_int_value(const rapidjson::Value& col, PrimitiveType type,
};

if (pure_doc_value && col.IsArray() && !col.Empty()) {
if (col.IsNumber()) {
RETURN_ERROR_IF_COL_IS_NOT_NUMBER(col[0], type);
if (col[0].IsNumber()) {
T value = (T)(sizeof(T) < 8 ? col[0].GetInt() : col[0].GetInt64());
col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&value)), 0);
return Status::OK();
Loading

0 comments on commit 60530f5

Please sign in to comment.