Skip to content

Commit 1acd70c

Browse files
committed
1
1 parent b039e0d commit 1acd70c

File tree

16 files changed

+133
-66
lines changed

16 files changed

+133
-66
lines changed

be/src/common/config.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1315,6 +1315,7 @@ DEFINE_String(user_files_secure_path, "${DORIS_HOME}");
13151315
DEFINE_Int32(fe_expire_duration_seconds, "60");
13161316

13171317
DEFINE_Int32(grace_shutdown_wait_seconds, "120");
1318+
DEFINE_Int32(grace_shutdown_post_delay_seconds, "30");
13181319

13191320
DEFINE_Int16(bitmap_serialize_version, "1");
13201321

be/src/common/config.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1367,6 +1367,12 @@ DECLARE_Int32(fe_expire_duration_seconds);
13671367
// , but if the waiting time exceed the limit, then be will exit directly.
13681368
// During this period, FE will not send any queries to BE and waiting for all running queries to stop.
13691369
DECLARE_Int32(grace_shutdown_wait_seconds);
1370+
// When using the graceful stop feature, after the main process waits for
1371+
// all currently running tasks to finish, it will continue to wait for
1372+
// an additional period to ensure that queries still running on other nodes have also completed.
1373+
// Since a BE node cannot detect the task execution status on other BE nodes,
1374+
// you may need to increase this threshold to allow for a longer waiting time.
1375+
DECLARE_Int32(grace_shutdown_post_delay_seconds);
13701376

13711377
// BitmapValue serialize version.
13721378
DECLARE_Int16(bitmap_serialize_version);

be/src/http/action/health_action.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,17 @@ void HealthAction::handle(HttpRequest* req) {
3434
std::string status;
3535
std::string msg;
3636
HttpStatus st;
37+
// always return HttpStatus::OK
38+
// because in k8s, we don't want the pod to be removed
39+
// from service during shutdown
3740
if (!doris::k_is_server_ready) {
3841
status = "Server is not available";
3942
msg = "Server is not ready";
40-
st = HttpStatus::SERVICE_UNAVAILABLE;
43+
st = HttpStatus::OK;
4144
} else if (doris::k_doris_exit) {
4245
status = "Server is not available";
4346
msg = "Server is shutting down";
44-
st = HttpStatus::SERVICE_UNAVAILABLE;
47+
st = HttpStatus::OK;
4548
} else {
4649
status = "OK";
4750
msg = "OK";

be/src/runtime/exec_env.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,13 @@ void ExecEnv::wait_for_all_tasks_done() {
177177
sleep(1);
178178
++wait_seconds_passed;
179179
}
180+
// This is a conservative strategy.
181+
// Because a query might still have fragments running on other BE nodes.
182+
// In other words, the query hasn't truly terminated.
183+
// If the current BE is shut down at this point,
184+
// the FE will detect the downtime of a related BE and cancel the entire query,
185+
// defeating the purpose of a graceful stop.
186+
sleep(config::grace_shutdown_post_delay_seconds);
180187
}
181188

182189
bool ExecEnv::check_auth_token(const std::string& auth_token) {

fe/fe-common/src/main/java/org/apache/doris/common/Config.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3654,12 +3654,6 @@ public static int metaServiceRpcRetryTimes() {
36543654
})
36553655
public static int first_error_msg_max_length = 256;
36563656

3657-
@ConfField(mutable = true, masterOnly = false, description = {
3658-
"如果设置为 false,则 health 接口将返回 503,表示此 FE 不应再接收服务请求",
3659-
"If set to false, the health endpoint will return 503, "
3660-
+ "indicating that this FE should no longer receive service requests"})
3661-
public static boolean frontend_service_available = true;
3662-
36633657
@ConfField
36643658
public static String cloud_snapshot_handler_class = "org.apache.doris.cloud.snapshot.CloudSnapshotHandler";
36653659
@ConfField

fe/fe-core/src/main/java/org/apache/doris/DorisFE.java

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.apache.doris.journal.bdbje.BDBTool;
3737
import org.apache.doris.journal.bdbje.BDBToolOptions;
3838
import org.apache.doris.persist.meta.MetaReader;
39+
import org.apache.doris.qe.Coordinator;
40+
import org.apache.doris.qe.QeProcessorImpl;
3941
import org.apache.doris.qe.QeService;
4042
import org.apache.doris.qe.SimpleScheduler;
4143
import org.apache.doris.service.ExecuteEnv;
@@ -63,6 +65,7 @@
6365
import java.nio.channels.OverlappingFileLockException;
6466
import java.nio.file.StandardOpenOption;
6567
import java.time.LocalDate;
68+
import java.util.List;
6669
import java.util.concurrent.TimeUnit;
6770
import java.util.concurrent.atomic.AtomicBoolean;
6871

@@ -84,7 +87,10 @@ public class DorisFE {
8487
private static FileLock processFileLock;
8588

8689
// set to true when all servers are ready.
87-
private static AtomicBoolean serverReady = new AtomicBoolean(false);
90+
private static final AtomicBoolean serverReady = new AtomicBoolean(false);
91+
92+
// HTTP server instance, used for graceful shutdown
93+
private static HttpServer httpServer;
8894

8995
public static void main(String[] args) {
9096
// Every doris version should have a final meta version, it should not change
@@ -148,7 +154,19 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star
148154
}
149155

150156
Log4jConfig.initLogging(dorisHomeDir + "/conf/");
151-
Runtime.getRuntime().addShutdownHook(new Thread(LogManager::shutdown));
157+
// Add shutdown hook for graceful exit
158+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
159+
LOG.info("Received shutdown signal, starting graceful shutdown...");
160+
serverReady.set(false);
161+
gracefulShutdown();
162+
163+
// Shutdown HTTP server after main process graceful shutdown is complete
164+
if (httpServer != null) {
165+
httpServer.shutdown();
166+
}
167+
168+
LogManager.shutdown();
169+
}));
152170

153171
// set dns cache ttl
154172
java.security.Security.setProperty("networkaddress.cache.ttl", "60");
@@ -211,7 +229,7 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star
211229
feServer.start();
212230

213231
if (options.enableHttpServer) {
214-
HttpServer httpServer = new HttpServer();
232+
httpServer = new HttpServer();
215233
httpServer.setPort(Config.http_port);
216234
httpServer.setHttpsPort(Config.https_port);
217235
httpServer.setMaxHttpPostSize(Config.jetty_server_max_http_post_size);
@@ -242,11 +260,13 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star
242260
startMonitor();
243261

244262
serverReady.set(true);
263+
// JVM will exit when shutdown hook is completed
245264
while (true) {
246265
Thread.sleep(2000);
247266
}
267+
LOG.info("Doris FE main loop exited, shutting down gracefully...");
248268
} catch (Throwable e) {
249-
// Some exception may thrown before LOG is inited.
269+
// Some exception may throw before LOG is inited.
250270
// So need to print to stdout
251271
e.printStackTrace();
252272
LOG.error("", e);
@@ -599,4 +619,21 @@ public static class StartupOptions {
599619
public static boolean isServerReady() {
600620
return serverReady.get();
601621
}
622+
623+
private static void gracefulShutdown() {
624+
// wait for all queries to finish
625+
try {
626+
long now = System.currentTimeMillis();
627+
List<Coordinator> allCoordinators = QeProcessorImpl.INSTANCE.getAllCoordinators();
628+
while (!allCoordinators.isEmpty() && System.currentTimeMillis() - now < 300 * 1000L) {
629+
Thread.sleep(1000);
630+
allCoordinators = QeProcessorImpl.INSTANCE.getAllCoordinators();
631+
LOG.info("waiting {} queries to finish before shutdown", allCoordinators.size());
632+
}
633+
} catch (Throwable t) {
634+
LOG.error("", t);
635+
}
636+
637+
LOG.info("graceful shutdown finished");
638+
}
602639
}

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ public long hashReplicaToBe(String clusterId, boolean isBackGround) throws Compu
422422
long lastUpdateMs = be.getLastUpdateMs();
423423
long missTimeMs = Math.abs(lastUpdateMs - System.currentTimeMillis());
424424
// be core or restart must in heartbeat_interval_second
425-
if ((be.isAlive() || missTimeMs <= Config.heartbeat_interval_second * 1000L)
425+
if ((be.isQueryAvailable() || missTimeMs <= Config.heartbeat_interval_second * 1000L)
426426
&& !be.isSmoothUpgradeSrc()) {
427427
if (be.isDecommissioned()) {
428428
decommissionAvailBes.add(be);

fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,11 @@ public class FeConstants {
3636

3737
public static int checkpoint_interval_second = 60; // 1 minutes
3838

39-
// dpp version
40-
public static String dpp_version = "3_2_0";
41-
4239
// bloom filter false positive probability
4340
public static double default_bloom_filter_fpp = 0.05;
4441

4542
// set to true to skip some step when running FE unit test
4643
public static boolean runningUnitTest = false;
47-
// use to set some mocked values for FE unit test
48-
public static Object unitTestConstant = null;
4944

5045
// set to false to disable internal schema db
5146
public static boolean enableInternalSchemaDb = true;
@@ -68,29 +63,8 @@ public class FeConstants {
6863
// use for copy into test
6964
public static boolean disablePreHeat = false;
7065

71-
public static final String FS_PREFIX_S3 = "s3";
72-
public static final String FS_PREFIX_S3A = "s3a";
73-
public static final String FS_PREFIX_S3N = "s3n";
74-
public static final String FS_PREFIX_OSS = "oss";
75-
public static final String FS_PREFIX_GCS = "gs";
76-
public static final String FS_PREFIX_BOS = "bos";
77-
public static final String FS_PREFIX_COS = "cos";
78-
public static final String FS_PREFIX_COSN = "cosn";
79-
public static final String FS_PREFIX_LAKEFS = "lakefs";
80-
public static final String FS_PREFIX_OBS = "obs";
81-
public static final String FS_PREFIX_OFS = "ofs";
82-
public static final String FS_PREFIX_GFS = "gfs";
83-
public static final String FS_PREFIX_JFS = "jfs";
84-
public static final String FS_PREFIX_HDFS = "hdfs";
85-
public static final String FS_PREFIX_VIEWFS = "viewfs";
86-
public static final String FS_PREFIX_FILE = "file";
87-
8866
public static final String INTERNAL_DB_NAME = "__internal_schema";
8967
public static final String INTERNAL_FILE_CACHE_HOTSPOT_TABLE_NAME = "cloud_cache_hotspot";
9068

9169
public static String METADATA_FAILURE_RECOVERY_KEY = "metadata_failure_recovery";
92-
93-
public static String CLOUD_RETRY_E230 = "E-230";
94-
95-
public static String BUILT_IN_STORAGE_VAULT_NAME = "built_in_storage_vault";
9670
}

fe/fe-core/src/main/java/org/apache/doris/httpv2/HttpServer.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,14 @@
2323
import org.apache.doris.httpv2.config.SpringLog4j2Config;
2424
import org.apache.doris.service.FrontendOptions;
2525

26+
import org.apache.logging.log4j.LogManager;
27+
import org.apache.logging.log4j.Logger;
2628
import org.springframework.boot.autoconfigure.SpringBootApplication;
2729
import org.springframework.boot.builder.SpringApplicationBuilder;
2830
import org.springframework.boot.context.properties.EnableConfigurationProperties;
2931
import org.springframework.boot.web.servlet.ServletComponentScan;
3032
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
33+
import org.springframework.context.ConfigurableApplicationContext;
3134

3235
import java.util.HashMap;
3336
import java.util.Map;
@@ -36,6 +39,8 @@
3639
@EnableConfigurationProperties
3740
@ServletComponentScan
3841
public class HttpServer extends SpringBootServletInitializer {
42+
private static final Logger LOG = LogManager.getLogger(HttpServer.class);
43+
private ConfigurableApplicationContext applicationContext;
3944
private int port;
4045
private int httpsPort;
4146
private int acceptors;
@@ -176,9 +181,26 @@ public void start() {
176181
} else {
177182
properties.put("logging.config", Config.custom_config_dir + "/" + SpringLog4j2Config.SPRING_LOG_XML_FILE);
178183
}
179-
new SpringApplicationBuilder()
184+
// Disable automatic shutdown hook registration
185+
// This prevents Spring Boot from responding to SIGTERM automatically
186+
// allowing the main process (DorisFE) to control when the HTTP server shuts down
187+
this.applicationContext = new SpringApplicationBuilder()
180188
.sources(HttpServer.class)
181189
.properties(properties)
190+
// Disable the automatic shutdown hook registration, there is a shutdown hook in DorisFE.
191+
.registerShutdownHook(false)
182192
.run(new String[]{});
183193
}
194+
195+
/**
196+
* Explicitly shutdown the HTTP server.
197+
* This method should be called by the main process (DorisFE) after its graceful shutdown is complete.
198+
*/
199+
public void shutdown() {
200+
if (applicationContext != null) {
201+
LOG.info("Shutting down HTTP server gracefully...");
202+
applicationContext.close();
203+
LOG.info("HTTP server shutdown complete");
204+
}
205+
}
184206
}

fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/HealthAction.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,6 @@ public Object execute(HttpServletRequest request, HttpServletResponse response)
4444
return ResponseEntityBuilder.serviceUnavailable("Server is not ready");
4545
}
4646

47-
if (!Config.frontend_service_available) {
48-
return ResponseEntityBuilder.serviceUnavailable("Server is not available");
49-
}
50-
5147
Map<String, Object> result = new HashMap<>();
5248
result.put("total_backend_num", Env.getCurrentSystemInfo().getAllBackendIds(false).size());
5349
result.put("online_backend_num", Env.getCurrentSystemInfo().getAllBackendIds(true).size());

0 commit comments

Comments
 (0)