Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types.PUniqueId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.cache.CacheAnalyzer;
import org.apache.doris.qe.cache.SqlCache;

Expand Down Expand Up @@ -96,8 +97,8 @@ public static synchronized void updateConfig() {
}

private static Cache<String, SqlCacheContext> buildSqlCaches(int sqlCacheNum, long cacheIntervalSeconds) {
sqlCacheNum = sqlCacheNum <= 0 ? 100 : sqlCacheNum;
cacheIntervalSeconds = cacheIntervalSeconds <= 0 ? 30 : cacheIntervalSeconds;
sqlCacheNum = sqlCacheNum < 0 ? 100 : sqlCacheNum;
cacheIntervalSeconds = cacheIntervalSeconds < 0 ? 30 : cacheIntervalSeconds;

return Caffeine.newBuilder()
.maximumSize(sqlCacheNum)
Expand All @@ -107,6 +108,22 @@ private static Cache<String, SqlCacheContext> buildSqlCaches(int sqlCacheNum, lo
.build();
}

/** tryAddFeCache */
public void tryAddFeSqlCache(ConnectContext connectContext, String sql) {
Optional<SqlCacheContext> sqlCacheContextOpt = connectContext.getStatementContext().getSqlCacheContext();
if (!sqlCacheContextOpt.isPresent()) {
return;
}

SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get();
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
String key = currentUserIdentity.toString() + ":" + sql.trim();
if ((sqlCaches.getIfPresent(key) == null) && sqlCacheContext.getOrComputeCacheKeyMd5() != null
&& sqlCacheContext.getResultSetInFe().isPresent()) {
sqlCaches.put(key, sqlCacheContext);
}
}

/** tryAddCache */
public void tryAddCache(
ConnectContext connectContext, String sql,
Expand Down Expand Up @@ -178,6 +195,19 @@ public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, Stri
}

try {
Optional<ResultSet> resultSetInFe = sqlCacheContext.getResultSetInFe();
if (resultSetInFe.isPresent()) {
MetricRepo.COUNTER_CACHE_HIT_SQL.increase(1L);

String cachedPlan = sqlCacheContext.getPhysicalPlan();
LogicalSqlCache logicalSqlCache = new LogicalSqlCache(
sqlCacheContext.getQueryId(), sqlCacheContext.getColLabels(),
sqlCacheContext.getResultExprs(), resultSetInFe, ImmutableList.of(),
"none", cachedPlan
);
return Optional.of(logicalSqlCache);
}

Status status = new Status();
PUniqueId cacheKeyMd5 = sqlCacheContext.getOrComputeCacheKeyMd5();
InternalService.PFetchCacheResult cacheData =
Expand All @@ -195,7 +225,9 @@ public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, Stri

LogicalSqlCache logicalSqlCache = new LogicalSqlCache(
sqlCacheContext.getQueryId(), sqlCacheContext.getColLabels(),
sqlCacheContext.getResultExprs(), cacheValues, backendAddress, cachedPlan);
sqlCacheContext.getResultExprs(), Optional.empty(),
cacheValues, backendAddress, cachedPlan
);
return Optional.of(logicalSqlCache);
}
return invalidateCache(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.common.NereidsException;
import org.apache.doris.common.Pair;
Expand Down Expand Up @@ -54,6 +55,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
Expand All @@ -66,8 +68,10 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.ResultSetMetaData;
import org.apache.doris.qe.cache.CacheAnalyzer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -168,8 +172,9 @@ public Plan plan(LogicalPlan plan, PhysicalProperties requireProperties,
LogicalSqlCache logicalSqlCache = (LogicalSqlCache) plan;
physicalPlan = new PhysicalSqlCache(
logicalSqlCache.getQueryId(), logicalSqlCache.getColumnLabels(),
logicalSqlCache.getResultExprs(), logicalSqlCache.getCacheValues(),
logicalSqlCache.getBackendAddress(), logicalSqlCache.getPlanBody()
logicalSqlCache.getResultExprs(), logicalSqlCache.getResultSetInFe(),
logicalSqlCache.getCacheValues(), logicalSqlCache.getBackendAddress(),
logicalSqlCache.getPlanBody()
);
return physicalPlan;
}
Expand Down Expand Up @@ -528,31 +533,66 @@ public Optional<ResultSet> handleQueryInFe(StatementBase parsedStmt) {
if (!(parsedStmt instanceof LogicalPlanAdapter)) {
return Optional.empty();
}
if (!(physicalPlan instanceof PhysicalResultSink)) {
return Optional.empty();
if (physicalPlan instanceof PhysicalSqlCache
&& ((PhysicalSqlCache) physicalPlan).getResultSet().isPresent()) {
return Optional.of(((PhysicalSqlCache) physicalPlan).getResultSet().get());
}
if (!(((PhysicalResultSink<?>) physicalPlan).child() instanceof PhysicalOneRowRelation)) {
if (!(physicalPlan instanceof PhysicalResultSink)) {
return Optional.empty();
}
PhysicalOneRowRelation physicalOneRowRelation
= (PhysicalOneRowRelation) ((PhysicalResultSink<?>) physicalPlan).child();
List<Column> columns = Lists.newArrayList();
List<String> data = Lists.newArrayList();
for (int i = 0; i < physicalOneRowRelation.getProjects().size(); i++) {
NamedExpression item = physicalOneRowRelation.getProjects().get(i);
NamedExpression output = physicalPlan.getOutput().get(i);
Expression expr = item.child(0);
if (expr instanceof Literal) {
LiteralExpr legacyExpr = ((Literal) expr).toLegacyLiteral();

Optional<SqlCacheContext> sqlCacheContext = statementContext.getSqlCacheContext();
boolean enableSqlCache
= CacheAnalyzer.canUseSqlCache(statementContext.getConnectContext().getSessionVariable());
Plan child = physicalPlan.child(0);
if (child instanceof PhysicalOneRowRelation) {
PhysicalOneRowRelation physicalOneRowRelation = (PhysicalOneRowRelation) physicalPlan.child(0);
List<Column> columns = Lists.newArrayList();
List<String> data = Lists.newArrayList();
for (int i = 0; i < physicalOneRowRelation.getProjects().size(); i++) {
NamedExpression item = physicalOneRowRelation.getProjects().get(i);
NamedExpression output = physicalPlan.getOutput().get(i);
Expression expr = item.child(0);
if (expr instanceof Literal) {
LiteralExpr legacyExpr = ((Literal) expr).toLegacyLiteral();
columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType()));
data.add(legacyExpr.getStringValueInFe());
} else {
return Optional.empty();
}
}

ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(data));
if (sqlCacheContext.isPresent() && enableSqlCache) {
sqlCacheContext.get().setResultSetInFe(resultSet);
Env.getCurrentEnv().getSqlCacheManager().tryAddFeSqlCache(
statementContext.getConnectContext(),
statementContext.getOriginStatement().originStmt
);
}
return Optional.of(resultSet);
} else if (child instanceof PhysicalEmptyRelation) {
List<Column> columns = Lists.newArrayList();
PhysicalEmptyRelation physicalEmptyRelation = (PhysicalEmptyRelation) physicalPlan.child(0);
for (int i = 0; i < physicalEmptyRelation.getProjects().size(); i++) {
NamedExpression output = physicalPlan.getOutput().get(i);
columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType()));
data.add(legacyExpr.getStringValueInFe());
} else {
return Optional.empty();
}

ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
ResultSet resultSet = new CommonResultSet(metadata, ImmutableList.of());
if (sqlCacheContext.isPresent() && enableSqlCache) {
sqlCacheContext.get().setResultSetInFe(resultSet);
Env.getCurrentEnv().getSqlCacheManager().tryAddFeSqlCache(
statementContext.getConnectContext(),
statementContext.getOriginStatement().originStmt
);
}
return Optional.of(resultSet);
} else {
return Optional.empty();
}
ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(data));
return Optional.of(resultSet);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.nereids.trees.expressions.Variable;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.proto.Types.PUniqueId;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.cache.CacheProxy;
import org.apache.doris.thrift.TUniqueId;

Expand Down Expand Up @@ -83,6 +84,7 @@ public class SqlCacheContext {
private volatile List<String> colLabels;

private volatile PUniqueId cacheKeyMd5;
private volatile ResultSet resultSetInFe;

public SqlCacheContext(UserIdentity userIdentity, TUniqueId queryId) {
this.userIdentity = Objects.requireNonNull(userIdentity, "userIdentity cannot be null");
Expand Down Expand Up @@ -378,6 +380,14 @@ public void setOriginSql(String originSql) {
this.originSql = originSql.trim();
}

public Optional<ResultSet> getResultSetInFe() {
return Optional.ofNullable(resultSetInFe);
}

public void setResultSetInFe(ResultSet resultSetInFe) {
this.resultSetInFe = resultSetInFe;
}

/** FullTableName */
@lombok.Data
@lombok.AllArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.ImmutableList;
Expand All @@ -46,18 +47,21 @@ public class LogicalSqlCache extends LogicalLeaf implements SqlCache, TreeString
private final TUniqueId queryId;
private final List<String> columnLabels;
private final List<Expr> resultExprs;
private final Optional<ResultSet> resultSetInFe;
private final List<InternalService.PCacheValue> cacheValues;
private final String backendAddress;
private final String planBody;

/** LogicalSqlCache */
public LogicalSqlCache(TUniqueId queryId,
List<String> columnLabels, List<Expr> resultExprs,
List<InternalService.PCacheValue> cacheValues, String backendAddress, String planBody) {
Optional<ResultSet> resultSetInFe, List<InternalService.PCacheValue> cacheValues,
String backendAddress, String planBody) {
super(PlanType.LOGICAL_SQL_CACHE, Optional.empty(), Optional.empty());
this.queryId = Objects.requireNonNull(queryId, "queryId can not be null");
this.columnLabels = Objects.requireNonNull(columnLabels, "columnLabels can not be null");
this.resultExprs = Objects.requireNonNull(resultExprs, "resultExprs can not be null");
this.resultSetInFe = Objects.requireNonNull(resultSetInFe, "resultSetInFe can not be null");
this.cacheValues = Objects.requireNonNull(cacheValues, "cacheValues can not be null");
this.backendAddress = Objects.requireNonNull(backendAddress, "backendAddress can not be null");
this.planBody = Objects.requireNonNull(planBody, "planBody can not be null");
Expand All @@ -67,6 +71,10 @@ public TUniqueId getQueryId() {
return queryId;
}

public Optional<ResultSet> getResultSetInFe() {
return resultSetInFe;
}

public List<InternalService.PCacheValue> getCacheValues() {
return cacheValues;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PCacheValue;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.statistics.Statistics;
import org.apache.doris.thrift.TUniqueId;

Expand All @@ -46,19 +48,22 @@ public class PhysicalSqlCache extends PhysicalLeaf implements SqlCache, TreeStri
private final TUniqueId queryId;
private final List<String> columnLabels;
private final List<Expr> resultExprs;
private final Optional<ResultSet> resultSet;
private final List<InternalService.PCacheValue> cacheValues;
private final String backendAddress;
private final String planBody;

/** PhysicalSqlCache */
public PhysicalSqlCache(TUniqueId queryId,
List<String> columnLabels, List<Expr> resultExprs,
List<InternalService.PCacheValue> cacheValues, String backendAddress, String planBody) {
Optional<ResultSet> resultSet, List<InternalService.PCacheValue> cacheValues,
String backendAddress, String planBody) {
super(PlanType.PHYSICAL_SQL_CACHE, Optional.empty(),
new LogicalProperties(() -> ImmutableList.of(), () -> FunctionalDependencies.EMPTY_FUNC_DEPS));
this.queryId = Objects.requireNonNull(queryId, "queryId can not be null");
this.columnLabels = Objects.requireNonNull(columnLabels, "colNames can not be null");
this.resultExprs = Objects.requireNonNull(resultExprs, "resultExprs can not be null");
this.resultSet = Objects.requireNonNull(resultSet, "resultSet can not be null");
this.cacheValues = Objects.requireNonNull(cacheValues, "cacheValues can not be null");
this.backendAddress = Objects.requireNonNull(backendAddress, "backendAddress can not be null");
this.planBody = Objects.requireNonNull(planBody, "planBody can not be null");
Expand All @@ -68,6 +73,10 @@ public TUniqueId getQueryId() {
return queryId;
}

public Optional<ResultSet> getResultSet() {
return resultSet;
}

public List<InternalService.PCacheValue> getCacheValues() {
return cacheValues;
}
Expand All @@ -90,9 +99,18 @@ public String getPlanBody() {

@Override
public String toString() {
long rowCount = 0;
if (resultSet.isPresent()) {
rowCount = resultSet.get().getResultRows().size();
} else {
for (PCacheValue cacheValue : cacheValues) {
rowCount += cacheValue.getRowsCount();
}
}
return Utils.toSqlString("PhysicalSqlCache[" + id.asInt() + "]",
"queryId", DebugUtil.printId(queryId),
"backend", backendAddress
"backend", backendAddress,
"rowCount", rowCount
);
}

Expand Down
12 changes: 6 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1661,8 +1661,7 @@ private boolean sendCachedValues(MysqlChannel channel, List<InternalService.PCac
/**
* Handle the SelectStmt via Cache.
*/
private void handleCacheStmt(CacheAnalyzer cacheAnalyzer, MysqlChannel channel)
throws Exception {
private void handleCacheStmt(CacheAnalyzer cacheAnalyzer, MysqlChannel channel) throws Exception {
InternalService.PFetchCacheResult cacheResult = null;
boolean wantToParseSqlForSqlCache = planner instanceof NereidsPlanner
&& CacheAnalyzer.canUseSqlCache(context.getSessionVariable());
Expand Down Expand Up @@ -1838,6 +1837,7 @@ private void sendResult(boolean isOutfileQuery, boolean isSendFields, Queriable
return;
}

boolean isDryRun = ConnectContext.get() != null && ConnectContext.get().getSessionVariable().dryRunQuery;
while (true) {
// register the fetch result time.
profile.getSummaryProfile().setTempStartTime();
Expand All @@ -1846,7 +1846,7 @@ private void sendResult(boolean isOutfileQuery, boolean isSendFields, Queriable

// for outfile query, there will be only one empty batch send back with eos flag
// call `copyRowBatch()` first, because batch.getBatch() may be null, if result set is empty
if (cacheAnalyzer != null && !isOutfileQuery) {
if (cacheAnalyzer != null && !isOutfileQuery && !isDryRun) {
cacheAnalyzer.copyRowBatch(batch);
}
if (batch.getBatch() != null) {
Expand Down Expand Up @@ -1877,10 +1877,10 @@ private void sendResult(boolean isOutfileQuery, boolean isSendFields, Queriable
break;
}
}
if (cacheAnalyzer != null) {
if (cacheAnalyzer != null && !isDryRun) {
if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) {
isSendFields =
sendCachedValues(channel, cacheResult.getValuesList(), (Queriable) queryStmt, isSendFields,
sendCachedValues(channel, cacheResult.getValuesList(), queryStmt, isSendFields,
false);
}

Expand All @@ -1898,7 +1898,7 @@ private void sendResult(boolean isOutfileQuery, boolean isSendFields, Queriable
}
if (!isSendFields) {
if (!isOutfileQuery) {
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().dryRunQuery) {
if (ConnectContext.get() != null && isDryRun) {
// Return a one row one column result set, with the real result number
List<String> data = Lists.newArrayList(batch.getQueryStatistics() == null ? "0"
: batch.getQueryStatistics().getReturnedRows() + "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ public static boolean canUseSqlCache(SessionVariable sessionVariable) {
}

public static boolean commonCacheCondition(SessionVariable sessionVariable) {
return sessionVariable.getSqlSelectLimit() < 0 && sessionVariable.getDefaultOrderByLimit() < 0;
return sessionVariable.getSqlSelectLimit() < 0 && sessionVariable.getDefaultOrderByLimit() < 0
&& !sessionVariable.dryRunQuery;
}

/**
Expand Down
Loading