diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index cc37ced5b5feb4..f573b59f6027ad 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -903,11 +903,15 @@ void StorageEngine::start_delete_unused_rowset() { { std::lock_guard lock(_gc_mutex); for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) { - if (it->second.use_count() == 1 && it->second->need_delete_file()) { + uint64_t now = UnixSeconds(); + if (it->second.use_count() == 1 && it->second->need_delete_file() && + // We delay the GC time of this rowset since it's maybe still needed, see #20732 + now > it->second->delayed_expired_timestamp()) { if (it->second->is_local()) { unused_rowsets_copy[it->first] = it->second; } // remote rowset data will be reclaimed by `remove_unused_remote_files` + evict_querying_rowset(it->second->rowset_id()); it = _unused_rowsets.erase(it); } else { ++it; @@ -1169,4 +1173,23 @@ Status StorageEngine::get_compaction_status_json(std::string* result) { return Status::OK(); } +void StorageEngine::add_quering_rowset(RowsetSharedPtr rs) { + std::lock_guard lock(_quering_rowsets_mutex); + _querying_rowsets.emplace(rs->rowset_id(), rs); +} + +RowsetSharedPtr StorageEngine::get_quering_rowset(RowsetId rs_id) { + std::lock_guard lock(_quering_rowsets_mutex); + auto it = _querying_rowsets.find(rs_id); + if (it != _querying_rowsets.end()) { + return it->second; + } + return nullptr; +} + +void StorageEngine::evict_querying_rowset(RowsetId rs_id) { + std::lock_guard lock(_quering_rowsets_mutex); + _querying_rowsets.erase(rs_id); +} + } // namespace doris diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index d6215586eda9df..2bd01ba2cb29a2 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -224,6 +224,12 @@ class StorageEngine { int64_t transaction_id, bool is_recover); int64_t get_pending_publish_min_version(int64_t tablet_id); + void add_quering_rowset(RowsetSharedPtr rs); + + RowsetSharedPtr get_quering_rowset(RowsetId rs_id); + + void evict_querying_rowset(RowsetId rs_id); + private: // Instance should be inited from `static open()` // MUST NOT be called in other circumstances. @@ -372,6 +378,10 @@ class StorageEngine { // map, if we use RowsetId as the key, we need custom hash func std::unordered_map _unused_rowsets; + // Hold reference of quering rowsets + std::mutex _quering_rowsets_mutex; + std::unordered_map _querying_rowsets; + // Count the memory consumption of segment compaction tasks. std::shared_ptr _segcompaction_mem_tracker; // This mem tracker is only for tracking memory use by segment meta data such as footer or index page. diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index eea7c666328c34..efaaf2fffe8923 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -781,11 +781,6 @@ void Tablet::delete_expired_stale_rowset() { for (auto& timestampedVersion : to_delete_version) { auto it = _stale_rs_version_map.find(timestampedVersion->version()); if (it != _stale_rs_version_map.end()) { - uint64_t now = UnixSeconds(); - if (now <= it->second->delayed_expired_timestamp()) { - // Some rowsets gc time was delayed, ignore - continue; - } // delete rowset StorageEngine::instance()->add_unused_rowset(it->second); _stale_rs_version_map.erase(it); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 05d9f16cfe9daa..745de1bb71abf5 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1540,8 +1540,9 @@ Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request, if (!tablet) { continue; } - BetaRowsetSharedPtr rowset = - std::static_pointer_cast(tablet->get_rowset(rowset_id)); + // We ensured it's rowset is not released when init Tablet reader param, rowset->update_delayed_expired_timestamp(); + BetaRowsetSharedPtr rowset = std::static_pointer_cast( + StorageEngine::instance()->get_quering_rowset(rowset_id)); if (!rowset) { LOG(INFO) << "no such rowset " << rowset_id; continue; diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 0b631b143e58f9..b945d42f6c798d 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -413,6 +413,7 @@ Status NewOlapScanner::_init_tablet_reader_params( UnixSeconds() + _tablet_reader_params.runtime_state->execution_timeout() + delayed_s; rs_reader->rowset()->update_delayed_expired_timestamp(delayed_expired_timestamp); + StorageEngine::instance()->add_quering_rowset(rs_reader->rowset()); } }