Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,9 @@ Status CloudBaseCompaction::execute_compact() {
.tag("output_segments", _output_rowset->num_segments())
.tag("output_rowset_data_size", _output_rowset->data_disk_size())
.tag("output_rowset_index_size", _output_rowset->index_disk_size())
.tag("output_rowset_total_size", _output_rowset->total_disk_size());
.tag("output_rowset_total_size", _output_rowset->total_disk_size())
.tag("local_read_bytes", _local_read_bytes_total)
.tag("remote_read_bytes", _remote_read_bytes_total);

//_compaction_succeed = true;
_state = CompactionState::SUCCESS;
Expand Down
4 changes: 3 additions & 1 deletion be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,9 @@ Status CloudCumulativeCompaction::execute_compact() {
.tag("tablet_max_version", _tablet->max_version_unlocked())
.tag("cumulative_point", cloud_tablet()->cumulative_layer_point())
.tag("num_rowsets", cloud_tablet()->fetch_add_approximate_num_rowsets(0))
.tag("cumu_num_rowsets", cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0));
.tag("cumu_num_rowsets", cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0))
.tag("local_read_bytes", _local_read_bytes_total)
.tag("remote_read_bytes", _remote_read_bytes_total);

_state = CompactionState::SUCCESS;

Expand Down
4 changes: 3 additions & 1 deletion be/src/cloud/cloud_full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ Status CloudFullCompaction::execute_compact() {
.tag("output_segments", _output_rowset->num_segments())
.tag("output_rowset_data_size", _output_rowset->data_disk_size())
.tag("output_rowset_index_size", _output_rowset->index_disk_size())
.tag("output_rowset_total_size", _output_rowset->total_disk_size());
.tag("output_rowset_total_size", _output_rowset->total_disk_size())
.tag("local_read_bytes", _local_read_bytes_total)
.tag("remote_read_bytes", _remote_read_bytes_total);

_state = CompactionState::SUCCESS;

Expand Down
6 changes: 5 additions & 1 deletion be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
#include "olap/storage_policy.h"
#include "runtime/memory/cache_manager.h"
#include "util/parse_util.h"
#include "util/time.h"
#include "vec/common/assert_cast.h"

namespace doris {
Expand Down Expand Up @@ -444,14 +445,14 @@ void CloudStorageEngine::_compaction_tasks_producer_callback() {

int64_t interval = config::generate_compaction_tasks_interval_ms;
do {
int64_t cur_time = UnixMillis();
if (!config::disable_auto_compaction) {
Status st = _adjust_compaction_thread_num();
if (!st.ok()) {
break;
}

bool check_score = false;
int64_t cur_time = UnixMillis();
if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) {
compaction_type = CompactionType::CUMULATIVE_COMPACTION;
round++;
Expand Down Expand Up @@ -503,6 +504,9 @@ void CloudStorageEngine::_compaction_tasks_producer_callback() {
} else {
interval = config::check_auto_compaction_interval_seconds * 1000;
}
int64_t end_time = UnixMillis();
DorisMetrics::instance()->compaction_producer_callback_a_round_time->set_value(end_time -
cur_time);
} while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
}

Expand Down
41 changes: 39 additions & 2 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/beta_rowset_reader.h"
#include "olap/rowset/beta_rowset_writer.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_fwd.h"
Expand All @@ -74,6 +75,7 @@
#include "olap/utils.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/thread_context.h"
#include "util/doris_metrics.h"
#include "util/time.h"
#include "util/trace.h"

Expand Down Expand Up @@ -241,6 +243,14 @@ Status Compaction::merge_input_rowsets() {
}
}

_local_read_bytes_total = _stats.bytes_read_from_local;
_remote_read_bytes_total = _stats.bytes_read_from_remote;
DorisMetrics::instance()->local_compaction_read_bytes_total->increment(_local_read_bytes_total);
DorisMetrics::instance()->remote_compaction_read_bytes_total->increment(
_remote_read_bytes_total);
DorisMetrics::instance()->local_compaction_write_bytes_total->increment(
_stats.cached_bytes_total);

COUNTER_UPDATE(_output_rowset_data_size_counter, _output_rowset->data_disk_size());
COUNTER_UPDATE(_output_row_num_counter, _output_rowset->num_rows());
COUNTER_UPDATE(_output_segments_num_counter, _output_rowset->num_segments());
Expand Down Expand Up @@ -346,6 +356,8 @@ void CompactionMixin::build_basic_info() {
COUNTER_UPDATE(_input_row_num_counter, _input_row_num);
COUNTER_UPDATE(_input_segments_num_counter, _input_num_segments);

TEST_SYNC_POINT_RETURN_WITH_VOID("compaction::CompactionMixin::build_basic_info");

_output_version =
Version(_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version());

Expand Down Expand Up @@ -448,6 +460,17 @@ Status CompactionMixin::execute_compact() {
}
}

DorisMetrics::instance()->local_compaction_read_rows_total->increment(_input_row_num);
DorisMetrics::instance()->local_compaction_read_bytes_total->increment(
_input_rowsets_total_size);

TEST_SYNC_POINT_RETURN_WITH_VALUE("compaction::CompactionMixin::execute_compact", Status::OK());

DorisMetrics::instance()->local_compaction_write_rows_total->increment(
_output_rowset->num_rows());
DorisMetrics::instance()->local_compaction_write_bytes_total->increment(
_output_rowset->total_disk_size());

_load_segment_to_cache();
return Status::OK();
}
Expand All @@ -474,6 +497,9 @@ Status CompactionMixin::execute_compact_impl(int64_t permits) {
}
build_basic_info();

TEST_SYNC_POINT_RETURN_WITH_VALUE("compaction::CompactionMixin::execute_compact_impl",
Status::OK());

VLOG_DEBUG << "dump tablet schema: " << _cur_tablet_schema->dump_structure();

LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->tablet_id()
Expand All @@ -489,8 +515,12 @@ Status CompactionMixin::execute_compact_impl(int64_t permits) {
<< ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version
<< ", current_max_version=" << tablet()->max_version().second
<< ", disk=" << tablet()->data_dir()->path() << ", segments=" << _input_num_segments
<< ", input_data_size=" << _input_rowsets_data_size
<< ", output_rowset_size=" << _output_rowset->total_disk_size()
<< ", input_rowsets_data_size=" << _input_rowsets_data_size
<< ", input_rowsets_index_size=" << _input_rowsets_index_size
<< ", input_rowsets_total_size=" << _input_rowsets_total_size
<< ", output_rowset_data_size=" << _output_rowset->data_disk_size()
<< ", output_rowset_index_size=" << _output_rowset->index_disk_size()
<< ", output_rowset_total_size=" << _output_rowset->total_disk_size()
<< ", input_row_num=" << _input_row_num
<< ", output_row_num=" << _output_rowset->num_rows()
<< ", filtered_row_num=" << _stats.filtered_rows
Expand Down Expand Up @@ -1346,6 +1376,13 @@ Status CloudCompactionMixin::execute_compact() {
_tablet->tablet_id());
}
});

DorisMetrics::instance()->remote_compaction_read_rows_total->increment(_input_row_num);
DorisMetrics::instance()->remote_compaction_write_rows_total->increment(
_output_rowset->num_rows());
DorisMetrics::instance()->remote_compaction_write_bytes_total->increment(
_output_rowset->total_disk_size());

_load_segment_to_cache();
return Status::OK();
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ class Compaction {
int64_t _input_row_num {0};
int64_t _input_num_segments {0};

int64_t _local_read_bytes_total {};
int64_t _remote_read_bytes_total {};

Merger::Statistics _stats;

RowsetSharedPtr _output_rowset;
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "olap/cumulative_compaction.h"

#include <cpp/sync_point.h>

#include <memory>
#include <mutex>
#include <ostream>
Expand Down Expand Up @@ -152,6 +154,9 @@ Status CumulativeCompaction::execute_compact() {
st = CompactionMixin::execute_compact();
RETURN_IF_ERROR(st);

TEST_SYNC_POINT_RETURN_WITH_VALUE(
"cumulative_compaction::CumulativeCompaction::execute_compact", Status::OK());

DCHECK_EQ(_state, CompactionState::SUCCESS);
if (tablet()->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
tablet()->cumulative_compaction_policy()->update_compaction_level(tablet(), _input_rowsets,
Expand Down
14 changes: 14 additions & 0 deletions be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type,
stats_output->output_rows = output_rows;
stats_output->merged_rows = reader.merged_rows();
stats_output->filtered_rows = reader.filtered_rows();
stats_output->bytes_read_from_local = reader.stats().file_cache_stats.bytes_read_from_local;
stats_output->bytes_read_from_remote =
reader.stats().file_cache_stats.bytes_read_from_remote;
stats_output->cached_bytes_total = reader.stats().file_cache_stats.bytes_write_into_cache;
}

RETURN_NOT_OK_STATUS_WITH_WARN(dst_rowset_writer->flush(),
Expand Down Expand Up @@ -312,6 +316,10 @@ Status Merger::vertical_compact_one_group(
stats_output->output_rows = output_rows;
stats_output->merged_rows = reader.merged_rows();
stats_output->filtered_rows = reader.filtered_rows();
stats_output->bytes_read_from_local = reader.stats().file_cache_stats.bytes_read_from_local;
stats_output->bytes_read_from_remote =
reader.stats().file_cache_stats.bytes_read_from_remote;
stats_output->cached_bytes_total = reader.stats().file_cache_stats.bytes_write_into_cache;
}
RETURN_IF_ERROR(dst_rowset_writer->flush_columns(is_key));

Expand Down Expand Up @@ -356,6 +364,12 @@ Status Merger::vertical_compact_one_group(
stats_output->output_rows = output_rows;
stats_output->merged_rows = src_block_reader.merged_rows();
stats_output->filtered_rows = src_block_reader.filtered_rows();
stats_output->bytes_read_from_local =
src_block_reader.stats().file_cache_stats.bytes_read_from_local;
stats_output->bytes_read_from_remote =
src_block_reader.stats().file_cache_stats.bytes_read_from_remote;
stats_output->cached_bytes_total =
src_block_reader.stats().file_cache_stats.bytes_write_into_cache;
}

// segcompaction produce only one segment at once
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class Merger {
int64_t merged_rows = 0;
int64_t filtered_rows = 0;
RowIdConversion* rowid_conversion = nullptr;
// these data for trans
int64_t cached_bytes_total = 0;
int64_t bytes_read_from_local = 0;
int64_t bytes_read_from_remote = 0;
};

// merge rows from `src_rowset_readers` and write into `dst_rowset_writer`.
Expand Down
12 changes: 12 additions & 0 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ void StorageEngine::_compaction_tasks_producer_callback() {

int64_t interval = config::generate_compaction_tasks_interval_ms;
do {
int64_t cur_time = UnixMillis();
if (!config::disable_auto_compaction &&
(!config::enable_compaction_pause_on_high_memory ||
!GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE))) {
Expand Down Expand Up @@ -716,6 +717,17 @@ void StorageEngine::_compaction_tasks_producer_callback() {
} else {
interval = 5000; // 5s to check disable_auto_compaction
}

// wait some seconds for ut test
{
std ::vector<std ::any> args {};
args.emplace_back(1);
doris ::SyncPoint ::get_instance()->process(
"StorageEngine::_compaction_tasks_producer_callback", std ::move(args));
}
int64_t end_time = UnixMillis();
DorisMetrics::instance()->compaction_producer_callback_a_round_time->set_value(end_time -
cur_time);
} while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/beta_rowset_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ class BetaRowsetReader : public RowsetReader {

void set_topn_limit(size_t topn_limit) override { _topn_limit = topn_limit; }

OlapReaderStatistics* get_stats() { return _stats; }

private:
[[nodiscard]] Status _init_iterator_once();
[[nodiscard]] Status _init_iterator();
Expand Down
31 changes: 31 additions & 0 deletions be/src/util/doris_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,26 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(stream_receive_bytes_total, MetricUnit::BYT
DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(stream_load_rows_total, MetricUnit::ROWS, "", stream_load,
Labels({{"type", "load_rows"}}));

DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(compaction_producer_callback_a_round_time,
MetricUnit::ROWSETS);

DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(local_compaction_read_rows_total, MetricUnit::ROWS, "",
compaction_rows_total, Labels({{"type", "read"}}));
DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(local_compaction_read_bytes_total, MetricUnit::BYTES, "",
compaction_bytes_total, Labels({{"type", "read"}}));
DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(local_compaction_write_rows_total, MetricUnit::ROWS, "",
compaction_rows_total, Labels({{"type", "write"}}));
DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(local_compaction_write_bytes_total, MetricUnit::BYTES, "",
compaction_bytes_total, Labels({{"type", "write"}}));
DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(remote_compaction_read_rows_total, MetricUnit::ROWS, "",
compaction_rows_total, Labels({{"type", "read"}}));
DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(remote_compaction_read_bytes_total, MetricUnit::BYTES, "",
compaction_bytes_total, Labels({{"type", "read"}}));
DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(remote_compaction_write_rows_total, MetricUnit::ROWS, "",
compaction_rows_total, Labels({{"type", "write"}}));
DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(remote_compaction_write_bytes_total, MetricUnit::BYTES, "",
compaction_bytes_total, Labels({{"type", "write"}}));

DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(load_rows, MetricUnit::ROWS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(load_bytes, MetricUnit::BYTES);

Expand Down Expand Up @@ -225,6 +245,8 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, push_request_write_bytes);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, push_request_write_rows);

INT_COUNTER_METRIC_REGISTER(_server_metric_entity, compaction_producer_callback_a_round_time);

// engine_requests_total
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, create_tablet_requests_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, create_tablet_requests_failed);
Expand Down Expand Up @@ -255,6 +277,15 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, alter_inverted_index_requests_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, alter_inverted_index_requests_failed);

INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_compaction_read_rows_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_compaction_read_bytes_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_compaction_write_rows_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_compaction_write_bytes_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, remote_compaction_read_rows_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, remote_compaction_read_bytes_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, remote_compaction_write_rows_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, remote_compaction_write_bytes_total);

INT_COUNTER_METRIC_REGISTER(_server_metric_entity, base_compaction_deltas_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, base_compaction_bytes_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, cumulative_compaction_deltas_total);
Expand Down
11 changes: 11 additions & 0 deletions be/src/util/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class DorisMetrics {
IntCounter* finish_task_requests_total = nullptr;
IntCounter* finish_task_requests_failed = nullptr;

IntCounter* compaction_producer_callback_a_round_time = nullptr;

IntCounter* base_compaction_request_total = nullptr;
IntCounter* base_compaction_request_failed = nullptr;
IntCounter* cumulative_compaction_request_total = nullptr;
Expand All @@ -88,6 +90,15 @@ class DorisMetrics {
IntCounter* single_compaction_request_failed = nullptr;
IntCounter* single_compaction_request_cancelled = nullptr;

IntCounter* local_compaction_read_rows_total = nullptr;
IntCounter* local_compaction_read_bytes_total = nullptr;
IntCounter* local_compaction_write_rows_total = nullptr;
IntCounter* local_compaction_write_bytes_total = nullptr;
IntCounter* remote_compaction_read_rows_total = nullptr;
IntCounter* remote_compaction_read_bytes_total = nullptr;
IntCounter* remote_compaction_write_rows_total = nullptr;
IntCounter* remote_compaction_write_bytes_total = nullptr;

IntCounter* base_compaction_deltas_total = nullptr;
IntCounter* base_compaction_bytes_total = nullptr;
IntCounter* cumulative_compaction_deltas_total = nullptr;
Expand Down
Loading
Loading