Skip to content
21 changes: 21 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,27 @@ namespace config {
CONF_mInt64(base_compaction_interval_seconds_since_last_operation, "86400");
CONF_mInt32(base_compaction_write_mbytes_per_sec, "5");

// config the cumulative compaction policy
// Valid configs: num_base, size_based
// num_based policy, the original version of cumulative compaction, cumulative version compaction once.
// size_based policy, a optimization version of cumulative compaction, targeting the use cases requiring
// lower write amplification, trading off read amplification and space amplification.
CONF_String(cumulative_compaction_policy, "num_based");

// In size_based policy, output rowset of cumulative compaction total disk size exceed this config size,
// this rowset will be given to base compaction, unit is m byte.
CONF_mInt64(cumulative_size_based_promotion_size_mbytes, "1024");
// In size_based policy, output rowset of cumulative compaction total disk size exceed this config ratio of
// base rowset's total disk size, this rowset will be given to base compaction. The value must be between
// 0 and 1.
CONF_mDouble(cumulative_size_based_promotion_ratio, "0.05");
// In size_based policy, the smallest size of rowset promotion. When the rowset is less than this config, this
// rowset will be not given to base compaction. The unit is m byte.
CONF_mInt64(cumulative_size_based_promotion_min_size_mbytes, "64");
// The lower bound size to do cumulative compaction. When total disk size of candidate rowsets is less than
// this size, size_based policy also does cumulative compaction. The unit is m byte.
CONF_mInt64(cumulative_size_based_compaction_lower_size_mbytes, "64");

// cumulative compaction policy: max delta file's size unit:B
CONF_mInt32(cumulative_compaction_check_interval_seconds, "10");
CONF_mInt64(min_cumulative_compaction_num_singleton_deltas, "5");
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ add_library(Olap STATIC
comparison_predicate.cpp
compress.cpp
cumulative_compaction.cpp
cumulative_compaction_policy.cpp
delete_handler.cpp
delta_writer.cpp
file_helper.cpp
Expand Down
63 changes: 16 additions & 47 deletions be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ CumulativeCompaction::CumulativeCompaction(TabletSharedPtr tablet, const std::st
: Compaction(tablet, label, parent_tracker),
_cumulative_rowset_size_threshold(config::cumulative_compaction_budgeted_bytes) {}

CumulativeCompaction::~CumulativeCompaction() { }
CumulativeCompaction::~CumulativeCompaction() {}

OLAPStatus CumulativeCompaction::compact() {
if (!_tablet->init_succeeded()) {
Expand All @@ -41,7 +41,7 @@ OLAPStatus CumulativeCompaction::compact() {
}
TRACE("got cumulative compaction lock");

// 1.calculate cumulative point
// 1.calculate cumulative point
_tablet->calculate_cumulative_point();
TRACE("calculated cumulative point");
LOG(INFO) << "after calculate, current cumulative point is " << _tablet->cumulative_layer_point()
Expand All @@ -60,9 +60,10 @@ OLAPStatus CumulativeCompaction::compact() {
_state = CompactionState::SUCCESS;

// 5. set cumulative point
_tablet->set_cumulative_layer_point(_input_rowsets.back()->end_version() + 1);
LOG(INFO) << "after cumulative compaction, current cumulative point is "
<< _tablet->cumulative_layer_point() << ", tablet=" << _tablet->full_name() ;
_tablet->cumulative_compaction_policy()->update_cumulative_point(_input_rowsets, _output_rowset,
_last_delete_version);
LOG(INFO) << "after cumulative compaction, current cumulative point is "
<< _tablet->cumulative_layer_point() << ", tablet=" << _tablet->full_name();

// 6. add metric to cumulative compaction
DorisMetrics::instance()->cumulative_compaction_deltas_total.increment(_input_rowsets.size());
Expand All @@ -74,62 +75,30 @@ OLAPStatus CumulativeCompaction::compact() {

OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() {
std::vector<RowsetSharedPtr> candidate_rowsets;

_tablet->pick_candicate_rowsets_to_cumulative_compaction(
config::cumulative_compaction_skip_window_seconds, &candidate_rowsets);
config::cumulative_compaction_skip_window_seconds, &candidate_rowsets);

if (candidate_rowsets.empty()) {
return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS;
}

std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
RETURN_NOT_OK(check_version_continuity(candidate_rowsets));

std::vector<RowsetSharedPtr> transient_rowsets;
size_t compaction_score = 0;
// the last delete version we meet when traversing candidate_rowsets
Version last_delete_version { -1, -1 };

for (size_t i = 0; i < candidate_rowsets.size(); ++i) {
RowsetSharedPtr rowset = candidate_rowsets[i];
if (_tablet->version_for_delete_predicate(rowset->version())) {
last_delete_version = rowset->version();
if (!transient_rowsets.empty()) {
// we meet a delete version, and there were other versions before.
// we should compact those version before handling them over to base compaction
_input_rowsets = transient_rowsets;
break;
}

// we meet a delete version, and no other versions before, skip it and continue
transient_rowsets.clear();
compaction_score = 0;
continue;
}

if (compaction_score >= config::max_cumulative_compaction_num_singleton_deltas) {
// got enough segments
break;
}

compaction_score += rowset->rowset_meta()->get_compaction_score();
transient_rowsets.push_back(rowset);
}

// if we have a sufficient number of segments,
// or have other versions before encountering the delete version, we should process the compaction.
if (compaction_score >= config::min_cumulative_compaction_num_singleton_deltas
|| (last_delete_version.first != -1 && !transient_rowsets.empty())) {
_input_rowsets = transient_rowsets;
}
int transient_size = _tablet->cumulative_compaction_policy()->pick_input_rowsets(
candidate_rowsets, config::max_cumulative_compaction_num_singleton_deltas,
config::min_cumulative_compaction_num_singleton_deltas, &_input_rowsets,
&_last_delete_version, &compaction_score);

// Cumulative compaction will process with at least 1 rowset.
// So when there is no rowset being chosen, we should return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS:
if (_input_rowsets.empty()) {
if (last_delete_version.first != -1) {
if (_last_delete_version.first != -1) {
// we meet a delete version, should increase the cumulative point to let base compaction handle the delete version.
// plus 1 to skip the delete version.
// NOTICE: after that, the cumulative point may be larger than max version of this tablet, but it doen't matter.
_tablet->set_cumulative_layer_point(last_delete_version.first + 1);
_tablet->set_cumulative_layer_point(_last_delete_version.first + 1);
return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS;
}

Expand All @@ -149,8 +118,8 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() {
if (cumu_interval > interval_threshold && base_interval > interval_threshold) {
// before increasing cumulative point, we should make sure all rowsets are non-overlapping.
// if at least one rowset is overlapping, we should compact them first.
CHECK(candidate_rowsets.size() == transient_rowsets.size())
<< "tablet: " << _tablet->full_name() << ", "<< candidate_rowsets.size() << " vs. " << transient_rowsets.size();
CHECK(candidate_rowsets.size() == transient_size)
<< "tablet: " << _tablet->full_name() << ", "<< candidate_rowsets.size() << " vs. " << transient_size;
for (auto& rs : candidate_rowsets) {
if (rs->rowset_meta()->is_segments_overlapping()) {
_input_rowsets = candidate_rowsets;
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/cumulative_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
#define DORIS_BE_SRC_OLAP_CUMULATIVE_COMPACTION_H

#include <string>

#include "olap/compaction.h"
#include "olap/cumulative_compaction_policy.h"

namespace doris {

Expand All @@ -46,6 +46,8 @@ class CumulativeCompaction : public Compaction {
private:
int64_t _cumulative_rowset_size_threshold;

Version _last_delete_version { -1, -1 };

DISALLOW_COPY_AND_ASSIGN(CumulativeCompaction);
};

Expand Down
Loading