Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1317,6 +1317,7 @@ DEFINE_String(user_files_secure_path, "${DORIS_HOME}");
DEFINE_Int32(fe_expire_duration_seconds, "60");

DEFINE_Int32(grace_shutdown_wait_seconds, "120");
DEFINE_Int32(grace_shutdown_post_delay_seconds, "30");

DEFINE_Int16(bitmap_serialize_version, "1");

Expand Down
6 changes: 6 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1368,6 +1368,12 @@ DECLARE_Int32(fe_expire_duration_seconds);
// , but if the waiting time exceed the limit, then be will exit directly.
// During this period, FE will not send any queries to BE and waiting for all running queries to stop.
DECLARE_Int32(grace_shutdown_wait_seconds);
// When using the graceful stop feature, after the main process waits for
// all currently running tasks to finish, it will continue to wait for
// an additional period to ensure that queries still running on other nodes have also completed.
// Since a BE node cannot detect the task execution status on other BE nodes,
// you may need to increase this threshold to allow for a longer waiting time.
DECLARE_Int32(grace_shutdown_post_delay_seconds);

// BitmapValue serialize version.
DECLARE_Int16(bitmap_serialize_version);
Expand Down
27 changes: 24 additions & 3 deletions be/src/http/action/health_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,42 @@
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/http_status.h"
#include "runtime/exec_env.h"

namespace doris {

const static std::string HEADER_JSON = "application/json";

void HealthAction::handle(HttpRequest* req) {
std::string status;
std::string msg;
HttpStatus st;
// always return HttpStatus::OK
// because in k8s, we don't want the pod to be removed
// from service during shutdown
if (!doris::k_is_server_ready) {
status = "Server is not available";
msg = "Server is not ready";
st = HttpStatus::OK;
} else if (doris::k_doris_exit) {
status = "Server is not available";
msg = "Server is shutting down";
st = HttpStatus::OK;
} else {
status = "OK";
msg = "OK";
st = HttpStatus::OK;
}

std::stringstream ss;
ss << "{";
ss << "\"status\": \"OK\",";
ss << "\"msg\": \"To Be Added\"";
ss << "\"status\": \"" << status << "\",";
ss << "\"msg\": \"" << msg << "\"";
ss << "}";
std::string result = ss.str();

req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
HttpChannel::send_reply(req, HttpStatus::OK, result);
HttpChannel::send_reply(req, st, result);
}

} // end namespace doris
7 changes: 7 additions & 0 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,13 @@ void ExecEnv::wait_for_all_tasks_done() {
sleep(1);
++wait_seconds_passed;
}
// This is a conservative strategy.
// Because a query might still have fragments running on other BE nodes.
// In other words, the query hasn't truly terminated.
// If the current BE is shut down at this point,
// the FE will detect the downtime of a related BE and cancel the entire query,
// defeating the purpose of a graceful stop.
sleep(config::grace_shutdown_post_delay_seconds);
}

bool ExecEnv::check_auth_token(const std::string& auth_token) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ class IndexPolicyMgr;
struct SyncRowsetStats;
class DeleteBitmapAggCache;

// set to true when BE is shutting down
inline bool k_doris_exit = false;
// set to true after BE start ready
inline bool k_is_server_ready = false;

// Execution environment for queries/plan fragments.
// Contains all required global structures, and handles to
Expand Down
3 changes: 3 additions & 0 deletions be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -602,12 +602,15 @@ int main(int argc, char** argv) {

exec_env->storage_engine().notify_listeners();

doris::k_is_server_ready = true;

while (!doris::k_doris_exit) {
#if defined(LEAK_SANITIZER)
__lsan_do_leak_check();
#endif
sleep(3);
}
doris::k_is_server_ready = false;
LOG(INFO) << "Doris main exiting.";
#if defined(LLVM_PROFILE)
__llvm_profile_write_file();
Expand Down
4 changes: 2 additions & 2 deletions be/test/http/http_client_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ TEST_F(HttpClientTest, enable_http_auth) {
st = client.execute(&response);
EXPECT_TRUE(st.ok());
std::cout << "response = " << response << "\n";
EXPECT_TRUE(response.find("To Be Added") != std::string::npos);
EXPECT_TRUE(response.find("Server is not ready") != std::string::npos);
}

{
Expand Down Expand Up @@ -423,7 +423,7 @@ TEST_F(HttpClientTest, enable_http_auth) {
st = client.execute(&response);
EXPECT_TRUE(st.ok());
std::cout << "response = " << response << "\n";
EXPECT_TRUE(response.find("To Be Added") != std::string::npos);
EXPECT_TRUE(response.find("Server is not ready") != std::string::npos);
}

{
Expand Down
52 changes: 49 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.doris.journal.bdbje.BDBTool;
import org.apache.doris.journal.bdbje.BDBToolOptions;
import org.apache.doris.persist.meta.MetaReader;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QeService;
import org.apache.doris.qe.SimpleScheduler;
import org.apache.doris.service.ExecuteEnv;
Expand Down Expand Up @@ -63,7 +65,9 @@
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.StandardOpenOption;
import java.time.LocalDate;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class DorisFE {
private static final Logger LOG = LogManager.getLogger(DorisFE.class);
Expand All @@ -82,6 +86,12 @@ public class DorisFE {
private static FileChannel processLockFileChannel;
private static FileLock processFileLock;

// set to true when all servers are ready.
private static final AtomicBoolean serverReady = new AtomicBoolean(false);

// HTTP server instance, used for graceful shutdown
private static HttpServer httpServer;

public static void main(String[] args) {
// Every doris version should have a final meta version, it should not change
// between small releases. Add a check here to avoid mistake.
Expand Down Expand Up @@ -144,7 +154,19 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star
}

Log4jConfig.initLogging(dorisHomeDir + "/conf/");
Runtime.getRuntime().addShutdownHook(new Thread(LogManager::shutdown));
// Add shutdown hook for graceful exit
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOG.info("Received shutdown signal, starting graceful shutdown...");
serverReady.set(false);
gracefulShutdown();

// Shutdown HTTP server after main process graceful shutdown is complete
if (httpServer != null) {
httpServer.shutdown();
}

LogManager.shutdown();
}));

// set dns cache ttl
java.security.Security.setProperty("networkaddress.cache.ttl", "60");
Expand Down Expand Up @@ -207,7 +229,7 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star
feServer.start();

if (options.enableHttpServer) {
HttpServer httpServer = new HttpServer();
httpServer = new HttpServer();
httpServer.setPort(Config.http_port);
httpServer.setHttpsPort(Config.https_port);
httpServer.setMaxHttpPostSize(Config.jetty_server_max_http_post_size);
Expand Down Expand Up @@ -236,11 +258,14 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star

ThreadPoolManager.registerAllThreadPoolMetric();
startMonitor();

serverReady.set(true);
// JVM will exit when shutdown hook is completed
while (true) {
Thread.sleep(2000);
}
} catch (Throwable e) {
// Some exception may thrown before LOG is inited.
// Some exception may throw before LOG is inited.
Copy link

Copilot AI Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grammatically incorrect: 'may throw' should be 'may be thrown'.

Suggested change
// Some exception may throw before LOG is inited.
// Some exception may be thrown before LOG is inited.

Copilot uses AI. Check for mistakes.
// So need to print to stdout
e.printStackTrace();
LOG.error("", e);
Expand Down Expand Up @@ -589,4 +614,25 @@ public static class StartupOptions {
public boolean enableHttpServer = true;
public boolean enableQeService = true;
}

public static boolean isServerReady() {
return serverReady.get();
}

private static void gracefulShutdown() {
// wait for all queries to finish
try {
long now = System.currentTimeMillis();
List<Coordinator> allCoordinators = QeProcessorImpl.INSTANCE.getAllCoordinators();
while (!allCoordinators.isEmpty() && System.currentTimeMillis() - now < 300 * 1000L) {
Thread.sleep(1000);
allCoordinators = QeProcessorImpl.INSTANCE.getAllCoordinators();
LOG.info("waiting {} queries to finish before shutdown", allCoordinators.size());
}
} catch (Throwable t) {
LOG.error("", t);
}

LOG.info("graceful shutdown finished");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private boolean isColocated() {
public long getColocatedBeId(String clusterId) throws ComputeGroupException {
CloudSystemInfoService infoService = ((CloudSystemInfoService) Env.getCurrentSystemInfo());
List<Backend> bes = infoService.getBackendsByClusterId(clusterId).stream()
.filter(be -> !be.isQueryDisabled()).collect(Collectors.toList());
.filter(be -> be.isQueryAvailable()).collect(Collectors.toList());
String clusterName = infoService.getClusterNameByClusterId(clusterId);
if (bes.isEmpty()) {
LOG.warn("failed to get available be, cluster: {}-{}", clusterName, clusterId);
Expand Down Expand Up @@ -422,7 +422,7 @@ public long hashReplicaToBe(String clusterId, boolean isBackGround) throws Compu
long lastUpdateMs = be.getLastUpdateMs();
long missTimeMs = Math.abs(lastUpdateMs - System.currentTimeMillis());
// be core or restart must in heartbeat_interval_second
if ((be.isAlive() || missTimeMs <= Config.heartbeat_interval_second * 1000L)
if ((be.isQueryAvailable() || missTimeMs <= Config.heartbeat_interval_second * 1000L)
&& !be.isSmoothUpgradeSrc()) {
if (be.isDecommissioned()) {
decommissionAvailBes.add(be);
Expand Down
26 changes: 0 additions & 26 deletions fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,11 @@ public class FeConstants {

public static int checkpoint_interval_second = 60; // 1 minutes

// dpp version
public static String dpp_version = "3_2_0";

// bloom filter false positive probability
public static double default_bloom_filter_fpp = 0.05;

// set to true to skip some step when running FE unit test
public static boolean runningUnitTest = false;
// use to set some mocked values for FE unit test
public static Object unitTestConstant = null;

// set to false to disable internal schema db
public static boolean enableInternalSchemaDb = true;
Expand All @@ -68,29 +63,8 @@ public class FeConstants {
// use for copy into test
public static boolean disablePreHeat = false;

public static final String FS_PREFIX_S3 = "s3";
public static final String FS_PREFIX_S3A = "s3a";
public static final String FS_PREFIX_S3N = "s3n";
public static final String FS_PREFIX_OSS = "oss";
public static final String FS_PREFIX_GCS = "gs";
public static final String FS_PREFIX_BOS = "bos";
public static final String FS_PREFIX_COS = "cos";
public static final String FS_PREFIX_COSN = "cosn";
public static final String FS_PREFIX_LAKEFS = "lakefs";
public static final String FS_PREFIX_OBS = "obs";
public static final String FS_PREFIX_OFS = "ofs";
public static final String FS_PREFIX_GFS = "gfs";
public static final String FS_PREFIX_JFS = "jfs";
public static final String FS_PREFIX_HDFS = "hdfs";
public static final String FS_PREFIX_VIEWFS = "viewfs";
public static final String FS_PREFIX_FILE = "file";

public static final String INTERNAL_DB_NAME = "__internal_schema";
public static final String INTERNAL_FILE_CACHE_HOTSPOT_TABLE_NAME = "cloud_cache_hotspot";

public static String METADATA_FAILURE_RECOVERY_KEY = "metadata_failure_recovery";

public static String CLOUD_RETRY_E230 = "E-230";

public static String BUILT_IN_STORAGE_VAULT_NAME = "built_in_storage_vault";
}
24 changes: 23 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/httpv2/HttpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import org.apache.doris.httpv2.config.SpringLog4j2Config;
import org.apache.doris.service.FrontendOptions;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.context.ConfigurableApplicationContext;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -36,6 +39,8 @@
@EnableConfigurationProperties
@ServletComponentScan
public class HttpServer extends SpringBootServletInitializer {
private static final Logger LOG = LogManager.getLogger(HttpServer.class);
private ConfigurableApplicationContext applicationContext;
private int port;
private int httpsPort;
private int acceptors;
Expand Down Expand Up @@ -176,9 +181,26 @@ public void start() {
} else {
properties.put("logging.config", Config.custom_config_dir + "/" + SpringLog4j2Config.SPRING_LOG_XML_FILE);
}
new SpringApplicationBuilder()
// Disable automatic shutdown hook registration
// This prevents Spring Boot from responding to SIGTERM automatically
// allowing the main process (DorisFE) to control when the HTTP server shuts down
this.applicationContext = new SpringApplicationBuilder()
.sources(HttpServer.class)
.properties(properties)
// Disable the automatic shutdown hook registration, there is a shutdown hook in DorisFE.
.registerShutdownHook(false)
.run();
}

/**
* Explicitly shutdown the HTTP server.
* This method should be called by the main process (DorisFE) after its graceful shutdown is complete.
*/
public void shutdown() {
if (applicationContext != null) {
LOG.info("Shutting down HTTP server gracefully...");
applicationContext.close();
LOG.info("HTTP server shutdown complete");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,9 @@ public static ResponseEntity notFound(Object data) {
ResponseBody body = new ResponseBody().code(RestApiStatusCode.NOT_FOUND).msg("Not Found").data(data);
return ResponseEntity.status(HttpStatus.OK).body(body);
}

public static ResponseEntity serviceUnavailable(String msg) {
ResponseBody body = new ResponseBody().code(RestApiStatusCode.SERVICE_UNAVAILABLE).msg(msg);
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(body);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.httpv2.rest;

import org.apache.doris.DorisFE;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
Expand All @@ -39,6 +40,10 @@ public Object execute(HttpServletRequest request, HttpServletResponse response)
executeCheckPassword(request, response);
}

if (!DorisFE.isServerReady()) {
return ResponseEntityBuilder.serviceUnavailable("Server is not ready");
}

Map<String, Object> result = new HashMap<>();
result.put("total_backend_num", Env.getCurrentSystemInfo().getAllBackendIds(false).size());
result.put("online_backend_num", Env.getCurrentSystemInfo().getAllBackendIds(true).size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ public enum RestApiStatusCode {
UNAUTHORIZED(401),
BAD_REQUEST(403),
NOT_FOUND(404),
INTERNAL_SERVER_ERROR(500);
INTERNAL_SERVER_ERROR(500),
SERVICE_UNAVAILABLE(503);

public int code;

Expand Down
Loading
Loading