Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ OLAPStatus CumulativeCompaction::compact() {
_state = CompactionState::SUCCESS;

// 5. set cumulative point
_tablet->cumulative_compaction_policy()->update_cumulative_point(_input_rowsets, _output_rowset,
_tablet->cumulative_compaction_policy()->update_cumulative_point(_tablet.get(), _input_rowsets, _output_rowset,
_last_delete_version);
LOG(INFO) << "after cumulative compaction, current cumulative point is "
<< _tablet->cumulative_layer_point() << ", tablet=" << _tablet->full_name();
Expand All @@ -87,7 +87,8 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() {

size_t compaction_score = 0;
int transient_size = _tablet->cumulative_compaction_policy()->pick_input_rowsets(
candidate_rowsets, config::max_cumulative_compaction_num_singleton_deltas,
_tablet.get(), candidate_rowsets,
config::max_cumulative_compaction_num_singleton_deltas,
config::min_cumulative_compaction_num_singleton_deltas, &_input_rowsets,
&_last_delete_version, &compaction_score);

Expand Down
44 changes: 23 additions & 21 deletions be/src/olap/cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
namespace doris {

SizeBasedCumulativeCompactionPolicy::SizeBasedCumulativeCompactionPolicy(
std::shared_ptr<Tablet> tablet, int64_t size_based_promotion_size, double size_based_promotion_ratio,
int64_t size_based_promotion_size, double size_based_promotion_ratio,
int64_t size_based_promotion_min_size, int64_t size_based_compaction_lower_bound_size)
: CumulativeCompactionPolicy(tablet),
: CumulativeCompactionPolicy(),
_size_based_promotion_size(size_based_promotion_size),
_size_based_promotion_ratio(size_based_promotion_ratio),
_size_based_promotion_min_size(size_based_promotion_min_size),
Expand All @@ -42,7 +42,7 @@ SizeBasedCumulativeCompactionPolicy::SizeBasedCumulativeCompactionPolicy(
}
}

void SizeBasedCumulativeCompactionPolicy::calculate_cumulative_point(
void SizeBasedCumulativeCompactionPolicy::calculate_cumulative_point(Tablet* tablet,
const std::vector<RowsetMetaSharedPtr>& all_metas, int64_t current_cumulative_point,
int64_t* ret_cumulative_point) {
*ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
Expand Down Expand Up @@ -82,7 +82,7 @@ void SizeBasedCumulativeCompactionPolicy::calculate_cumulative_point(
break;
}

bool is_delete = _tablet->version_for_delete_predicate(rs->version());
bool is_delete = tablet->version_for_delete_predicate(rs->version());

// break the loop if segments in this rowset is overlapping, or is a singleton.
if (!is_delete && (rs->is_segments_overlapping() || rs->is_singleton_delta())) {
Expand All @@ -101,7 +101,7 @@ void SizeBasedCumulativeCompactionPolicy::calculate_cumulative_point(
}
VLOG(3) << "cumulative compaction size_based policy, calculate cumulative point value = "
<< *ret_cumulative_point << ", calc promotion size value = " << promotion_size
<< " tablet = " << _tablet->full_name();
<< " tablet = " << tablet->full_name();
}

void SizeBasedCumulativeCompactionPolicy::_calc_promotion_size(RowsetMetaSharedPtr base_rowset_meta,
Expand All @@ -123,19 +123,18 @@ void SizeBasedCumulativeCompactionPolicy::_refresh_tablet_size_based_promotion_s
_tablet_size_based_promotion_size = promotion_size;
}

void SizeBasedCumulativeCompactionPolicy::update_cumulative_point(
void SizeBasedCumulativeCompactionPolicy::update_cumulative_point(Tablet* tablet,
const std::vector<RowsetSharedPtr>& input_rowsets, RowsetSharedPtr output_rowset,
Version& last_delete_version) {

// if rowsets have delete version, move to the last directly
if (last_delete_version.first != -1) {
_tablet->set_cumulative_layer_point(output_rowset->end_version() + 1);
tablet->set_cumulative_layer_point(output_rowset->end_version() + 1);
} else {
// if rowsets have not delete version, check output_rowset total disk size
// satisfies promotion size.
size_t total_size = output_rowset->rowset_meta()->total_disk_size();
if (total_size >= _tablet_size_based_promotion_size) {
_tablet->set_cumulative_layer_point(output_rowset->end_version() + 1);
tablet->set_cumulative_layer_point(output_rowset->end_version() + 1);
}
}
}
Expand Down Expand Up @@ -201,6 +200,7 @@ void SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(
}

int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
Tablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets, const int64_t max_compaction_score,
const int64_t min_compaction_score, std::vector<RowsetSharedPtr>* input_rowsets,
Version* last_delete_version, size_t* compaction_score) {
Expand All @@ -211,7 +211,7 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
for (size_t i = 0; i < candidate_rowsets.size(); ++i) {
RowsetSharedPtr rowset = candidate_rowsets[i];
// check whether this rowset is delete version
if (_tablet->version_for_delete_predicate(rowset->version())) {
if (tablet->version_for_delete_predicate(rowset->version())) {
*last_delete_version = rowset->version();
if (!input_rowsets->empty()) {
// we meet a delete version, and there were other versions before.
Expand Down Expand Up @@ -273,7 +273,7 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
LOG(INFO) << "cumulative compaction size_based policy, compaction_score = " << *compaction_score
<< ", total_size = " << total_size
<< ", calc promotion size value = " << promotion_size
<< ", tablet = " << _tablet->full_name() << ", input_rowset size "
<< ", tablet = " << tablet->full_name() << ", input_rowset size "
<< input_rowsets->size();

// empty return
Expand Down Expand Up @@ -309,15 +309,16 @@ int SizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) {
return 0;
}

void NumBasedCumulativeCompactionPolicy::update_cumulative_point(
void NumBasedCumulativeCompactionPolicy::update_cumulative_point(Tablet* tablet,
const std::vector<RowsetSharedPtr>& input_rowsets, RowsetSharedPtr _output_rowset,
Version& last_delete_version) {
// use the version after end version of the last input rowsets to update cumulative point
int64_t cumulative_point = input_rowsets.back()->end_version() + 1;
_tablet->set_cumulative_layer_point(cumulative_point);
tablet->set_cumulative_layer_point(cumulative_point);
}

int NumBasedCumulativeCompactionPolicy::pick_input_rowsets(
Tablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets, const int64_t max_compaction_score,
const int64_t min_compaction_score, std::vector<RowsetSharedPtr>* input_rowsets,
Version* last_delete_version, size_t* compaction_score) {
Expand All @@ -326,7 +327,7 @@ int NumBasedCumulativeCompactionPolicy::pick_input_rowsets(
for (size_t i = 0; i < candidate_rowsets.size(); ++i) {
RowsetSharedPtr rowset = candidate_rowsets[i];
// check whether this rowset is delete version
if (_tablet->version_for_delete_predicate(rowset->version())) {
if (tablet->version_for_delete_predicate(rowset->version())) {
*last_delete_version = rowset->version();
if (!input_rowsets->empty()) {
// we meet a delete version, and there were other versions before.
Expand Down Expand Up @@ -384,7 +385,8 @@ void NumBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(const
}
}

void NumBasedCumulativeCompactionPolicy::calculate_cumulative_point(const std::vector<RowsetMetaSharedPtr>& all_metas,
void NumBasedCumulativeCompactionPolicy::calculate_cumulative_point(Tablet* tablet,
const std::vector<RowsetMetaSharedPtr>& all_metas,
int64_t current_cumulative_point, int64_t* ret_cumulative_point) {

*ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
Expand Down Expand Up @@ -437,20 +439,19 @@ void CumulativeCompactionPolicy::pick_candicate_rowsets(int64_t skip_window_sec,

}

std::unique_ptr<CumulativeCompactionPolicy> CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(std::string type,
std::shared_ptr<Tablet> tablet) {
std::unique_ptr<CumulativeCompactionPolicy> CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(std::string type) {

CompactionPolicy policy_type;
_parse_cumulative_compaction_policy(type, &policy_type);

if (policy_type == NUM_BASED_POLICY) {
return std::unique_ptr<CumulativeCompactionPolicy>(new NumBasedCumulativeCompactionPolicy(tablet));
return std::unique_ptr<CumulativeCompactionPolicy>(new NumBasedCumulativeCompactionPolicy());
}
else if(policy_type == SIZE_BASED_POLICY) {
return std::unique_ptr<CumulativeCompactionPolicy>(new SizeBasedCumulativeCompactionPolicy(tablet));
return std::unique_ptr<CumulativeCompactionPolicy>(new SizeBasedCumulativeCompactionPolicy());
}

return std::unique_ptr<CumulativeCompactionPolicy>(new NumBasedCumulativeCompactionPolicy(tablet));
return std::unique_ptr<CumulativeCompactionPolicy>(new NumBasedCumulativeCompactionPolicy());
}

void CumulativeCompactionPolicyFactory::_parse_cumulative_compaction_policy(std::string type, CompactionPolicy *policy_type) {
Expand All @@ -462,7 +463,8 @@ void CumulativeCompactionPolicyFactory::_parse_cumulative_compaction_policy(std:
else if (type == CUMULATIVE_SIZE_BASED_POLICY) {
*policy_type = SIZE_BASED_POLICY;
} else {
LOG(FATAL) << "parse cumulative compaction policy error " << type;
LOG(WARNING) << "parse cumulative compaction policy error " << type << ", default use " << CUMULATIVE_NUM_BASED_POLICY;
*policy_type = NUM_BASED_POLICY;
}
}
}
Loading