Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.doris.plugin.dialect;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import lombok.Data;
Expand All @@ -30,35 +33,113 @@
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

/**
* This class is used to convert sql with different dialects using sql convertor service.
* The sql convertor service is a http service which is used to convert sql.
* <p>
* Features:
* - Support multiple URLs (comma separated)
* - Blacklist mechanism for failed URLs
* - Automatic failover and retry
* - URL caching and smart selection
*/
public class HttpDialectUtils {
private static final Logger LOG = LogManager.getLogger(HttpDialectUtils.class);

public static String convertSql(String targetURL, String originStmt, String dialect,
// Cache URL manager instances to avoid duplicate parsing with automatic expiration
private static final Cache<String, UrlManager> urlManagerCache = Caffeine.newBuilder()
.maximumSize(10)
.expireAfterAccess(8, TimeUnit.HOURS)
.build();

// Blacklist recovery time (ms): 1 minute
private static final long BLACKLIST_RECOVERY_TIME_MS = 60 * 1000;
// Connection timeout period (ms): 3 seconds
private static final int CONNECTION_TIMEOUT_MS = 3000;
// Read timeout period (ms): 10 seconds
private static final int READ_TIMEOUT_MS = 10000;

public static String convertSql(String targetURLs, String originStmt, String dialect,
String[] features, String config) {
UrlManager urlManager = getOrCreateUrlManager(targetURLs);
ConvertRequest convertRequest = new ConvertRequest(originStmt, dialect, features, config);
String requestStr = convertRequest.toJson();

// Try to convert SQL using intelligent URL selection strategy
return tryConvertWithIntelligentSelection(urlManager, requestStr, originStmt);
}

/**
* Try to convert SQL using intelligent URL selection strategy
* CRITICAL: This method ensures 100% success rate when ANY service is available
*/
private static String tryConvertWithIntelligentSelection(
UrlManager urlManager, String requestStr, String originStmt) {
// Strategy: Try ALL URLs in intelligent order, regardless of blacklist status
// This ensures 100% success rate when any service is actually available

List<String> allUrls = urlManager.getAllUrlsInPriorityOrder();

for (String url : allUrls) {
try {
String result = doConvertSql(url, requestStr);
// If no exception thrown, HTTP response was successful (200)
// Mark URL as healthy and return result (even if empty)
urlManager.markUrlAsHealthy(url);
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully converted SQL using URL: {}", url);
}
return result;
} catch (Exception e) {
LOG.warn("Failed to convert SQL using URL: {}, error: {}", url, e.getMessage());
// Add failed URL to blacklist for future optimization
urlManager.markUrlAsBlacklisted(url);
// Continue trying next URL - this is CRITICAL for 100% success rate
}
}

return originStmt;
}

/**
* Get or create a URL manager
*/
private static UrlManager getOrCreateUrlManager(String targetURLs) {
return urlManagerCache.get(targetURLs, UrlManager::new);
}

/**
* Perform SQL conversion for individual URL
*/
private static String doConvertSql(String targetURL, String requestStr) throws Exception {
HttpURLConnection connection = null;
try {
URL url = new URL(targetURL);
if (targetURL == null || targetURL.trim().isEmpty()) {
throw new Exception("Target URL is null or empty");
}
URL url = new URL(targetURL.trim());
connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/json");
connection.setUseCaches(false);
connection.setDoOutput(true);
connection.setConnectTimeout(CONNECTION_TIMEOUT_MS);
connection.setReadTimeout(READ_TIMEOUT_MS);

String requestStr = convertRequest.toJson();
try (OutputStream outputStream = connection.getOutputStream()) {
outputStream.write(requestStr.getBytes(StandardCharsets.UTF_8));
}

int responseCode = connection.getResponseCode();
if (LOG.isDebugEnabled()) {
LOG.debug("POST Response Code: {}, post data: {}", responseCode, requestStr);
LOG.debug("POST Response Code: {}, URL: {}, post data: {}", responseCode, targetURL, requestStr);
}

if (responseCode == HttpURLConnection.HTTP_OK) {
Expand All @@ -76,33 +157,192 @@ public static String convertSql(String targetURL, String originStmt, String dial
}.getType();
ConvertResponse result = new Gson().fromJson(response.toString(), type);
if (LOG.isDebugEnabled()) {
LOG.debug("convert response: {}", result);
LOG.debug("Convert response: {}, URL: {}", result, targetURL);
}
if (result.code == 0) {
if (!"v1".equals(result.version)) {
LOG.warn("failed to convert sql, response version is not v1: {}", result.version);
return originStmt;
throw new Exception("Unsupported version: " + result.version);
}
return result.data;
} else {
LOG.warn("failed to convert sql, response: {}", result);
return originStmt;
throw new Exception("Conversion failed: " + result.message);
}
}
} else {
LOG.warn("failed to convert sql, response code: {}", responseCode);
return originStmt;
throw new Exception("HTTP response code: " + responseCode);
}
} catch (Exception e) {
LOG.warn("failed to convert sql", e);
return originStmt;
} finally {
if (connection != null) {
connection.disconnect();
}
}
}

/**
* URL Manager - Responsible for URL parsing, caching, blacklist management, and smart selection
*/
private static class UrlManager {
private final List<String> parsedUrls;
private final ConcurrentHashMap<String, BlacklistEntry> blacklist;

public UrlManager(String urls) {
this.parsedUrls = parseUrls(urls);
this.blacklist = new ConcurrentHashMap<>();
if (LOG.isDebugEnabled()) {
LOG.debug("Created UrlManager with URLs: {}, parsed: {}", urls, parsedUrls);
}
}

/**
* Parse comma separated URL strings
*/
private List<String> parseUrls(String urls) {
List<String> result = Lists.newArrayList();
if (urls != null && !urls.trim().isEmpty()) {
String[] urlArray = urls.split(",");
for (String url : urlArray) {
String trimmedUrl = url.trim();
if (!trimmedUrl.isEmpty()) {
result.add(trimmedUrl);
}
}
}
return result;
}

/**
* Mark URL as healthy (remove from blacklist)
*/
public void markUrlAsHealthy(String url) {
if (blacklist.remove(url) != null) {
LOG.info("Removed URL from blacklist due to successful request: {}", url);
}
}

/**
* Add URL to blacklist
*/
public void markUrlAsBlacklisted(String url) {
// If URL is already in blacklist, just return
if (blacklist.containsKey(url)) {
return;
}

long currentTime = System.currentTimeMillis();
long recoverTime = currentTime + BLACKLIST_RECOVERY_TIME_MS;
blacklist.put(url, new BlacklistEntry(currentTime, recoverTime));
LOG.warn("Added URL to blacklist: {}, will recover at: {}", url, new Date(recoverTime));
}

/**
* Check if URL is localhost (127.0.0.1 or localhost)
*/
private boolean isLocalhost(String url) {
return url.contains("127.0.0.1") || url.contains("localhost");
}

/**
* Get ALL URLs in priority order for 100% success guarantee
* CRITICAL: This method ensures we try every URL when any service might be available
* <p>
* Priority order:
* 1. Localhost URLs (127.0.0.1 or localhost) that are healthy
* 2. Other healthy URLs (randomly selected)
* 3. Localhost URLs in blacklist
* 4. Other blacklisted URLs (sorted by recovery time)
*/
public List<String> getAllUrlsInPriorityOrder() {
List<String> prioritizedUrls = Lists.newArrayList();
List<String> healthyLocalhost = Lists.newArrayList();
List<String> healthyOthers = Lists.newArrayList();
List<String> blacklistedLocalhost = Lists.newArrayList();
List<String> blacklistedOthers = Lists.newArrayList();

long currentTime = System.currentTimeMillis();

// Single traversal to categorize all URLs
for (String url : parsedUrls) {
BlacklistEntry entry = blacklist.get(url);
boolean isHealthy = false;

if (entry == null) {
// URL is not in blacklist, consider it healthy
isHealthy = true;
} else if (currentTime >= entry.recoverTime) {
// URL has reached recovery time, remove from blacklist and consider healthy
blacklist.remove(url);
isHealthy = true;
if (LOG.isDebugEnabled()) {
LOG.debug("URL recovered from blacklist: {}", url);
}
}

boolean isLocal = isLocalhost(url);

if (isHealthy) {
if (isLocal) {
healthyLocalhost.add(url);
} else {
healthyOthers.add(url);
}
} else {
if (isLocal) {
blacklistedLocalhost.add(url);
} else {
blacklistedOthers.add(url);
}
}
}

// Add URLs in priority order
// 1. Healthy localhost URLs first
prioritizedUrls.addAll(healthyLocalhost);

// 2. Other healthy URLs (randomly shuffled for load balancing)
Collections.shuffle(healthyOthers, ThreadLocalRandom.current());
prioritizedUrls.addAll(healthyOthers);

// 3. Blacklisted localhost URLs
prioritizedUrls.addAll(blacklistedLocalhost);

// 4. Other blacklisted URLs (sorted by recovery time)
blacklistedOthers.sort((url1, url2) -> {
BlacklistEntry entry1 = blacklist.get(url1);
BlacklistEntry entry2 = blacklist.get(url2);
if (entry1 == null && entry2 == null) {
return 0;
}
if (entry1 == null) {
return -1;
}
if (entry2 == null) {
return 1;
}
return Long.compare(entry1.recoverTime, entry2.recoverTime);
});
prioritizedUrls.addAll(blacklistedOthers);

if (LOG.isDebugEnabled()) {
LOG.debug("All URLs in priority order: {}", prioritizedUrls);
}

return prioritizedUrls;
}
}

/**
* Blacklist entry
*/
private static class BlacklistEntry {
final long blacklistedTime;
final long recoverTime;

BlacklistEntry(long blacklistedTime, long recoverTime) {
this.blacklistedTime = blacklistedTime;
this.recoverTime = recoverTime;
}
}

@Data
private static class ConvertRequest {
private String version; // CHECKSTYLE IGNORE THIS LINE
Expand Down
Loading
Loading