Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ private long selectBackendForCloudGroupCommitInternal(long tableId, String clust
}
List<String> backendsInfo = backends.stream()
.map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() + ", active=" + be.isActive()
+ ", decommission=" + be.isDecommissioned() + " }")
+ ", decommissioned=" + be.isDecommissioned() + ", decommissioning=" + be.isDecommissioning()
+ " }")
.collect(Collectors.toList());
throw new LoadException("No suitable backend for cloud cluster=" + cluster + ", backends = " + backendsInfo);
}
Expand Down Expand Up @@ -348,7 +349,8 @@ private Long getCachedBackend(String cluster, long tableId) {
return null;
}
Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
if (backend != null && backend.isAlive() && !backend.isDecommissioned()) {
if (backend != null && backend.isAlive() && !backend.isDecommissioned()
&& (!Config.isCloudMode() || !backend.isDecommissioning())) {
return backend.getId();
} else {
tableToBeMap.remove(encode(cluster, tableId));
Expand All @@ -365,7 +367,8 @@ private Long getRandomBackend(String cluster, long tableId, List<Backend> backen
OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId);
Collections.shuffle(backends);
for (Backend backend : backends) {
if (backend.isAlive() && !backend.isDecommissioned()) {
if (backend.isAlive() && !backend.isDecommissioned() && (!Config.isCloudMode()
|| !backend.isDecommissioning())) {
tableToBeMap.put(encode(cluster, tableId), backend.getId());
tableToPressureMap.put(tableId,
new SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ suite("insert_group_commit_into") {
sql """ set insert_max_filter_ratio = 0.05; """
test {
sql """ insert into ${table} values('a', 'a'), ('10', 'a'), ('11', 'a'), ('12', 'a'); """
exception """too many filtered rows"""
exception """null value for not null column"""
}
getRowCount(2)
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ suite("test_group_commit_interval_ms_property") {
DISTRIBUTED BY HASH (k) BUCKETS 8
PROPERTIES(
"replication_num" = "1",
"group_commit_interval_ms"="10000"
"group_commit_interval_ms"="5000"
);
"""

Expand All @@ -61,11 +61,11 @@ suite("test_group_commit_interval_ms_property") {
sql """ set group_commit = async_mode; """

def res1 = sql """show create table ${test_table}"""
assertTrue(res1.toString().contains("\"group_commit_interval_ms\" = \"10000\""))
assertTrue(res1.toString().contains("\"group_commit_interval_ms\" = \"5000\""))

def msg1 = group_commit_insert """insert into ${test_table} values(1,1); """, 1

Thread.sleep(2000);
Thread.sleep(100);

def msg2 = group_commit_insert """insert into ${test_table} values(2,2) """, 1

Expand All @@ -78,7 +78,7 @@ suite("test_group_commit_interval_ms_property") {

def msg3 = group_commit_insert """insert into ${test_table} values(3,3); """, 1

Thread.sleep(8000);
Thread.sleep(6000);

def msg4 = group_commit_insert """insert into ${test_table} values(4,4); """, 1

Expand Down
Loading