From 02dd27f433be68c233dffc84b757efc2a2334f0b Mon Sep 17 00:00:00 2001 From: "Mingyu Chen (Rayner)" Date: Fri, 7 Nov 2025 14:26:22 +0800 Subject: [PATCH 1/3] [opt](scheduler) Improve Graceful Shutdown Behavior for BE and FE, and Optimize Query Retry During BE Shutdown (#56601) Related PR: #23865 This PR includes the following main changes: 1. New BE Parameter: `grace_shutdown_post_delay_seconds` When using the BE graceful stop feature, after the main process waits for all currently running tasks to complete, it will continue to wait for an additional period to ensure that queries still running on other nodes have also finished. Since a BE node cannot detect the execution status of tasks on other BE nodes, this threshold may need to be increased to allow a longer waiting time. 2. Enhanced BE `api/health` Endpoint * When the BE has not yet fully started or is in the process of shutting down, the endpoint will return: * Message: `"Server is not available"` * HTTP Code: `200` * Under normal circumstances: * Message: `"OK"` * HTTP Code: `200` When using `stop_fe.sh --grace`, the FE will wait for currently running queries to finish before exiting. Note, Currently, only query tasks are waited for; import and other types of tasks are not yet included. In cloud mode, when encountering the error `"No backend available as scan node"`, the FE will now internally retry the query to reassign it to other available BE nodes. --- be/src/common/config.cpp | 1 + be/src/common/config.h | 6 ++ be/src/http/action/health_action.cpp | 27 ++++++- be/src/runtime/exec_env.cpp | 7 ++ be/src/runtime/exec_env.h | 3 + be/src/service/doris_main.cpp | 3 + be/test/http/http_client_test.cpp | 4 +- .../main/java/org/apache/doris/DorisFE.java | 53 +++++++++++- .../doris/cloud/catalog/CloudReplica.java | 4 +- .../org/apache/doris/common/FeConstants.java | 26 ------ .../org/apache/doris/httpv2/HttpServer.java | 23 +++++- .../httpv2/entity/ResponseEntityBuilder.java | 5 ++ .../doris/httpv2/rest/HealthAction.java | 5 ++ .../doris/httpv2/rest/RestApiStatusCode.java | 3 +- .../doris/job/extensions/mtmv/MTMVTask.java | 3 +- .../apache/doris/load/StreamLoadHandler.java | 2 +- .../insert/AbstractInsertExecutor.java | 6 +- .../commands/insert/OlapInsertExecutor.java | 6 +- .../org/apache/doris/qe/StmtExecutor.java | 23 ++++-- .../java/org/apache/doris/system/Backend.java | 52 +++++++++--- .../doris/system/SystemInfoService.java | 30 +++++-- .../doris/utframe/DemoMultiBackendsTest.java | 3 +- regression-test/framework/pom.xml | 4 +- .../test_retry_no_scan_node.groovy | 80 +++++++++++++++++++ 24 files changed, 302 insertions(+), 77 deletions(-) create mode 100644 regression-test/suites/cloud_p0/query_retry/test_retry_no_scan_node.groovy diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 3c8511417a5fba..571b1879c80c2d 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1295,6 +1295,7 @@ DEFINE_String(user_files_secure_path, "${DORIS_HOME}"); DEFINE_Int32(fe_expire_duration_seconds, "60"); DEFINE_Int32(grace_shutdown_wait_seconds, "120"); +DEFINE_Int32(grace_shutdown_post_delay_seconds, "30"); DEFINE_Int16(bitmap_serialize_version, "1"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 455a2796637a50..b2e41b9f977d80 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1356,6 +1356,12 @@ DECLARE_Int32(fe_expire_duration_seconds); // , but if the waiting time exceed the limit, then be will exit directly. // During this period, FE will not send any queries to BE and waiting for all running queries to stop. DECLARE_Int32(grace_shutdown_wait_seconds); +// When using the graceful stop feature, after the main process waits for +// all currently running tasks to finish, it will continue to wait for +// an additional period to ensure that queries still running on other nodes have also completed. +// Since a BE node cannot detect the task execution status on other BE nodes, +// you may need to increase this threshold to allow for a longer waiting time. +DECLARE_Int32(grace_shutdown_post_delay_seconds); // BitmapValue serialize version. DECLARE_Int16(bitmap_serialize_version); diff --git a/be/src/http/action/health_action.cpp b/be/src/http/action/health_action.cpp index 91be5f78f306d5..8c1976dea2ecc0 100644 --- a/be/src/http/action/health_action.cpp +++ b/be/src/http/action/health_action.cpp @@ -24,21 +24,42 @@ #include "http/http_headers.h" #include "http/http_request.h" #include "http/http_status.h" +#include "runtime/exec_env.h" namespace doris { const static std::string HEADER_JSON = "application/json"; void HealthAction::handle(HttpRequest* req) { + std::string status; + std::string msg; + HttpStatus st; + // always return HttpStatus::OK + // because in k8s, we don't want the pod to be removed + // from service during shutdown + if (!doris::k_is_server_ready) { + status = "Server is not available"; + msg = "Server is not ready"; + st = HttpStatus::OK; + } else if (doris::k_doris_exit) { + status = "Server is not available"; + msg = "Server is shutting down"; + st = HttpStatus::OK; + } else { + status = "OK"; + msg = "OK"; + st = HttpStatus::OK; + } + std::stringstream ss; ss << "{"; - ss << "\"status\": \"OK\","; - ss << "\"msg\": \"To Be Added\""; + ss << "\"status\": \"" << status << "\","; + ss << "\"msg\": \"" << msg << "\""; ss << "}"; std::string result = ss.str(); req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str()); - HttpChannel::send_reply(req, HttpStatus::OK, result); + HttpChannel::send_reply(req, st, result); } } // end namespace doris diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index 5cd36368732acc..70835a7284f66f 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -176,6 +176,13 @@ void ExecEnv::wait_for_all_tasks_done() { sleep(1); ++wait_seconds_passed; } + // This is a conservative strategy. + // Because a query might still have fragments running on other BE nodes. + // In other words, the query hasn't truly terminated. + // If the current BE is shut down at this point, + // the FE will detect the downtime of a related BE and cancel the entire query, + // defeating the purpose of a graceful stop. + sleep(config::grace_shutdown_post_delay_seconds); } bool ExecEnv::check_auth_token(const std::string& auth_token) { diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 9f9ea7eb8700ca..d740ad3eed0677 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -131,7 +131,10 @@ class IndexPolicyMgr; struct SyncRowsetStats; class DeleteBitmapAggCache; +// set to true when BE is shutting down inline bool k_doris_exit = false; +// set to true after BE start ready +inline bool k_is_server_ready = false; // Execution environment for queries/plan fragments. // Contains all required global structures, and handles to diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 82dd743267407e..c6d9df630dc841 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -602,12 +602,15 @@ int main(int argc, char** argv) { exec_env->storage_engine().notify_listeners(); + doris::k_is_server_ready = true; + while (!doris::k_doris_exit) { #if defined(LEAK_SANITIZER) __lsan_do_leak_check(); #endif sleep(3); } + doris::k_is_server_ready = false; LOG(INFO) << "Doris main exiting."; #if defined(LLVM_PROFILE) __llvm_profile_write_file(); diff --git a/be/test/http/http_client_test.cpp b/be/test/http/http_client_test.cpp index 84e4d259ff5ccd..0416fd658de12c 100644 --- a/be/test/http/http_client_test.cpp +++ b/be/test/http/http_client_test.cpp @@ -391,7 +391,7 @@ TEST_F(HttpClientTest, enable_http_auth) { st = client.execute(&response); EXPECT_TRUE(st.ok()); std::cout << "response = " << response << "\n"; - EXPECT_TRUE(response.find("To Be Added") != std::string::npos); + EXPECT_TRUE(response.find("Server is not ready") != std::string::npos); } { @@ -422,7 +422,7 @@ TEST_F(HttpClientTest, enable_http_auth) { st = client.execute(&response); EXPECT_TRUE(st.ok()); std::cout << "response = " << response << "\n"; - EXPECT_TRUE(response.find("To Be Added") != std::string::npos); + EXPECT_TRUE(response.find("Server is not ready") != std::string::npos); } { diff --git a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java index f6a70a13740c1c..679945f3dcd98c 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java @@ -35,6 +35,8 @@ import org.apache.doris.journal.bdbje.BDBTool; import org.apache.doris.journal.bdbje.BDBToolOptions; import org.apache.doris.persist.meta.MetaReader; +import org.apache.doris.qe.Coordinator; +import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.qe.QeService; import org.apache.doris.qe.SimpleScheduler; import org.apache.doris.service.ExecuteEnv; @@ -61,7 +63,10 @@ import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; import java.nio.file.StandardOpenOption; + +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; public class DorisFE { private static final Logger LOG = LogManager.getLogger(DorisFE.class); @@ -80,6 +85,12 @@ public class DorisFE { private static FileChannel processLockFileChannel; private static FileLock processFileLock; + // set to true when all servers are ready. + private static final AtomicBoolean serverReady = new AtomicBoolean(false); + + // HTTP server instance, used for graceful shutdown + private static HttpServer httpServer; + public static void main(String[] args) { // Every doris version should have a final meta version, it should not change // between small releases. Add a check here to avoid mistake. @@ -142,7 +153,19 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star } Log4jConfig.initLogging(dorisHomeDir + "/conf/"); - Runtime.getRuntime().addShutdownHook(new Thread(LogManager::shutdown)); + // Add shutdown hook for graceful exit + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + LOG.info("Received shutdown signal, starting graceful shutdown..."); + serverReady.set(false); + gracefulShutdown(); + + // Shutdown HTTP server after main process graceful shutdown is complete + if (httpServer != null) { + httpServer.shutdown(); + } + + LogManager.shutdown(); + })); // set dns cache ttl java.security.Security.setProperty("networkaddress.cache.ttl", "60"); @@ -195,7 +218,7 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star feServer.start(); if (options.enableHttpServer) { - HttpServer httpServer = new HttpServer(); + httpServer = new HttpServer(); httpServer.setPort(Config.http_port); httpServer.setHttpsPort(Config.https_port); httpServer.setMaxHttpPostSize(Config.jetty_server_max_http_post_size); @@ -224,11 +247,14 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star ThreadPoolManager.registerAllThreadPoolMetric(); startMonitor(); + + serverReady.set(true); + // JVM will exit when shutdown hook is completed while (true) { Thread.sleep(2000); } } catch (Throwable e) { - // Some exception may thrown before LOG is inited. + // Some exception may throw before LOG is inited. // So need to print to stdout e.printStackTrace(); LOG.error("", e); @@ -538,4 +564,25 @@ public static class StartupOptions { public boolean enableHttpServer = true; public boolean enableQeService = true; } + + public static boolean isServerReady() { + return serverReady.get(); + } + + private static void gracefulShutdown() { + // wait for all queries to finish + try { + long now = System.currentTimeMillis(); + List allCoordinators = QeProcessorImpl.INSTANCE.getAllCoordinators(); + while (!allCoordinators.isEmpty() && System.currentTimeMillis() - now < 300 * 1000L) { + Thread.sleep(1000); + allCoordinators = QeProcessorImpl.INSTANCE.getAllCoordinators(); + LOG.info("waiting {} queries to finish before shutdown", allCoordinators.size()); + } + } catch (Throwable t) { + LOG.error("", t); + } + + LOG.info("graceful shutdown finished"); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java index 56a08e881acf26..89de4f8b49d858 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java @@ -102,7 +102,7 @@ private boolean isColocated() { public long getColocatedBeId(String clusterId) throws ComputeGroupException { CloudSystemInfoService infoService = ((CloudSystemInfoService) Env.getCurrentSystemInfo()); List bes = infoService.getBackendsByClusterId(clusterId).stream() - .filter(be -> !be.isQueryDisabled()).collect(Collectors.toList()); + .filter(be -> be.isQueryAvailable()).collect(Collectors.toList()); String clusterName = infoService.getClusterNameByClusterId(clusterId); if (bes.isEmpty()) { LOG.warn("failed to get available be, cluster: {}-{}", clusterName, clusterId); @@ -418,7 +418,7 @@ public long hashReplicaToBe(String clusterId, boolean isBackGround) throws Compu long lastUpdateMs = be.getLastUpdateMs(); long missTimeMs = Math.abs(lastUpdateMs - System.currentTimeMillis()); // be core or restart must in heartbeat_interval_second - if ((be.isAlive() || missTimeMs <= Config.heartbeat_interval_second * 1000L) + if ((be.isQueryAvailable() || missTimeMs <= Config.heartbeat_interval_second * 1000L) && !be.isSmoothUpgradeSrc()) { if (be.isDecommissioned()) { decommissionAvailBes.add(be); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java index 1c24ca69d4f191..1747baa758577a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java @@ -36,16 +36,11 @@ public class FeConstants { public static int checkpoint_interval_second = 60; // 1 minutes - // dpp version - public static String dpp_version = "3_2_0"; - // bloom filter false positive probability public static double default_bloom_filter_fpp = 0.05; // set to true to skip some step when running FE unit test public static boolean runningUnitTest = false; - // use to set some mocked values for FE unit test - public static Object unitTestConstant = null; // set to false to disable internal schema db public static boolean enableInternalSchemaDb = true; @@ -66,30 +61,9 @@ public class FeConstants { // use for copy into test public static boolean disablePreHeat = false; - public static final String FS_PREFIX_S3 = "s3"; - public static final String FS_PREFIX_S3A = "s3a"; - public static final String FS_PREFIX_S3N = "s3n"; - public static final String FS_PREFIX_OSS = "oss"; - public static final String FS_PREFIX_GCS = "gs"; - public static final String FS_PREFIX_BOS = "bos"; - public static final String FS_PREFIX_COS = "cos"; - public static final String FS_PREFIX_COSN = "cosn"; - public static final String FS_PREFIX_LAKEFS = "lakefs"; - public static final String FS_PREFIX_OBS = "obs"; - public static final String FS_PREFIX_OFS = "ofs"; - public static final String FS_PREFIX_GFS = "gfs"; - public static final String FS_PREFIX_JFS = "jfs"; - public static final String FS_PREFIX_HDFS = "hdfs"; - public static final String FS_PREFIX_VIEWFS = "viewfs"; - public static final String FS_PREFIX_FILE = "file"; - public static final String INTERNAL_DB_NAME = "__internal_schema"; public static final String INTERNAL_FILE_CACHE_HOTSPOT_TABLE_NAME = "cloud_cache_hotspot"; public static String TEMP_MATERIZLIZE_DVIEW_PREFIX = "internal_tmp_materialized_view_"; public static String METADATA_FAILURE_RECOVERY_KEY = "metadata_failure_recovery"; - - public static String CLOUD_RETRY_E230 = "E-230"; - - public static String BUILT_IN_STORAGE_VAULT_NAME = "built_in_storage_vault"; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/HttpServer.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/HttpServer.java index 88ec1a1b9f93e8..b769dc04631d48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/HttpServer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/HttpServer.java @@ -23,11 +23,14 @@ import org.apache.doris.httpv2.config.SpringLog4j2Config; import org.apache.doris.service.FrontendOptions; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.web.servlet.ServletComponentScan; import org.springframework.boot.web.servlet.support.SpringBootServletInitializer; +import org.springframework.context.ConfigurableApplicationContext; import java.util.HashMap; import java.util.Map; @@ -36,6 +39,8 @@ @EnableConfigurationProperties @ServletComponentScan public class HttpServer extends SpringBootServletInitializer { + private static final Logger LOG = LogManager.getLogger(HttpServer.class); + private ConfigurableApplicationContext applicationContext; private int port; private int httpsPort; private int acceptors; @@ -176,9 +181,25 @@ public void start() { } else { properties.put("logging.config", Config.custom_config_dir + "/" + SpringLog4j2Config.SPRING_LOG_XML_FILE); } - new SpringApplicationBuilder() + // Disable automatic shutdown hook registration + // This prevents Spring Boot from responding to SIGTERM automatically + // allowing the main process (DorisFE) to control when the HTTP server shuts down + this.applicationContext = new SpringApplicationBuilder() .sources(HttpServer.class) .properties(properties) + .registerShutdownHook(false) .run(new String[]{}); + } + + /** + * Explicitly shutdown the HTTP server. + * This method should be called by the main process (DorisFE) after its graceful shutdown is complete. + */ + public void shutdown() { + if (applicationContext != null) { + LOG.info("Shutting down HTTP server gracefully..."); + applicationContext.close(); + LOG.info("HTTP server shutdown complete"); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/entity/ResponseEntityBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/entity/ResponseEntityBuilder.java index 30564945505383..7cc4b6774f7e43 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/entity/ResponseEntityBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/entity/ResponseEntityBuilder.java @@ -67,4 +67,9 @@ public static ResponseEntity notFound(Object data) { ResponseBody body = new ResponseBody().code(RestApiStatusCode.NOT_FOUND).msg("Not Found").data(data); return ResponseEntity.status(HttpStatus.OK).body(body); } + + public static ResponseEntity serviceUnavailable(String msg) { + ResponseBody body = new ResponseBody().code(RestApiStatusCode.SERVICE_UNAVAILABLE).msg(msg); + return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(body); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/HealthAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/HealthAction.java index de8f1b235ecc29..ada1c88cac4379 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/HealthAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/HealthAction.java @@ -17,6 +17,7 @@ package org.apache.doris.httpv2.rest; +import org.apache.doris.DorisFE; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; @@ -39,6 +40,10 @@ public Object execute(HttpServletRequest request, HttpServletResponse response) executeCheckPassword(request, response); } + if (!DorisFE.isServerReady()) { + return ResponseEntityBuilder.serviceUnavailable("Server is not ready"); + } + Map result = new HashMap<>(); result.put("total_backend_num", Env.getCurrentSystemInfo().getAllBackendIds(false).size()); result.put("online_backend_num", Env.getCurrentSystemInfo().getAllBackendIds(true).size()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestApiStatusCode.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestApiStatusCode.java index 6952f2abb7ce17..fc803b2d9552c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestApiStatusCode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestApiStatusCode.java @@ -23,7 +23,8 @@ public enum RestApiStatusCode { UNAUTHORIZED(401), BAD_REQUEST(403), NOT_FOUND(404), - INTERNAL_SERVER_ERROR(500); + INTERNAL_SERVER_ERROR(500), + SERVICE_UNAVAILABLE(503); public int code; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index d0ec0f47ede493..1effe8d259ef54 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -62,6 +62,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TCell; import org.apache.doris.thrift.TRow; import org.apache.doris.thrift.TStatusCode; @@ -317,7 +318,7 @@ private void executeWithRetry(Set execPartitionNames, Map backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) .getBackendsByClusterName(clusterName) - .stream().filter(Backend::isAlive) + .stream().filter(Backend::isLoadAvailable) .collect(Collectors.toList()); if (backends.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java index 0dc5922794ea8a..c80618307d2675 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java @@ -23,7 +23,6 @@ import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; @@ -36,6 +35,7 @@ import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.qe.QeProcessorImpl.QueryInfo; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.LoadEtlTask; import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TStatusCode; @@ -202,8 +202,8 @@ public void executeSingleInsert(StmtExecutor executor, long jobId) throws Except onComplete(); } catch (Throwable t) { onFail(t); - // retry insert into from select when meet E-230 in cloud - if (Config.isCloudMode() && t.getMessage().contains(FeConstants.CLOUD_RETRY_E230)) { + // retry insert into from select when meet "need re-plan error" or no scan node in cloud + if (Config.isCloudMode() && SystemInfoService.needRetryWithReplan(t.getMessage())) { throw t; } return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java index bc4e4774d8e8a0..56990b5e404466 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java @@ -27,7 +27,6 @@ import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugPointUtil; @@ -50,6 +49,7 @@ import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TOlapTableLocationParam; import org.apache.doris.thrift.TPartitionType; import org.apache.doris.transaction.BeginTransactionException; @@ -248,8 +248,8 @@ protected void onFail(Throwable t) { labelName, queryId, txnId, abortTxnException); } } - // retry insert into from select when meet E-230 in cloud - if (Config.isCloudMode() && t.getMessage().contains(FeConstants.CLOUD_RETRY_E230)) { + // retry insert into from select when meet "need re-plan error" in cloud + if (Config.isCloudMode() && SystemInfoService.needRetryWithReplan(t.getMessage())) { return; } StringBuilder sb = new StringBuilder(t.getMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index d9ccfbe7650237..5bcc6aeb825fae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -594,7 +594,10 @@ public void queryRetry(TUniqueId queryId) throws Exception { execute(queryId); return; } catch (UserException e) { - if (!e.getMessage().contains(FeConstants.CLOUD_RETRY_E230) || i == retryTime) { + if (!SystemInfoService.needRetryWithReplan(e.getMessage()) || i == retryTime) { + // We have retried internally(in handleQueryWithRetry()) for other kinds of exceptions. + // And for error in SystemInfoService.NEED_REPLAN_ERRORS, they are not handled internally but here + // so we just handle these errors, and throw exception for other errors. throw e; } if (this.coord != null && this.coord.isQueryCancelled()) { @@ -610,8 +613,8 @@ public void queryRetry(TUniqueId queryId) throws Exception { if (DebugPointUtil.isEnable("StmtExecutor.retry.longtime")) { randomMillis = 1000; } - LOG.warn("receive E-230 tried={} first queryId={} last queryId={} new queryId={} sleep={}ms", - i, DebugUtil.printId(firstQueryId), DebugUtil.printId(lastQueryId), + LOG.warn("receive '{}' tried={} first queryId={} last queryId={} new queryId={} sleep={}ms", + e.getMessage(), i, DebugUtil.printId(firstQueryId), DebugUtil.printId(lastQueryId), DebugUtil.printId(queryId), randomMillis); Thread.sleep(randomMillis); context.getState().reset(); @@ -806,7 +809,9 @@ private void executeByNereids(TUniqueId queryId) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug("Command({}) process failed.", originStmt.originStmt, e); } - if (Config.isCloudMode() && e.getDetailMessage().contains(FeConstants.CLOUD_RETRY_E230)) { + if (Config.isCloudMode() && SystemInfoService.needRetryWithReplan(e.getDetailMessage())) { + // For errors in SystemInfoService.NEED_REPLAN_ERRORS, + // throw exception directly to trigger a replan retry outside(in StmtExecutor.queryRetry()) throw e; } context.getState().setError(e.getMysqlErrorCode(), e.getMessage()); @@ -957,15 +962,17 @@ private void handleQueryWithRetry(TUniqueId queryId) throws Exception { handleQueryStmt(); break; } catch (RpcException | UserException e) { - if (Config.isCloudMode() && e.getMessage().contains(FeConstants.CLOUD_RETRY_E230)) { + if (Config.isCloudMode() && SystemInfoService.needRetryWithReplan(e.getMessage())) { + // For errors in SystemInfoService.NEED_REPLAN_ERRORS, + // throw exception directly to trigger a replan retry outside(in StmtExecutor.queryRetry()) throw e; } // If the previous try is timeout or cancelled, then do not need try again. if (this.coord != null && (this.coord.isQueryCancelled() || this.coord.isTimeout())) { throw e; } - // cloud mode retry - LOG.debug("due to exception {} retry {} rpc {} user {}", + + LOG.warn("retry due to exception {}. retried {} times. is rpc error: {}, is user error: {}.", e.getMessage(), i, e instanceof RpcException, e instanceof UserException); boolean isNeedRetry = false; if (Config.isCloudMode()) { @@ -2052,6 +2059,8 @@ public void executeAndSendResult(boolean isOutfileQuery, boolean isSendFields, DebugUtil.printId(context.queryId()), e.getMessage()); LOG.warn(internalErrorSt.getErrorMsg()); coordBase.cancel(internalErrorSt); + // set to null so that the retry logic will generate a new coordinator + this.coord = null; throw e; } finally { coordBase.close(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index c42456acb639ce..24019306d5fcf0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -26,6 +26,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.persist.gson.GsonUtils; @@ -37,6 +38,7 @@ import org.apache.doris.thrift.TStorageMedium; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -51,6 +53,7 @@ import java.io.IOException; import java.security.SecureRandom; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -156,10 +159,6 @@ public class Backend implements Writable { // No need to persist, because only master FE handle heartbeat. private int heartbeatFailureCounter = 0; - // Not need serialize this field. If fe restart the state is reset to false. Maybe fe will - // send some queries to this BE, it is not an important problem. - private AtomicBoolean isShutDown = new AtomicBoolean(false); - private long nextForceEditlogHeartbeatTime = System.currentTimeMillis() + (new SecureRandom()).nextInt(60 * 1000); public Backend() { @@ -296,20 +295,38 @@ public void setLastStreamLoadTime(long lastStreamLoadTime) { this.backendStatus.lastStreamLoadTime = lastStreamLoadTime; } + // ATTN: This method only return the value of "isQueryDisabled", + // it does not determine the backend IS queryable or not, use isQueryAvailable instead. public boolean isQueryDisabled() { return backendStatus.isQueryDisabled; } - public void setQueryDisabled(boolean isQueryDisabled) { - this.backendStatus.isQueryDisabled = isQueryDisabled; + // return true if be status is changed + public boolean setQueryDisabled(boolean isQueryDisabled) { + if (this.backendStatus.isQueryDisabled != isQueryDisabled) { + this.backendStatus.isQueryDisabled = isQueryDisabled; + return true; + } + return false; } + // ATTN: This method only return the value of "isLoadDisabled", + // it does not determine the backend IS loadable or not, use isLoadAvailable instead. public boolean isLoadDisabled() { return backendStatus.isLoadDisabled; } - public void setLoadDisabled(boolean isLoadDisabled) { - this.backendStatus.isLoadDisabled = isLoadDisabled; + // return true if be status is changed + public boolean setLoadDisabled(boolean isLoadDisabled) { + if (this.backendStatus.isLoadDisabled != isLoadDisabled) { + this.backendStatus.isLoadDisabled = isLoadDisabled; + return true; + } + return false; + } + + private boolean isShutDown() { + return backendStatus.isShutdown; } public void setActive(boolean isActive) { @@ -523,15 +540,21 @@ public boolean isDecommissioning() { } public boolean isQueryAvailable() { - return isAlive() && !isQueryDisabled() && !isShutDown.get(); + String debugDeadBeIds = DebugPointUtil.getDebugParamOrDefault( + "Backend.isQueryAvailable", "unavailableBeIds", ""); + if (!Strings.isNullOrEmpty(debugDeadBeIds) + && Arrays.stream(debugDeadBeIds.split(",")).anyMatch(id -> Long.parseLong(id) == this.id)) { + return false; + } + return isAlive() && !isQueryDisabled() && !isShutDown(); } public boolean isScheduleAvailable() { - return isAlive() && !isDecommissioned(); + return isAlive() && !isDecommissioned() && !isShutDown(); } public boolean isLoadAvailable() { - return isAlive() && !isLoadDisabled(); + return isAlive() && !isLoadDisabled() && !isShutDown(); } public void setDisks(ImmutableMap disks) { @@ -871,10 +894,10 @@ public boolean handleHbResponse(BackendHbResponse hbResponse, boolean isReplay) this.arrowFlightSqlPort = hbResponse.getArrowFlightSqlPort(); } - if (this.isShutDown.get() != hbResponse.isShutDown()) { + if (this.backendStatus.isShutdown != hbResponse.isShutDown()) { isChanged = true; LOG.info("{} shutdown state is changed", this.toString()); - this.isShutDown.set(hbResponse.isShutDown()); + this.backendStatus.isShutdown = hbResponse.isShutDown(); } if (!this.getNodeRoleTag().value.equals(hbResponse.getNodeRole()) && Tag.validNodeRoleTag( @@ -988,6 +1011,8 @@ public class BackendStatus { public volatile boolean isLoadDisabled = false; @SerializedName("isActive") public volatile boolean isActive = true; + @SerializedName("isShutdown") + public volatile boolean isShutdown = false; // cloud mode, cloud control just query master, so not need SerializedName public volatile long currentFragmentNum = 0; @@ -1123,3 +1148,4 @@ private void migrateEndpointTag(Map tagMap, String oldTagName, S } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index acace11f400644..07535da3953f77 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -43,6 +43,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; @@ -79,6 +80,13 @@ public class SystemInfoService { public static final String NOT_USING_VALID_CLUSTER_MSG = "Not using valid cloud clusters, please use a cluster before issuing any queries"; + public static final String ERROR_E230 = "E-230"; + + public static final ImmutableSet NEED_REPLAN_ERRORS = ImmutableSet.of( + NO_SCAN_NODE_BACKEND_AVAILABLE_MSG, + ERROR_E230 + ); + protected volatile ImmutableMap idToBackendRef = ImmutableMap.of(); protected volatile ImmutableMap idToReportVersionRef = ImmutableMap.of(); @@ -965,17 +973,11 @@ public void modifyBackends(ModifyBackendClause alterClause) throws UserException } if (alterClause.isQueryDisabled() != null) { - if (!alterClause.isQueryDisabled().equals(be.isQueryDisabled())) { - be.setQueryDisabled(alterClause.isQueryDisabled()); - shouldModify = true; - } + shouldModify = be.setQueryDisabled(alterClause.isQueryDisabled()); } if (alterClause.isLoadDisabled() != null) { - if (!alterClause.isLoadDisabled().equals(be.isLoadDisabled())) { - be.setLoadDisabled(alterClause.isLoadDisabled()); - shouldModify = true; - } + shouldModify = be.setLoadDisabled(alterClause.isLoadDisabled()); } if (shouldModify) { @@ -1059,4 +1061,16 @@ public int getTabletNumByBackendId(long beId) { return Env.getCurrentInvertedIndex().getTabletNumByBackendId(beId); } + // If the error msg contains certain keywords, we need to retry the query with re-plan. + public static boolean needRetryWithReplan(String errorMsg) { + if (Strings.isNullOrEmpty(errorMsg)) { + return false; + } + for (String keyword : NEED_REPLAN_ERRORS) { + if (errorMsg.contains(keyword)) { + return true; + } + } + return false; + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java index 208fb2afaa1682..2b92cd80e75ebe 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java @@ -203,7 +203,8 @@ public void testCreateDbAndTable() throws Exception { result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 9)); Assert.assertEquals( "{\"lastSuccessReportTabletsTime\":\"N/A\",\"lastStreamLoadTime\":-1,\"isQueryDisabled\":false," - + "\"isLoadDisabled\":false,\"isActive\":true,\"currentFragmentNum\":0,\"lastFragmentUpdateTime\":0}", + + "\"isLoadDisabled\":false,\"isActive\":true,\"isShutdown\":false,\"currentFragmentNum\":0," + + "\"lastFragmentUpdateTime\":0}", result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 6)); Assert.assertEquals("0", result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 5)); Assert.assertEquals(Tag.VALUE_MIX, result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 4)); diff --git a/regression-test/framework/pom.xml b/regression-test/framework/pom.xml index 60746483928518..7ca39bb9b7e7d7 100644 --- a/regression-test/framework/pom.xml +++ b/regression-test/framework/pom.xml @@ -321,11 +321,11 @@ under the License. aliyun-java-sdk-ram 3.3.1 - + diff --git a/regression-test/suites/cloud_p0/query_retry/test_retry_no_scan_node.groovy b/regression-test/suites/cloud_p0/query_retry/test_retry_no_scan_node.groovy new file mode 100644 index 00000000000000..ce5417924e5852 --- /dev/null +++ b/regression-test/suites/cloud_p0/query_retry/test_retry_no_scan_node.groovy @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.NodeType +import org.apache.doris.regression.suite.SuiteCluster + +suite("test_retry_no_scan_node", "p0, docker") { + if (!isCloudMode()) { + return + } + def options = new ClusterOptions() + options.enableDebugPoints() + options.setFeNum(1) + options.feConfigs.add('max_query_retry_time=100') + options.feConfigs.add('sys_log_verbose_modules=org') + options.setBeNum(2) + options.cloudMode = true + // 1. connect to master + options.connectToFollower = false + + def queryTask = { + for (int i = 0; i < 100; i++) { + try { + log.info("query count: {}", i) + sql """select * from test_no_scan_node_table""" + Thread.sleep(100) + } catch (Exception e) { + logger.warn("select failed: ${e.message}") + assertFalse(true); + } + } + } + + docker(options) { + def be1 = cluster.getBeByIndex(1) + def beId = be1.backendId; + + try { + sql """ + CREATE TABLE test_no_scan_node_table + ( k1 TINYINT, k2 INT not null ) + DISTRIBUTED BY HASH(k2) BUCKETS 2 PROPERTIES ( "replication_num" = "1" ); + """ + sql """ + INSERT INTO test_no_scan_node_table VALUES (1, 1), (2, 2), (3, 3); + """ + + def result = sql """select * from test_no_scan_node_table order by k2;""" + log.info("insert result : {}", result) + assertEquals([[1, 1], [2, 2], [3, 3]], result) + + // this should be run at least 10 seconds + def queryThread = Thread.start(queryTask) + + // inject query not available error + cluster.injectDebugPoints(NodeType.FE, ['Backend.isQueryAvailable' : [unavailableBeIds:beId]]) + // query should have no failure + // wait query thread finish + queryThread.join(15000); + } finally { + cluster.clearFrontendDebugPoints() + cluster.clearBackendDebugPoints() + } + } +} + From c50d659c679faa49bedb1c70246e4942e104c17c Mon Sep 17 00:00:00 2001 From: "Mingyu Chen (Rayner)" Date: Fri, 7 Nov 2025 14:43:07 +0800 Subject: [PATCH 2/3] 2 --- fe/fe-core/src/main/java/org/apache/doris/DorisFE.java | 1 - .../src/main/java/org/apache/doris/httpv2/HttpServer.java | 2 +- .../src/main/java/org/apache/doris/qe/StmtExecutor.java | 4 ++-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java index 679945f3dcd98c..cc29a28bfad827 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java @@ -63,7 +63,6 @@ import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; import java.nio.file.StandardOpenOption; - import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/HttpServer.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/HttpServer.java index b769dc04631d48..9d7dc79a8e10e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/HttpServer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/HttpServer.java @@ -189,7 +189,7 @@ public void start() { .properties(properties) .registerShutdownHook(false) .run(new String[]{}); - } + } /** * Explicitly shutdown the HTTP server. diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 5bcc6aeb825fae..0e384e55787fd8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1182,7 +1182,7 @@ public void executeByLegacy(TUniqueId queryId) throws Exception { throw e; } catch (UserException e) { // insert into select - if (Config.isCloudMode() && e.getMessage().contains(FeConstants.CLOUD_RETRY_E230)) { + if (Config.isCloudMode() && SystemInfoService.needRetryWithReplan(e.getMessage())) { throw e; } // analysis exception only print message, not print the stack @@ -2533,7 +2533,7 @@ private void handleInsertStmt() throws Exception { } // cloud mode, insert into select meet -230, retry - if (Config.isCloudMode() && t.getMessage().contains(FeConstants.CLOUD_RETRY_E230)) { + if (Config.isCloudMode() && SystemInfoService.needRetryWithReplan(t.getMessage())) { LOG.warn("insert into select meet E-230, retry again"); resetAnalyzerAndStmt(); if (insertStmt instanceof NativeInsertStmt) { From b0811c6d236f87e91e761bd0a8a516eb8fa68ecc Mon Sep 17 00:00:00 2001 From: "Mingyu Chen (Rayner)" Date: Fri, 7 Nov 2025 14:56:24 +0800 Subject: [PATCH 3/3] 2 --- .../src/main/java/org/apache/doris/common/FeConstants.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java index 1747baa758577a..34e7d7dbc5a0b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java @@ -41,6 +41,8 @@ public class FeConstants { // set to true to skip some step when running FE unit test public static boolean runningUnitTest = false; + // use to set some mocked values for FE unit test + public static Object unitTestConstant = null; // set to false to disable internal schema db public static boolean enableInternalSchemaDb = true;