diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index 83c30955b9dd2d..8529fb0ed92572 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -280,8 +280,8 @@ bool CloudStorageEngine::stopped() { Result CloudStorageEngine::get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats, - bool force_use_cache) { - return _tablet_mgr->get_tablet(tablet_id, false, true, sync_stats, force_use_cache) + bool force_use_only_cached) { + return _tablet_mgr->get_tablet(tablet_id, false, true, sync_stats, force_use_only_cached) .transform([](auto&& t) { return static_pointer_cast(std::move(t)); }); } diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index fba9165c0f44c4..0b61fe2076200d 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -63,7 +63,7 @@ class CloudStorageEngine final : public BaseStorageEngine { bool stopped() override; Result get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats = nullptr, - bool force_use_cache = false) override; + bool force_use_only_cached = false) override; Status start_bg_threads(std::shared_ptr wg_sptr = nullptr) override; diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index 03c882e0352731..a3996d4e61eccd 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -160,7 +160,7 @@ void set_tablet_access_time_ms(CloudTablet* tablet) { Result> CloudTabletMgr::get_tablet(int64_t tablet_id, bool warmup_data, bool sync_delete_bitmap, SyncRowsetStats* sync_stats, - bool local_only) { + bool force_use_only_cached) { // LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr` class Value : public LRUCacheValueBase { public: @@ -179,13 +179,14 @@ Result> CloudTabletMgr::get_tablet(int64_t tablet_i auto* handle = _cache->lookup(key); if (handle == nullptr) { - if (local_only) { + if (force_use_only_cached) { LOG(INFO) << "tablet=" << tablet_id - << "does not exists in local tablet cache, because param local_only=true, " + << "does not exists in local tablet cache, because param " + "force_use_only_cached=true, " "treat it as an error"; return ResultError(Status::InternalError( "tablet={} does not exists in local tablet cache, because param " - "local_only=true, " + "force_use_only_cached=true, " "treat it as an error", tablet_id)); } diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 83dabfc0a612c0..271742c49f889d 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -1351,7 +1351,7 @@ Status StorageEngine::create_tablet(const TCreateTabletReq& request, RuntimeProf } Result StorageEngine::get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats, - bool force_use_cache) { + bool force_use_only_cached) { BaseTabletSPtr tablet; std::string err; tablet = _tablet_manager->get_tablet(tablet_id, true, &err); diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index b55b2f1863bc70..b973cb4830a254 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -114,7 +114,7 @@ class BaseStorageEngine { virtual Result get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats = nullptr, - bool force_use_cache = false) = 0; + bool force_use_only_cached = false) = 0; void register_report_listener(ReportWorker* listener); void deregister_report_listener(ReportWorker* listener); @@ -238,7 +238,7 @@ class StorageEngine final : public BaseStorageEngine { Status create_tablet(const TCreateTabletReq& request, RuntimeProfile* profile); Result get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats = nullptr, - bool force_use_cache = false) override; + bool force_use_only_cached = false) override; void clear_transaction_task(const TTransactionId transaction_id); void clear_transaction_task(const TTransactionId transaction_id, diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index a31a5b00d0b426..8c13157d03bcc7 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -53,10 +53,10 @@ void ExecEnv::set_write_cooldown_meta_executors() { #endif // BE_TEST Result ExecEnv::get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats, - bool force_use_cache) { + bool force_use_only_cached) { auto storage_engine = GetInstance()->_storage_engine.get(); return storage_engine != nullptr - ? storage_engine->get_tablet(tablet_id, sync_stats) + ? storage_engine->get_tablet(tablet_id, sync_stats, force_use_only_cached) : ResultError(Status::InternalError("failed to get tablet {}", tablet_id)); } diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index a7a57a86d3253f..7ce356520b7dbb 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -166,7 +166,7 @@ class ExecEnv { // Requires ExenEnv ready static Result get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats = nullptr, - bool force_use_cache = false); + bool force_use_only_cached = false); static bool ready() { return _s_ready.load(std::memory_order_acquire); } static bool tracking_memory() { return _s_tracking_memory.load(std::memory_order_acquire); } diff --git a/regression-test/suites/cloud_p0/cache/test_topn_broadcast.groovy b/regression-test/suites/cloud_p0/cache/test_topn_broadcast.groovy new file mode 100644 index 00000000000000..2fb8d834f11009 --- /dev/null +++ b/regression-test/suites/cloud_p0/cache/test_topn_broadcast.groovy @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.util.Http + +import org.apache.doris.regression.suite.ClusterOptions + +suite("test_topn_broadcast", "docker") { + + + def options = new ClusterOptions() + + options.feNum = 1 + options.beNum = 3 + options.msNum = 1 + options.cloudMode = true + options.feConfigs += ['example_conf_k1=v1', 'example_conf_k2=v2'] + options.beConfigs += ['enable_file_cache=true', 'enable_java_support=false', 'file_cache_enter_disk_resource_limit_mode_percent=99', + 'file_cache_background_lru_dump_interval_ms=2000', 'file_cache_background_lru_log_replay_interval_ms=500', + 'disable_auto_compation=true', 'file_cache_enter_need_evict_cache_in_advance_percent=99', + 'file_cache_background_lru_dump_update_cnt_threshold=0' + ] + + docker(options) { + // define a sql table + def indexTbName = "test_topn_broadcast" + + sql "set global enable_two_phase_read_opt = true" + sql " set global enable_common_expr_pushdown = true " + sql " set global topn_opt_limit_threshold = 1024 " + sql "DROP TABLE IF EXISTS ${indexTbName}" + sql """ + CREATE TABLE ${indexTbName} ( + `@timestamp` int(11) NULL COMMENT "", + `clientip` varchar(20) NULL COMMENT "", + `request` text NULL COMMENT "", + `status` int(11) NULL COMMENT "", + `size` int(11) NULL COMMENT "", + INDEX clientip_idx (`clientip`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '', + INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '' + ) ENGINE=OLAP + DUPLICATE KEY(`@timestamp`) + COMMENT "OLAP" + DISTRIBUTED BY RANDOM BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + + + def load_httplogs_data = {table_name, label, read_flag, format_flag, ignore_failure=false, + expected_succ_rows = -1, load_to_single_tablet = 'true' -> + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'label', label + "_" + UUID.randomUUID().toString() + set 'read_json_by_line', read_flag + set 'format', format_flag + file context.config.dataPath + "/fault_injection_p0/documents-1000.json" + time 10000 // limit inflight 10s + if (expected_succ_rows >= 0) { + set 'max_filter_ratio', '1' + } + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (ignore_failure && expected_succ_rows < 0) { return } + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + } + } + } + + try { + load_httplogs_data.call(indexTbName, 'test_topn_broadcast1', 'true', 'json') + load_httplogs_data.call(indexTbName, 'test_topn_broadcast2', 'true', 'json') + load_httplogs_data.call(indexTbName, 'test_topn_broadcast3', 'true', 'json') + sql "sync" + + def explain_result = sql """ explain select * from ${indexTbName} order by `@timestamp` limit 512; """ + println explain_result + + sql """ select * from ${indexTbName} order by `@timestamp` limit 512; """ + + // TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,PathHash,MetaUrl,CompactionStatus + List> tabletRows = sql """ show tablets from ${indexTbName}; """ + def tabletIds = tabletRows.collect { row -> row[0].toString() }.unique() + assertTrue(tabletIds.size() > 0, "table ${indexTbName} should contain at least one tablet") + // print tabletIds + println "Tablet IDs: ${tabletIds}" + + List> backendRows = sql """ show backends """ + def bes = backendRows + .findAll { row -> row[9].toString().equalsIgnoreCase("true") } + .collect { row -> + [ + host : row[1].toString(), + httpPort: row[4].toString().toInteger() + ] + } + assertTrue(!bes.isEmpty(), "no alive backend hosts available for verification") + def expectedHostCount = bes.size() + + // Collect which backends report each tablet. New requirement: + // If any tablet appears in more than one backend's tablets_json, fail the test. + def tabletPresence = [:].withDefault { [] as List } + bes.each { be -> + def response = Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true) + assertEquals(0, response.code as Integer) + def data = response.data + def beTablets = data.tablets.collect { it.tablet_id as String } + tabletIds.each { tabletId -> + if (beTablets.contains(tabletId)) { + tabletPresence[tabletId] << be.host + } + } + } + + tabletIds.each { tabletId -> + def hosts = tabletPresence[tabletId].unique() + assertFalse(hosts.size() > 1, "tablet ${tabletId} appears on multiple backends: ${hosts}") + } + + } finally { + } + } +}