Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -744,15 +744,19 @@ public void toThrift(TTypeDesc container) {
case DECIMAL128:
case DECIMAL256:
case DATETIMEV2: {
Preconditions.checkArgument(precision >= scale,
String.format("given precision %d is out of scale bound %d", precision, scale));
if (precision < scale) {
throw new IllegalArgumentException(
String.format("given precision %d is out of scale bound %d", precision, scale));
}
scalarType.setScale(scale);
scalarType.setPrecision(precision);
break;
}
case TIMEV2: {
Preconditions.checkArgument(precision >= scale,
String.format("given precision %d is out of scale bound %d", precision, scale));
if (precision < scale) {
throw new IllegalArgumentException(
String.format("given precision %d is out of scale bound %d", precision, scale));
}
scalarType.setScale(scale);
scalarType.setPrecision(precision);
break;
Expand Down
20 changes: 15 additions & 5 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1300,7 +1300,11 @@ public class Config extends ConfigBase {
* Minimum interval between last version when caching results,
* This parameter distinguishes between offline and real-time updates
*/
@ConfField(mutable = true, masterOnly = false)
@ConfField(
mutable = true,
masterOnly = false,
callbackClassString = "org.apache.doris.common.NereidsSqlCacheManager$UpdateConfig"
)
public static int cache_last_version_interval_second = 30;

/**
Expand Down Expand Up @@ -2011,10 +2015,16 @@ public class Config extends ConfigBase {
/**
* the plan cache num which can be reused for the next query
*/
@ConfField(mutable = false, varType = VariableAnnotation.EXPERIMENTAL, description = {
"当前默认设置为 100,用来控制控制NereidsSqlCacheManager管理的sql cache数量。",
"Now default set to 100, this config is used to control the number of "
+ "sql cache managed by NereidsSqlCacheManager"})
@ConfField(
mutable = true,
varType = VariableAnnotation.EXPERIMENTAL,
callbackClassString = "org.apache.doris.common.NereidsSqlCacheManager$UpdateConfig",
description = {
"当前默认设置为 100,用来控制控制NereidsSqlCacheManager管理的sql cache数量。",
"Now default set to 100, this config is used to control the number of "
+ "sql cache managed by NereidsSqlCacheManager"
}
)
public static int sql_cache_manage_num = 100;

/**
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/ConfigBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class ConfigBase {

Class<? extends ConfHandler> callback() default DefaultConfHandler.class;

String callbackClassString() default "";

// description for this config item.
// There should be 2 elements in the array.
// The first element is the description in Chinese.
Expand Down Expand Up @@ -329,6 +331,16 @@ public static synchronized void setMutableConfig(String key, String value) throw
throw new ConfigException("Failed to set config '" + key + "'. err: " + e.getMessage());
}

String callbackClassString = anno.callbackClassString();
if (!Strings.isNullOrEmpty(callbackClassString)) {
try {
ConfHandler confHandler = (ConfHandler) Class.forName(anno.callbackClassString()).newInstance();
confHandler.handle(field, value);
} catch (Exception e) {
throw new ConfigException("Failed to set config '" + key + "'. err: " + e.getMessage());
}
}

LOG.info("set config {} to {}", key, value);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
package org.apache.doris.common;

import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
import org.apache.doris.common.ConfigBase.DefaultConfHandler;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.DataMaskPolicy;
Expand All @@ -50,6 +49,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types.PUniqueId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.cache.CacheAnalyzer;
import org.apache.doris.qe.cache.SqlCache;
Expand All @@ -59,6 +59,7 @@
import com.google.common.collect.ImmutableList;
import org.apache.commons.collections.CollectionUtils;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.List;
import java.util.Map.Entry;
Expand All @@ -69,11 +70,36 @@
/** NereidsSqlCacheManager */
public class NereidsSqlCacheManager {
// key: <user>:<sql>
// value: CacheAnalyzer
private final Cache<String, SqlCacheContext> sqlCache;
// value: SqlCacheContext
private volatile Cache<String, SqlCacheContext> sqlCaches;

public NereidsSqlCacheManager(int sqlCacheNum, long cacheIntervalSeconds) {
sqlCache = Caffeine.newBuilder()
sqlCaches = buildSqlCaches(sqlCacheNum, cacheIntervalSeconds);
}

public static synchronized void updateConfig() {
Env currentEnv = Env.getCurrentEnv();
if (currentEnv == null) {
return;
}
NereidsSqlCacheManager sqlCacheManager = currentEnv.getSqlCacheManager();
if (sqlCacheManager == null) {
return;
}

Cache<String, SqlCacheContext> sqlCaches = buildSqlCaches(
Config.sql_cache_manage_num,
Config.cache_last_version_interval_second
);
sqlCaches.putAll(sqlCacheManager.sqlCaches.asMap());
sqlCacheManager.sqlCaches = sqlCaches;
}

private static Cache<String, SqlCacheContext> buildSqlCaches(int sqlCacheNum, long cacheIntervalSeconds) {
sqlCacheNum = sqlCacheNum <= 0 ? 100 : sqlCacheNum;
cacheIntervalSeconds = cacheIntervalSeconds <= 0 ? 30 : cacheIntervalSeconds;

return Caffeine.newBuilder()
.maximumSize(sqlCacheNum)
.expireAfterAccess(Duration.ofSeconds(cacheIntervalSeconds))
// auto evict cache when jvm memory too low
Expand All @@ -95,10 +121,9 @@ public void tryAddCache(
SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get();
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
String key = currentUserIdentity.toString() + ":" + sql.trim();
if (analyzer.getCache() instanceof SqlCache
&& (currentMissParseSqlFromSqlCache || sqlCache.getIfPresent(key) == null)) {
if ((currentMissParseSqlFromSqlCache || sqlCaches.getIfPresent(key) == null)
&& sqlCacheContext.getOrComputeCacheKeyMd5() != null) {
SqlCache cache = (SqlCache) analyzer.getCache();
sqlCacheContext.setCacheKeyMd5(cache.getOrComputeCacheMd5());
sqlCacheContext.setSumOfPartitionNum(cache.getSumOfPartitionNum());
sqlCacheContext.setLatestPartitionId(cache.getLatestId());
sqlCacheContext.setLatestPartitionVersion(cache.getLatestVersion());
Expand All @@ -109,23 +134,16 @@ public void tryAddCache(
sqlCacheContext.addScanTable(scanTable);
}

sqlCache.put(key, sqlCacheContext);
sqlCaches.put(key, sqlCacheContext);
}
}

/** invalidateCache */
public void invalidateCache(ConnectContext connectContext, String sql) {
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
String key = currentUserIdentity.toString() + ":" + sql.trim();
sqlCache.invalidate(key);
}

/** tryParseSql */
public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, String sql) {
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
Env env = connectContext.getEnv();
String key = currentUserIdentity.toString() + ":" + sql.trim();
SqlCacheContext sqlCacheContext = sqlCache.getIfPresent(key);
SqlCacheContext sqlCacheContext = sqlCaches.getIfPresent(key);
if (sqlCacheContext == null) {
return Optional.empty();
}
Expand Down Expand Up @@ -161,16 +179,17 @@ public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, Stri

try {
Status status = new Status();
PUniqueId cacheKeyMd5 = sqlCacheContext.getOrComputeCacheKeyMd5();
InternalService.PFetchCacheResult cacheData =
SqlCache.getCacheData(sqlCacheContext.getCacheProxy(),
sqlCacheContext.getCacheKeyMd5(), sqlCacheContext.getLatestPartitionId(),
cacheKeyMd5, sqlCacheContext.getLatestPartitionId(),
sqlCacheContext.getLatestPartitionVersion(), sqlCacheContext.getLatestPartitionTime(),
sqlCacheContext.getSumOfPartitionNum(), status);

if (status.ok() && cacheData != null && cacheData.getStatus() == InternalService.PCacheStatus.CACHE_OK) {
List<InternalService.PCacheValue> cacheValues = cacheData.getValuesList();
String cachedPlan = sqlCacheContext.getPhysicalPlan();
String backendAddress = SqlCache.findCacheBe(sqlCacheContext.getCacheKeyMd5()).getAddress();
String backendAddress = SqlCache.findCacheBe(cacheKeyMd5).getAddress();

MetricRepo.COUNTER_CACHE_HIT_SQL.increase(1L);

Expand All @@ -179,9 +198,9 @@ public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, Stri
sqlCacheContext.getResultExprs(), cacheValues, backendAddress, cachedPlan);
return Optional.of(logicalSqlCache);
}
return Optional.empty();
return invalidateCache(key);
} catch (Throwable t) {
return Optional.empty();
return invalidateCache(key);
}
}

Expand Down Expand Up @@ -309,7 +328,8 @@ private boolean nondeterministicFunctionChanged(
return true;
}

List<Pair<Expression, Expression>> nondeterministicFunctions = sqlCacheContext.getFoldNondeterministicPairs();
List<Pair<Expression, Expression>> nondeterministicFunctions
= sqlCacheContext.getFoldFullNondeterministicPairs();
if (nondeterministicFunctions.isEmpty()) {
return false;
}
Expand All @@ -328,21 +348,8 @@ private boolean nondeterministicFunctionChanged(
return false;
}

private boolean isValidDbAndTable(TableIf tableIf, Env env) {
return getTableFromEnv(tableIf, env) != null;
}

private TableIf getTableFromEnv(TableIf tableIf, Env env) {
Optional<Database> db = env.getInternalCatalog().getDb(tableIf.getDatabase().getId());
if (!db.isPresent()) {
return null;
}
Optional<Table> table = db.get().getTable(tableIf.getId());
return table.orElse(null);
}

private Optional<LogicalSqlCache> invalidateCache(String key) {
sqlCache.invalidate(key);
sqlCaches.invalidate(key);
return Optional.empty();
}

Expand All @@ -357,4 +364,15 @@ private TableIf findTableIf(Env env, FullTableName fullTableName) {
}
return db.get().getTable(fullTableName.table).orElse(null);
}

// NOTE: used in Config.sql_cache_manage_num.callbackClassString and
// Config.cache_last_version_interval_second.callbackClassString,
// don't remove it!
public static class UpdateConfig extends DefaultConfHandler {
@Override
public void handle(Field field, String confVal) throws Exception {
super.handle(field, confVal);
NereidsSqlCacheManager.updateConfig();
}
}
}
Loading