Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 39 additions & 4 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1657,11 +1657,46 @@ void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& r
.tag("tablet_id", drop_tablet_req.tablet_id);
return;
});
// 1. erase lru from tablet mgr
// TODO(dx) clean tablet file cache
// get tablet's info(such as cachekey, tablet id, rsid)
MonotonicStopWatch watch;
watch.start();
auto weak_tablets = engine.tablet_mgr().get_weak_tablets();
std::ostringstream rowset_ids_stream;
bool found = false;
for (auto& weak_tablet : weak_tablets) {
auto tablet = weak_tablet.lock();
if (tablet == nullptr) {
continue;
}
if (tablet->tablet_id() != drop_tablet_req.tablet_id) {
continue;
}
found = true;
auto clean_rowsets = tablet->get_snapshot_rowset(true);
// Get first 10 rowset IDs as comma-separated string, just for log
int count = 0;
for (const auto& rowset : clean_rowsets) {
if (count >= 10) break;
if (count > 0) {
rowset_ids_stream << ",";
}
rowset_ids_stream << rowset->rowset_id().to_string();
count++;
}

CloudTablet::recycle_cached_data(std::move(clean_rowsets));
break;
}

if (!found) {
LOG(WARNING) << "tablet not found when dropping tablet_id=" << drop_tablet_req.tablet_id
<< ", cost " << static_cast<int64_t>(watch.elapsed_time()) / 1e9 << "(s)";
return;
}

engine.tablet_mgr().erase_tablet(drop_tablet_req.tablet_id);
// 2. gen clean file cache task
LOG(INFO) << "drop cloud tablet_id=" << drop_tablet_req.tablet_id
<< " and clean file cache first 10 rowsets {" << rowset_ids_stream.str() << "}, cost "
<< static_cast<int64_t>(watch.elapsed_time()) / 1e9 << "(s)";
return;
}

Expand Down
6 changes: 5 additions & 1 deletion be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,8 @@ void CloudTablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,

uint64_t CloudTablet::delete_expired_stale_rowsets() {
std::vector<RowsetSharedPtr> expired_rowsets;
// ATTN: trick, Use stale_rowsets to temporarily increase the reference count of the rowset shared pointer in _stale_rs_version_map so that in the recycle_cached_data function, it checks if the reference count is 2.
std::vector<RowsetSharedPtr> stale_rowsets;
int64_t expired_stale_sweep_endtime =
::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec;
std::vector<std::string> version_to_delete;
Expand All @@ -409,6 +411,7 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
auto rs_it = _stale_rs_version_map.find(v_ts->version());
if (rs_it != _stale_rs_version_map.end()) {
expired_rowsets.push_back(rs_it->second);
stale_rowsets.push_back(rs_it->second);
LOG(INFO) << "erase stale rowset, tablet_id=" << tablet_id()
<< " rowset_id=" << rs_it->second->rowset_id().to_string()
<< " version=" << rs_it->first.to_string();
Expand Down Expand Up @@ -456,7 +459,8 @@ void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>& rowset

if (config::enable_file_cache) {
for (const auto& rs : rowsets) {
if (rs.use_count() >= 1) {
// rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2.
if (rs.use_count() > 2) {
LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has "
<< rs.use_count()
<< " references. File Cache won't be recycled when query is using it.";
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,12 @@ class CloudTablet final : public BaseTablet {

void build_tablet_report_info(TTabletInfo* tablet_info);

static void recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets);

private:
// FIXME(plat1ko): No need to record base size if rowsets are ordered by version
void update_base_size(const Rowset& rs);

static void recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets);

Status sync_if_not_running();

CloudStorageEngine& _engine;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import org.apache.doris.regression.action.HttpCliAction
import org.apache.doris.regression.util.DataUtils
import org.apache.doris.regression.util.JdbcUtils
import org.apache.doris.regression.util.Hdfs
import org.apache.doris.regression.util.Http
import org.apache.doris.regression.util.SuiteUtils
import org.apache.doris.regression.util.DebugPoint
import org.apache.doris.regression.RunMode
Expand Down Expand Up @@ -2766,4 +2767,94 @@ class Suite implements GroovyInterceptable {
assertEquals(re_fe, re_be)
assertEquals(re_fe, re_no_fold)
}

def backendIdToHost = { ->
def spb = sql_return_maparray """SHOW BACKENDS"""
def beIdToHost = [:]
spb.each {
beIdToHost[it.BackendId] = it.Host
}
beIdToHost
}

def getTabletAndBeHostFromBe = { bes ->
def ret = [:]
bes.each { be ->
// {"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3}
def data = Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true).data
def tablets = data.tablets.collect { it.tablet_id as String }
tablets.each{
ret[it] = data.host
}
}
ret
}

def getTabletAndBeHostFromFe = { table ->
def result = sql_return_maparray """SHOW TABLETS FROM $table"""
def bes = backendIdToHost.call()
// tablet : [backendId, host]
def ret = [:]
result.each {
ret[it.TabletId] = [it.BackendId, bes[it.BackendId]]
}
ret
}

// get rowset_id segment_id from ms
// curl '175.40.101.1:5000/MetaService/http/get_value?token=greedisgood9999&unicode&key_type=MetaRowsetKey&instance_id=default_instance_id&tablet_id=27700&version=2'
def getSegmentFilesFromMs = { msHttpPort, tabletId, version, check_func ->
httpTest {
endpoint msHttpPort
op "get"
uri "/MetaService/http/get_value?token=greedisgood9999&unicode&key_type=MetaRowsetKey&instance_id=default_instance_id&tablet_id=${tabletId}&version=${version}"
check check_func
}
}

def getRowsetFileCacheDirFromBe = { beHttpPort, msHttpPort, tabletId, version ->
def hashValues = []
def segmentFiles = []
getSegmentFilesFromMs(msHttpPort, tabletId, version) {
respCode, body ->
def json = parseJson(body)
logger.info("get tablet {} version {} from ms, response {}", tabletId, version, json)
// {"rowset_id":"0","partition_id":"27695","tablet_id":"27700","txn_id":"7057526525952","tablet_schema_hash":0,"rowset_type":"BETA_ROWSET","rowset_state":"COMMITTED","start_version":"3","end_version":"3","version_hash":"0","num_rows":"1","total_disk_size":"895","data_disk_size":"895","index_disk_size":"0","empty":false,"load_id":{"hi":"-1646598626735601581","lo":"-6677682539881484579"},"delete_flag":false,"creation_time":"1736153402","num_segments":"1","rowset_id_v2":"0200000000000004694889e84c76391cfd52ec7db0a483ba","resource_id":"1","newest_write_timestamp":"1736153402","segments_key_bounds":[{"min_key":"AoAAAAAAAAAC","max_key":"AoAAAAAAAAAC"}],"txn_expiration":"1736167802","segments_overlap_pb":"NONOVERLAPPING","compaction_level":"0","segments_file_size":["895"],"index_id":"27697","schema_version":0,"enable_segments_file_size":true,"has_variant_type_in_schema":false,"enable_inverted_index_file_info":false}
def segmentNum = json.num_segments as int
def rowsetId = json.rowset_id_v2 as String
segmentFiles = (0..<segmentNum).collect { i -> "${rowsetId}_${i}.dat" }
}

segmentFiles.each {
// curl '175.40.51.3:8040/api/file_cache?op=hash&value=0200000000000004694889e84c76391cfd52ec7db0a483ba_0.dat'
def data = Http.GET("http://${beHttpPort}/api/file_cache?op=hash&value=${it}", true)
// {"hash":"2b79c649a1766dad371054ee168f0574"}
logger.info("get tablet {} segmentFile {}, response {}", tabletId, it, data)
hashValues << data.hash
}
hashValues
}

// get table's tablet file cache
def getTabletFileCacheDirFromBe = { msHttpPort, table, version ->
// beHost HashFile
def beHostToHashFile = [:]

def getTabletsAndHostFromFe = getTabletAndBeHostFromFe(table)
getTabletsAndHostFromFe.each {
def beHost = it.Value[1]
def tabletId = it.Key
def hashRet = getRowsetFileCacheDirFromBe(beHost + ":8040", msHttpPort, tabletId, version)
hashRet.each {
def hashFile = it
if (beHostToHashFile.containsKey(beHost)) {
beHostToHashFile[beHost].add(hashFile)
} else {
beHostToHashFile[beHost] = [hashFile]
}
}
}
beHostToHashFile
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class Http {
conn.setRequestProperty('Authorization', 'Basic cm9vdDo=') //token for root
def code = conn.responseCode
def text = conn.content.text
logger.info("http post url=${url}, isJson=${isJson}, response code=${code}, text=${text}")
logger.info("http get url=${url}, isJson=${isJson}, response code=${code}, text=${text}")
Assert.assertEquals(200, code)
if (isJson) {
def json = new JsonSlurper()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.apache.doris.regression.suite.ClusterOptions
import org.apache.doris.regression.util.Http

suite('test_clean_stale_rs_file_cache', 'docker') {
if (!isCloudMode()) {
return;
}
def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
'cloud_tablet_rebalancer_interval_second=1',
'sys_log_verbose_modules=org',
'heartbeat_interval_second=1'
]
options.beConfigs += [
'report_tablet_interval_seconds=1',
'cumulative_compaction_min_deltas=5',
'tablet_rowset_stale_sweep_by_size=false',
'tablet_rowset_stale_sweep_time_sec=60',
'vacuum_stale_rowsets_interval_s=10'
]
options.setFeNum(1)
options.setBeNum(1)
options.cloudMode = true

def table = "test_clean_stale_rs_file_cache"

docker(options) {
def ms = cluster.getAllMetaservices().get(0)
def msHttpPort = ms.host + ":" + ms.httpPort
sql """CREATE TABLE $table (
`k1` int(11) NULL,
`k2` int(11) NULL,
`v1` varchar(2048)
)
DUPLICATE KEY(`k1`, `k2`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
PROPERTIES (
"replication_num"="1"
);
"""
// version 2
sql """
insert into $table values (1, 1, 'v1'), (2, 2, 'v2'), (3, 3, 'v3')
"""
def cacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2)
// version 3
sql """
insert into $table values (10, 1, 'v1'), (20, 2, 'v2'), (30, 3, 'v3')
"""
def cacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 2)
// version 4
sql """
insert into $table values (100, 1, 'v1'), (200, 2, 'v2'), (300, 3, 'v3')
"""
// version 5
sql """
insert into $table values (1000, 1, 'v1'), (2000, 2, 'v2'), (3000, 3, 'v3')
"""
// version 6
sql """
insert into $table values (10000, 1, 'v1'), (20000, 2, 'v2'), (30000, 3, 'v3')
"""

def mergedCacheDir = cacheDirVersion2 + cacheDirVersion3.collectEntries { host, hashFiles ->
[(host): cacheDirVersion2[host] ? (cacheDirVersion2[host] + hashFiles) : hashFiles]
}
for (int i = 0; i < 5; i++) {
sql """
select count(*) from $table
"""
}
def beforeGetFromFe = getTabletAndBeHostFromFe(table)
logger.info("fe tablets {}, cache dir {}", beforeGetFromFe , mergedCacheDir)
// wait compaction finish, and vacuum_stale_rowsets work
sleep(80 * 1000)

// check cache file has been deleted
beforeGetFromFe.each {
def tabletId = it.Key
def backendId = it.Value[0]
def backendHost = it.Value[1]
def be = cluster.getBeByBackendId(backendId.toLong())
def dataPath = new File("${be.path}/storage/file_cache")
def subDirs = []

def collectDirs
collectDirs = { File dir ->
if (dir.exists()) {
dir.eachDir { subDir ->
subDirs << subDir.name
collectDirs(subDir)
}
}
}


collectDirs(dataPath)
logger.info("BE {} file_cache subdirs: {}", backendHost, subDirs)
def cacheDir = mergedCacheDir[backendHost]

// add check
cacheDir.each { hashFile ->
assertFalse(subDirs.any { subDir -> subDir.startsWith(hashFile) },
"Found unexpected cache file pattern ${hashFile} in BE ${backendHost}'s file_cache directory. " +
"Matching subdir found in: ${subDirs}")
}
}

}
}
Loading
Loading