Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -903,11 +903,15 @@ void StorageEngine::start_delete_unused_rowset() {
{
std::lock_guard<std::mutex> lock(_gc_mutex);
for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) {
if (it->second.use_count() == 1 && it->second->need_delete_file()) {
uint64_t now = UnixSeconds();
if (it->second.use_count() == 1 && it->second->need_delete_file() &&
// We delay the GC time of this rowset since it's maybe still needed, see #20732
now > it->second->delayed_expired_timestamp()) {
if (it->second->is_local()) {
unused_rowsets_copy[it->first] = it->second;
}
// remote rowset data will be reclaimed by `remove_unused_remote_files`
evict_querying_rowset(it->second->rowset_id());
it = _unused_rowsets.erase(it);
} else {
++it;
Expand Down Expand Up @@ -1169,4 +1173,23 @@ Status StorageEngine::get_compaction_status_json(std::string* result) {
return Status::OK();
}

void StorageEngine::add_quering_rowset(RowsetSharedPtr rs) {
std::lock_guard<std::mutex> lock(_quering_rowsets_mutex);
_querying_rowsets.emplace(rs->rowset_id(), rs);
}

RowsetSharedPtr StorageEngine::get_quering_rowset(RowsetId rs_id) {
std::lock_guard<std::mutex> lock(_quering_rowsets_mutex);
auto it = _querying_rowsets.find(rs_id);
if (it != _querying_rowsets.end()) {
return it->second;
}
return nullptr;
}

void StorageEngine::evict_querying_rowset(RowsetId rs_id) {
std::lock_guard<std::mutex> lock(_quering_rowsets_mutex);
_querying_rowsets.erase(rs_id);
}

} // namespace doris
10 changes: 10 additions & 0 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ class StorageEngine {
int64_t transaction_id, bool is_recover);
int64_t get_pending_publish_min_version(int64_t tablet_id);

void add_quering_rowset(RowsetSharedPtr rs);

RowsetSharedPtr get_quering_rowset(RowsetId rs_id);

void evict_querying_rowset(RowsetId rs_id);

private:
// Instance should be inited from `static open()`
// MUST NOT be called in other circumstances.
Expand Down Expand Up @@ -372,6 +378,10 @@ class StorageEngine {
// map<rowset_id(str), RowsetSharedPtr>, if we use RowsetId as the key, we need custom hash func
std::unordered_map<std::string, RowsetSharedPtr> _unused_rowsets;

// Hold reference of quering rowsets
std::mutex _quering_rowsets_mutex;
std::unordered_map<RowsetId, RowsetSharedPtr, HashOfRowsetId> _querying_rowsets;

// Count the memory consumption of segment compaction tasks.
std::shared_ptr<MemTracker> _segcompaction_mem_tracker;
// This mem tracker is only for tracking memory use by segment meta data such as footer or index page.
Expand Down
5 changes: 0 additions & 5 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -781,11 +781,6 @@ void Tablet::delete_expired_stale_rowset() {
for (auto& timestampedVersion : to_delete_version) {
auto it = _stale_rs_version_map.find(timestampedVersion->version());
if (it != _stale_rs_version_map.end()) {
uint64_t now = UnixSeconds();
if (now <= it->second->delayed_expired_timestamp()) {
// Some rowsets gc time was delayed, ignore
continue;
}
// delete rowset
StorageEngine::instance()->add_unused_rowset(it->second);
_stale_rs_version_map.erase(it);
Expand Down
5 changes: 3 additions & 2 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1540,8 +1540,9 @@ Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request,
if (!tablet) {
continue;
}
BetaRowsetSharedPtr rowset =
std::static_pointer_cast<BetaRowset>(tablet->get_rowset(rowset_id));
// We ensured it's rowset is not released when init Tablet reader param, rowset->update_delayed_expired_timestamp();
BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(
StorageEngine::instance()->get_quering_rowset(rowset_id));
if (!rowset) {
LOG(INFO) << "no such rowset " << rowset_id;
continue;
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/new_olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ Status NewOlapScanner::_init_tablet_reader_params(
UnixSeconds() + _tablet_reader_params.runtime_state->execution_timeout() +
delayed_s;
rs_reader->rowset()->update_delayed_expired_timestamp(delayed_expired_timestamp);
StorageEngine::instance()->add_quering_rowset(rs_reader->rowset());
}
}

Expand Down