Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ Status CloudBaseCompaction::modify_rowsets() {
stats.num_rows(), stats.data_size());
}
}
_tablet->prefill_dbm_agg_cache_after_compaction(_output_rowset);
return Status::OK();
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,8 @@ Status CloudCumulativeCompaction::modify_rowsets() {
LOG(INFO) << "delete_expired_stale_rowsets for tablet=" << _tablet->tablet_id();
_engine.tablet_mgr().vacuum_stale_rowsets(CountDownLatch(1));
});

_tablet->prefill_dbm_agg_cache_after_compaction(_output_rowset);
return Status::OK();
}

Expand Down
1 change: 1 addition & 0 deletions be/src/cloud/cloud_full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ Status CloudFullCompaction::modify_rowsets() {
stats.num_rows(), stats.data_size());
}
}
_tablet->prefill_dbm_agg_cache_after_compaction(_output_rowset);
return Status::OK();
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1586,6 +1586,9 @@ DEFINE_mInt32(omp_threads_limit, "8");
// The capacity of segment partial column cache, used to cache column readers for each segment.
DEFINE_mInt32(max_segment_partial_column_cache_size, "100");

DEFINE_mBool(enable_prefill_output_dbm_agg_cache_after_compaction, "true");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the pr desc writes the default value is false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

DEFINE_mBool(enable_prefill_all_dbm_agg_cache_after_compaction, "true");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1641,6 +1641,9 @@ DECLARE_Int32(omp_threads_limit);
// The capacity of segment partial column cache, used to cache column readers for each segment.
DECLARE_mInt32(max_segment_partial_column_cache_size);

DECLARE_mBool(enable_prefill_output_dbm_agg_cache_after_compaction);
DECLARE_mBool(enable_prefill_all_dbm_agg_cache_after_compaction);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
26 changes: 26 additions & 0 deletions be/src/http/action/delete_bitmap_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,32 @@ Status DeleteBitmapAction::_handle_show_ms_delete_bitmap_count(HttpRequest* req,
return Status::OK();
}

Status DeleteBitmapAction::_handle_show_agg_cache_delete_bitmap_count(HttpRequest* req,
std::string* json_result) {
uint64_t tablet_id = 0;
bool verbose = false;
RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, &verbose), "check param failed");
BaseTabletSPtr tablet = nullptr;
if (config::is_cloud_mode()) {
tablet = DORIS_TRY(_engine.to_cloud().tablet_mgr().get_tablet(tablet_id));
DBUG_EXECUTE_IF(
"DeleteBitmapAction._handle_show_local_delete_bitmap_count.vacuum_stale_rowsets",
{ _engine.to_cloud().tablet_mgr().vacuum_stale_rowsets(CountDownLatch(1)); });
} else {
tablet = _engine.to_local().tablet_manager()->get_tablet(tablet_id);
DBUG_EXECUTE_IF(
"DeleteBitmapAction._handle_show_local_delete_bitmap_count.start_delete_unused_"
"rowset",
{ _engine.to_local().start_delete_unused_rowset(); });
}
if (tablet == nullptr) {
return Status::NotFound("Tablet not found. tablet_id={}", tablet_id);
}
auto dbm = tablet->tablet_meta()->delete_bitmap().agg_cache_snapshot();
_show_delete_bitmap(dbm, verbose, json_result);
return Status::OK();
}

void DeleteBitmapAction::handle(HttpRequest* req) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data());
if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_LOCAL) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/http/action/delete_bitmap_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class HttpRequest;

class ExecEnv;

enum class DeleteBitmapActionType { COUNT_LOCAL = 1, COUNT_MS = 2 };
enum class DeleteBitmapActionType { COUNT_LOCAL = 1, COUNT_MS = 2, COUNT_AGG_CACHE = 3 };

/// This action is used for viewing the delete bitmap status
class DeleteBitmapAction : public HttpHandlerWithAuth {
Expand All @@ -47,6 +47,7 @@ class DeleteBitmapAction : public HttpHandlerWithAuth {
private:
Status _handle_show_local_delete_bitmap_count(HttpRequest* req, std::string* json_result);
Status _handle_show_ms_delete_bitmap_count(HttpRequest* req, std::string* json_result);
Status _handle_show_agg_cache_delete_bitmap_count(HttpRequest* req, std::string* json_result);

private:
BaseStorageEngine& _engine;
Expand Down
27 changes: 26 additions & 1 deletion be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest
if (!s.ok() && !s.is<KEY_ALREADY_EXISTS>()) {
return s;
}
if (s.ok() && tablet_delete_bitmap->contains_agg_without_cache(
if (s.ok() && tablet_delete_bitmap->contains_agg_with_cache_if_eligible(
{loc.rowset_id, loc.segment_id, version}, loc.row_id)) {
// if has sequence col, we continue to compare the sequence_id of
// all rowsets, util we find an existing key.
Expand Down Expand Up @@ -2150,4 +2150,29 @@ int32_t BaseTablet::max_version_config() {
return max_version;
}

void BaseTablet::prefill_dbm_agg_cache(const RowsetSharedPtr& rowset, int64_t version) {
for (std::size_t i = 0; i < rowset->num_segments(); i++) {
tablet_meta()->delete_bitmap().get_agg({rowset->rowset_id(), i, version});
}
}

void BaseTablet::prefill_dbm_agg_cache_after_compaction(const RowsetSharedPtr& output_rowset) {
if (keys_type() == KeysType::UNIQUE_KEYS && enable_unique_key_merge_on_write() &&
(config::enable_prefill_output_dbm_agg_cache_after_compaction ||
config::enable_prefill_all_dbm_agg_cache_after_compaction)) {
int64_t cur_max_version {-1};
{
std::shared_lock rlock(get_header_lock());
cur_max_version = max_version_unlocked();
}
if (config::enable_prefill_all_dbm_agg_cache_after_compaction) {
traverse_rowsets(
[&](const RowsetSharedPtr& rs) { prefill_dbm_agg_cache(rs, cur_max_version); },
false);
} else if (config::enable_prefill_output_dbm_agg_cache_after_compaction) {
prefill_dbm_agg_cache(output_rowset, cur_max_version);
}
}
}

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
return Status::OK();
}

void prefill_dbm_agg_cache(const RowsetSharedPtr& rowset, int64_t version);
void prefill_dbm_agg_cache_after_compaction(const RowsetSharedPtr& output_rowset);

protected:
// Find the missed versions until the spec_version.
//
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,7 @@ Status CompactionMixin::modify_rowsets() {
}
DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowset",
{ tablet()->delete_expired_stale_rowset(); });
_tablet->prefill_dbm_agg_cache_after_compaction(_output_rowset);
return Status::OK();
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ Status FullCompaction::modify_rowsets() {
DBUG_EXECUTE_IF("FullCompaction.modify_rowsets.sleep", { sleep(5); })
tablet()->save_meta();
}

_tablet->prefill_dbm_agg_cache_after_compaction(_output_rowset);
return Status::OK();
}

Expand Down
16 changes: 16 additions & 0 deletions be/src/olap/lru_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,16 @@ PrunedInfo LRUCache::prune_if(CachePrunePredicate pred, bool lazy_mode) {
return {pruned_count, pruned_size};
}

void LRUCache::for_each_entry(const std::function<void(const LRUHandle*)>& visitor) {
std::lock_guard l(_mutex);
for (LRUHandle* p = _lru_normal.next; p != &_lru_normal; p = p->next) {
visitor(p);
}
for (LRUHandle* p = _lru_durable.next; p != &_lru_durable; p = p->next) {
visitor(p);
}
}

void LRUCache::set_cache_value_time_extractor(CacheValueTimeExtractor cache_value_time_extractor) {
_cache_value_time_extractor = cache_value_time_extractor;
}
Expand Down Expand Up @@ -777,6 +787,12 @@ PrunedInfo ShardedLRUCache::prune_if(CachePrunePredicate pred, bool lazy_mode) {
return pruned_info;
}

void ShardedLRUCache::for_each_entry(const std::function<void(const LRUHandle*)>& visitor) {
for (int s = 0; s < _num_shards; s++) {
_shards[s]->for_each_entry(visitor);
}
}

int64_t ShardedLRUCache::get_usage() {
size_t total_usage = 0;
for (int i = 0; i < _num_shards; i++) {
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/lru_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ class Cache {
// may hold lock for a long time to execute predicate.
virtual PrunedInfo prune_if(CachePrunePredicate pred, bool lazy_mode = false) { return {0, 0}; }

virtual void for_each_entry(const std::function<void(const LRUHandle*)>& visitor) = 0;

virtual int64_t get_usage() = 0;

virtual PrunedInfo set_capacity(size_t capacity) = 0;
Expand Down Expand Up @@ -333,6 +335,7 @@ class LRUCache {
void erase(const CacheKey& key, uint32_t hash);
PrunedInfo prune();
PrunedInfo prune_if(CachePrunePredicate pred, bool lazy_mode = false);
void for_each_entry(const std::function<void(const LRUHandle*)>& visitor);

void set_cache_value_time_extractor(CacheValueTimeExtractor cache_value_time_extractor);
void set_cache_value_check_timestamp(bool cache_value_check_timestamp);
Expand Down Expand Up @@ -406,6 +409,7 @@ class ShardedLRUCache : public Cache {
uint64_t new_id() override;
PrunedInfo prune() override;
PrunedInfo prune_if(CachePrunePredicate pred, bool lazy_mode = false) override;
void for_each_entry(const std::function<void(const LRUHandle*)>& visitor) override;
int64_t get_usage() override;
size_t get_element_count() override;
PrunedInfo set_capacity(size_t capacity) override;
Expand Down Expand Up @@ -471,6 +475,7 @@ class DummyLRUCache : public Cache {
PrunedInfo prune_if(CachePrunePredicate pred, bool lazy_mode = false) override {
return {0, 0};
};
void for_each_entry(const std::function<void(const LRUHandle*)>& visitor) override {}
int64_t get_usage() override { return 0; };
PrunedInfo set_capacity(size_t capacity) override { return {0, 0}; };
size_t get_capacity() override { return 0; };
Expand Down
Loading
Loading