Skip to content

Commit 02dd27f

Browse files
committed
[opt](scheduler) Improve Graceful Shutdown Behavior for BE and FE, and Optimize Query Retry During BE Shutdown (#56601)
Related PR: #23865 This PR includes the following main changes: 1. New BE Parameter: `grace_shutdown_post_delay_seconds` When using the BE graceful stop feature, after the main process waits for all currently running tasks to complete, it will continue to wait for an additional period to ensure that queries still running on other nodes have also finished. Since a BE node cannot detect the execution status of tasks on other BE nodes, this threshold may need to be increased to allow a longer waiting time. 2. Enhanced BE `api/health` Endpoint * When the BE has not yet fully started or is in the process of shutting down, the endpoint will return: * Message: `"Server is not available"` * HTTP Code: `200` * Under normal circumstances: * Message: `"OK"` * HTTP Code: `200` When using `stop_fe.sh --grace`, the FE will wait for currently running queries to finish before exiting. Note, Currently, only query tasks are waited for; import and other types of tasks are not yet included. In cloud mode, when encountering the error `"No backend available as scan node"`, the FE will now internally retry the query to reassign it to other available BE nodes.
1 parent 72c4294 commit 02dd27f

File tree

24 files changed

+302
-77
lines changed

24 files changed

+302
-77
lines changed

be/src/common/config.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1295,6 +1295,7 @@ DEFINE_String(user_files_secure_path, "${DORIS_HOME}");
12951295
DEFINE_Int32(fe_expire_duration_seconds, "60");
12961296

12971297
DEFINE_Int32(grace_shutdown_wait_seconds, "120");
1298+
DEFINE_Int32(grace_shutdown_post_delay_seconds, "30");
12981299

12991300
DEFINE_Int16(bitmap_serialize_version, "1");
13001301

be/src/common/config.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1356,6 +1356,12 @@ DECLARE_Int32(fe_expire_duration_seconds);
13561356
// , but if the waiting time exceed the limit, then be will exit directly.
13571357
// During this period, FE will not send any queries to BE and waiting for all running queries to stop.
13581358
DECLARE_Int32(grace_shutdown_wait_seconds);
1359+
// When using the graceful stop feature, after the main process waits for
1360+
// all currently running tasks to finish, it will continue to wait for
1361+
// an additional period to ensure that queries still running on other nodes have also completed.
1362+
// Since a BE node cannot detect the task execution status on other BE nodes,
1363+
// you may need to increase this threshold to allow for a longer waiting time.
1364+
DECLARE_Int32(grace_shutdown_post_delay_seconds);
13591365

13601366
// BitmapValue serialize version.
13611367
DECLARE_Int16(bitmap_serialize_version);

be/src/http/action/health_action.cpp

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,42 @@
2424
#include "http/http_headers.h"
2525
#include "http/http_request.h"
2626
#include "http/http_status.h"
27+
#include "runtime/exec_env.h"
2728

2829
namespace doris {
2930

3031
const static std::string HEADER_JSON = "application/json";
3132

3233
void HealthAction::handle(HttpRequest* req) {
34+
std::string status;
35+
std::string msg;
36+
HttpStatus st;
37+
// always return HttpStatus::OK
38+
// because in k8s, we don't want the pod to be removed
39+
// from service during shutdown
40+
if (!doris::k_is_server_ready) {
41+
status = "Server is not available";
42+
msg = "Server is not ready";
43+
st = HttpStatus::OK;
44+
} else if (doris::k_doris_exit) {
45+
status = "Server is not available";
46+
msg = "Server is shutting down";
47+
st = HttpStatus::OK;
48+
} else {
49+
status = "OK";
50+
msg = "OK";
51+
st = HttpStatus::OK;
52+
}
53+
3354
std::stringstream ss;
3455
ss << "{";
35-
ss << "\"status\": \"OK\",";
36-
ss << "\"msg\": \"To Be Added\"";
56+
ss << "\"status\": \"" << status << "\",";
57+
ss << "\"msg\": \"" << msg << "\"";
3758
ss << "}";
3859
std::string result = ss.str();
3960

4061
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
41-
HttpChannel::send_reply(req, HttpStatus::OK, result);
62+
HttpChannel::send_reply(req, st, result);
4263
}
4364

4465
} // end namespace doris

be/src/runtime/exec_env.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,13 @@ void ExecEnv::wait_for_all_tasks_done() {
176176
sleep(1);
177177
++wait_seconds_passed;
178178
}
179+
// This is a conservative strategy.
180+
// Because a query might still have fragments running on other BE nodes.
181+
// In other words, the query hasn't truly terminated.
182+
// If the current BE is shut down at this point,
183+
// the FE will detect the downtime of a related BE and cancel the entire query,
184+
// defeating the purpose of a graceful stop.
185+
sleep(config::grace_shutdown_post_delay_seconds);
179186
}
180187

181188
bool ExecEnv::check_auth_token(const std::string& auth_token) {

be/src/runtime/exec_env.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,10 @@ class IndexPolicyMgr;
131131
struct SyncRowsetStats;
132132
class DeleteBitmapAggCache;
133133

134+
// set to true when BE is shutting down
134135
inline bool k_doris_exit = false;
136+
// set to true after BE start ready
137+
inline bool k_is_server_ready = false;
135138

136139
// Execution environment for queries/plan fragments.
137140
// Contains all required global structures, and handles to

be/src/service/doris_main.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,12 +602,15 @@ int main(int argc, char** argv) {
602602

603603
exec_env->storage_engine().notify_listeners();
604604

605+
doris::k_is_server_ready = true;
606+
605607
while (!doris::k_doris_exit) {
606608
#if defined(LEAK_SANITIZER)
607609
__lsan_do_leak_check();
608610
#endif
609611
sleep(3);
610612
}
613+
doris::k_is_server_ready = false;
611614
LOG(INFO) << "Doris main exiting.";
612615
#if defined(LLVM_PROFILE)
613616
__llvm_profile_write_file();

be/test/http/http_client_test.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ TEST_F(HttpClientTest, enable_http_auth) {
391391
st = client.execute(&response);
392392
EXPECT_TRUE(st.ok());
393393
std::cout << "response = " << response << "\n";
394-
EXPECT_TRUE(response.find("To Be Added") != std::string::npos);
394+
EXPECT_TRUE(response.find("Server is not ready") != std::string::npos);
395395
}
396396

397397
{
@@ -422,7 +422,7 @@ TEST_F(HttpClientTest, enable_http_auth) {
422422
st = client.execute(&response);
423423
EXPECT_TRUE(st.ok());
424424
std::cout << "response = " << response << "\n";
425-
EXPECT_TRUE(response.find("To Be Added") != std::string::npos);
425+
EXPECT_TRUE(response.find("Server is not ready") != std::string::npos);
426426
}
427427

428428
{

fe/fe-core/src/main/java/org/apache/doris/DorisFE.java

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.apache.doris.journal.bdbje.BDBTool;
3636
import org.apache.doris.journal.bdbje.BDBToolOptions;
3737
import org.apache.doris.persist.meta.MetaReader;
38+
import org.apache.doris.qe.Coordinator;
39+
import org.apache.doris.qe.QeProcessorImpl;
3840
import org.apache.doris.qe.QeService;
3941
import org.apache.doris.qe.SimpleScheduler;
4042
import org.apache.doris.service.ExecuteEnv;
@@ -61,7 +63,10 @@
6163
import java.nio.channels.FileLock;
6264
import java.nio.channels.OverlappingFileLockException;
6365
import java.nio.file.StandardOpenOption;
66+
67+
import java.util.List;
6468
import java.util.concurrent.TimeUnit;
69+
import java.util.concurrent.atomic.AtomicBoolean;
6570

6671
public class DorisFE {
6772
private static final Logger LOG = LogManager.getLogger(DorisFE.class);
@@ -80,6 +85,12 @@ public class DorisFE {
8085
private static FileChannel processLockFileChannel;
8186
private static FileLock processFileLock;
8287

88+
// set to true when all servers are ready.
89+
private static final AtomicBoolean serverReady = new AtomicBoolean(false);
90+
91+
// HTTP server instance, used for graceful shutdown
92+
private static HttpServer httpServer;
93+
8394
public static void main(String[] args) {
8495
// Every doris version should have a final meta version, it should not change
8596
// between small releases. Add a check here to avoid mistake.
@@ -142,7 +153,19 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star
142153
}
143154

144155
Log4jConfig.initLogging(dorisHomeDir + "/conf/");
145-
Runtime.getRuntime().addShutdownHook(new Thread(LogManager::shutdown));
156+
// Add shutdown hook for graceful exit
157+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
158+
LOG.info("Received shutdown signal, starting graceful shutdown...");
159+
serverReady.set(false);
160+
gracefulShutdown();
161+
162+
// Shutdown HTTP server after main process graceful shutdown is complete
163+
if (httpServer != null) {
164+
httpServer.shutdown();
165+
}
166+
167+
LogManager.shutdown();
168+
}));
146169

147170
// set dns cache ttl
148171
java.security.Security.setProperty("networkaddress.cache.ttl", "60");
@@ -195,7 +218,7 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star
195218
feServer.start();
196219

197220
if (options.enableHttpServer) {
198-
HttpServer httpServer = new HttpServer();
221+
httpServer = new HttpServer();
199222
httpServer.setPort(Config.http_port);
200223
httpServer.setHttpsPort(Config.https_port);
201224
httpServer.setMaxHttpPostSize(Config.jetty_server_max_http_post_size);
@@ -224,11 +247,14 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star
224247

225248
ThreadPoolManager.registerAllThreadPoolMetric();
226249
startMonitor();
250+
251+
serverReady.set(true);
252+
// JVM will exit when shutdown hook is completed
227253
while (true) {
228254
Thread.sleep(2000);
229255
}
230256
} catch (Throwable e) {
231-
// Some exception may thrown before LOG is inited.
257+
// Some exception may throw before LOG is inited.
232258
// So need to print to stdout
233259
e.printStackTrace();
234260
LOG.error("", e);
@@ -538,4 +564,25 @@ public static class StartupOptions {
538564
public boolean enableHttpServer = true;
539565
public boolean enableQeService = true;
540566
}
567+
568+
public static boolean isServerReady() {
569+
return serverReady.get();
570+
}
571+
572+
private static void gracefulShutdown() {
573+
// wait for all queries to finish
574+
try {
575+
long now = System.currentTimeMillis();
576+
List<Coordinator> allCoordinators = QeProcessorImpl.INSTANCE.getAllCoordinators();
577+
while (!allCoordinators.isEmpty() && System.currentTimeMillis() - now < 300 * 1000L) {
578+
Thread.sleep(1000);
579+
allCoordinators = QeProcessorImpl.INSTANCE.getAllCoordinators();
580+
LOG.info("waiting {} queries to finish before shutdown", allCoordinators.size());
581+
}
582+
} catch (Throwable t) {
583+
LOG.error("", t);
584+
}
585+
586+
LOG.info("graceful shutdown finished");
587+
}
541588
}

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ private boolean isColocated() {
102102
public long getColocatedBeId(String clusterId) throws ComputeGroupException {
103103
CloudSystemInfoService infoService = ((CloudSystemInfoService) Env.getCurrentSystemInfo());
104104
List<Backend> bes = infoService.getBackendsByClusterId(clusterId).stream()
105-
.filter(be -> !be.isQueryDisabled()).collect(Collectors.toList());
105+
.filter(be -> be.isQueryAvailable()).collect(Collectors.toList());
106106
String clusterName = infoService.getClusterNameByClusterId(clusterId);
107107
if (bes.isEmpty()) {
108108
LOG.warn("failed to get available be, cluster: {}-{}", clusterName, clusterId);
@@ -418,7 +418,7 @@ public long hashReplicaToBe(String clusterId, boolean isBackGround) throws Compu
418418
long lastUpdateMs = be.getLastUpdateMs();
419419
long missTimeMs = Math.abs(lastUpdateMs - System.currentTimeMillis());
420420
// be core or restart must in heartbeat_interval_second
421-
if ((be.isAlive() || missTimeMs <= Config.heartbeat_interval_second * 1000L)
421+
if ((be.isQueryAvailable() || missTimeMs <= Config.heartbeat_interval_second * 1000L)
422422
&& !be.isSmoothUpgradeSrc()) {
423423
if (be.isDecommissioned()) {
424424
decommissionAvailBes.add(be);

fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,11 @@ public class FeConstants {
3636

3737
public static int checkpoint_interval_second = 60; // 1 minutes
3838

39-
// dpp version
40-
public static String dpp_version = "3_2_0";
41-
4239
// bloom filter false positive probability
4340
public static double default_bloom_filter_fpp = 0.05;
4441

4542
// set to true to skip some step when running FE unit test
4643
public static boolean runningUnitTest = false;
47-
// use to set some mocked values for FE unit test
48-
public static Object unitTestConstant = null;
4944

5045
// set to false to disable internal schema db
5146
public static boolean enableInternalSchemaDb = true;
@@ -66,30 +61,9 @@ public class FeConstants {
6661
// use for copy into test
6762
public static boolean disablePreHeat = false;
6863

69-
public static final String FS_PREFIX_S3 = "s3";
70-
public static final String FS_PREFIX_S3A = "s3a";
71-
public static final String FS_PREFIX_S3N = "s3n";
72-
public static final String FS_PREFIX_OSS = "oss";
73-
public static final String FS_PREFIX_GCS = "gs";
74-
public static final String FS_PREFIX_BOS = "bos";
75-
public static final String FS_PREFIX_COS = "cos";
76-
public static final String FS_PREFIX_COSN = "cosn";
77-
public static final String FS_PREFIX_LAKEFS = "lakefs";
78-
public static final String FS_PREFIX_OBS = "obs";
79-
public static final String FS_PREFIX_OFS = "ofs";
80-
public static final String FS_PREFIX_GFS = "gfs";
81-
public static final String FS_PREFIX_JFS = "jfs";
82-
public static final String FS_PREFIX_HDFS = "hdfs";
83-
public static final String FS_PREFIX_VIEWFS = "viewfs";
84-
public static final String FS_PREFIX_FILE = "file";
85-
8664
public static final String INTERNAL_DB_NAME = "__internal_schema";
8765
public static final String INTERNAL_FILE_CACHE_HOTSPOT_TABLE_NAME = "cloud_cache_hotspot";
8866
public static String TEMP_MATERIZLIZE_DVIEW_PREFIX = "internal_tmp_materialized_view_";
8967

9068
public static String METADATA_FAILURE_RECOVERY_KEY = "metadata_failure_recovery";
91-
92-
public static String CLOUD_RETRY_E230 = "E-230";
93-
94-
public static String BUILT_IN_STORAGE_VAULT_NAME = "built_in_storage_vault";
9569
}

0 commit comments

Comments
 (0)