diff --git a/fe/fe-common/src/main/java/org/apache/doris/catalog/ScalarType.java b/fe/fe-common/src/main/java/org/apache/doris/catalog/ScalarType.java index 67346fc21609c1..adf064c0000324 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/catalog/ScalarType.java +++ b/fe/fe-common/src/main/java/org/apache/doris/catalog/ScalarType.java @@ -744,15 +744,19 @@ public void toThrift(TTypeDesc container) { case DECIMAL128: case DECIMAL256: case DATETIMEV2: { - Preconditions.checkArgument(precision >= scale, - String.format("given precision %d is out of scale bound %d", precision, scale)); + if (precision < scale) { + throw new IllegalArgumentException( + String.format("given precision %d is out of scale bound %d", precision, scale)); + } scalarType.setScale(scale); scalarType.setPrecision(precision); break; } case TIMEV2: { - Preconditions.checkArgument(precision >= scale, - String.format("given precision %d is out of scale bound %d", precision, scale)); + if (precision < scale) { + throw new IllegalArgumentException( + String.format("given precision %d is out of scale bound %d", precision, scale)); + } scalarType.setScale(scale); scalarType.setPrecision(precision); break; diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index d318b77aa416dd..efe0285f5ee9b7 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1300,7 +1300,11 @@ public class Config extends ConfigBase { * Minimum interval between last version when caching results, * This parameter distinguishes between offline and real-time updates */ - @ConfField(mutable = true, masterOnly = false) + @ConfField( + mutable = true, + masterOnly = false, + callbackClassString = "org.apache.doris.common.NereidsSqlCacheManager$UpdateConfig" + ) public static int cache_last_version_interval_second = 30; /** @@ -2011,10 +2015,16 @@ public class Config extends ConfigBase { /** * the plan cache num which can be reused for the next query */ - @ConfField(mutable = false, varType = VariableAnnotation.EXPERIMENTAL, description = { - "当前默认设置为 100,用来控制控制NereidsSqlCacheManager管理的sql cache数量。", - "Now default set to 100, this config is used to control the number of " - + "sql cache managed by NereidsSqlCacheManager"}) + @ConfField( + mutable = true, + varType = VariableAnnotation.EXPERIMENTAL, + callbackClassString = "org.apache.doris.common.NereidsSqlCacheManager$UpdateConfig", + description = { + "当前默认设置为 100,用来控制控制NereidsSqlCacheManager管理的sql cache数量。", + "Now default set to 100, this config is used to control the number of " + + "sql cache managed by NereidsSqlCacheManager" + } + ) public static int sql_cache_manage_num = 100; /** diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/ConfigBase.java b/fe/fe-common/src/main/java/org/apache/doris/common/ConfigBase.java index 2411ff997c7d0b..dd66fdcb0f593b 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/ConfigBase.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/ConfigBase.java @@ -56,6 +56,8 @@ public class ConfigBase { Class callback() default DefaultConfHandler.class; + String callbackClassString() default ""; + // description for this config item. // There should be 2 elements in the array. // The first element is the description in Chinese. @@ -329,6 +331,16 @@ public static synchronized void setMutableConfig(String key, String value) throw throw new ConfigException("Failed to set config '" + key + "'. err: " + e.getMessage()); } + String callbackClassString = anno.callbackClassString(); + if (!Strings.isNullOrEmpty(callbackClassString)) { + try { + ConfHandler confHandler = (ConfHandler) Class.forName(anno.callbackClassString()).newInstance(); + confHandler.handle(field, value); + } catch (Exception e) { + throw new ConfigException("Failed to set config '" + key + "'. err: " + e.getMessage()); + } + } + LOG.info("set config {} to {}", key, value); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java index f3643046cade04..ad047a1693eb7c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java @@ -18,14 +18,13 @@ package org.apache.doris.common; import org.apache.doris.analysis.UserIdentity; -import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.View; +import org.apache.doris.common.ConfigBase.DefaultConfHandler; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.privilege.DataMaskPolicy; @@ -50,6 +49,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache; import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.Types.PUniqueId; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.cache.CacheAnalyzer; import org.apache.doris.qe.cache.SqlCache; @@ -59,6 +59,7 @@ import com.google.common.collect.ImmutableList; import org.apache.commons.collections.CollectionUtils; +import java.lang.reflect.Field; import java.time.Duration; import java.util.List; import java.util.Map.Entry; @@ -69,11 +70,36 @@ /** NereidsSqlCacheManager */ public class NereidsSqlCacheManager { // key: : - // value: CacheAnalyzer - private final Cache sqlCache; + // value: SqlCacheContext + private volatile Cache sqlCaches; public NereidsSqlCacheManager(int sqlCacheNum, long cacheIntervalSeconds) { - sqlCache = Caffeine.newBuilder() + sqlCaches = buildSqlCaches(sqlCacheNum, cacheIntervalSeconds); + } + + public static synchronized void updateConfig() { + Env currentEnv = Env.getCurrentEnv(); + if (currentEnv == null) { + return; + } + NereidsSqlCacheManager sqlCacheManager = currentEnv.getSqlCacheManager(); + if (sqlCacheManager == null) { + return; + } + + Cache sqlCaches = buildSqlCaches( + Config.sql_cache_manage_num, + Config.cache_last_version_interval_second + ); + sqlCaches.putAll(sqlCacheManager.sqlCaches.asMap()); + sqlCacheManager.sqlCaches = sqlCaches; + } + + private static Cache buildSqlCaches(int sqlCacheNum, long cacheIntervalSeconds) { + sqlCacheNum = sqlCacheNum <= 0 ? 100 : sqlCacheNum; + cacheIntervalSeconds = cacheIntervalSeconds <= 0 ? 30 : cacheIntervalSeconds; + + return Caffeine.newBuilder() .maximumSize(sqlCacheNum) .expireAfterAccess(Duration.ofSeconds(cacheIntervalSeconds)) // auto evict cache when jvm memory too low @@ -95,10 +121,9 @@ public void tryAddCache( SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get(); UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity(); String key = currentUserIdentity.toString() + ":" + sql.trim(); - if (analyzer.getCache() instanceof SqlCache - && (currentMissParseSqlFromSqlCache || sqlCache.getIfPresent(key) == null)) { + if ((currentMissParseSqlFromSqlCache || sqlCaches.getIfPresent(key) == null) + && sqlCacheContext.getOrComputeCacheKeyMd5() != null) { SqlCache cache = (SqlCache) analyzer.getCache(); - sqlCacheContext.setCacheKeyMd5(cache.getOrComputeCacheMd5()); sqlCacheContext.setSumOfPartitionNum(cache.getSumOfPartitionNum()); sqlCacheContext.setLatestPartitionId(cache.getLatestId()); sqlCacheContext.setLatestPartitionVersion(cache.getLatestVersion()); @@ -109,23 +134,16 @@ public void tryAddCache( sqlCacheContext.addScanTable(scanTable); } - sqlCache.put(key, sqlCacheContext); + sqlCaches.put(key, sqlCacheContext); } } - /** invalidateCache */ - public void invalidateCache(ConnectContext connectContext, String sql) { - UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity(); - String key = currentUserIdentity.toString() + ":" + sql.trim(); - sqlCache.invalidate(key); - } - /** tryParseSql */ public Optional tryParseSql(ConnectContext connectContext, String sql) { UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity(); Env env = connectContext.getEnv(); String key = currentUserIdentity.toString() + ":" + sql.trim(); - SqlCacheContext sqlCacheContext = sqlCache.getIfPresent(key); + SqlCacheContext sqlCacheContext = sqlCaches.getIfPresent(key); if (sqlCacheContext == null) { return Optional.empty(); } @@ -161,16 +179,17 @@ public Optional tryParseSql(ConnectContext connectContext, Stri try { Status status = new Status(); + PUniqueId cacheKeyMd5 = sqlCacheContext.getOrComputeCacheKeyMd5(); InternalService.PFetchCacheResult cacheData = SqlCache.getCacheData(sqlCacheContext.getCacheProxy(), - sqlCacheContext.getCacheKeyMd5(), sqlCacheContext.getLatestPartitionId(), + cacheKeyMd5, sqlCacheContext.getLatestPartitionId(), sqlCacheContext.getLatestPartitionVersion(), sqlCacheContext.getLatestPartitionTime(), sqlCacheContext.getSumOfPartitionNum(), status); if (status.ok() && cacheData != null && cacheData.getStatus() == InternalService.PCacheStatus.CACHE_OK) { List cacheValues = cacheData.getValuesList(); String cachedPlan = sqlCacheContext.getPhysicalPlan(); - String backendAddress = SqlCache.findCacheBe(sqlCacheContext.getCacheKeyMd5()).getAddress(); + String backendAddress = SqlCache.findCacheBe(cacheKeyMd5).getAddress(); MetricRepo.COUNTER_CACHE_HIT_SQL.increase(1L); @@ -179,9 +198,9 @@ public Optional tryParseSql(ConnectContext connectContext, Stri sqlCacheContext.getResultExprs(), cacheValues, backendAddress, cachedPlan); return Optional.of(logicalSqlCache); } - return Optional.empty(); + return invalidateCache(key); } catch (Throwable t) { - return Optional.empty(); + return invalidateCache(key); } } @@ -309,7 +328,8 @@ private boolean nondeterministicFunctionChanged( return true; } - List> nondeterministicFunctions = sqlCacheContext.getFoldNondeterministicPairs(); + List> nondeterministicFunctions + = sqlCacheContext.getFoldFullNondeterministicPairs(); if (nondeterministicFunctions.isEmpty()) { return false; } @@ -328,21 +348,8 @@ private boolean nondeterministicFunctionChanged( return false; } - private boolean isValidDbAndTable(TableIf tableIf, Env env) { - return getTableFromEnv(tableIf, env) != null; - } - - private TableIf getTableFromEnv(TableIf tableIf, Env env) { - Optional db = env.getInternalCatalog().getDb(tableIf.getDatabase().getId()); - if (!db.isPresent()) { - return null; - } - Optional table = db.get().getTable(tableIf.getId()); - return table.orElse(null); - } - private Optional invalidateCache(String key) { - sqlCache.invalidate(key); + sqlCaches.invalidate(key); return Optional.empty(); } @@ -357,4 +364,15 @@ private TableIf findTableIf(Env env, FullTableName fullTableName) { } return db.get().getTable(fullTableName.table).orElse(null); } + + // NOTE: used in Config.sql_cache_manage_num.callbackClassString and + // Config.cache_last_version_interval_second.callbackClassString, + // don't remove it! + public static class UpdateConfig extends DefaultConfHandler { + @Override + public void handle(Field field, String confVal) throws Exception { + super.handle(field, confVal); + NereidsSqlCacheManager.updateConfig(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java index 6afda4e4fe96d6..6a7dbd2ed9de4f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -52,6 +53,7 @@ public class SqlCacheContext { private final TUniqueId queryId; // if contains udf/udaf/tableValuesFunction we can not process it and skip use sql cache private volatile boolean cannotProcessExpression; + private volatile String originSql; private volatile String physicalPlan; private volatile long latestPartitionId = -1; private volatile long latestPartitionTime = -1; @@ -65,9 +67,13 @@ public class SqlCacheContext { private final Map> rowPolicies = Maps.newLinkedHashMap(); private final Map> dataMaskPolicies = Maps.newLinkedHashMap(); private final Set usedVariables = Sets.newLinkedHashSet(); - // key: the expression which contains nondeterministic function, e.g. date(now()) - // value: the expression which already try to fold nondeterministic function, e.g. '2024-01-01' + // key: the expression which **contains** nondeterministic function, e.g. date_add(date_column, date(now())) + // value: the expression which already try to fold nondeterministic function, + // e.g. date_add(date_column, '2024-01-01') // note that value maybe contains nondeterministic function too, when fold failed + private final List> foldFullNondeterministicPairs = Lists.newArrayList(); + // key: the expression which **is** nondeterministic function, e.g. now() + // value: the expression which already try to fold nondeterministic function, e.g. '2024-01-01 10:01:03' private final List> foldNondeterministicPairs = Lists.newArrayList(); private volatile boolean hasUnsupportedTables; private final List scanTables = Lists.newArrayList(); @@ -206,6 +212,14 @@ public synchronized List getUsedVariables() { return ImmutableList.copyOf(usedVariables); } + public synchronized void addFoldFullNondeterministicPair(Expression unfold, Expression fold) { + foldFullNondeterministicPairs.add(Pair.of(unfold, fold)); + } + + public synchronized List> getFoldFullNondeterministicPairs() { + return ImmutableList.copyOf(foldFullNondeterministicPairs); + } + public synchronized void addFoldNondeterministicPair(Expression unfold, Expression fold) { foldNondeterministicPairs.add(Pair.of(unfold, fold)); } @@ -278,10 +292,6 @@ public synchronized Map> getRowPolicies() { return ImmutableMap.copyOf(rowPolicies); } - public boolean isHasUnsupportedTables() { - return hasUnsupportedTables; - } - public synchronized void addScanTable(ScanTable scanTable) { this.scanTables.add(scanTable); } @@ -310,12 +320,62 @@ public TUniqueId getQueryId() { return queryId; } - public PUniqueId getCacheKeyMd5() { + /** getOrComputeCacheKeyMd5 */ + public PUniqueId getOrComputeCacheKeyMd5() { + if (cacheKeyMd5 == null && originSql != null) { + synchronized (this) { + if (cacheKeyMd5 != null) { + return cacheKeyMd5; + } + + StringBuilder cacheKey = new StringBuilder(originSql); + for (Entry entry : usedViews.entrySet()) { + cacheKey.append("|") + .append(entry.getKey()) + .append("=") + .append(entry.getValue()); + } + for (Variable usedVariable : usedVariables) { + cacheKey.append("|") + .append(usedVariable.getType().name()) + .append(":") + .append(usedVariable.getName()) + .append("=") + .append(usedVariable.getRealExpression().toSql()); + } + for (Pair pair : foldNondeterministicPairs) { + cacheKey.append("|") + .append(pair.key().toSql()) + .append("=") + .append(pair.value().toSql()); + } + for (Entry> entry : rowPolicies.entrySet()) { + List policy = entry.getValue(); + if (policy.isEmpty()) { + continue; + } + cacheKey.append("|") + .append(entry.getKey()) + .append("=") + .append(policy); + } + for (Entry> entry : dataMaskPolicies.entrySet()) { + if (!entry.getValue().isPresent()) { + continue; + } + cacheKey.append("|") + .append(entry.getKey()) + .append("=") + .append(entry.getValue().map(Object::toString).orElse("")); + } + cacheKeyMd5 = CacheProxy.getMd5(cacheKey.toString()); + } + } return cacheKeyMd5; } - public void setCacheKeyMd5(PUniqueId cacheKeyMd5) { - this.cacheKeyMd5 = cacheKeyMd5; + public void setOriginSql(String originSql) { + this.originSql = originSql.trim(); } /** FullTableName */ @@ -325,6 +385,11 @@ public static class FullTableName { public final String catalog; public final String db; public final String table; + + @Override + public String toString() { + return catalog + "." + db + "." + table; + } } /** FullColumnName */ @@ -335,6 +400,11 @@ public static class FullColumnName { public final String db; public final String table; public final String column; + + @Override + public String toString() { + return catalog + "." + db + "." + table + "." + column; + } } /** ScanTable */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index 5baa59a35dec1b..a468a0f0cb7f21 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -112,7 +112,6 @@ public class StatementContext implements Closeable { private final Map rewrittenCteConsumer = new HashMap<>(); private final Set viewDdlSqlSet = Sets.newHashSet(); private final SqlCacheContext sqlCacheContext; - private Map> checkedPrivilegedTableAndUsedColumns = Maps.newLinkedHashMap(); // collect all hash join conditions to compute node connectivity in join graph private final List joinFilters = new ArrayList<>(); @@ -155,6 +154,9 @@ public StatementContext(ConnectContext connectContext, OriginStatement originSta && CacheAnalyzer.canUseSqlCache(connectContext.getSessionVariable())) { this.sqlCacheContext = new SqlCacheContext( connectContext.getCurrentUserIdentity(), connectContext.queryId()); + if (originStatement != null) { + this.sqlCacheContext.setOriginSql(originStatement.originStmt.trim()); + } } else { this.sqlCacheContext = null; } @@ -170,6 +172,9 @@ public ConnectContext getConnectContext() { public void setOriginStatement(OriginStatement originStatement) { this.originStatement = originStatement; + if (originStatement != null && sqlCacheContext != null) { + sqlCacheContext.setOriginSql(originStatement.originStmt.trim()); + } } public OriginStatement getOriginStatement() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java index cc85bc01323c75..691e4aca1a337d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java @@ -154,14 +154,18 @@ public Expression analyze(Expression expression, ExpressionRewriteContext contex @Override public Expression visitBoundFunction(BoundFunction boundFunction, ExpressionRewriteContext context) { Expression fold = super.visitBoundFunction(boundFunction, context); - if (fold instanceof Nondeterministic) { + boolean unfold = fold instanceof Nondeterministic; + if (unfold) { sqlCacheContext.setCannotProcessExpression(true); } + if (boundFunction instanceof Nondeterministic && !unfold) { + sqlCacheContext.addFoldNondeterministicPair(boundFunction, fold); + } return fold; } }.rewrite(analyzeResult, context); - sqlCacheContext.addFoldNondeterministicPair(analyzeResult, foldNondeterministic); + sqlCacheContext.addFoldFullNondeterministicPair(analyzeResult, foldNondeterministic); return foldNondeterministic; } return analyzeResult; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java index 5c01fd4df9a87a..79462a9d8e7ebf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java @@ -141,11 +141,20 @@ public boolean isHashJoinCondition(EqualPredicate equal) { public static Pair, List> extractExpressionForHashTable(List leftSlots, List rightSlots, List onConditions) { JoinSlotCoverageChecker checker = new JoinSlotCoverageChecker(leftSlots, rightSlots); - Map> mapper = onConditions.stream().collect(Collectors.groupingBy( - expr -> (expr instanceof EqualPredicate) && checker.isHashJoinCondition((EqualPredicate) expr))); + + ImmutableList.Builder hashConditions = ImmutableList.builderWithExpectedSize(onConditions.size()); + ImmutableList.Builder otherConditions = ImmutableList.builderWithExpectedSize(onConditions.size()); + for (Expression expr : onConditions) { + if (expr instanceof EqualPredicate && checker.isHashJoinCondition((EqualPredicate) expr)) { + hashConditions.add(expr); + } else { + otherConditions.add(expr); + } + } + return Pair.of( - mapper.getOrDefault(true, ImmutableList.of()), - mapper.getOrDefault(false, ImmutableList.of()) + hashConditions.build(), + otherConditions.build() ); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index e5b2e1b2479d73..4c2c866578f5c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -62,6 +62,7 @@ import org.apache.doris.planner.RuntimeFilterId; import org.apache.doris.planner.ScanNode; import org.apache.doris.planner.SetOperationNode; +import org.apache.doris.planner.SortNode; import org.apache.doris.planner.UnionNode; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PExecPlanFragmentResult; @@ -999,7 +1000,9 @@ private void sendPipelineCtx() throws TException, RpcException, UserException { // 4. send and wait fragments rpc // 4.1 serialize fragment // unsetFields() must be called serially. - beToPipelineExecCtxs.values().stream().forEach(ctxs -> ctxs.unsetFields()); + for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) { + ctxs.unsetFields(); + } // serializeFragments() can be called in parallel. final AtomicLong compressedSize = new AtomicLong(0); beToPipelineExecCtxs.values().parallelStream().forEach(ctxs -> { @@ -3809,10 +3812,15 @@ Map toTPipelineParams(int backendNum) int rate = Math.min(Config.query_colocate_join_memory_limit_penalty_factor, instanceExecParams.size()); memLimit = queryOptions.getMemLimit() / rate; } - Set topnFilterSources = scanNodes.stream() - .filter(scanNode -> scanNode instanceof OlapScanNode) - .flatMap(scanNode -> ((OlapScanNode) scanNode).getTopnFilterSortNodes().stream()) - .map(sort -> sort.getId().asInt()).collect(Collectors.toSet()); + Set topnFilterSources = Sets.newLinkedHashSet(); + for (ScanNode scanNode : scanNodes) { + if (scanNode instanceof OlapScanNode) { + for (SortNode sortNode : ((OlapScanNode) scanNode).getTopnFilterSortNodes()) { + topnFilterSources.add(sortNode.getId().asInt()); + } + } + } + Map res = new HashMap(); Map instanceIdx = new HashMap(); TPlanFragment fragmentThrift = fragment.toThrift(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index eb1d511f45e01e..064243c62692c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -42,6 +42,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -1639,16 +1640,33 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { public static final String IGNORE_RUNTIME_FILTER_IDS = "ignore_runtime_filter_ids"; public Set getIgnoredRuntimeFilterIds() { - return Arrays.stream(ignoreRuntimeFilterIds.split(",[\\s]*")) - .map(v -> { - int res = -1; + Set ids = Sets.newLinkedHashSet(); + if (ignoreRuntimeFilterIds.isEmpty()) { + return ImmutableSet.of(); + } + for (String v : ignoreRuntimeFilterIds.split(",[\\s]*")) { + int res = -1; + if (!v.isEmpty()) { + boolean isNumber = true; + for (int i = 0; i < v.length(); ++i) { + char c = v.charAt(i); + if (c < '0' || c > '9') { + isNumber = false; + break; + } + } + if (isNumber) { try { - res = Integer.valueOf(v); - } catch (Exception e) { - //ignore it + res = Integer.parseInt(v); + } catch (Throwable t) { + // ignore } - return res; - }).collect(ImmutableSet.toImmutableSet()); + } + + } + ids.add(res); + } + return ids; } public void setIgnoreRuntimeFilterIds(String ignoreRuntimeFilterIds) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 19faa555999139..3b341ded8ed810 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1692,7 +1692,6 @@ private void handleCacheStmt(CacheAnalyzer cacheAnalyzer, MysqlChannel channel) } } - CacheMode mode = cacheAnalyzer.getCacheMode(); Queriable queryStmt = (Queriable) parsedStmt; boolean isSendFields = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java index f1c5d98f16095f..161661055a4981 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java @@ -45,6 +45,8 @@ import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.metric.MetricRepo; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.SqlCacheContext; import org.apache.doris.nereids.SqlCacheContext.FullTableName; import org.apache.doris.nereids.SqlCacheContext.ScanTable; import org.apache.doris.nereids.glue.LogicalPlanAdapter; @@ -70,6 +72,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; /** @@ -401,7 +404,7 @@ private CacheMode innerCheckCacheModeForNereids(long now) { } return CacheMode.NoNeed; } - if (!(parsedStmt instanceof LogicalPlanAdapter) || scanNodes.size() == 0) { + if (!(parsedStmt instanceof LogicalPlanAdapter) || scanNodes.isEmpty()) { if (LOG.isDebugEnabled()) { LOG.debug("not a select stmt or no scan node. queryid {}", DebugUtil.printId(queryId)); } @@ -442,13 +445,22 @@ private CacheMode innerCheckCacheModeForNereids(long now) { Config.cache_last_version_interval_second * 1000); } + String originStmt = ((LogicalPlanAdapter) parsedStmt).getStatementContext() + .getOriginStatement().originStmt; + cache = new SqlCache(this.queryId, originStmt); + SqlCache sqlCache = (SqlCache) cache; PUniqueId existsMd5 = null; - if (cache instanceof SqlCache) { - existsMd5 = ((SqlCache) cache).getOrComputeCacheMd5(); + if (planner instanceof NereidsPlanner) { + NereidsPlanner nereidsPlanner = (NereidsPlanner) planner; + Optional sqlCacheContext = nereidsPlanner + .getCascadesContext() + .getStatementContext() + .getSqlCacheContext(); + if (sqlCacheContext.isPresent()) { + existsMd5 = sqlCacheContext.get().getOrComputeCacheKeyMd5(); + } } - cache = new SqlCache(this.queryId, ((LogicalPlanAdapter) parsedStmt).getStatementContext() - .getOriginStatement().originStmt); - SqlCache sqlCache = (SqlCache) cache; + sqlCache.setCacheInfo(this.latestTable, allViewExpandStmtListStr); sqlCache.setCacheMd5(existsMd5); MetricRepo.COUNTER_CACHE_ADDED_SQL.increase(1L); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java index 20d6f5d0d02401..dd0e0ce6bcdb77 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java @@ -60,7 +60,6 @@ public abstract InternalService.PFetchCacheResult fetchCache(InternalService.PFe public abstract void clearCache(InternalService.PClearCacheRequest clearRequest); - public static Types.PUniqueId getMd5(String str) { MessageDigest msgDigest; final byte[] digest; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java index 5cab6f74bb3411..20d9f93efb494e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Type; import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.Types.PUniqueId; import org.apache.doris.qe.RowBatch; import org.apache.doris.thrift.TResultBatch; @@ -117,10 +118,10 @@ public void clear() { } public InternalService.PUpdateCacheRequest buildSqlUpdateRequest( - String sql, long partitionKey, long lastVersion, long lastestTime, long partitionNum) { + PUniqueId cacheKeyMd5, long partitionKey, long lastVersion, long lastestTime, long partitionNum) { if (updateRequest == null) { updateRequest = InternalService.PUpdateCacheRequest.newBuilder() - .setSqlKey(CacheProxy.getMd5(sql)) + .setSqlKey(cacheKeyMd5) .setCacheType(InternalService.CacheType.SQL_CACHE).build(); } updateRequest = updateRequest.toBuilder() diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java index a7351a306d2e2d..efe4ed5949f6d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java @@ -141,9 +141,12 @@ public void updateCache() { return; } + PUniqueId cacheKeyMd5 = getOrComputeCacheMd5(); InternalService.PUpdateCacheRequest updateRequest = - rowBatchBuilder.buildSqlUpdateRequest(getSqlWithViewStmt(), latestTable.latestPartitionId, - latestTable.latestPartitionVersion, latestTable.latestPartitionTime, + rowBatchBuilder.buildSqlUpdateRequest(cacheKeyMd5, + latestTable.latestPartitionId, + latestTable.latestPartitionVersion, + latestTable.latestPartitionTime, latestTable.sumOfPartitionNum ); if (updateRequest.getValuesCount() > 0) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java index 7613a2d284b91c..949b28491d6dba 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java @@ -367,9 +367,9 @@ public void testHitSqlCache() throws Exception { List scanNodes = Arrays.asList(hiveScanNode1); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); ca.checkCacheMode(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); - Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + Assert.assertEquals(CacheAnalyzer.CacheMode.Sql, ca.getCacheMode()); SqlCache sqlCache = (SqlCache) ca.getCache(); - Assert.assertEquals(sqlCache.getLatestTime(), NOW); + Assert.assertEquals(NOW, sqlCache.getLatestTime()); } @Test @@ -383,11 +383,11 @@ public void testHitSqlCacheAfterPartitionUpdateTimeChanged() throws Exception { CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); ca.checkCacheMode(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); - Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + Assert.assertEquals(CacheAnalyzer.CacheMode.Sql, ca.getCacheMode()); SqlCache sqlCache1 = (SqlCache) ca.getCache(); // latestTime is equals to the schema update time if not set partition update time - Assert.assertEquals(sqlCache1.getLatestTime(), tbl2.getSchemaUpdateTime()); + Assert.assertEquals(tbl2.getSchemaUpdateTime(), sqlCache1.getLatestTime()); // wait a second and set partition update time try { @@ -401,7 +401,7 @@ public void testHitSqlCacheAfterPartitionUpdateTimeChanged() throws Exception { // check cache mode again ca.checkCacheMode(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); SqlCache sqlCache2 = (SqlCache) ca.getCache(); - Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + Assert.assertEquals(CacheAnalyzer.CacheMode.Sql, ca.getCacheMode()); // the latest time will be changed and is equals to the partition update time Assert.assertEquals(later, sqlCache2.getLatestTime()); @@ -415,9 +415,9 @@ public void testHitSqlCacheByNereids() { List scanNodes = Arrays.asList(hiveScanNode1); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); ca.checkCacheModeForNereids(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); - Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + Assert.assertEquals(CacheAnalyzer.CacheMode.Sql, ca.getCacheMode()); SqlCache sqlCache = (SqlCache) ca.getCache(); - Assert.assertEquals(sqlCache.getLatestTime(), NOW); + Assert.assertEquals(NOW, sqlCache.getLatestTime()); } @Test @@ -431,11 +431,11 @@ public void testHitSqlCacheByNereidsAfterPartitionUpdateTimeChanged() { CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); ca.checkCacheModeForNereids(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); - Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + Assert.assertEquals(CacheAnalyzer.CacheMode.Sql, ca.getCacheMode()); SqlCache sqlCache1 = (SqlCache) ca.getCache(); // latestTime is equals to the schema update time if not set partition update time - Assert.assertEquals(sqlCache1.getLatestTime(), tbl2.getSchemaUpdateTime()); + Assert.assertEquals(tbl2.getSchemaUpdateTime(), sqlCache1.getLatestTime()); // wait a second and set partition update time try { @@ -449,7 +449,7 @@ public void testHitSqlCacheByNereidsAfterPartitionUpdateTimeChanged() { // check cache mode again ca.checkCacheModeForNereids(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); SqlCache sqlCache2 = (SqlCache) ca.getCache(); - Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + Assert.assertEquals(CacheAnalyzer.CacheMode.Sql, ca.getCacheMode()); // the latest time will be changed and is equals to the partition update time Assert.assertEquals(later, sqlCache2.getLatestTime()); @@ -463,9 +463,9 @@ public void testHitSqlCacheWithHiveView() throws Exception { List scanNodes = Arrays.asList(hiveScanNode2); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); ca.checkCacheMode(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); - Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + Assert.assertEquals(CacheAnalyzer.CacheMode.Sql, ca.getCacheMode()); SqlCache sqlCache = (SqlCache) ca.getCache(); - Assert.assertEquals(sqlCache.getLatestTime(), NOW); + Assert.assertEquals(NOW, sqlCache.getLatestTime()); } @Test @@ -475,9 +475,9 @@ public void testHitSqlCacheWithHiveViewByNereids() { List scanNodes = Arrays.asList(hiveScanNode2); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); ca.checkCacheModeForNereids(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); - Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + Assert.assertEquals(CacheAnalyzer.CacheMode.Sql, ca.getCacheMode()); SqlCache sqlCache = (SqlCache) ca.getCache(); - Assert.assertEquals(sqlCache.getLatestTime(), NOW); + Assert.assertEquals(NOW, sqlCache.getLatestTime()); } @Test @@ -487,13 +487,13 @@ public void testHitSqlCacheWithNestedHiveView() throws Exception { List scanNodes = Arrays.asList(hiveScanNode3); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); ca.checkCacheMode(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); - Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + Assert.assertEquals(CacheAnalyzer.CacheMode.Sql, ca.getCacheMode()); SqlCache sqlCache = (SqlCache) ca.getCache(); String cacheKey = sqlCache.getSqlWithViewStmt(); Assert.assertEquals(cacheKey, "SELECT `hms_ctl`.`hms_db`.`hms_view2`.`k1` AS `k1` " + "FROM `hms_ctl`.`hms_db`.`hms_view2`" + "|SELECT * FROM hms_db.hms_tbl|SELECT * FROM hms_db.hms_view1"); - Assert.assertEquals(sqlCache.getLatestTime(), NOW); + Assert.assertEquals(NOW, sqlCache.getLatestTime()); } @Test @@ -503,12 +503,12 @@ public void testHitSqlCacheWithNestedHiveViewByNereids() { List scanNodes = Arrays.asList(hiveScanNode3); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); ca.checkCacheModeForNereids(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); - Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + Assert.assertEquals(CacheAnalyzer.CacheMode.Sql, ca.getCacheMode()); SqlCache sqlCache = (SqlCache) ca.getCache(); String cacheKey = sqlCache.getSqlWithViewStmt(); - Assert.assertEquals(cacheKey, "select * from hms_ctl.hms_db.hms_view2" - + "|SELECT * FROM hms_db.hms_tbl|SELECT * FROM hms_db.hms_view1"); - Assert.assertEquals(sqlCache.getLatestTime(), NOW); + Assert.assertEquals("select * from hms_ctl.hms_db.hms_view2" + + "|SELECT * FROM hms_db.hms_tbl|SELECT * FROM hms_db.hms_view1", cacheKey); + Assert.assertEquals(NOW, sqlCache.getLatestTime()); } @Test @@ -518,7 +518,7 @@ public void testNotHitSqlCache() throws Exception { List scanNodes = Arrays.asList(hiveScanNode1); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); ca.checkCacheMode(0); - Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.None); + Assert.assertEquals(CacheAnalyzer.CacheMode.None, ca.getCacheMode()); } @Test @@ -528,7 +528,7 @@ public void testNotHitSqlCacheByNereids() { List scanNodes = Arrays.asList(hiveScanNode1); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); ca.checkCacheModeForNereids(0); - Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.None); + Assert.assertEquals(CacheAnalyzer.CacheMode.None, ca.getCacheMode()); } @Test @@ -540,7 +540,7 @@ public void testNotHitSqlCacheWithFederatedQuery() throws Exception { List scanNodes = Arrays.asList(hiveScanNode1, olapScanNode); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); ca.checkCacheMode(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); - Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.None); + Assert.assertEquals(CacheAnalyzer.CacheMode.None, ca.getCacheMode()); } @Test @@ -552,6 +552,6 @@ public void testNotHitSqlCacheWithFederatedQueryByNereids() { List scanNodes = Arrays.asList(hiveScanNode1, olapScanNode); CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); ca.checkCacheModeForNereids(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); - Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.None); + Assert.assertEquals(CacheAnalyzer.CacheMode.None, ca.getCacheMode()); } } diff --git a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy index 0a318218301dfc..e73498442ae48b 100644 --- a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy +++ b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +import java.util.stream.Collectors + suite("parse_sql_from_sql_cache") { def assertHasCache = { String sqlStr -> explain { @@ -343,7 +345,7 @@ suite("parse_sql_from_sql_cache") { CREATE ROW POLICY test_cache_row_policy_2 ON ${dbName}.test_use_plan_cache13 AS RESTRICTIVE TO test_cache_user2 - USING (id = 'concat(id, "**")')""" + USING (id = 4)""" sql "set enable_nereids_planner=true" sql "sync" @@ -380,7 +382,7 @@ suite("parse_sql_from_sql_cache") { CREATE ROW POLICY test_cache_row_policy_3 ON ${dbName}.test_use_plan_cache14 AS RESTRICTIVE TO test_cache_user3 - USING (id = 'concat(id, "**")')""" + USING (id = 4)""" sql "set enable_nereids_planner=true" sql "sync" @@ -559,6 +561,63 @@ suite("parse_sql_from_sql_cache") { assertNoCache "SELECT java_udf_string_test(varchar_col, 2, 3) result FROM test_javaudf_string ORDER BY result;" sql "SELECT java_udf_string_test(varchar_col, 2, 3) result FROM test_javaudf_string ORDER BY result;" assertNoCache "SELECT java_udf_string_test(varchar_col, 2, 3) result FROM test_javaudf_string ORDER BY result;" + }), + extraThread("testMultiFrontends", { + def aliveFrontends = sql_return_maparray("show frontends") + .stream() + .filter { it["Alive"].toString().equalsIgnoreCase("true") } + .collect(Collectors.toList()) + + if (aliveFrontends.size() <= 1) { + return + } + + def fe1 = aliveFrontends[0]["Host"] + ":" + aliveFrontends[0]["QueryPort"] + def fe2 = fe1 + if (aliveFrontends.size() > 1) { + fe2 = aliveFrontends[1]["Host"] + ":" + aliveFrontends[1]["QueryPort"] + } + + log.info("fe1: ${fe1}") + log.info("fe2: ${fe2}") + + def dbName = context.config.getDbNameByFile(context.file) + + log.info("connect to fe: ${fe1}") + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = "jdbc:mysql://${fe1}") { + sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" + + sql "use ${dbName}" + + createTestTable "test_use_plan_cache18" + + sql "sync" + + // after partition changed 10s, the sql cache can be used + sleep(10000) + + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache18" + sql "select * from test_use_plan_cache18" + assertHasCache "select * from test_use_plan_cache18" + } + + log.info("connect to fe: ${fe2}") + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = "jdbc:mysql://${fe2}") { + sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" + + sql "use ${dbName}" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache18" + sql "select * from test_use_plan_cache18" + assertHasCache "select * from test_use_plan_cache18" + } }) ).get() }