Skip to content

Commit 4573e26

Browse files
committed
[fix](scheduler) disable query and load when BE is shutting down
1 parent e80eaf6 commit 4573e26

File tree

4 files changed

+31
-25
lines changed

4 files changed

+31
-25
lines changed

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ private boolean isColocated() {
101101
public long getColocatedBeId(String clusterId) throws ComputeGroupException {
102102
CloudSystemInfoService infoService = ((CloudSystemInfoService) Env.getCurrentSystemInfo());
103103
List<Backend> bes = infoService.getBackendsByClusterId(clusterId).stream()
104-
.filter(be -> !be.isQueryDisabled()).collect(Collectors.toList());
104+
.filter(be -> be.isQueryAvailable()).collect(Collectors.toList());
105105
String clusterName = infoService.getClusterNameByClusterId(clusterId);
106106
if (bes.isEmpty()) {
107107
LOG.warn("failed to get available be, cluster: {}-{}", clusterName, clusterId);

fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public StreamLoadHandler(TStreamLoadPutRequest request, AtomicInteger indexId,
9292
public static Backend selectBackend(String clusterName) throws LoadException {
9393
List<Backend> backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
9494
.getBackendsByClusterName(clusterName)
95-
.stream().filter(Backend::isAlive)
95+
.stream().filter(Backend::isLoadAvailable)
9696
.collect(Collectors.toList());
9797

9898
if (backends.isEmpty()) {

fe/fe-core/src/main/java/org/apache/doris/system/Backend.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -293,20 +293,38 @@ public void setLastStreamLoadTime(long lastStreamLoadTime) {
293293
this.backendStatus.lastStreamLoadTime = lastStreamLoadTime;
294294
}
295295

296+
// ATTN: This method only the value of "isQueryDisabled",
297+
// it does not determine the backend IS queryable or not, use isQueryAvailable instead.
296298
public boolean isQueryDisabled() {
297299
return backendStatus.isQueryDisabled;
298300
}
299301

300-
public void setQueryDisabled(boolean isQueryDisabled) {
301-
this.backendStatus.isQueryDisabled = isQueryDisabled;
302+
// return true if be status is changed
303+
public boolean setQueryDisabled(boolean isQueryDisabled) {
304+
if (this.backendStatus.isQueryDisabled != isQueryDisabled) {
305+
this.backendStatus.isQueryDisabled = isQueryDisabled;
306+
return true;
307+
}
308+
return false;
302309
}
303310

311+
// ATTN: This method only the value of "isLoadDisabled",
312+
// it does not determine the backend IS loadable or not, use isLoadAvailable instead.
304313
public boolean isLoadDisabled() {
305314
return backendStatus.isLoadDisabled;
306315
}
307316

308-
public void setLoadDisabled(boolean isLoadDisabled) {
309-
this.backendStatus.isLoadDisabled = isLoadDisabled;
317+
// return true if be status is changed
318+
public boolean setLoadDisabled(boolean isLoadDisabled) {
319+
if (this.backendStatus.isLoadDisabled != isLoadDisabled) {
320+
this.backendStatus.isLoadDisabled = isLoadDisabled;
321+
return true;
322+
}
323+
return false;
324+
}
325+
326+
public boolean isShutDown() {
327+
return isShutDown.get();
310328
}
311329

312330
public void setActive(boolean isActive) {
@@ -524,15 +542,15 @@ public boolean isDecommissioning() {
524542
}
525543

526544
public boolean isQueryAvailable() {
527-
return isAlive() && !isQueryDisabled() && !isShutDown.get();
545+
return isAlive() && !isQueryDisabled() && !isShutDown();
528546
}
529547

530548
public boolean isScheduleAvailable() {
531-
return isAlive() && !isDecommissioned();
549+
return isAlive() && !isDecommissioned() && !isShutDown();
532550
}
533551

534552
public boolean isLoadAvailable() {
535-
return isAlive() && !isLoadDisabled();
553+
return isAlive() && !isLoadDisabled() && !isShutDown();
536554
}
537555

538556
public void setDisks(ImmutableMap<String, DiskInfo> disks) {

fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -998,17 +998,11 @@ public void modifyBackends(ModifyBackendClause alterClause) throws UserException
998998
}
999999

10001000
if (alterClause.isQueryDisabled() != null) {
1001-
if (!alterClause.isQueryDisabled().equals(be.isQueryDisabled())) {
1002-
be.setQueryDisabled(alterClause.isQueryDisabled());
1003-
shouldModify = true;
1004-
}
1001+
shouldModify = be.setQueryDisabled(alterClause.isQueryDisabled());
10051002
}
10061003

10071004
if (alterClause.isLoadDisabled() != null) {
1008-
if (!alterClause.isLoadDisabled().equals(be.isLoadDisabled())) {
1009-
be.setLoadDisabled(alterClause.isLoadDisabled());
1010-
shouldModify = true;
1011-
}
1005+
shouldModify = be.setLoadDisabled(alterClause.isLoadDisabled());
10121006
}
10131007

10141008
if (shouldModify) {
@@ -1052,17 +1046,11 @@ public void modifyBackends(ModifyBackendOp op) throws UserException {
10521046
}
10531047

10541048
if (op.isQueryDisabled() != null) {
1055-
if (!op.isQueryDisabled().equals(be.isQueryDisabled())) {
1056-
be.setQueryDisabled(op.isQueryDisabled());
1057-
shouldModify = true;
1058-
}
1049+
shouldModify = be.setQueryDisabled(op.isQueryDisabled());
10591050
}
10601051

10611052
if (op.isLoadDisabled() != null) {
1062-
if (!op.isLoadDisabled().equals(be.isLoadDisabled())) {
1063-
be.setLoadDisabled(op.isLoadDisabled());
1064-
shouldModify = true;
1065-
}
1053+
shouldModify = be.setLoadDisabled(op.isLoadDisabled());
10661054
}
10671055

10681056
if (shouldModify) {

0 commit comments

Comments
 (0)