diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectConverterPlugin.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectConverterPlugin.java index bff2dfd1340061..02b19ae0ba103d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectConverterPlugin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectConverterPlugin.java @@ -103,6 +103,15 @@ public ImmutableSet acceptDialects() { if (Strings.isNullOrEmpty(targetURL)) { return null; } + // TODO: support multiple URLs load balancing, here we just use the first one + String[] urlArray = targetURL.split(","); + for (String url : urlArray) { + String trimmedUrl = url.trim(); + if (!trimmedUrl.isEmpty()) { + targetURL = trimmedUrl; + break; + } + } return HttpDialectUtils.convertSql(targetURL, originSql, sessionVariable.getSqlDialect(), sessionVariable.getSqlConvertorFeatures(), sessionVariable.getSqlConvertorConfig()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java index 5131cf82bf66fb..89acd66658d6f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java @@ -17,9 +17,6 @@ package org.apache.doris.plugin.dialect; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.google.common.collect.Lists; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import lombok.Data; @@ -33,113 +30,35 @@ import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; /** * This class is used to convert sql with different dialects using sql convertor service. * The sql convertor service is a http service which is used to convert sql. - *

- * Features: - * - Support multiple URLs (comma separated) - * - Blacklist mechanism for failed URLs - * - Automatic failover and retry - * - URL caching and smart selection */ public class HttpDialectUtils { private static final Logger LOG = LogManager.getLogger(HttpDialectUtils.class); - // Cache URL manager instances to avoid duplicate parsing with automatic expiration - private static final Cache urlManagerCache = Caffeine.newBuilder() - .maximumSize(10) - .expireAfterAccess(8, TimeUnit.HOURS) - .build(); - - // Blacklist recovery time (ms): 1 minute - private static final long BLACKLIST_RECOVERY_TIME_MS = 60 * 1000; - // Connection timeout period (ms): 3 seconds - private static final int CONNECTION_TIMEOUT_MS = 3000; - // Read timeout period (ms): 10 seconds - private static final int READ_TIMEOUT_MS = 10000; - - public static String convertSql(String targetURLs, String originStmt, String dialect, + public static String convertSql(String targetURL, String originStmt, String dialect, String[] features, String config) { - UrlManager urlManager = getOrCreateUrlManager(targetURLs); ConvertRequest convertRequest = new ConvertRequest(originStmt, dialect, features, config); - String requestStr = convertRequest.toJson(); - - // Try to convert SQL using intelligent URL selection strategy - return tryConvertWithIntelligentSelection(urlManager, requestStr, originStmt); - } - - /** - * Try to convert SQL using intelligent URL selection strategy - * CRITICAL: This method ensures 100% success rate when ANY service is available - */ - private static String tryConvertWithIntelligentSelection( - UrlManager urlManager, String requestStr, String originStmt) { - // Strategy: Try ALL URLs in intelligent order, regardless of blacklist status - // This ensures 100% success rate when any service is actually available - List allUrls = urlManager.getAllUrlsInPriorityOrder(); - - for (String url : allUrls) { - try { - String result = doConvertSql(url, requestStr); - // If no exception thrown, HTTP response was successful (200) - // Mark URL as healthy and return result (even if empty) - urlManager.markUrlAsHealthy(url); - if (LOG.isDebugEnabled()) { - LOG.debug("Successfully converted SQL using URL: {}", url); - } - return result; - } catch (Exception e) { - LOG.warn("Failed to convert SQL using URL: {}, error: {}", url, e.getMessage()); - // Add failed URL to blacklist for future optimization - urlManager.markUrlAsBlacklisted(url); - // Continue trying next URL - this is CRITICAL for 100% success rate - } - } - - return originStmt; - } - - /** - * Get or create a URL manager - */ - private static UrlManager getOrCreateUrlManager(String targetURLs) { - return urlManagerCache.get(targetURLs, UrlManager::new); - } - - /** - * Perform SQL conversion for individual URL - */ - private static String doConvertSql(String targetURL, String requestStr) throws Exception { HttpURLConnection connection = null; try { - if (targetURL == null || targetURL.trim().isEmpty()) { - throw new Exception("Target URL is null or empty"); - } - URL url = new URL(targetURL.trim()); + URL url = new URL(targetURL); connection = (HttpURLConnection) url.openConnection(); connection.setRequestMethod("POST"); connection.setRequestProperty("Content-Type", "application/json"); connection.setUseCaches(false); connection.setDoOutput(true); - connection.setConnectTimeout(CONNECTION_TIMEOUT_MS); - connection.setReadTimeout(READ_TIMEOUT_MS); + String requestStr = convertRequest.toJson(); try (OutputStream outputStream = connection.getOutputStream()) { outputStream.write(requestStr.getBytes(StandardCharsets.UTF_8)); } int responseCode = connection.getResponseCode(); if (LOG.isDebugEnabled()) { - LOG.debug("POST Response Code: {}, URL: {}, post data: {}", responseCode, targetURL, requestStr); + LOG.debug("POST Response Code: {}, post data: {}", responseCode, requestStr); } if (responseCode == HttpURLConnection.HTTP_OK) { @@ -157,20 +76,26 @@ private static String doConvertSql(String targetURL, String requestStr) throws E }.getType(); ConvertResponse result = new Gson().fromJson(response.toString(), type); if (LOG.isDebugEnabled()) { - LOG.debug("Convert response: {}, URL: {}", result, targetURL); + LOG.debug("convert response: {}", result); } if (result.code == 0) { if (!"v1".equals(result.version)) { - throw new Exception("Unsupported version: " + result.version); + LOG.warn("failed to convert sql, response version is not v1: {}", result.version); + return originStmt; } return result.data; } else { - throw new Exception("Conversion failed: " + result.message); + LOG.warn("failed to convert sql, response: {}", result); + return originStmt; } } } else { - throw new Exception("HTTP response code: " + responseCode); + LOG.warn("failed to convert sql, response code: {}", responseCode); + return originStmt; } + } catch (Exception e) { + LOG.warn("failed to convert sql", e); + return originStmt; } finally { if (connection != null) { connection.disconnect(); @@ -178,171 +103,6 @@ private static String doConvertSql(String targetURL, String requestStr) throws E } } - /** - * URL Manager - Responsible for URL parsing, caching, blacklist management, and smart selection - */ - private static class UrlManager { - private final List parsedUrls; - private final ConcurrentHashMap blacklist; - - public UrlManager(String urls) { - this.parsedUrls = parseUrls(urls); - this.blacklist = new ConcurrentHashMap<>(); - if (LOG.isDebugEnabled()) { - LOG.debug("Created UrlManager with URLs: {}, parsed: {}", urls, parsedUrls); - } - } - - /** - * Parse comma separated URL strings - */ - private List parseUrls(String urls) { - List result = Lists.newArrayList(); - if (urls != null && !urls.trim().isEmpty()) { - String[] urlArray = urls.split(","); - for (String url : urlArray) { - String trimmedUrl = url.trim(); - if (!trimmedUrl.isEmpty()) { - result.add(trimmedUrl); - } - } - } - return result; - } - - /** - * Mark URL as healthy (remove from blacklist) - */ - public void markUrlAsHealthy(String url) { - if (blacklist.remove(url) != null) { - LOG.info("Removed URL from blacklist due to successful request: {}", url); - } - } - - /** - * Add URL to blacklist - */ - public void markUrlAsBlacklisted(String url) { - // If URL is already in blacklist, just return - if (blacklist.containsKey(url)) { - return; - } - - long currentTime = System.currentTimeMillis(); - long recoverTime = currentTime + BLACKLIST_RECOVERY_TIME_MS; - blacklist.put(url, new BlacklistEntry(currentTime, recoverTime)); - LOG.warn("Added URL to blacklist: {}, will recover at: {}", url, new Date(recoverTime)); - } - - /** - * Check if URL is localhost (127.0.0.1 or localhost) - */ - private boolean isLocalhost(String url) { - return url.contains("127.0.0.1") || url.contains("localhost"); - } - - /** - * Get ALL URLs in priority order for 100% success guarantee - * CRITICAL: This method ensures we try every URL when any service might be available - *

- * Priority order: - * 1. Localhost URLs (127.0.0.1 or localhost) that are healthy - * 2. Other healthy URLs (randomly selected) - * 3. Localhost URLs in blacklist - * 4. Other blacklisted URLs (sorted by recovery time) - */ - public List getAllUrlsInPriorityOrder() { - List prioritizedUrls = Lists.newArrayList(); - List healthyLocalhost = Lists.newArrayList(); - List healthyOthers = Lists.newArrayList(); - List blacklistedLocalhost = Lists.newArrayList(); - List blacklistedOthers = Lists.newArrayList(); - - long currentTime = System.currentTimeMillis(); - - // Single traversal to categorize all URLs - for (String url : parsedUrls) { - BlacklistEntry entry = blacklist.get(url); - boolean isHealthy = false; - - if (entry == null) { - // URL is not in blacklist, consider it healthy - isHealthy = true; - } else if (currentTime >= entry.recoverTime) { - // URL has reached recovery time, remove from blacklist and consider healthy - blacklist.remove(url); - isHealthy = true; - if (LOG.isDebugEnabled()) { - LOG.debug("URL recovered from blacklist: {}", url); - } - } - - boolean isLocal = isLocalhost(url); - - if (isHealthy) { - if (isLocal) { - healthyLocalhost.add(url); - } else { - healthyOthers.add(url); - } - } else { - if (isLocal) { - blacklistedLocalhost.add(url); - } else { - blacklistedOthers.add(url); - } - } - } - - // Add URLs in priority order - // 1. Healthy localhost URLs first - prioritizedUrls.addAll(healthyLocalhost); - - // 2. Other healthy URLs (randomly shuffled for load balancing) - Collections.shuffle(healthyOthers, ThreadLocalRandom.current()); - prioritizedUrls.addAll(healthyOthers); - - // 3. Blacklisted localhost URLs - prioritizedUrls.addAll(blacklistedLocalhost); - - // 4. Other blacklisted URLs (sorted by recovery time) - blacklistedOthers.sort((url1, url2) -> { - BlacklistEntry entry1 = blacklist.get(url1); - BlacklistEntry entry2 = blacklist.get(url2); - if (entry1 == null && entry2 == null) { - return 0; - } - if (entry1 == null) { - return -1; - } - if (entry2 == null) { - return 1; - } - return Long.compare(entry1.recoverTime, entry2.recoverTime); - }); - prioritizedUrls.addAll(blacklistedOthers); - - if (LOG.isDebugEnabled()) { - LOG.debug("All URLs in priority order: {}", prioritizedUrls); - } - - return prioritizedUrls; - } - } - - /** - * Blacklist entry - */ - private static class BlacklistEntry { - final long blacklistedTime; - final long recoverTime; - - BlacklistEntry(long blacklistedTime, long recoverTime) { - this.blacklistedTime = blacklistedTime; - this.recoverTime = recoverTime; - } - } - @Data private static class ConvertRequest { private String version; // CHECKSTYLE IGNORE THIS LINE diff --git a/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java index 4ce71e196a33e1..de359f79475cc4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java @@ -28,379 +28,49 @@ import java.net.DatagramSocket; import java.net.ServerSocket; import java.net.SocketException; -import java.util.ArrayList; -import java.util.List; public class HttpDialectUtilsTest { - private List ports = new ArrayList<>(); - private List servers = new ArrayList<>(); + private int port; + private SimpleHttpServer server; @Before public void setUp() throws Exception { - // Create three test servers - for (int i = 0; i < 3; i++) { - int port = findValidPort(); - ports.add(port); - SimpleHttpServer server = new SimpleHttpServer(port); - server.start("/api/v1/convert"); - servers.add(server); - } + port = findValidPort(); + server = new SimpleHttpServer(port); + server.start("/api/v1/convert"); } @After public void tearDown() { - for (SimpleHttpServer server : servers) { - if (server != null) { - server.stop(); - } + if (server != null) { + server.stop(); } - servers.clear(); - ports.clear(); } @Test - public void testSingleUrlConvert() { + public void testSqlConvert() { String originSql = "select * from t1 where \"k1\" = 1"; String expectedSql = "select * from t1 where `k1` = 1"; String[] features = new String[] {"ctas"}; - String targetURL = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert"; - - // Test with no response (should return original SQL) + String targetURL = "http://127.0.0.1:" + port + "/api/v1/convert"; String res = HttpDialectUtils.convertSql(targetURL, originSql, "presto", features, "{}"); Assert.assertEquals(originSql, res); - - // Test successful conversion - servers.get(0).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + // test presto + server.setResponse("{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); res = HttpDialectUtils.convertSql(targetURL, originSql, "presto", features, "{}"); Assert.assertEquals(expectedSql, res); - - // Test version error - servers.get(0).setResponse( - "{\"version\": \"v2\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + // test response version error + server.setResponse("{\"version\": \"v2\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); res = HttpDialectUtils.convertSql(targetURL, originSql, "presto", features, "{}"); Assert.assertEquals(originSql, res); - - // Test code error - servers.get(0).setResponse( + // test response code error + server.setResponse( "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 400, \"message\": \"\"}"); res = HttpDialectUtils.convertSql(targetURL, originSql, "presto", features, "{}"); Assert.assertEquals(originSql, res); } - @Test - public void testMultipleUrlsConvert() { - String originSql = "select * from t1 where \"k1\" = 1"; - String expectedSql = "select * from t1 where `k1` = 1"; - String[] features = new String[] {"ctas"}; - - String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," - + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert," - + "http://127.0.0.1:" + ports.get(2) + "/api/v1/convert"; - - servers.get(0).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - - String res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); - Assert.assertEquals(expectedSql, res); - } - - @Test - public void testFailoverMechanism() throws InterruptedException { - String originSql = "select * from t1 where \"k1\" = 1"; - String expectedSql = "select * from t1 where `k1` = 1"; - String[] features = new String[] {"ctas"}; - - String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," - + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert"; - - // First server returns error, second server succeeds - servers.get(0).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 500, \"message\": \"error\"}"); - servers.get(1).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - - String res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); - Assert.assertEquals(expectedSql, res); - } - - @Test - public void testBlacklistMechanism() throws InterruptedException { - String originSql = "select * from t1 where \"k1\" = 1"; - String expectedSql = "select * from t1 where `k1` = 1"; - String[] features = new String[] {"ctas"}; - - String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," - + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert"; - - // Stop first server, set second server to work - servers.get(0).stop(); - servers.get(1).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - - // First call should succeed via second server - String res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); - Assert.assertEquals(expectedSql, res); - - // Restart first server - servers.set(0, new SimpleHttpServer(ports.get(0))); - try { - servers.get(0).start("/api/v1/convert"); - servers.get(0).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - } catch (IOException e) { - return; // Skip test if port is occupied - } - - // Should still work with blacklist recovery - res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); - Assert.assertEquals(expectedSql, res); - } - - @Test - public void testAllUrlsFailure() { - String originSql = "select * from t1 where \"k1\" = 1"; - String[] features = new String[] {"ctas"}; - - String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," - + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert"; - - // All servers return error - servers.get(0).setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); - servers.get(1).setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); - - String res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); - Assert.assertEquals(originSql, res); - } - - @Test - public void testUrlParsing() { - String originSql = "select * from t1 where \"k1\" = 1"; - String expectedSql = "select * from t1 where `k1` = 1"; - String[] features = new String[] {"ctas"}; - - // Test URL parsing with spaces and empty items - String targetURLs = " http://127.0.0.1:" + ports.get(0) + "/api/v1/convert , ," - + " http://127.0.0.1:" + ports.get(1) + "/api/v1/convert "; - - servers.get(0).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - - String res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); - Assert.assertEquals(expectedSql, res); - } - - @Test - public void testSeamlessFailover() throws IOException { - String originSql = "select * from t1 where \"k1\" = 1"; - String expectedSql = "select * from t1 where `k1` = 1"; - String[] features = new String[] {"ctas"}; - - String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," - + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert"; - - // Both servers start healthy - servers.get(0).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - servers.get(1).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - - String res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); - Assert.assertEquals(expectedSql, res); - - // Stop first server - servers.get(0).stop(); - res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); - Assert.assertEquals(expectedSql, res); - - // Restart first server, stop second - servers.set(0, new SimpleHttpServer(ports.get(0))); - servers.get(0).start("/api/v1/convert"); - servers.get(0).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - servers.get(1).stop(); - - // Should seamlessly switch to first server - res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); - Assert.assertEquals(expectedSql, res); - } - - @Test - public void testConcurrentRequests() throws InterruptedException { - String originSql = "select * from t1 where \"k1\" = 1"; - String expectedSql = "select * from t1 where `k1` = 1"; - String[] features = new String[] {"ctas"}; - - String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," - + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert"; - - servers.get(0).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - servers.get(1).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - - // Test with multiple concurrent threads - Thread[] threads = new Thread[10]; - String[] results = new String[10]; - - for (int i = 0; i < 10; i++) { - final int index = i; - threads[i] = new Thread(() -> { - results[index] = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); - }); - threads[i].start(); - } - - for (Thread thread : threads) { - thread.join(); - } - - // Verify all results - for (String result : results) { - Assert.assertEquals(expectedSql, result); - } - } - - @Test - public void testZeroFailureGuarantee() throws InterruptedException { - String originSql = "select * from t1 where \"k1\" = 1"; - String expectedSql = "select * from t1 where `k1` = 1"; - String[] features = new String[] {"ctas"}; - - String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," - + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert," - + "http://127.0.0.1:" + ports.get(2) + "/api/v1/convert"; - - int totalRequests = 30; // Reduced for faster testing with production timeouts - int successCount = 0; - - // Test various failure scenarios while ensuring at least one service is always available - for (int i = 0; i < totalRequests; i++) { - if (i < 6) { - // All servers healthy - setAllServersHealthy(expectedSql); - } else if (i < 12) { - // Server 0 fails, others healthy - servers.get(0) - .setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); - servers.get(1).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - servers.get(2).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - } else if (i < 18) { - // Servers 0,1 fail, server 2 healthy - servers.get(0) - .setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); - servers.get(1) - .setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 503, \"message\": \"error\"}"); - servers.get(2).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - } else if (i < 24) { - // Only server 1 healthy - servers.get(0) - .setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); - servers.get(1).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - servers.get(2) - .setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); - } else { - // Alternating recovery - if (i % 2 == 0) { - servers.get(0).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - servers.get(1).setResponse( - "{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); - servers.get(2).setResponse( - "{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); - } else { - servers.get(0).setResponse( - "{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); - servers.get(1).setResponse( - "{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); - servers.get(2).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - } - } - - String result = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); - if (expectedSql.equals(result)) { - successCount++; - } - - Thread.sleep(50); // Small delay between requests - } - - System.out.println("Zero Failure Guarantee Test Results:"); - System.out.println("Total requests: " + totalRequests); - System.out.println("Successful: " + successCount); - System.out.println("Success rate: " + (successCount * 100.0 / totalRequests) + "%"); - - // Must achieve 100% success rate when at least one service is available - Assert.assertEquals("Must achieve 100% success rate when service is available", - totalRequests, successCount); - } - - @Test - public void testNetworkJitterStress() throws InterruptedException { - String originSql = "select * from t1 where \"k1\" = 1"; - String expectedSql = "select * from t1 where `k1` = 1"; - String[] features = new String[] {"ctas"}; - - String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," - + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert"; - - int totalRequests = 15; // Reduced for faster testing with production timeouts - int successCount = 0; - - // Simulate network jitter while ensuring at least one server is always available - for (int i = 0; i < totalRequests; i++) { - double random = Math.random(); - if (random < 0.3) { - // Server 0 fails, Server 1 works - servers.get(0) - .setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"timeout\"}"); - servers.get(1).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - } else if (random < 0.5) { - // Server 1 fails, Server 0 works - servers.get(0).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - servers.get(1).setResponse( - "{\"version\": \"v1\", \"data\": \"\", \"code\": 503, \"message\": \"service unavailable\"}"); - } else { - // Both servers work - servers.get(0).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - servers.get(1).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - } - - String result = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); - if (expectedSql.equals(result)) { - successCount++; - } - - Thread.sleep(100); // Delay between requests for production timeouts - } - - System.out.println("Network Jitter Test Results:"); - System.out.println("Total requests: " + totalRequests); - System.out.println("Successful: " + successCount); - System.out.println("Success rate: " + (successCount * 100.0 / totalRequests) + "%"); - - // Must achieve 100% success rate since we ensure at least one server is always available - Assert.assertEquals("Must handle network jitter with 100% success when service is available", - totalRequests, successCount); - } - - private void setAllServersHealthy(String expectedSql) { - for (int i = 0; i < 3; i++) { - servers.get(i).setResponse( - "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - } - } - private static int findValidPort() { int port; while (true) {