Skip to content

Commit 507f29b

Browse files
branch-4.0: [fix](cloud) avoid broadcast remote read in topn query #58044 (#58156)
Cherry-picked from #58044 Signed-off-by: zhengyu <zhangzhengyu@selectdb.com> Co-authored-by: zhengyu <zhangzhengyu@selectdb.com>
1 parent 9812408 commit 507f29b

File tree

8 files changed

+162
-13
lines changed

8 files changed

+162
-13
lines changed

be/src/cloud/cloud_storage_engine.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -280,8 +280,8 @@ bool CloudStorageEngine::stopped() {
280280

281281
Result<BaseTabletSPtr> CloudStorageEngine::get_tablet(int64_t tablet_id,
282282
SyncRowsetStats* sync_stats,
283-
bool force_use_cache) {
284-
return _tablet_mgr->get_tablet(tablet_id, false, true, sync_stats, force_use_cache)
283+
bool force_use_only_cached) {
284+
return _tablet_mgr->get_tablet(tablet_id, false, true, sync_stats, force_use_only_cached)
285285
.transform([](auto&& t) { return static_pointer_cast<BaseTablet>(std::move(t)); });
286286
}
287287

be/src/cloud/cloud_storage_engine.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class CloudStorageEngine final : public BaseStorageEngine {
6363
bool stopped() override;
6464

6565
Result<BaseTabletSPtr> get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats = nullptr,
66-
bool force_use_cache = false) override;
66+
bool force_use_only_cached = false) override;
6767

6868
Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) override;
6969

be/src/cloud/cloud_tablet_mgr.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ void set_tablet_access_time_ms(CloudTablet* tablet) {
160160
Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_id, bool warmup_data,
161161
bool sync_delete_bitmap,
162162
SyncRowsetStats* sync_stats,
163-
bool local_only) {
163+
bool force_use_only_cached) {
164164
// LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr`
165165
class Value : public LRUCacheValueBase {
166166
public:
@@ -179,13 +179,14 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i
179179
auto* handle = _cache->lookup(key);
180180

181181
if (handle == nullptr) {
182-
if (local_only) {
182+
if (force_use_only_cached) {
183183
LOG(INFO) << "tablet=" << tablet_id
184-
<< "does not exists in local tablet cache, because param local_only=true, "
184+
<< "does not exists in local tablet cache, because param "
185+
"force_use_only_cached=true, "
185186
"treat it as an error";
186187
return ResultError(Status::InternalError(
187188
"tablet={} does not exists in local tablet cache, because param "
188-
"local_only=true, "
189+
"force_use_only_cached=true, "
189190
"treat it as an error",
190191
tablet_id));
191192
}

be/src/olap/storage_engine.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1351,7 +1351,7 @@ Status StorageEngine::create_tablet(const TCreateTabletReq& request, RuntimeProf
13511351
}
13521352

13531353
Result<BaseTabletSPtr> StorageEngine::get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats,
1354-
bool force_use_cache) {
1354+
bool force_use_only_cached) {
13551355
BaseTabletSPtr tablet;
13561356
std::string err;
13571357
tablet = _tablet_manager->get_tablet(tablet_id, true, &err);

be/src/olap/storage_engine.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ class BaseStorageEngine {
114114

115115
virtual Result<BaseTabletSPtr> get_tablet(int64_t tablet_id,
116116
SyncRowsetStats* sync_stats = nullptr,
117-
bool force_use_cache = false) = 0;
117+
bool force_use_only_cached = false) = 0;
118118

119119
void register_report_listener(ReportWorker* listener);
120120
void deregister_report_listener(ReportWorker* listener);
@@ -238,7 +238,7 @@ class StorageEngine final : public BaseStorageEngine {
238238
Status create_tablet(const TCreateTabletReq& request, RuntimeProfile* profile);
239239

240240
Result<BaseTabletSPtr> get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats = nullptr,
241-
bool force_use_cache = false) override;
241+
bool force_use_only_cached = false) override;
242242

243243
void clear_transaction_task(const TTransactionId transaction_id);
244244
void clear_transaction_task(const TTransactionId transaction_id,

be/src/runtime/exec_env.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ void ExecEnv::set_write_cooldown_meta_executors() {
5353
#endif // BE_TEST
5454

5555
Result<BaseTabletSPtr> ExecEnv::get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats,
56-
bool force_use_cache) {
56+
bool force_use_only_cached) {
5757
auto storage_engine = GetInstance()->_storage_engine.get();
5858
return storage_engine != nullptr
59-
? storage_engine->get_tablet(tablet_id, sync_stats)
59+
? storage_engine->get_tablet(tablet_id, sync_stats, force_use_only_cached)
6060
: ResultError(Status::InternalError("failed to get tablet {}", tablet_id));
6161
}
6262

be/src/runtime/exec_env.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ class ExecEnv {
166166
// Requires ExenEnv ready
167167
static Result<BaseTabletSPtr> get_tablet(int64_t tablet_id,
168168
SyncRowsetStats* sync_stats = nullptr,
169-
bool force_use_cache = false);
169+
bool force_use_only_cached = false);
170170

171171
static bool ready() { return _s_ready.load(std::memory_order_acquire); }
172172
static bool tracking_memory() { return _s_tracking_memory.load(std::memory_order_acquire); }
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import org.apache.doris.regression.util.Http
19+
20+
import org.apache.doris.regression.suite.ClusterOptions
21+
22+
suite("test_topn_broadcast", "docker") {
23+
24+
25+
def options = new ClusterOptions()
26+
27+
options.feNum = 1
28+
options.beNum = 3
29+
options.msNum = 1
30+
options.cloudMode = true
31+
options.feConfigs += ['example_conf_k1=v1', 'example_conf_k2=v2']
32+
options.beConfigs += ['enable_file_cache=true', 'enable_java_support=false', 'file_cache_enter_disk_resource_limit_mode_percent=99',
33+
'file_cache_background_lru_dump_interval_ms=2000', 'file_cache_background_lru_log_replay_interval_ms=500',
34+
'disable_auto_compation=true', 'file_cache_enter_need_evict_cache_in_advance_percent=99',
35+
'file_cache_background_lru_dump_update_cnt_threshold=0'
36+
]
37+
38+
docker(options) {
39+
// define a sql table
40+
def indexTbName = "test_topn_broadcast"
41+
42+
sql "set global enable_two_phase_read_opt = true"
43+
sql " set global enable_common_expr_pushdown = true "
44+
sql " set global topn_opt_limit_threshold = 1024 "
45+
sql "DROP TABLE IF EXISTS ${indexTbName}"
46+
sql """
47+
CREATE TABLE ${indexTbName} (
48+
`@timestamp` int(11) NULL COMMENT "",
49+
`clientip` varchar(20) NULL COMMENT "",
50+
`request` text NULL COMMENT "",
51+
`status` int(11) NULL COMMENT "",
52+
`size` int(11) NULL COMMENT "",
53+
INDEX clientip_idx (`clientip`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '',
54+
INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT ''
55+
) ENGINE=OLAP
56+
DUPLICATE KEY(`@timestamp`)
57+
COMMENT "OLAP"
58+
DISTRIBUTED BY RANDOM BUCKETS 3
59+
PROPERTIES (
60+
"replication_allocation" = "tag.location.default: 1",
61+
"disable_auto_compaction" = "true"
62+
);
63+
"""
64+
65+
66+
def load_httplogs_data = {table_name, label, read_flag, format_flag, ignore_failure=false,
67+
expected_succ_rows = -1, load_to_single_tablet = 'true' ->
68+
// load the json data
69+
streamLoad {
70+
table "${table_name}"
71+
72+
// set http request header params
73+
set 'label', label + "_" + UUID.randomUUID().toString()
74+
set 'read_json_by_line', read_flag
75+
set 'format', format_flag
76+
file context.config.dataPath + "/fault_injection_p0/documents-1000.json"
77+
time 10000 // limit inflight 10s
78+
if (expected_succ_rows >= 0) {
79+
set 'max_filter_ratio', '1'
80+
}
81+
82+
// if declared a check callback, the default check condition will ignore.
83+
// So you must check all condition
84+
check { result, exception, startTime, endTime ->
85+
if (ignore_failure && expected_succ_rows < 0) { return }
86+
if (exception != null) {
87+
throw exception
88+
}
89+
log.info("Stream load result: ${result}".toString())
90+
def json = parseJson(result)
91+
}
92+
}
93+
}
94+
95+
try {
96+
load_httplogs_data.call(indexTbName, 'test_topn_broadcast1', 'true', 'json')
97+
load_httplogs_data.call(indexTbName, 'test_topn_broadcast2', 'true', 'json')
98+
load_httplogs_data.call(indexTbName, 'test_topn_broadcast3', 'true', 'json')
99+
sql "sync"
100+
101+
def explain_result = sql """ explain select * from ${indexTbName} order by `@timestamp` limit 512; """
102+
println explain_result
103+
104+
sql """ select * from ${indexTbName} order by `@timestamp` limit 512; """
105+
106+
// TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,PathHash,MetaUrl,CompactionStatus
107+
List<List<Object>> tabletRows = sql """ show tablets from ${indexTbName}; """
108+
def tabletIds = tabletRows.collect { row -> row[0].toString() }.unique()
109+
assertTrue(tabletIds.size() > 0, "table ${indexTbName} should contain at least one tablet")
110+
// print tabletIds
111+
println "Tablet IDs: ${tabletIds}"
112+
113+
List<List<Object>> backendRows = sql """ show backends """
114+
def bes = backendRows
115+
.findAll { row -> row[9].toString().equalsIgnoreCase("true") }
116+
.collect { row ->
117+
[
118+
host : row[1].toString(),
119+
httpPort: row[4].toString().toInteger()
120+
]
121+
}
122+
assertTrue(!bes.isEmpty(), "no alive backend hosts available for verification")
123+
def expectedHostCount = bes.size()
124+
125+
// Collect which backends report each tablet. New requirement:
126+
// If any tablet appears in more than one backend's tablets_json, fail the test.
127+
def tabletPresence = [:].withDefault { [] as List<String> }
128+
bes.each { be ->
129+
def response = Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true)
130+
assertEquals(0, response.code as Integer)
131+
def data = response.data
132+
def beTablets = data.tablets.collect { it.tablet_id as String }
133+
tabletIds.each { tabletId ->
134+
if (beTablets.contains(tabletId)) {
135+
tabletPresence[tabletId] << be.host
136+
}
137+
}
138+
}
139+
140+
tabletIds.each { tabletId ->
141+
def hosts = tabletPresence[tabletId].unique()
142+
assertFalse(hosts.size() > 1, "tablet ${tabletId} appears on multiple backends: ${hosts}")
143+
}
144+
145+
} finally {
146+
}
147+
}
148+
}

0 commit comments

Comments
 (0)