Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement](compaction) optimizing memory usage for compaction #37099

Merged
merged 11 commits into from
Jul 2, 2024
Merged
10 changes: 10 additions & 0 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ Status CloudBaseCompaction::pick_rowsets_to_compact() {
return Status::Error<BE_NO_SUITABLE_VERSION>("no suitable versions for compaction");
}

int score = 0;
int rowset_cnt = 0;
while (rowset_cnt < _input_rowsets.size()) {
score += _input_rowsets[rowset_cnt++]->rowset_meta()->get_compaction_score();
if (score > config::base_compaction_max_compaction_score) {
break;
}
}
_input_rowsets.resize(rowset_cnt);

// 1. cumulative rowset must reach base_compaction_min_rowset_num threshold
if (_input_rowsets.size() > config::base_compaction_min_rowset_num) {
VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id()
Expand Down
13 changes: 11 additions & 2 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,20 @@ Status CloudCumulativeCompaction::pick_rowsets_to_compact() {
return st;
}

int64_t max_score = config::cumulative_compaction_max_deltas;
auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8;
if (cloud_tablet()->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() ||
memory_usage_high) {
max_score = std::max(config::cumulative_compaction_max_deltas /
config::cumulative_compaction_max_deltas_factor,
config::cumulative_compaction_min_deltas + 1);
}

size_t compaction_score = 0;
auto compaction_policy = cloud_tablet()->tablet_meta()->compaction_policy();
_engine.cumu_compaction_policy(compaction_policy)
->pick_input_rowsets(cloud_tablet(), candidate_rowsets,
config::cumulative_compaction_max_deltas,
->pick_input_rowsets(cloud_tablet(), candidate_rowsets, max_score,
config::cumulative_compaction_min_deltas, &_input_rowsets,
&_last_delete_version, &compaction_score);

Expand Down
6 changes: 6 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ DEFINE_mInt32(max_single_replica_compaction_threads, "-1");

DEFINE_Bool(enable_base_compaction_idle_sched, "true");
DEFINE_mInt64(base_compaction_min_rowset_num, "5");
DEFINE_mInt64(base_compaction_max_compaction_score, "20");
DEFINE_mDouble(base_compaction_min_data_ratio, "0.3");
DEFINE_mInt64(base_compaction_dup_key_max_file_size_mbytes, "1024");

Expand Down Expand Up @@ -415,6 +416,7 @@ DEFINE_mInt64(compaction_min_size_mbytes, "64");
// cumulative compaction policy: min and max delta file's number
DEFINE_mInt64(cumulative_compaction_min_deltas, "5");
DEFINE_mInt64(cumulative_compaction_max_deltas, "1000");
DEFINE_mInt32(cumulative_compaction_max_deltas_factor, "10");

// This config can be set to limit thread number in multiget thread pool.
DEFINE_mInt32(multi_get_max_threads, "10");
Expand Down Expand Up @@ -1313,6 +1315,10 @@ DEFINE_Bool(enable_file_logger, "true");
// The minimum row group size when exporting Parquet files. default 128MB
DEFINE_Int64(min_row_group_size, "134217728");

DEFINE_mInt64(compaction_memory_bytes_limit, "1073741824");

DEFINE_mInt64(compaction_batch_size, "-1");

// If set to false, the parquet reader will not use page index to filter data.
// This is only for debug purpose, in case sometimes the page index
// filter wrong data.
Expand Down
6 changes: 6 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ DECLARE_mInt32(max_single_replica_compaction_threads);

DECLARE_Bool(enable_base_compaction_idle_sched);
DECLARE_mInt64(base_compaction_min_rowset_num);
DECLARE_mInt64(base_compaction_max_compaction_score);
DECLARE_mDouble(base_compaction_min_data_ratio);
DECLARE_mInt64(base_compaction_dup_key_max_file_size_mbytes);

Expand Down Expand Up @@ -468,6 +469,7 @@ DECLARE_mInt64(compaction_min_size_mbytes);
// cumulative compaction policy: min and max delta file's number
DECLARE_mInt64(cumulative_compaction_min_deltas);
DECLARE_mInt64(cumulative_compaction_max_deltas);
DECLARE_mInt32(cumulative_compaction_max_deltas_factor);

// This config can be set to limit thread number in multiget thread pool.
DECLARE_mInt32(multi_get_max_threads);
Expand Down Expand Up @@ -1399,6 +1401,10 @@ DECLARE_Bool(enable_file_logger);
// The minimum row group size when exporting Parquet files.
DECLARE_Int64(min_row_group_size);

DECLARE_mInt64(compaction_memory_bytes_limit);

DECLARE_mInt64(compaction_batch_size);

DECLARE_mBool(enable_parquet_page_index);

#ifdef BE_TEST
Expand Down
10 changes: 10 additions & 0 deletions be/src/olap/base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ Status BaseCompaction::pick_rowsets_to_compact() {
"situation, no need to do base compaction.");
}

int score = 0;
int rowset_cnt = 0;
while (rowset_cnt < _input_rowsets.size()) {
score += _input_rowsets[rowset_cnt++]->rowset_meta()->get_compaction_score();
if (score > config::base_compaction_max_compaction_score) {
break;
}
}
_input_rowsets.resize(rowset_cnt);

// 1. cumulative rowset must reach base_compaction_num_cumulative_deltas threshold
if (_input_rowsets.size() > config::base_compaction_min_rowset_num) {
VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id()
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <string>

#include "common/status.h"
#include "olap/iterators.h"
#include "olap/partial_update_info.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/tablet_fwd.h"
Expand Down Expand Up @@ -299,6 +300,10 @@ class BaseTablet {
std::atomic<int64_t> read_block_count = 0;
std::atomic<int64_t> write_count = 0;
std::atomic<int64_t> compaction_count = 0;

std::mutex sample_info_lock;
std::vector<CompactionSampleInfo> sample_infos;
Status last_compaction_status = Status::OK();
};

} /* namespace doris */
15 changes: 14 additions & 1 deletion be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,15 @@ void Compaction::init_profile(const std::string& label) {
_merge_rowsets_latency_timer = ADD_TIMER(_profile, "merge_rowsets_latency");
}

int64_t Compaction::merge_way_num() {
int64_t way_num = 0;
for (auto&& rowset : _input_rowsets) {
way_num += rowset->rowset_meta()->get_merge_way_num();
}

return way_num;
}

Status Compaction::merge_input_rowsets() {
std::vector<RowsetReaderSharedPtr> input_rs_readers;
input_rs_readers.reserve(_input_rowsets.size());
Expand All @@ -170,19 +179,23 @@ Status Compaction::merge_input_rowsets() {
_stats.rowid_conversion = &_rowid_conversion;
}

int64_t way_num = merge_way_num();

Status res;
{
SCOPED_TIMER(_merge_rowsets_latency_timer);
if (_is_vertical) {
res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema,
input_rs_readers, _output_rs_writer.get(),
get_avg_segment_rows(), &_stats);
get_avg_segment_rows(), way_num, &_stats);
} else {
res = Merger::vmerge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema,
input_rs_readers, _output_rs_writer.get(), &_stats);
}
}

_tablet->last_compaction_status = res;

if (!res.ok()) {
LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res
<< ", tablet=" << _tablet->tablet_id()
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class Compaction {

void _load_segment_to_cache();

int64_t merge_way_num();

// the root tracker for this compaction
std::shared_ptr<MemTrackerLimiter> _mem_tracker;

Expand Down
15 changes: 12 additions & 3 deletions be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,20 @@ Status CumulativeCompaction::pick_rowsets_to_compact() {
<< ", tablet=" << _tablet->tablet_id();
}

int64_t max_score = config::cumulative_compaction_max_deltas;
auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8;
if (tablet()->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || memory_usage_high) {
max_score = std::max(config::cumulative_compaction_max_deltas /
config::cumulative_compaction_max_deltas_factor,
config::cumulative_compaction_min_deltas + 1);
}

size_t compaction_score = 0;
tablet()->cumulative_compaction_policy()->pick_input_rowsets(
tablet(), candidate_rowsets, config::cumulative_compaction_max_deltas,
config::cumulative_compaction_min_deltas, &_input_rowsets, &_last_delete_version,
&compaction_score, _allow_delete_in_cumu_compaction);
tablet(), candidate_rowsets, max_score, config::cumulative_compaction_min_deltas,
&_input_rowsets, &_last_delete_version, &compaction_score,
_allow_delete_in_cumu_compaction);

// Cumulative compaction will process with at least 1 rowset.
// So when there is no rowset being chosen, we should return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>():
Expand Down
15 changes: 14 additions & 1 deletion be/src/olap/iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <cstddef>
#include <memory>

#include "common/status.h"
Expand Down Expand Up @@ -121,6 +122,12 @@ class StorageReadOptions {
size_t topn_limit = 0;
};

struct CompactionSampleInfo {
int64_t bytes = 0;
int64_t rows = 0;
int64_t group_data_size;
};

class RowwiseIterator;
using RowwiseIteratorUPtr = std::unique_ptr<RowwiseIterator>;
class RowwiseIterator {
Expand All @@ -133,7 +140,13 @@ class RowwiseIterator {
// Input options may contain scan range in which this scan.
// Return Status::OK() if init successfully,
// Return other error otherwise
virtual Status init(const StorageReadOptions& opts) = 0;
virtual Status init(const StorageReadOptions& opts) {
return Status::NotSupported("to be implemented");
}

virtual Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info) {
return Status::NotSupported("to be implemented");
}

// If there is any valid data, this function will load data
// into input batch with Status::OK() returned
Expand Down
67 changes: 63 additions & 4 deletions be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <algorithm>
#include <iterator>
#include <memory>
#include <mutex>
#include <numeric>
#include <ostream>
#include <shared_mutex>
Expand All @@ -33,7 +34,9 @@

#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "olap/base_tablet.h"
#include "olap/iterators.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowid_conversion.h"
Expand All @@ -43,6 +46,7 @@
#include "olap/rowset/segment_v2/segment_writer.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_reader.h"
#include "olap/utils.h"
#include "util/slice.h"
Expand Down Expand Up @@ -241,7 +245,8 @@ Status Merger::vertical_compact_one_group(
vectorized::RowSourcesBuffer* row_source_buf,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, Statistics* stats_output,
std::vector<uint32_t> key_group_cluster_key_idxes) {
std::vector<uint32_t> key_group_cluster_key_idxes, int64_t batch_size,
CompactionSampleInfo* sample_info) {
// build tablet reader
VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" << max_rows_per_segment;
vectorized::VerticalBlockReader reader(row_source_buf);
Expand Down Expand Up @@ -279,7 +284,8 @@ Status Merger::vertical_compact_one_group(

reader_params.return_columns = column_group;
reader_params.origin_return_columns = &reader_params.return_columns;
RETURN_IF_ERROR(reader.init(reader_params));
reader_params.batch_size = batch_size;
RETURN_IF_ERROR(reader.init(reader_params, sample_info));

if (reader_params.record_rowids) {
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
Expand Down Expand Up @@ -385,6 +391,55 @@ Status Merger::vertical_compact_one_group(int64_t tablet_id, ReaderType reader_t
return Status::OK();
}

int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t way_cnt) {
std::unique_lock<std::mutex> lock(tablet->sample_info_lock);
CompactionSampleInfo info = tablet->sample_infos[group_index];
if (way_cnt <= 0) {
LOG(INFO) << "estimate batch size for vertical compaction, tablet id: "
<< tablet->tablet_id() << " way cnt: " << way_cnt;
return 4096 - 32;
}
int64_t block_mem_limit = config::compaction_memory_bytes_limit / way_cnt;
if (tablet->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>()) {
block_mem_limit /= 4;
}

int64_t group_data_size = 0;
if (info.group_data_size > 0 && info.bytes > 0 && info.rows > 0) {
float smoothing_factor = 0.5;
group_data_size = int64_t(info.group_data_size * (1 - smoothing_factor) +
info.bytes / info.rows * smoothing_factor);
tablet->sample_infos[group_index].group_data_size = group_data_size;
} else if (info.group_data_size > 0 && (info.bytes <= 0 || info.rows <= 0)) {
group_data_size = info.group_data_size;
} else if (info.group_data_size <= 0 && info.bytes > 0 && info.rows > 0) {
group_data_size = info.bytes / info.rows;
tablet->sample_infos[group_index].group_data_size = group_data_size;
} else {
LOG(INFO) << "estimate batch size for vertical compaction, tablet id: "
<< tablet->tablet_id() << " group data size: " << info.group_data_size
<< " row num: " << info.rows << " consume bytes: " << info.bytes;
return 1024 - 32;
}

if (group_data_size <= 0) {
LOG(WARNING) << "estimate batch size for vertical compaction, tablet id: "
<< tablet->tablet_id() << " unexpected group data size: " << group_data_size;
return 4096 - 32;
}

tablet->sample_infos[group_index].bytes = 0;
tablet->sample_infos[group_index].rows = 0;

int64_t batch_size = block_mem_limit / group_data_size;
int64_t res = std::max(std::min(batch_size, int64_t(4096 - 32)), 32L);
LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " << tablet->tablet_id()
<< " group data size: " << info.group_data_size << " row num: " << info.rows
<< " consume bytes: " << info.bytes << " way cnt: " << way_cnt
<< " batch size: " << res;
return res;
}

// steps to do vertical merge:
// 1. split columns into column groups
// 2. compact groups one by one, generate a row_source_buf when compact key group
Expand All @@ -394,7 +449,7 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t
const TabletSchema& tablet_schema,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment,
Statistics* stats_output) {
int64_t merge_way_num, Statistics* stats_output) {
LOG(INFO) << "Start to do vertical compaction, tablet_id: " << tablet->tablet_id();
std::vector<std::vector<uint32_t>> column_groups;
vertical_split_columns(tablet_schema, &column_groups);
Expand All @@ -405,14 +460,18 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t

vectorized::RowSourcesBuffer row_sources_buf(
tablet->tablet_id(), dst_rowset_writer->context().tablet_path, reader_type);
tablet->sample_infos.resize(column_groups.size(), {0, 0, 0});
// compact group one by one
for (auto i = 0; i < column_groups.size(); ++i) {
VLOG_NOTICE << "row source size: " << row_sources_buf.total_size();
bool is_key = (i == 0);
int64_t batch_size = config::compaction_batch_size != -1
? config::compaction_batch_size
: estimate_batch_size(i, tablet, merge_way_num);
RETURN_IF_ERROR(vertical_compact_one_group(
tablet, reader_type, tablet_schema, is_key, column_groups[i], &row_sources_buf,
src_rowset_readers, dst_rowset_writer, max_rows_per_segment, stats_output,
key_group_cluster_key_idxes));
key_group_cluster_key_idxes, batch_size, &(tablet->sample_infos[i])));
if (is_key) {
RETURN_IF_ERROR(row_sources_buf.flush());
}
Expand Down
6 changes: 4 additions & 2 deletions be/src/olap/merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "common/status.h"
#include "io/io_common.h"
#include "olap/iterators.h"
#include "olap/rowset/rowset_fwd.h"
#include "olap/tablet_fwd.h"

Expand Down Expand Up @@ -59,7 +60,7 @@ class Merger {
static Status vertical_merge_rowsets(
BaseTabletSPtr tablet, ReaderType reader_type, const TabletSchema& tablet_schema,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, int64_t merge_way_num,
Statistics* stats_output);

// for vertical compaction
Expand All @@ -71,7 +72,8 @@ class Merger {
vectorized::RowSourcesBuffer* row_source_buf,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, Statistics* stats_output,
std::vector<uint32_t> key_group_cluster_key_idxes);
std::vector<uint32_t> key_group_cluster_key_idxes, int64_t batch_size,
CompactionSampleInfo* sample_info);

// for segcompaction
static Status vertical_compact_one_group(int64_t tablet_id, ReaderType reader_type,
Expand Down
Loading
Loading