Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions be/src/olap/base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,6 @@ BaseCompaction::BaseCompaction(TabletSharedPtr tablet, const std::string& label,

BaseCompaction::~BaseCompaction() {}

OLAPStatus BaseCompaction::compact() {
RETURN_NOT_OK(prepare_compact());
RETURN_NOT_OK(execute_compact());

return OLAP_SUCCESS;
}

OLAPStatus BaseCompaction::prepare_compact() {
if (!_tablet->init_succeeded()) {
return OLAP_ERR_INPUT_PARAMETER_ERROR;
Expand All @@ -56,7 +49,7 @@ OLAPStatus BaseCompaction::prepare_compact() {
return OLAP_SUCCESS;
}

OLAPStatus BaseCompaction::execute_compact() {
OLAPStatus BaseCompaction::execute_compact_impl() {
MutexLock lock(_tablet->get_base_lock(), TRY_LOCK);
if (!lock.own_lock()) {
LOG(WARNING) << "another base compaction is running. tablet=" << _tablet->full_name();
Expand Down
4 changes: 1 addition & 3 deletions be/src/olap/base_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@ class BaseCompaction : public Compaction {
const std::shared_ptr<MemTracker>& parent_tracker);
~BaseCompaction() override;

OLAPStatus compact() override;

OLAPStatus prepare_compact() override;
OLAPStatus execute_compact() override;
OLAPStatus execute_compact_impl() override;

std::vector<RowsetSharedPtr> get_input_rowsets() { return _input_rowsets; }

Expand Down
27 changes: 17 additions & 10 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ Compaction::Compaction(TabletSharedPtr tablet, const std::string& label,

Compaction::~Compaction() {}

OLAPStatus Compaction::compact() {
RETURN_NOT_OK(prepare_compact());
RETURN_NOT_OK(execute_compact());
return OLAP_SUCCESS;
}

OLAPStatus Compaction::execute_compact() {
OLAPStatus st = execute_compact_impl();
if (st != OLAP_SUCCESS) {
gc_output_rowset();
}
return st;
}

OLAPStatus Compaction::do_compaction(int64_t permits) {
TRACE("start to do compaction");
_tablet->data_dir()->disks_compaction_score_increment(permits);
Expand Down Expand Up @@ -165,17 +179,10 @@ void Compaction::modify_rowsets() {
_tablet->save_meta();
}

OLAPStatus Compaction::gc_unused_rowsets() {
StorageEngine* storage_engine = StorageEngine::instance();
if (_state != CompactionState::SUCCESS) {
storage_engine->add_unused_rowset(_output_rowset);
return OLAP_SUCCESS;
void Compaction::gc_output_rowset() {
if (_state != CompactionState::SUCCESS && _output_rowset != nullptr) {
StorageEngine::instance()->add_unused_rowset(_output_rowset);
}
for (auto& rowset : _input_rowsets) {
storage_engine->add_unused_rowset(rowset);
}
_input_rowsets.clear();
return OLAP_SUCCESS;
}

// Find the longest consecutive version path in "rowset", from begining.
Expand Down
10 changes: 6 additions & 4 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,19 @@ class Merger;
// 1. pick rowsets satisfied to compact
// 2. do compaction
// 3. modify rowsets
// 4. gc unused rowsets
// 4. gc output rowset if failed
class Compaction {
public:
Compaction(TabletSharedPtr tablet, const std::string& label,
const std::shared_ptr<MemTracker>& parent_tracker);
virtual ~Compaction();

virtual OLAPStatus compact() = 0;
// This is only for http CompactionAction
OLAPStatus compact();

virtual OLAPStatus prepare_compact() = 0;
virtual OLAPStatus execute_compact() = 0;
OLAPStatus execute_compact();
virtual OLAPStatus execute_compact_impl() = 0;

protected:
virtual OLAPStatus pick_rowsets_to_compact() = 0;
Expand All @@ -62,7 +64,7 @@ class Compaction {
OLAPStatus do_compaction_impl(int64_t permits);

void modify_rowsets();
OLAPStatus gc_unused_rowsets();
void gc_output_rowset();

OLAPStatus construct_output_rowset_writer();
OLAPStatus construct_input_rowset_readers();
Expand Down
9 changes: 1 addition & 8 deletions be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,6 @@ CumulativeCompaction::CumulativeCompaction(TabletSharedPtr tablet, const std::st

CumulativeCompaction::~CumulativeCompaction() {}

OLAPStatus CumulativeCompaction::compact() {
RETURN_NOT_OK(prepare_compact());
RETURN_NOT_OK(execute_compact());

return OLAP_SUCCESS;
}

OLAPStatus CumulativeCompaction::prepare_compact() {
if (!_tablet->init_succeeded()) {
return OLAP_ERR_CUMULATIVE_INVALID_PARAMETERS;
Expand Down Expand Up @@ -64,7 +57,7 @@ OLAPStatus CumulativeCompaction::prepare_compact() {
return OLAP_SUCCESS;
}

OLAPStatus CumulativeCompaction::execute_compact() {
OLAPStatus CumulativeCompaction::execute_compact_impl() {
MutexLock lock(_tablet->get_cumulative_lock(), TRY_LOCK);
if (!lock.own_lock()) {
LOG(INFO) << "The tablet is under cumulative compaction. tablet=" << _tablet->full_name();
Expand Down
4 changes: 1 addition & 3 deletions be/src/olap/cumulative_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ class CumulativeCompaction : public Compaction {
const std::shared_ptr<MemTracker>& parent_tracker);
~CumulativeCompaction() override;

OLAPStatus compact() override;

OLAPStatus prepare_compact() override;
OLAPStatus execute_compact() override;
OLAPStatus execute_compact_impl() override;

std::vector<RowsetSharedPtr> get_input_rowsets() { return _input_rowsets; }

Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,9 @@ struct OlapReaderStatistics {
int64_t rows_bf_filtered = 0;
// Including the number of rows filtered out according to the Delete information in the Tablet,
// and the number of rows filtered for marked deleted rows under the unique key model.
// This metric is mainly used to record the number of rows filtered by the delete condition in Segment V1,
// and it is also used to record the replaced rows in the Unique key model in the "Reader" class.
// In segmentv2, if you want to get all filtered rows, you need the sum of "rows_del_filtered" and "rows_conditions_filtered".
int64_t rows_del_filtered = 0;
// the number of rows filtered by various column indexes.
int64_t rows_conditions_filtered = 0;
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ class Reader {

uint64_t merged_rows() const { return _merged_rows; }

uint64_t filtered_rows() const { return _stats.rows_del_filtered; }
uint64_t filtered_rows() const {
return _stats.rows_del_filtered + _stats.rows_conditions_filtered;
}

const OlapReaderStatistics& stats() const { return _stats; }
OlapReaderStatistics* mutable_stats() { return &_stats; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void test() throws Exception {
"\"symbol\" = \"_ZN9doris_udf6AddUdfEPNS_15FunctionContextERKNS_9StringValE\",\n" +
"\"prepare_fn\" = \"_ZN9doris_udf13AddUdfPrepareEPNS_15FunctionContextENS0_18FunctionStateScopeE\",\n" +
"\"close_fn\" = \"_ZN9doris_udf11AddUdfCloseEPNS_15FunctionContextENS0_18FunctionStateScopeE\",\n" +
"\"object_file\" = \"http://nmg01-inf-dorishb00.nmg01.baidu.com:8456/libcmy_udf.so\"\n" +
"\"object_file\" = \"http://127.0.0.1:8008/libcmy_udf.so\"\n" +
");";

CreateFunctionStmt createFunctionStmt = (CreateFunctionStmt) UtFrameUtils.parseAndAnalyzeStmt(createFuncStr, ctx);
Expand Down