From 82d14fcdece84528779ac3f15b51816ffefa7ed9 Mon Sep 17 00:00:00 2001 From: zy-kkk Date: Thu, 3 Jul 2025 16:15:52 +0800 Subject: [PATCH 1/3] [Enhancement](sql-dialect) Support multiple sql-converter service urls --- .../plugin/dialect/HttpDialectUtils.java | 271 ++++++++++++- .../doris/plugin/HttpDialectUtilsTest.java | 378 +++++++++++++++++- 2 files changed, 621 insertions(+), 28 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java index 89acd66658d6f0..bf5855b4d774c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java @@ -17,6 +17,7 @@ package org.apache.doris.plugin.dialect; +import com.google.common.collect.Lists; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import lombok.Data; @@ -30,18 +31,95 @@ import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; /** * This class is used to convert sql with different dialects using sql convertor service. * The sql convertor service is a http service which is used to convert sql. + *

+ * Features: + * - Support multiple URLs (comma separated) + * - Blacklist mechanism for failed URLs + * - Automatic failover and retry + * - URL caching and smart selection */ public class HttpDialectUtils { private static final Logger LOG = LogManager.getLogger(HttpDialectUtils.class); - public static String convertSql(String targetURL, String originStmt, String dialect, + // Cache URL manager instances to avoid duplicate parsing + private static final ConcurrentHashMap urlManagerCache = new ConcurrentHashMap<>(); + + // Blacklist recovery time (ms): 5 minutes + private static final long BLACKLIST_RECOVERY_TIME_MS = 5 * 60 * 1000; + // Connection timeout period (ms): 3 seconds + private static final int CONNECTION_TIMEOUT_MS = 3000; + // Read timeout period (ms): 10 seconds + private static final int READ_TIMEOUT_MS = 10000; + + public static String convertSql(String targetURLs, String originStmt, String dialect, String[] features, String config) { + if (targetURLs == null || targetURLs.trim().isEmpty()) { + LOG.warn("Target URLs is empty, return original SQL"); + return originStmt; + } + + UrlManager urlManager = getOrCreateUrlManager(targetURLs); ConvertRequest convertRequest = new ConvertRequest(originStmt, dialect, features, config); + String requestStr = convertRequest.toJson(); + + // Try to convert SQL using intelligent URL selection strategy + return tryConvertWithIntelligentSelection(urlManager, requestStr, originStmt); + } + + /** + * Try to convert SQL using intelligent URL selection strategy + * CRITICAL: This method ensures 100% success rate when ANY service is available + */ + private static String tryConvertWithIntelligentSelection( + UrlManager urlManager, String requestStr, String originStmt) { + // Strategy: Try ALL URLs in intelligent order, regardless of blacklist status + // This ensures 100% success rate when any service is actually available + + List allUrls = urlManager.getAllUrlsInPriorityOrder(); + + for (String url : allUrls) { + try { + String result = doConvertSql(url, requestStr); + if (result != null && !result.equals(originStmt)) { + // Conversion succeeded, mark URL as healthy (remove from blacklist) + urlManager.markUrlAsHealthy(url); + if (LOG.isDebugEnabled()) { + LOG.debug("Successfully converted SQL using URL: {}", url); + } + return result; + } + } catch (Exception e) { + LOG.warn("Failed to convert SQL using URL: {}, error: {}", url, e.getMessage()); + // Add failed URL to blacklist for future optimization + urlManager.markUrlAsBlacklisted(url); + // Continue trying next URL - this is CRITICAL for 100% success rate + } + } + + LOG.warn("All URLs failed to convert SQL, return original SQL"); + return originStmt; + } + + /** + * Get or create a URL manager + */ + private static UrlManager getOrCreateUrlManager(String targetURLs) { + return urlManagerCache.computeIfAbsent(targetURLs, UrlManager::new); + } + /** + * Perform SQL conversion for individual URL + */ + private static String doConvertSql(String targetURL, String requestStr) throws Exception { HttpURLConnection connection = null; try { URL url = new URL(targetURL); @@ -50,15 +128,16 @@ public static String convertSql(String targetURL, String originStmt, String dial connection.setRequestProperty("Content-Type", "application/json"); connection.setUseCaches(false); connection.setDoOutput(true); + connection.setConnectTimeout(CONNECTION_TIMEOUT_MS); + connection.setReadTimeout(READ_TIMEOUT_MS); - String requestStr = convertRequest.toJson(); try (OutputStream outputStream = connection.getOutputStream()) { outputStream.write(requestStr.getBytes(StandardCharsets.UTF_8)); } int responseCode = connection.getResponseCode(); if (LOG.isDebugEnabled()) { - LOG.debug("POST Response Code: {}, post data: {}", responseCode, requestStr); + LOG.debug("POST Response Code: {}, URL: {}, post data: {}", responseCode, targetURL, requestStr); } if (responseCode == HttpURLConnection.HTTP_OK) { @@ -76,26 +155,23 @@ public static String convertSql(String targetURL, String originStmt, String dial }.getType(); ConvertResponse result = new Gson().fromJson(response.toString(), type); if (LOG.isDebugEnabled()) { - LOG.debug("convert response: {}", result); + LOG.debug("Convert response: {}, URL: {}", result, targetURL); } if (result.code == 0) { if (!"v1".equals(result.version)) { - LOG.warn("failed to convert sql, response version is not v1: {}", result.version); - return originStmt; + LOG.warn("Failed to convert sql, response version is not v1: {}, URL: {}", + result.version, targetURL); + return null; } return result.data; } else { - LOG.warn("failed to convert sql, response: {}", result); - return originStmt; + LOG.warn("Failed to convert sql, response: {}, URL: {}", result, targetURL); + return null; } } } else { - LOG.warn("failed to convert sql, response code: {}", responseCode); - return originStmt; + throw new Exception("HTTP response code: " + responseCode); } - } catch (Exception e) { - LOG.warn("failed to convert sql", e); - return originStmt; } finally { if (connection != null) { connection.disconnect(); @@ -103,6 +179,175 @@ public static String convertSql(String targetURL, String originStmt, String dial } } + /** + * URL Manager - Responsible for URL parsing, caching, blacklist management, and smart selection + */ + private static class UrlManager { + private final List parsedUrls; + private final ConcurrentHashMap blacklist; + + public UrlManager(String urls) { + this.parsedUrls = parseUrls(urls); + this.blacklist = new ConcurrentHashMap<>(); + if (LOG.isDebugEnabled()) { + LOG.debug("Created UrlManager with URLs: {}, parsed: {}", urls, parsedUrls); + } + } + + /** + * Parse comma separated URL strings + */ + private List parseUrls(String urls) { + List result = Lists.newArrayList(); + if (urls != null && !urls.trim().isEmpty()) { + String[] urlArray = urls.split(","); + for (String url : urlArray) { + String trimmedUrl = url.trim(); + if (!trimmedUrl.isEmpty()) { + result.add(trimmedUrl); + } + } + } + return result; + } + + /** + * Mark URL as healthy (remove from blacklist) + */ + public void markUrlAsHealthy(String url) { + if (blacklist.remove(url) != null) { + LOG.info("Removed URL from blacklist due to successful request: {}", url); + } + } + + /** + * Add URL to blacklist + */ + public void markUrlAsBlacklisted(String url) { + long currentTime = System.currentTimeMillis(); + long recoverTime = currentTime + BLACKLIST_RECOVERY_TIME_MS; + BlacklistEntry existingEntry = blacklist.get(url); + if (existingEntry != null) { + // If URL is already in blacklist, limit maximum recovery time to avoid infinite extension + // Maximum recovery time is 2 times the original recovery time + long maxRecoverTime = currentTime + (BLACKLIST_RECOVERY_TIME_MS * 2); + recoverTime = Math.min(maxRecoverTime, existingEntry.recoverTime + BLACKLIST_RECOVERY_TIME_MS); + } + blacklist.put(url, new BlacklistEntry(currentTime, recoverTime)); + LOG.warn("Added URL to blacklist: {}, will recover at: {}", url, new Date(recoverTime)); + } + + /** + * Get list of healthy URLs (not in blacklist) + */ + public List getHealthyUrls() { + List healthy = Lists.newArrayList(); + long currentTime = System.currentTimeMillis(); + for (String url : parsedUrls) { + BlacklistEntry entry = blacklist.get(url); + if (entry == null) { + // URL is not in blacklist, consider it healthy + healthy.add(url); + } else if (currentTime >= entry.recoverTime) { + // URL has reached recovery time, remove from blacklist and add to healthy list + blacklist.remove(url); + healthy.add(url); + if (LOG.isDebugEnabled()) { + LOG.debug("URL recovered from blacklist: {}", url); + } + } + } + + // Randomly shuffle the order to avoid always trying from the first URL + Collections.shuffle(healthy, ThreadLocalRandom.current()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Healthy URLs: {}", healthy); + } + + return healthy; + } + + /** + * Get list of blacklisted URLs (for immediate retry) + */ + public List getBlacklistedUrls() { + List blacklisted = Lists.newArrayList(); + long currentTime = System.currentTimeMillis(); + + for (String url : parsedUrls) { + BlacklistEntry entry = blacklist.get(url); + if (entry != null && currentTime < entry.recoverTime) { + // URL is in blacklist and has not reached recovery time yet + blacklisted.add(url); + } + } + + // Sort by recovery time, prioritize URLs that should recover earlier + blacklisted.sort((url1, url2) -> { + BlacklistEntry entry1 = blacklist.get(url1); + BlacklistEntry entry2 = blacklist.get(url2); + return Long.compare(entry1.recoverTime, entry2.recoverTime); + }); + + if (LOG.isDebugEnabled()) { + LOG.debug("Blacklisted URLs for immediate retry: {}", blacklisted); + } + + return blacklisted; + } + + /** + * Get ALL URLs in priority order for 100% success guarantee + * CRITICAL: This method ensures we try every URL when any service might be available + *

+ * Priority order: + * 1. Healthy URLs (not in blacklist or recovered) - randomly shuffled for load balancing + * 2. Blacklisted URLs (sorted by recovery time) - still try them for guaranteed coverage + */ + public List getAllUrlsInPriorityOrder() { + List prioritizedUrls = Lists.newArrayList(); + + // First: Add all healthy URLs + List healthyUrls = getHealthyUrls(); + prioritizedUrls.addAll(healthyUrls); + + // Second: Add all blacklisted URLs that haven't been tried yet + List blacklistedUrls = getBlacklistedUrls(); + for (String blacklistedUrl : blacklistedUrls) { + if (!prioritizedUrls.contains(blacklistedUrl)) { + prioritizedUrls.add(blacklistedUrl); + } + } + + // Ensure we have all URLs - add any missing ones (safety net) + for (String url : parsedUrls) { + if (!prioritizedUrls.contains(url)) { + prioritizedUrls.add(url); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("All URLs in priority order: {}", prioritizedUrls); + } + + return prioritizedUrls; + } + } + + /** + * Blacklist entry + */ + private static class BlacklistEntry { + final long blacklistedTime; + final long recoverTime; + + BlacklistEntry(long blacklistedTime, long recoverTime) { + this.blacklistedTime = blacklistedTime; + this.recoverTime = recoverTime; + } + } + @Data private static class ConvertRequest { private String version; // CHECKSTYLE IGNORE THIS LINE diff --git a/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java index de359f79475cc4..b24d74d4510ce9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java @@ -28,49 +28,397 @@ import java.net.DatagramSocket; import java.net.ServerSocket; import java.net.SocketException; +import java.util.ArrayList; +import java.util.List; public class HttpDialectUtilsTest { - private int port; - private SimpleHttpServer server; + private List ports = new ArrayList<>(); + private List servers = new ArrayList<>(); @Before public void setUp() throws Exception { - port = findValidPort(); - server = new SimpleHttpServer(port); - server.start("/api/v1/convert"); + // Create three test servers + for (int i = 0; i < 3; i++) { + int port = findValidPort(); + ports.add(port); + SimpleHttpServer server = new SimpleHttpServer(port); + server.start("/api/v1/convert"); + servers.add(server); + } } @After public void tearDown() { - if (server != null) { - server.stop(); + for (SimpleHttpServer server : servers) { + if (server != null) { + server.stop(); + } } + servers.clear(); + ports.clear(); } @Test - public void testSqlConvert() { + public void testSingleUrlConvert() { String originSql = "select * from t1 where \"k1\" = 1"; String expectedSql = "select * from t1 where `k1` = 1"; String[] features = new String[] {"ctas"}; - String targetURL = "http://127.0.0.1:" + port + "/api/v1/convert"; + String targetURL = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert"; + + // Test with no response (should return original SQL) String res = HttpDialectUtils.convertSql(targetURL, originSql, "presto", features, "{}"); Assert.assertEquals(originSql, res); - // test presto - server.setResponse("{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + + // Test successful conversion + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); res = HttpDialectUtils.convertSql(targetURL, originSql, "presto", features, "{}"); Assert.assertEquals(expectedSql, res); - // test response version error - server.setResponse("{\"version\": \"v2\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + + // Test version error + servers.get(0).setResponse( + "{\"version\": \"v2\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); res = HttpDialectUtils.convertSql(targetURL, originSql, "presto", features, "{}"); Assert.assertEquals(originSql, res); - // test response code error - server.setResponse( + + // Test code error + servers.get(0).setResponse( "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 400, \"message\": \"\"}"); res = HttpDialectUtils.convertSql(targetURL, originSql, "presto", features, "{}"); Assert.assertEquals(originSql, res); } + @Test + public void testMultipleUrlsConvert() { + String originSql = "select * from t1 where \"k1\" = 1"; + String expectedSql = "select * from t1 where `k1` = 1"; + String[] features = new String[] {"ctas"}; + + String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," + + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert," + + "http://127.0.0.1:" + ports.get(2) + "/api/v1/convert"; + + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + + String res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + Assert.assertEquals(expectedSql, res); + } + + @Test + public void testFailoverMechanism() throws InterruptedException { + String originSql = "select * from t1 where \"k1\" = 1"; + String expectedSql = "select * from t1 where `k1` = 1"; + String[] features = new String[] {"ctas"}; + + String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," + + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert"; + + // First server returns error, second server succeeds + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 500, \"message\": \"error\"}"); + servers.get(1).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + + String res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + Assert.assertEquals(expectedSql, res); + } + + @Test + public void testBlacklistMechanism() throws InterruptedException { + String originSql = "select * from t1 where \"k1\" = 1"; + String expectedSql = "select * from t1 where `k1` = 1"; + String[] features = new String[] {"ctas"}; + + String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," + + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert"; + + // Stop first server, set second server to work + servers.get(0).stop(); + servers.get(1).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + + // First call should succeed via second server + String res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + Assert.assertEquals(expectedSql, res); + + // Restart first server + servers.set(0, new SimpleHttpServer(ports.get(0))); + try { + servers.get(0).start("/api/v1/convert"); + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + } catch (IOException e) { + return; // Skip test if port is occupied + } + + // Should still work with blacklist recovery + res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + Assert.assertEquals(expectedSql, res); + } + + @Test + public void testAllUrlsFailure() { + String originSql = "select * from t1 where \"k1\" = 1"; + String[] features = new String[] {"ctas"}; + + String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," + + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert"; + + // All servers return error + servers.get(0).setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); + servers.get(1).setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); + + String res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + Assert.assertEquals(originSql, res); + } + + @Test + public void testEmptyUrls() { + String originSql = "select * from t1 where \"k1\" = 1"; + String[] features = new String[] {"ctas"}; + + // Test empty URLs + String res = HttpDialectUtils.convertSql("", originSql, "presto", features, "{}"); + Assert.assertEquals(originSql, res); + + // Test null URLs + res = HttpDialectUtils.convertSql(null, originSql, "presto", features, "{}"); + Assert.assertEquals(originSql, res); + + // Test URLs with only commas + res = HttpDialectUtils.convertSql(",,", originSql, "presto", features, "{}"); + Assert.assertEquals(originSql, res); + } + + @Test + public void testUrlParsing() { + String originSql = "select * from t1 where \"k1\" = 1"; + String expectedSql = "select * from t1 where `k1` = 1"; + String[] features = new String[] {"ctas"}; + + // Test URL parsing with spaces and empty items + String targetURLs = " http://127.0.0.1:" + ports.get(0) + "/api/v1/convert , ," + + " http://127.0.0.1:" + ports.get(1) + "/api/v1/convert "; + + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + + String res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + Assert.assertEquals(expectedSql, res); + } + + @Test + public void testSeamlessFailover() throws IOException { + String originSql = "select * from t1 where \"k1\" = 1"; + String expectedSql = "select * from t1 where `k1` = 1"; + String[] features = new String[] {"ctas"}; + + String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," + + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert"; + + // Both servers start healthy + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + servers.get(1).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + + String res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + Assert.assertEquals(expectedSql, res); + + // Stop first server + servers.get(0).stop(); + res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + Assert.assertEquals(expectedSql, res); + + // Restart first server, stop second + servers.set(0, new SimpleHttpServer(ports.get(0))); + servers.get(0).start("/api/v1/convert"); + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + servers.get(1).stop(); + + // Should seamlessly switch to first server + res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + Assert.assertEquals(expectedSql, res); + } + + @Test + public void testConcurrentRequests() throws InterruptedException { + String originSql = "select * from t1 where \"k1\" = 1"; + String expectedSql = "select * from t1 where `k1` = 1"; + String[] features = new String[] {"ctas"}; + + String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," + + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert"; + + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + servers.get(1).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + + // Test with multiple concurrent threads + Thread[] threads = new Thread[10]; + String[] results = new String[10]; + + for (int i = 0; i < 10; i++) { + final int index = i; + threads[i] = new Thread(() -> { + results[index] = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + }); + threads[i].start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + // Verify all results + for (String result : results) { + Assert.assertEquals(expectedSql, result); + } + } + + @Test + public void testZeroFailureGuarantee() throws InterruptedException { + String originSql = "select * from t1 where \"k1\" = 1"; + String expectedSql = "select * from t1 where `k1` = 1"; + String[] features = new String[] {"ctas"}; + + String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," + + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert," + + "http://127.0.0.1:" + ports.get(2) + "/api/v1/convert"; + + int totalRequests = 30; // Reduced for faster testing with production timeouts + int successCount = 0; + + // Test various failure scenarios while ensuring at least one service is always available + for (int i = 0; i < totalRequests; i++) { + if (i < 6) { + // All servers healthy + setAllServersHealthy(expectedSql); + } else if (i < 12) { + // Server 0 fails, others healthy + servers.get(0) + .setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); + servers.get(1).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + servers.get(2).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + } else if (i < 18) { + // Servers 0,1 fail, server 2 healthy + servers.get(0) + .setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); + servers.get(1) + .setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 503, \"message\": \"error\"}"); + servers.get(2).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + } else if (i < 24) { + // Only server 1 healthy + servers.get(0) + .setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); + servers.get(1).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + servers.get(2) + .setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); + } else { + // Alternating recovery + if (i % 2 == 0) { + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + servers.get(1).setResponse( + "{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); + servers.get(2).setResponse( + "{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); + } else { + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); + servers.get(1).setResponse( + "{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); + servers.get(2).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + } + } + + String result = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + if (expectedSql.equals(result)) { + successCount++; + } + + Thread.sleep(50); // Small delay between requests + } + + System.out.println("Zero Failure Guarantee Test Results:"); + System.out.println("Total requests: " + totalRequests); + System.out.println("Successful: " + successCount); + System.out.println("Success rate: " + (successCount * 100.0 / totalRequests) + "%"); + + // Must achieve 100% success rate when at least one service is available + Assert.assertEquals("Must achieve 100% success rate when service is available", + totalRequests, successCount); + } + + @Test + public void testNetworkJitterStress() throws InterruptedException { + String originSql = "select * from t1 where \"k1\" = 1"; + String expectedSql = "select * from t1 where `k1` = 1"; + String[] features = new String[] {"ctas"}; + + String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," + + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert"; + + int totalRequests = 15; // Reduced for faster testing with production timeouts + int successCount = 0; + + // Simulate network jitter while ensuring at least one server is always available + for (int i = 0; i < totalRequests; i++) { + double random = Math.random(); + if (random < 0.3) { + // Server 0 fails, Server 1 works + servers.get(0) + .setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"timeout\"}"); + servers.get(1).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + } else if (random < 0.5) { + // Server 1 fails, Server 0 works + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + servers.get(1).setResponse( + "{\"version\": \"v1\", \"data\": \"\", \"code\": 503, \"message\": \"service unavailable\"}"); + } else { + // Both servers work + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + servers.get(1).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + } + + String result = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + if (expectedSql.equals(result)) { + successCount++; + } + + Thread.sleep(100); // Delay between requests for production timeouts + } + + System.out.println("Network Jitter Test Results:"); + System.out.println("Total requests: " + totalRequests); + System.out.println("Successful: " + successCount); + System.out.println("Success rate: " + (successCount * 100.0 / totalRequests) + "%"); + + // Must achieve 100% success rate since we ensure at least one server is always available + Assert.assertEquals("Must handle network jitter with 100% success when service is available", + totalRequests, successCount); + } + + private void setAllServersHealthy(String expectedSql) { + for (int i = 0; i < 3; i++) { + servers.get(i).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + } + } + private static int findValidPort() { int port; while (true) { From c1ed5edc9f6f0e27b268e65c7404a461f00b9790 Mon Sep 17 00:00:00 2001 From: zy-kkk Date: Fri, 4 Jul 2025 14:49:41 +0800 Subject: [PATCH 2/3] fix comment --- .../plugin/dialect/HttpDialectUtils.java | 149 ++++++++---------- .../doris/plugin/HttpDialectUtilsTest.java | 18 --- 2 files changed, 70 insertions(+), 97 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java index bf5855b4d774c4..0a1d7d0f0d715b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java @@ -53,8 +53,8 @@ public class HttpDialectUtils { // Cache URL manager instances to avoid duplicate parsing private static final ConcurrentHashMap urlManagerCache = new ConcurrentHashMap<>(); - // Blacklist recovery time (ms): 5 minutes - private static final long BLACKLIST_RECOVERY_TIME_MS = 5 * 60 * 1000; + // Blacklist recovery time (ms): 1 minute + private static final long BLACKLIST_RECOVERY_TIME_MS = 60 * 1000; // Connection timeout period (ms): 3 seconds private static final int CONNECTION_TIMEOUT_MS = 3000; // Read timeout period (ms): 10 seconds @@ -62,11 +62,6 @@ public class HttpDialectUtils { public static String convertSql(String targetURLs, String originStmt, String dialect, String[] features, String config) { - if (targetURLs == null || targetURLs.trim().isEmpty()) { - LOG.warn("Target URLs is empty, return original SQL"); - return originStmt; - } - UrlManager urlManager = getOrCreateUrlManager(targetURLs); ConvertRequest convertRequest = new ConvertRequest(originStmt, dialect, features, config); String requestStr = convertRequest.toJson(); @@ -224,108 +219,104 @@ public void markUrlAsHealthy(String url) { * Add URL to blacklist */ public void markUrlAsBlacklisted(String url) { + // If URL is already in blacklist, just return + if (blacklist.containsKey(url)) { + return; + } + long currentTime = System.currentTimeMillis(); long recoverTime = currentTime + BLACKLIST_RECOVERY_TIME_MS; - BlacklistEntry existingEntry = blacklist.get(url); - if (existingEntry != null) { - // If URL is already in blacklist, limit maximum recovery time to avoid infinite extension - // Maximum recovery time is 2 times the original recovery time - long maxRecoverTime = currentTime + (BLACKLIST_RECOVERY_TIME_MS * 2); - recoverTime = Math.min(maxRecoverTime, existingEntry.recoverTime + BLACKLIST_RECOVERY_TIME_MS); - } blacklist.put(url, new BlacklistEntry(currentTime, recoverTime)); LOG.warn("Added URL to blacklist: {}, will recover at: {}", url, new Date(recoverTime)); } /** - * Get list of healthy URLs (not in blacklist) + * Check if URL is localhost (127.0.0.1 or localhost) */ - public List getHealthyUrls() { - List healthy = Lists.newArrayList(); + private boolean isLocalhost(String url) { + return url.contains("127.0.0.1") || url.contains("localhost"); + } + + /** + * Get ALL URLs in priority order for 100% success guarantee + * CRITICAL: This method ensures we try every URL when any service might be available + *

+ * Priority order: + * 1. Localhost URLs (127.0.0.1 or localhost) that are healthy + * 2. Other healthy URLs (randomly selected) + * 3. Localhost URLs in blacklist + * 4. Other blacklisted URLs (sorted by recovery time) + */ + public List getAllUrlsInPriorityOrder() { + List prioritizedUrls = Lists.newArrayList(); + List healthyLocalhost = Lists.newArrayList(); + List healthyOthers = Lists.newArrayList(); + List blacklistedLocalhost = Lists.newArrayList(); + List blacklistedOthers = Lists.newArrayList(); + long currentTime = System.currentTimeMillis(); + + // Single traversal to categorize all URLs for (String url : parsedUrls) { BlacklistEntry entry = blacklist.get(url); + boolean isHealthy = false; + if (entry == null) { // URL is not in blacklist, consider it healthy - healthy.add(url); + isHealthy = true; } else if (currentTime >= entry.recoverTime) { - // URL has reached recovery time, remove from blacklist and add to healthy list + // URL has reached recovery time, remove from blacklist and consider healthy blacklist.remove(url); - healthy.add(url); + isHealthy = true; if (LOG.isDebugEnabled()) { LOG.debug("URL recovered from blacklist: {}", url); } } - } - // Randomly shuffle the order to avoid always trying from the first URL - Collections.shuffle(healthy, ThreadLocalRandom.current()); + boolean isLocal = isLocalhost(url); - if (LOG.isDebugEnabled()) { - LOG.debug("Healthy URLs: {}", healthy); + if (isHealthy) { + if (isLocal) { + healthyLocalhost.add(url); + } else { + healthyOthers.add(url); + } + } else { + if (isLocal) { + blacklistedLocalhost.add(url); + } else { + blacklistedOthers.add(url); + } + } } - return healthy; - } + // Add URLs in priority order + // 1. Healthy localhost URLs first + prioritizedUrls.addAll(healthyLocalhost); - /** - * Get list of blacklisted URLs (for immediate retry) - */ - public List getBlacklistedUrls() { - List blacklisted = Lists.newArrayList(); - long currentTime = System.currentTimeMillis(); + // 2. Other healthy URLs (randomly shuffled for load balancing) + Collections.shuffle(healthyOthers, ThreadLocalRandom.current()); + prioritizedUrls.addAll(healthyOthers); - for (String url : parsedUrls) { - BlacklistEntry entry = blacklist.get(url); - if (entry != null && currentTime < entry.recoverTime) { - // URL is in blacklist and has not reached recovery time yet - blacklisted.add(url); - } - } + // 3. Blacklisted localhost URLs + prioritizedUrls.addAll(blacklistedLocalhost); - // Sort by recovery time, prioritize URLs that should recover earlier - blacklisted.sort((url1, url2) -> { + // 4. Other blacklisted URLs (sorted by recovery time) + blacklistedOthers.sort((url1, url2) -> { BlacklistEntry entry1 = blacklist.get(url1); BlacklistEntry entry2 = blacklist.get(url2); - return Long.compare(entry1.recoverTime, entry2.recoverTime); - }); - - if (LOG.isDebugEnabled()) { - LOG.debug("Blacklisted URLs for immediate retry: {}", blacklisted); - } - - return blacklisted; - } - - /** - * Get ALL URLs in priority order for 100% success guarantee - * CRITICAL: This method ensures we try every URL when any service might be available - *

- * Priority order: - * 1. Healthy URLs (not in blacklist or recovered) - randomly shuffled for load balancing - * 2. Blacklisted URLs (sorted by recovery time) - still try them for guaranteed coverage - */ - public List getAllUrlsInPriorityOrder() { - List prioritizedUrls = Lists.newArrayList(); - - // First: Add all healthy URLs - List healthyUrls = getHealthyUrls(); - prioritizedUrls.addAll(healthyUrls); - - // Second: Add all blacklisted URLs that haven't been tried yet - List blacklistedUrls = getBlacklistedUrls(); - for (String blacklistedUrl : blacklistedUrls) { - if (!prioritizedUrls.contains(blacklistedUrl)) { - prioritizedUrls.add(blacklistedUrl); + if (entry1 == null && entry2 == null) { + return 0; } - } - - // Ensure we have all URLs - add any missing ones (safety net) - for (String url : parsedUrls) { - if (!prioritizedUrls.contains(url)) { - prioritizedUrls.add(url); + if (entry1 == null) { + return -1; } - } + if (entry2 == null) { + return 1; + } + return Long.compare(entry1.recoverTime, entry2.recoverTime); + }); + prioritizedUrls.addAll(blacklistedOthers); if (LOG.isDebugEnabled()) { LOG.debug("All URLs in priority order: {}", prioritizedUrls); diff --git a/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java index b24d74d4510ce9..4ce71e196a33e1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java @@ -174,24 +174,6 @@ public void testAllUrlsFailure() { Assert.assertEquals(originSql, res); } - @Test - public void testEmptyUrls() { - String originSql = "select * from t1 where \"k1\" = 1"; - String[] features = new String[] {"ctas"}; - - // Test empty URLs - String res = HttpDialectUtils.convertSql("", originSql, "presto", features, "{}"); - Assert.assertEquals(originSql, res); - - // Test null URLs - res = HttpDialectUtils.convertSql(null, originSql, "presto", features, "{}"); - Assert.assertEquals(originSql, res); - - // Test URLs with only commas - res = HttpDialectUtils.convertSql(",,", originSql, "presto", features, "{}"); - Assert.assertEquals(originSql, res); - } - @Test public void testUrlParsing() { String originSql = "select * from t1 where \"k1\" = 1"; From 9c86e78e1c3c360630e1f3908d5c607c45ee9f05 Mon Sep 17 00:00:00 2001 From: zy-kkk Date: Mon, 7 Jul 2025 17:50:11 +0800 Subject: [PATCH 3/3] fix comment2 --- .../plugin/dialect/HttpDialectUtils.java | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java index 0a1d7d0f0d715b..5131cf82bf66fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java @@ -17,6 +17,8 @@ package org.apache.doris.plugin.dialect; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.collect.Lists; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; @@ -36,6 +38,7 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; /** * This class is used to convert sql with different dialects using sql convertor service. @@ -50,8 +53,11 @@ public class HttpDialectUtils { private static final Logger LOG = LogManager.getLogger(HttpDialectUtils.class); - // Cache URL manager instances to avoid duplicate parsing - private static final ConcurrentHashMap urlManagerCache = new ConcurrentHashMap<>(); + // Cache URL manager instances to avoid duplicate parsing with automatic expiration + private static final Cache urlManagerCache = Caffeine.newBuilder() + .maximumSize(10) + .expireAfterAccess(8, TimeUnit.HOURS) + .build(); // Blacklist recovery time (ms): 1 minute private static final long BLACKLIST_RECOVERY_TIME_MS = 60 * 1000; @@ -84,14 +90,13 @@ private static String tryConvertWithIntelligentSelection( for (String url : allUrls) { try { String result = doConvertSql(url, requestStr); - if (result != null && !result.equals(originStmt)) { - // Conversion succeeded, mark URL as healthy (remove from blacklist) - urlManager.markUrlAsHealthy(url); - if (LOG.isDebugEnabled()) { - LOG.debug("Successfully converted SQL using URL: {}", url); - } - return result; + // If no exception thrown, HTTP response was successful (200) + // Mark URL as healthy and return result (even if empty) + urlManager.markUrlAsHealthy(url); + if (LOG.isDebugEnabled()) { + LOG.debug("Successfully converted SQL using URL: {}", url); } + return result; } catch (Exception e) { LOG.warn("Failed to convert SQL using URL: {}, error: {}", url, e.getMessage()); // Add failed URL to blacklist for future optimization @@ -100,7 +105,6 @@ private static String tryConvertWithIntelligentSelection( } } - LOG.warn("All URLs failed to convert SQL, return original SQL"); return originStmt; } @@ -108,7 +112,7 @@ private static String tryConvertWithIntelligentSelection( * Get or create a URL manager */ private static UrlManager getOrCreateUrlManager(String targetURLs) { - return urlManagerCache.computeIfAbsent(targetURLs, UrlManager::new); + return urlManagerCache.get(targetURLs, UrlManager::new); } /** @@ -117,7 +121,10 @@ private static UrlManager getOrCreateUrlManager(String targetURLs) { private static String doConvertSql(String targetURL, String requestStr) throws Exception { HttpURLConnection connection = null; try { - URL url = new URL(targetURL); + if (targetURL == null || targetURL.trim().isEmpty()) { + throw new Exception("Target URL is null or empty"); + } + URL url = new URL(targetURL.trim()); connection = (HttpURLConnection) url.openConnection(); connection.setRequestMethod("POST"); connection.setRequestProperty("Content-Type", "application/json"); @@ -154,14 +161,11 @@ private static String doConvertSql(String targetURL, String requestStr) throws E } if (result.code == 0) { if (!"v1".equals(result.version)) { - LOG.warn("Failed to convert sql, response version is not v1: {}, URL: {}", - result.version, targetURL); - return null; + throw new Exception("Unsupported version: " + result.version); } return result.data; } else { - LOG.warn("Failed to convert sql, response: {}, URL: {}", result, targetURL); - return null; + throw new Exception("Conversion failed: " + result.message); } } } else {