From 586492c124e44eb87183b6f4472853ffb13a2344 Mon Sep 17 00:00:00 2001 From: Xiangyu Wang Date: Sun, 10 Sep 2023 21:56:35 +0800 Subject: [PATCH] [Feature](multi-catalog) Support sql cache for hms catalog (#23391) **Support sql cache for hms catalog. Legacy planner and Nereids planner are all supported. Not support partition cache now, not support federated query now.** --- .../org/apache/doris/metric/MetricRepo.java | 4 + .../apache/doris/nereids/NereidsPlanner.java | 2 +- .../doris/nereids/StatementContext.java | 11 +- .../nereids/glue/LogicalPlanAdapter.java | 11 +- .../nereids/rules/analysis/BindRelation.java | 25 +- .../doris/planner/external/FileScanNode.java | 4 + .../apache/doris/qe/cache/CacheAnalyzer.java | 130 +++--- .../apache/doris/qe/cache/PartitionCache.java | 2 +- .../datasets/tpch/AnalyzeCheckTestBase.java | 2 +- .../apache/doris/qe/HmsQueryCacheTest.java | 406 ++++++++++++++++++ ...CacheTest.java => OlapQueryCacheTest.java} | 4 +- .../doris/utframe/TestWithFeService.java | 48 ++- 12 files changed, 559 insertions(+), 90 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java rename fe/fe-core/src/test/java/org/apache/doris/qe/{PartitionCacheTest.java => OlapQueryCacheTest.java} (99%) diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java index bbc42f8d2fa918..c9ceb2c3702879 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -71,6 +71,7 @@ public final class MetricRepo { public static LongCounterMetric COUNTER_QUERY_ERR; public static LongCounterMetric COUNTER_QUERY_TABLE; public static LongCounterMetric COUNTER_QUERY_OLAP_TABLE; + public static LongCounterMetric COUNTER_QUERY_HIVE_TABLE; public static AutoMappedMetric USER_COUNTER_QUERY_ALL; public static AutoMappedMetric USER_COUNTER_QUERY_ERR; public static Histogram HISTO_QUERY_LATENCY; @@ -287,6 +288,9 @@ public Long getValue() { COUNTER_QUERY_OLAP_TABLE = new LongCounterMetric("query_olap_table", MetricUnit.REQUESTS, "total query from olap table"); DORIS_METRIC_REGISTER.addMetrics(COUNTER_QUERY_OLAP_TABLE); + COUNTER_QUERY_HIVE_TABLE = new LongCounterMetric("query_hive_table", MetricUnit.REQUESTS, + "total query from hive table"); + DORIS_METRIC_REGISTER.addMetrics(COUNTER_QUERY_HIVE_TABLE); USER_COUNTER_QUERY_ALL = new AutoMappedMetric<>(name -> { LongCounterMetric userCountQueryAll = new LongCounterMetric("query_total", MetricUnit.REQUESTS, "total query for single user"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 9d3c83649674e1..8d3dca10687f97 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -143,7 +143,7 @@ public void plan(StatementBase queryStmt, org.apache.doris.thrift.TQueryOptions ArrayList columnLabelList = physicalPlan.getOutput().stream().map(NamedExpression::getName) .collect(Collectors.toCollection(ArrayList::new)); logicalPlanAdapter.setColLabels(columnLabelList); - logicalPlanAdapter.setViews(statementContext.getViews()); + logicalPlanAdapter.setViewDdlSqls(statementContext.getViewDdlSqls()); } @VisibleForTesting diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index ceb2e22721b19e..13415c504f1ff4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -18,7 +18,6 @@ package org.apache.doris.nereids; import org.apache.doris.analysis.StatementBase; -import org.apache.doris.catalog.View; import org.apache.doris.common.IdGenerator; import org.apache.doris.common.Pair; import org.apache.doris.nereids.hint.Hint; @@ -87,8 +86,8 @@ public class StatementContext { // Used to update consumer's stats private final Map, Group>>> cteIdToConsumerGroup = new HashMap<>(); private final Map rewrittenCtePlan = new HashMap<>(); - private final Set views = Sets.newHashSet(); private final Map hintMap = Maps.newLinkedHashMap(); + private final Set viewDdlSqlSet = Sets.newHashSet(); public StatementContext() { this.connectContext = ConnectContext.get(); @@ -235,11 +234,11 @@ public Map getRewrittenCtePlan() { return rewrittenCtePlan; } - public void addView(View view) { - this.views.add(view); + public void addViewDdlSql(String ddlSql) { + this.viewDdlSqlSet.add(ddlSql); } - public List getViews() { - return ImmutableList.copyOf(views); + public List getViewDdlSqls() { + return ImmutableList.copyOf(viewDdlSqlSet); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java index 3c9c8d1a1bf095..4d9b70c455efa9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java @@ -23,7 +23,6 @@ import org.apache.doris.analysis.Queriable; import org.apache.doris.analysis.RedirectStatus; import org.apache.doris.analysis.StatementBase; -import org.apache.doris.catalog.View; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.commands.ExplainCommand; @@ -44,7 +43,7 @@ public class LogicalPlanAdapter extends StatementBase implements Queriable { private final LogicalPlan logicalPlan; private List resultExprs; private ArrayList colLabels; - private List views; + private List viewDdlSqls; public LogicalPlanAdapter(LogicalPlan logicalPlan, StatementContext statementContext) { this.logicalPlan = logicalPlan; @@ -81,8 +80,8 @@ public ArrayList getColLabels() { return colLabels; } - public List getViews() { - return views; + public List getViewDdlSqls() { + return viewDdlSqls; } @Override @@ -98,8 +97,8 @@ public void setColLabels(ArrayList colLabels) { this.colLabels = colLabels; } - public void setViews(List views) { - this.views = views; + public void setViewDdlSqls(List viewDdlSqls) { + this.viewDdlSqls = viewDdlSqls; } public StatementContext getStatementContext() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java index f7cb4e7cdb35cf..0a5e5d2f4e41e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java @@ -63,7 +63,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; import java.util.List; import java.util.Optional; @@ -220,16 +219,14 @@ private LogicalPlan getLogicalPlan(TableIf table, UnboundRelation unboundRelatio case OLAP: return makeOlapScan(table, unboundRelation, tableQualifier); case VIEW: - cascadesContext.getStatementContext().addView((View) table); Plan viewPlan = parseAndAnalyzeView(((View) table).getDdlSql(), cascadesContext); return new LogicalSubQueryAlias<>(tableQualifier, viewPlan); case HMS_EXTERNAL_TABLE: - if (Config.enable_query_hive_views) { - if (((HMSExternalTable) table).isView() - && StringUtils.isNotEmpty(((HMSExternalTable) table).getViewText())) { - Plan hiveViewPlan = parseAndAnalyzeHiveView(table, cascadesContext); - return new LogicalSubQueryAlias<>(tableQualifier, hiveViewPlan); - } + if (Config.enable_query_hive_views && ((HMSExternalTable) table).isView()) { + String hiveCatalog = ((HMSExternalTable) table).getCatalog().getName(); + String ddlSql = ((HMSExternalTable) table).getViewText(); + Plan hiveViewPlan = parseAndAnalyzeHiveView(hiveCatalog, ddlSql, cascadesContext); + return new LogicalSubQueryAlias<>(tableQualifier, hiveViewPlan); } return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table, tableQualifier); case ICEBERG_EXTERNAL_TABLE: @@ -248,20 +245,20 @@ private LogicalPlan getLogicalPlan(TableIf table, UnboundRelation unboundRelatio } } - private Plan parseAndAnalyzeHiveView(TableIf table, CascadesContext cascadesContext) { - HMSExternalTable hiveTable = (HMSExternalTable) table; + private Plan parseAndAnalyzeHiveView(String hiveCatalog, String ddlSql, CascadesContext cascadesContext) { ConnectContext ctx = cascadesContext.getConnectContext(); String previousCatalog = ctx.getCurrentCatalog().getName(); String previousDb = ctx.getDatabase(); - ctx.changeDefaultCatalog(hiveTable.getCatalog().getName()); - Plan hiveViewPlan = parseAndAnalyzeView(hiveTable.getViewText(), cascadesContext); + ctx.changeDefaultCatalog(hiveCatalog); + Plan hiveViewPlan = parseAndAnalyzeView(ddlSql, cascadesContext); ctx.changeDefaultCatalog(previousCatalog); ctx.setDatabase(previousDb); return hiveViewPlan; } - private Plan parseAndAnalyzeView(String viewSql, CascadesContext parentContext) { - LogicalPlan parsedViewPlan = new NereidsParser().parseSingle(viewSql); + private Plan parseAndAnalyzeView(String ddlSql, CascadesContext parentContext) { + parentContext.getStatementContext().addViewDdlSql(ddlSql); + LogicalPlan parsedViewPlan = new NereidsParser().parseSingle(ddlSql); // TODO: use a good to do this, such as eliminate UnboundResultSink if (parsedViewPlan instanceof UnboundResultSink) { parsedViewPlan = (LogicalPlan) ((UnboundResultSink) parsedViewPlan).child(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java index 8e2c8ed3a4521e..fb214df5a6d843 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java @@ -270,4 +270,8 @@ protected int getBlockIndex(BlockLocation[] blkLocations, long offset) { long fileLength = last.getOffset() + last.getLength() - 1L; throw new IllegalArgumentException(String.format("Offset %d is outside of file (0..%d)", offset, fileLength)); } + + public long getReadPartitionNum() { + return this.readPartitionNum; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java index ab15096f0cc262..7ce85a5c25886c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java @@ -34,6 +34,7 @@ import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.View; import org.apache.doris.common.Config; import org.apache.doris.common.Status; @@ -43,12 +44,14 @@ import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.Planner; import org.apache.doris.planner.ScanNode; +import org.apache.doris.planner.external.HiveScanNode; import org.apache.doris.proto.InternalService; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.RowBatch; import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.Lists; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -59,7 +62,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; /** * Analyze which caching mode a SQL is suitable for @@ -89,7 +91,6 @@ public enum CacheMode { private StatementBase parsedStmt; private SelectStmt selectStmt; private List scanNodes; - private OlapTable olapTable; private RangePartitionInfo partitionInfo; private Column partColumn; private CompoundPredicate partitionPredicate; @@ -137,7 +138,7 @@ public CacheMode getCacheMode() { } public class CacheTable implements Comparable { - public OlapTable olapTable; + public TableIf table; public long latestPartitionId; public long latestVersion; public long latestTime; @@ -145,7 +146,7 @@ public class CacheTable implements Comparable { public long sumOfPartitionNum; public CacheTable() { - olapTable = null; + table = null; latestPartitionId = 0; latestVersion = 0; latestTime = 0; @@ -160,7 +161,7 @@ public int compareTo(CacheTable table) { public void debug() { LOG.debug("table {}, partition id {}, ver {}, time {}, partition num {}, sumOfPartitionNum: {}", - olapTable.getName(), latestPartitionId, latestVersion, latestTime, partitionNum, sumOfPartitionNum); + table.getName(), latestPartitionId, latestVersion, latestTime, partitionNum, sumOfPartitionNum); } } @@ -207,28 +208,12 @@ private CacheMode innerCheckCacheMode(long now) { LOG.debug("not a select stmt or no scan node. queryid {}", DebugUtil.printId(queryId)); return CacheMode.NoNeed; } - MetricRepo.COUNTER_QUERY_TABLE.increase(1L); - this.selectStmt = (SelectStmt) parsedStmt; - //Check the last version time of the table - List tblTimeList = Lists.newArrayList(); - for (int i = 0; i < scanNodes.size(); i++) { - ScanNode node = scanNodes.get(i); - if (!(node instanceof OlapScanNode)) { - LOG.debug("query contains non-olap table. queryid {}", DebugUtil.printId(queryId)); - return CacheMode.None; - } - if (enablePartitionCache() && ((OlapScanNode) node).getSelectedPartitionNum() > 1 - && selectStmt.hasGroupByClause()) { - LOG.debug("more than one partition scanned when query has agg, partition cache cannot use, queryid {}", - DebugUtil.printId(queryId)); - return CacheMode.None; - } - CacheTable cTable = getSelectedPartitionLastUpdateTime((OlapScanNode) node); - tblTimeList.add(cTable); + + List tblTimeList = buildCacheTableList(); + if (CollectionUtils.isEmpty(tblTimeList)) { + return CacheMode.None; } - MetricRepo.COUNTER_QUERY_OLAP_TABLE.increase(1L); - Collections.sort(tblTimeList); latestTable = tblTimeList.get(0); latestTable.sumOfPartitionNum = tblTimeList.stream().mapToLong(item -> item.partitionNum).sum(); latestTable.debug(); @@ -251,6 +236,11 @@ private CacheMode innerCheckCacheMode(long now) { return CacheMode.Sql; } + // TODO:wxy support partition cache for hive table later + if (!(latestTable.table instanceof OlapTable)) { + LOG.debug("only support partition cache for olap table now. queryid {}", DebugUtil.printId(queryId)); + return CacheMode.None; + } if (!enablePartitionCache()) { LOG.debug("partition query cache is disabled. queryid {}", DebugUtil.printId(queryId)); return CacheMode.None; @@ -265,7 +255,7 @@ private CacheMode innerCheckCacheMode(long now) { return CacheMode.None; } } - olapTable = latestTable.olapTable; + OlapTable olapTable = (OlapTable) latestTable.table; if (olapTable.getPartitionInfo().getType() != PartitionType.RANGE) { LOG.debug("the partition of OlapTable not RANGE type, queryid {}", DebugUtil.printId(queryId)); return CacheMode.None; @@ -315,23 +305,12 @@ private CacheMode innerCheckCacheModeSetOperation(long now) { } return CacheMode.NoNeed; } - MetricRepo.COUNTER_QUERY_TABLE.increase(1L); //Check the last version time of the table - List tblTimeList = Lists.newArrayList(); - for (int i = 0; i < scanNodes.size(); i++) { - ScanNode node = scanNodes.get(i); - if (!(node instanceof OlapScanNode)) { - if (LOG.isDebugEnabled()) { - LOG.debug("query contains non-olap table. queryid {}", DebugUtil.printId(queryId)); - } - return CacheMode.None; - } - CacheTable cTable = getSelectedPartitionLastUpdateTime((OlapScanNode) node); - tblTimeList.add(cTable); + List tblTimeList = buildCacheTableList(); + if (CollectionUtils.isEmpty(tblTimeList)) { + return CacheMode.None; } - MetricRepo.COUNTER_QUERY_OLAP_TABLE.increase(1L); - Collections.sort(tblTimeList); latestTable = tblTimeList.get(0); latestTable.sumOfPartitionNum = tblTimeList.stream().mapToLong(item -> item.partitionNum).sum(); latestTable.debug(); @@ -370,23 +349,12 @@ private CacheMode innerCheckCacheModeForNereids(long now) { } return CacheMode.NoNeed; } - MetricRepo.COUNTER_QUERY_TABLE.increase(1L); //Check the last version time of the table - List tblTimeList = Lists.newArrayList(); - for (int i = 0; i < scanNodes.size(); i++) { - ScanNode node = scanNodes.get(i); - if (!(node instanceof OlapScanNode)) { - if (LOG.isDebugEnabled()) { - LOG.debug("query contains non-olap table. queryid {}", DebugUtil.printId(queryId)); - } - return CacheMode.None; - } - CacheTable cTable = getSelectedPartitionLastUpdateTime((OlapScanNode) node); - tblTimeList.add(cTable); + List tblTimeList = buildCacheTableList(); + if (CollectionUtils.isEmpty(tblTimeList)) { + return CacheMode.None; } - MetricRepo.COUNTER_QUERY_OLAP_TABLE.increase(1L); - Collections.sort(tblTimeList); latestTable = tblTimeList.get(0); latestTable.sumOfPartitionNum = tblTimeList.stream().mapToLong(item -> item.partitionNum).sum(); latestTable.debug(); @@ -395,8 +363,7 @@ private CacheMode innerCheckCacheModeForNereids(long now) { return CacheMode.NoNeed; } - allViewStmtSet.addAll(((LogicalPlanAdapter) parsedStmt).getViews() - .stream().map(view -> view.getDdlSql()).collect(Collectors.toSet())); + allViewStmtSet.addAll(((LogicalPlanAdapter) parsedStmt).getViewDdlSqls()); String allViewExpandStmtListStr = StringUtils.join(allViewStmtSet, "|"); if (now == 0) { @@ -417,6 +384,45 @@ private CacheMode innerCheckCacheModeForNereids(long now) { return CacheMode.None; } + private List buildCacheTableList() { + //Check the last version time of the table + MetricRepo.COUNTER_QUERY_TABLE.increase(1L); + long olapScanNodeSize = scanNodes.stream().filter(node -> node instanceof OlapScanNode).count(); + long hiveScanNodeSize = scanNodes.stream().filter(node -> node instanceof HiveScanNode).count(); + if (olapScanNodeSize > 0) { + MetricRepo.COUNTER_QUERY_OLAP_TABLE.increase(1L); + } + if (hiveScanNodeSize > 0) { + MetricRepo.COUNTER_QUERY_HIVE_TABLE.increase(1L); + } + + if (!(olapScanNodeSize == scanNodes.size() || hiveScanNodeSize == scanNodes.size())) { + LOG.debug("only support olap/hive table with non-federated query, other types are not supported now, " + + "queryId {}", DebugUtil.printId(queryId)); + return Collections.emptyList(); + } + + List tblTimeList = Lists.newArrayList(); + for (int i = 0; i < scanNodes.size(); i++) { + ScanNode node = scanNodes.get(i); + if (enablePartitionCache() + && (node instanceof OlapScanNode) + && ((OlapScanNode) node).getSelectedPartitionNum() > 1 + && selectStmt != null + && selectStmt.hasGroupByClause()) { + LOG.debug("more than one partition scanned when qeury has agg, partition cache cannot use, queryid {}", + DebugUtil.printId(queryId)); + return Collections.emptyList(); + } + CacheTable cTable = node instanceof OlapScanNode + ? buildCacheTableForOlapScanNode((OlapScanNode) node) + : buildCacheTableForHiveScanNode((HiveScanNode) node); + tblTimeList.add(cTable); + } + Collections.sort(tblTimeList); + return tblTimeList; + } + public InternalService.PFetchCacheResult getCacheData() { if (parsedStmt instanceof LogicalPlanAdapter) { cacheMode = innerCheckCacheModeForNereids(0); @@ -579,11 +585,11 @@ private void getAggInfoList(SelectStmt stmt, List aggInfoList) { } } - private CacheTable getSelectedPartitionLastUpdateTime(OlapScanNode node) { + private CacheTable buildCacheTableForOlapScanNode(OlapScanNode node) { CacheTable cacheTable = new CacheTable(); OlapTable olapTable = node.getOlapTable(); - cacheTable.olapTable = olapTable; cacheTable.partitionNum = node.getSelectedPartitionIds().size(); + cacheTable.table = olapTable; for (Long partitionId : node.getSelectedPartitionIds()) { Partition partition = olapTable.getPartition(partitionId); if (partition.getVisibleVersionTime() >= cacheTable.latestTime) { @@ -595,6 +601,14 @@ private CacheTable getSelectedPartitionLastUpdateTime(OlapScanNode node) { return cacheTable; } + private CacheTable buildCacheTableForHiveScanNode(HiveScanNode node) { + CacheTable cacheTable = new CacheTable(); + cacheTable.table = node.getTargetTable(); + cacheTable.partitionNum = node.getReadPartitionNum(); + cacheTable.latestTime = cacheTable.table.getLastUpdateTime(); + return cacheTable; + } + private void addAllViewStmt(List tblRefs) { for (TableRef tblRef : tblRefs) { if (tblRef instanceof InlineViewRef) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java index d65d67fdbd2f5b..0aedef7fc60136 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java @@ -72,7 +72,7 @@ public PartitionCache(TUniqueId queryId, SelectStmt selectStmt) { public void setCacheInfo(CacheAnalyzer.CacheTable latestTable, RangePartitionInfo partitionInfo, Column partColumn, CompoundPredicate partitionPredicate, String allViewExpandStmtListStr) { this.latestTable = latestTable; - this.olapTable = latestTable.olapTable; + this.olapTable = (OlapTable) latestTable.table; this.partitionInfo = partitionInfo; this.partColumn = partColumn; this.partitionPredicate = partitionPredicate; diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/datasets/tpch/AnalyzeCheckTestBase.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/datasets/tpch/AnalyzeCheckTestBase.java index fb26d9670b632f..7f3fa892bfaf59 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/datasets/tpch/AnalyzeCheckTestBase.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/datasets/tpch/AnalyzeCheckTestBase.java @@ -36,7 +36,7 @@ public void runBeforeEach() throws Exception { } protected void checkAnalyze(String sql) { - LogicalPlan analyzed = analyze(sql); + LogicalPlan analyzed = analyzeAndGetLogicalPlanByNereids(sql); Assertions.assertTrue(checkBound(analyzed)); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java new file mode 100644 index 00000000000000..bb8bc4c1be52b1 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java @@ -0,0 +1,406 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.analysis.CreateCatalogStmt; +import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.analysis.StatementBase; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.TupleId; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.external.HMSExternalDatabase; +import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.catalog.external.HMSExternalTable.DLAType; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.datasource.CatalogMgr; +import org.apache.doris.datasource.HMSExternalCatalog; +import org.apache.doris.nereids.datasets.tpch.AnalyzeCheckTestBase; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.planner.ScanNode; +import org.apache.doris.planner.external.HiveScanNode; +import org.apache.doris.qe.cache.CacheAnalyzer; +import org.apache.doris.qe.cache.SqlCache; + +import com.google.common.collect.Lists; +import mockit.Expectations; +import mockit.Mocked; +import org.junit.Assert; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; + + +public class HmsQueryCacheTest extends AnalyzeCheckTestBase { + private static final String HMS_CATALOG = "hms_ctl"; + private static final long NOW = System.currentTimeMillis(); + private Env env; + private CatalogMgr mgr; + private OlapScanNode olapScanNode; + + @Mocked + private HMSExternalTable tbl; + @Mocked + private HMSExternalTable view1; + @Mocked + private HMSExternalTable view2; + @Mocked + private HiveScanNode hiveScanNode1; + @Mocked + private HiveScanNode hiveScanNode2; + @Mocked + private HiveScanNode hiveScanNode3; + + @Override + protected void runBeforeAll() throws Exception { + FeConstants.runningUnitTest = true; + Config.enable_query_hive_views = true; + Config.cache_enable_sql_mode = true; + Config.cache_enable_partition_mode = true; + connectContext.getSessionVariable().setEnableSqlCache(true); + + env = Env.getCurrentEnv(); + connectContext.setEnv(env); + mgr = env.getCatalogMgr(); + + // create hms catalog + CreateCatalogStmt hmsCatalogStmt = (CreateCatalogStmt) parseAndAnalyzeStmt( + "create catalog hms_ctl properties('type' = 'hms', 'hive.metastore.uris' = 'thrift://192.168.0.1:9083');", + connectContext); + mgr.createCatalog(hmsCatalogStmt); + + // create inner db and tbl for test + CreateDbStmt createDbStmt = (CreateDbStmt) parseAndAnalyzeStmt("create database test", connectContext); + mgr.getInternalCatalog().createDb(createDbStmt); + + CreateTableStmt createTableStmt = (CreateTableStmt) parseAndAnalyzeStmt("create table test.tbl1(\n" + + "k1 int comment 'test column k1', " + + "k2 int comment 'test column k2') comment 'test table1' " + + "distributed by hash(k1) buckets 1\n" + + "properties(\"replication_num\" = \"1\");"); + mgr.getInternalCatalog().createTable(createTableStmt); + } + + private void init(HMSExternalCatalog hmsCatalog) { + Deencapsulation.setField(hmsCatalog, "initialized", true); + Deencapsulation.setField(hmsCatalog, "objectCreated", true); + + List schema = Lists.newArrayList(); + schema.add(new Column("k1", PrimitiveType.INT)); + + HMSExternalDatabase db = new HMSExternalDatabase(hmsCatalog, 10000, "hms_db"); + Deencapsulation.setField(db, "initialized", true); + + Deencapsulation.setField(tbl, "objectCreated", true); + Deencapsulation.setField(tbl, "rwLock", new ReentrantReadWriteLock(true)); + new Expectations(tbl) { + { + tbl.getId(); + minTimes = 0; + result = 10001; + + tbl.getName(); + minTimes = 0; + result = "hms_tbl"; + + tbl.getDbName(); + minTimes = 0; + result = "hms_db"; + + tbl.getFullSchema(); + minTimes = 0; + result = schema; + + tbl.isSupportedHmsTable(); + minTimes = 0; + result = true; + + tbl.isView(); + minTimes = 0; + result = false; + + tbl.getType(); + minTimes = 0; + result = TableIf.TableType.HMS_EXTERNAL_TABLE; + + tbl.getDlaType(); + minTimes = 0; + result = DLAType.HIVE; + + tbl.getLastUpdateTime(); + minTimes = 0; + result = NOW; + } + }; + + Deencapsulation.setField(view1, "objectCreated", true); + Deencapsulation.setField(view1, "rwLock", new ReentrantReadWriteLock(true)); + + new Expectations(view1) { + { + view1.getId(); + minTimes = 0; + result = 10002; + + view1.getName(); + minTimes = 0; + result = "hms_view1"; + + view1.getDbName(); + minTimes = 0; + result = "hms_db"; + + view1.isView(); + minTimes = 0; + result = true; + + view1.getCatalog(); + minTimes = 0; + result = hmsCatalog; + + view1.getType(); + minTimes = 0; + result = TableIf.TableType.HMS_EXTERNAL_TABLE; + + view1.getFullSchema(); + minTimes = 0; + result = schema; + + view1.getViewText(); + minTimes = 0; + result = "SELECT * FROM hms_db.hms_tbl"; + + view1.isSupportedHmsTable(); + minTimes = 0; + result = true; + + view1.getDlaType(); + minTimes = 0; + result = DLAType.HIVE; + + view1.getLastUpdateTime(); + minTimes = 0; + result = NOW; + } + }; + + Deencapsulation.setField(view2, "objectCreated", true); + Deencapsulation.setField(view2, "rwLock", new ReentrantReadWriteLock(true)); + new Expectations(view2) { + { + view2.getId(); + minTimes = 0; + result = 10003; + + view2.getName(); + minTimes = 0; + result = "hms_view2"; + + view2.getDbName(); + minTimes = 0; + result = "hms_db"; + + view2.isView(); + minTimes = 0; + result = true; + + view2.getCatalog(); + minTimes = 0; + result = hmsCatalog; + + view2.getType(); + minTimes = 0; + result = TableIf.TableType.HMS_EXTERNAL_TABLE; + + view2.getFullSchema(); + minTimes = 0; + result = schema; + + view2.getViewText(); + minTimes = 0; + result = "SELECT * FROM hms_db.hms_view1"; + + view2.isSupportedHmsTable(); + minTimes = 0; + result = true; + + view2.getDlaType(); + minTimes = 0; + result = DLAType.HIVE; + + view2.getLastUpdateTime(); + minTimes = 0; + result = NOW; + } + }; + + db.addTableForTest(tbl); + db.addTableForTest(view1); + db.addTableForTest(view2); + hmsCatalog.addDatabaseForTest(db); + + new Expectations(hiveScanNode1) { + { + hiveScanNode1.getTargetTable(); + minTimes = 0; + result = tbl; + } + }; + + new Expectations(hiveScanNode2) { + { + hiveScanNode2.getTargetTable(); + minTimes = 0; + result = view1; + } + }; + + new Expectations(hiveScanNode3) { + { + hiveScanNode3.getTargetTable(); + minTimes = 0; + result = view2; + } + }; + + TupleDescriptor desc = new TupleDescriptor(new TupleId(1)); + desc.setTable(mgr.getInternalCatalog().getDbNullable("default_cluster:test").getTableNullable("tbl1")); + olapScanNode = new OlapScanNode(new PlanNodeId(1), desc, "tb1ScanNode"); + } + + @Test + public void testHitSqlCache() throws Exception { + init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG)); + StatementBase parseStmt = parseAndAnalyzeStmt("select * from hms_ctl.hms_db.hms_tbl", connectContext); + List scanNodes = Arrays.asList(hiveScanNode1); + CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); + ca.checkCacheMode(NOW + Config.cache_last_version_interval_second * 1000L * 2); + Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + } + + @Test + public void testHitSqlCacheByNereids() throws Exception { + init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG)); + StatementBase parseStmt = analyzeAndGetStmtByNereids("select * from hms_ctl.hms_db.hms_tbl", connectContext); + List scanNodes = Arrays.asList(hiveScanNode1); + CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); + ca.checkCacheModeForNereids(NOW + Config.cache_last_version_interval_second * 1000L * 2); + Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + } + + @Test + public void testHitSqlCacheWithHiveView() throws Exception { + init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG)); + StatementBase parseStmt = parseAndAnalyzeStmt("select * from hms_ctl.hms_db.hms_view1", connectContext); + List scanNodes = Arrays.asList(hiveScanNode2); + CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); + ca.checkCacheMode(NOW + Config.cache_last_version_interval_second * 1000L * 2); + Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + } + + @Test + public void testHitSqlCacheWithHiveViewByNereids() throws Exception { + init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG)); + StatementBase parseStmt = analyzeAndGetStmtByNereids("select * from hms_ctl.hms_db.hms_view1", connectContext); + List scanNodes = Arrays.asList(hiveScanNode2); + CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); + ca.checkCacheModeForNereids(NOW + Config.cache_last_version_interval_second * 1000L * 2); + Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + } + + @Test + public void testHitSqlCacheWithNestedHiveView() throws Exception { + init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG)); + StatementBase parseStmt = parseAndAnalyzeStmt("select * from hms_ctl.hms_db.hms_view2", connectContext); + List scanNodes = Arrays.asList(hiveScanNode3); + CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); + ca.checkCacheMode(NOW + Config.cache_last_version_interval_second * 1000L * 2); + Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + SqlCache sqlCache = (SqlCache) ca.getCache(); + String cacheKey = sqlCache.getSqlWithViewStmt(); + Assert.assertEquals(cacheKey, "SELECT `hms_ctl`.`default_cluster:hms_db`.`hms_view2`.`k1` AS `k1` " + + "FROM `hms_ctl`.`default_cluster:hms_db`.`hms_view2`" + + "|SELECT * FROM hms_db.hms_tbl|SELECT * FROM hms_db.hms_view1"); + } + + @Test + public void testHitSqlCacheWithNestedHiveViewByNereids() throws Exception { + init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG)); + StatementBase parseStmt = analyzeAndGetStmtByNereids("select * from hms_ctl.hms_db.hms_view2", connectContext); + List scanNodes = Arrays.asList(hiveScanNode3); + CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); + ca.checkCacheModeForNereids(NOW + Config.cache_last_version_interval_second * 1000L * 2); + Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql); + SqlCache sqlCache = (SqlCache) ca.getCache(); + String cacheKey = sqlCache.getSqlWithViewStmt(); + Assert.assertEquals(cacheKey, "select * from hms_ctl.hms_db.hms_view2" + + "|SELECT * FROM hms_db.hms_tbl|SELECT * FROM hms_db.hms_view1"); + } + + @Test + public void testNotHitSqlCache() throws Exception { + init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG)); + StatementBase parseStmt = parseAndAnalyzeStmt("select * from hms_ctl.hms_db.hms_tbl", connectContext); + List scanNodes = Arrays.asList(hiveScanNode1); + CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); + ca.checkCacheMode(NOW); + Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.None); + } + + @Test + public void testNotHitSqlCacheByNereids() throws Exception { + init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG)); + StatementBase parseStmt = analyzeAndGetStmtByNereids("select * from hms_ctl.hms_db.hms_tbl", connectContext); + List scanNodes = Arrays.asList(hiveScanNode1); + CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); + ca.checkCacheModeForNereids(NOW); + Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.None); + } + + @Test + public void testNotHitSqlCacheWithFederatedQuery() throws Exception { + init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG)); + // cache mode is None if this query is a federated query + StatementBase parseStmt = parseAndAnalyzeStmt("select * from hms_ctl.hms_db.hms_tbl " + + "inner join internal.test.tbl1", connectContext); + List scanNodes = Arrays.asList(hiveScanNode1, olapScanNode); + CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); + ca.checkCacheMode(NOW + Config.cache_last_version_interval_second * 1000L * 2); + Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.None); + } + + @Test + public void testNotHitSqlCacheWithFederatedQueryByNereids() throws Exception { + init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG)); + // cache mode is None if this query is a federated query + StatementBase parseStmt = analyzeAndGetStmtByNereids("select * from hms_ctl.hms_db.hms_tbl " + + "inner join internal.test.tbl1", connectContext); + List scanNodes = Arrays.asList(hiveScanNode1, olapScanNode); + CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes); + ca.checkCacheModeForNereids(NOW + Config.cache_last_version_interval_second * 1000L * 2); + Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.None); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java similarity index 99% rename from fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java rename to fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java index 52ee9e009f5873..a0a3c9565212a7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java @@ -100,8 +100,8 @@ import java.util.List; import java.util.function.Function; -public class PartitionCacheTest { - private static final Logger LOG = LogManager.getLogger(PartitionCacheTest.class); +public class OlapQueryCacheTest { + private static final Logger LOG = LogManager.getLogger(OlapQueryCacheTest.class); public static String clusterName = "testCluster"; public static String dbName = "testDb"; public static String fullDbName = "testCluster:testDb"; diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index f14966de5a6482..0be332492bdca2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -58,6 +58,7 @@ import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; @@ -183,6 +184,12 @@ protected StatementContext createStatementCtx(String sql) { return statementContext; } + protected StatementContext createStatementCtx(String sql, ConnectContext ctx) { + StatementContext statementContext = new StatementContext(ctx, new OriginStatement(sql, 0)); + ctx.setStatementContext(statementContext); + return statementContext; + } + protected T createStmt(String showSql) throws Exception { return (T) parseAndAnalyzeStmt(showSql, connectContext); @@ -193,7 +200,12 @@ protected CascadesContext createCascadesContext(String sql) { return MemoTestUtils.createCascadesContext(statementCtx, sql); } - public LogicalPlan analyze(String sql) { + protected CascadesContext createCascadesContext(String sql, ConnectContext ctx) { + StatementContext statementCtx = createStatementCtx(sql, ctx); + return MemoTestUtils.createCascadesContext(statementCtx, sql); + } + + public LogicalPlan analyzeAndGetLogicalPlanByNereids(String sql) { Set originDisableRules = connectContext.getSessionVariable().getDisableNereidsRuleNames(); Set disableRuleWithAuth = Sets.newHashSet(originDisableRules); disableRuleWithAuth.add(RuleType.RELATION_AUTHENTICATION.name()); @@ -205,6 +217,40 @@ public LogicalPlan analyze(String sql) { return (LogicalPlan) cascadesContext.getRewritePlan(); } + public LogicalPlan analyzeAndGetLogicalPlanByNereids(String sql, ConnectContext ctx) { + Set originDisableRules = ctx.getSessionVariable().getDisableNereidsRuleNames(); + Set disableRuleWithAuth = Sets.newHashSet(originDisableRules); + disableRuleWithAuth.add(RuleType.RELATION_AUTHENTICATION.name()); + ctx.getSessionVariable().setDisableNereidsRules(String.join(",", disableRuleWithAuth)); + CascadesContext cascadesContext = createCascadesContext(sql, ctx); + cascadesContext.newAnalyzer().analyze(); + ctx.getSessionVariable().setDisableNereidsRules(String.join(",", originDisableRules)); + cascadesContext.toMemo(); + return (LogicalPlan) cascadesContext.getRewritePlan(); + } + + // Parse an origin stmt and analyze it by nereids. Return a StatementBase instance. + public StatementBase analyzeAndGetStmtByNereids(String sql) { + return analyzeAndGetStmtByNereids(sql, connectContext); + } + + // Parse an origin stmt and analyze it by nereids. Return a StatementBase instance. + public StatementBase analyzeAndGetStmtByNereids(String sql, ConnectContext ctx) { + Set originDisableRules = ctx.getSessionVariable().getDisableNereidsRuleNames(); + Set disableRuleWithAuth = Sets.newHashSet(originDisableRules); + disableRuleWithAuth.add(RuleType.RELATION_AUTHENTICATION.name()); + ctx.getSessionVariable().setDisableNereidsRules(String.join(",", disableRuleWithAuth)); + CascadesContext cascadesContext = createCascadesContext(sql, ctx); + cascadesContext.newAnalyzer().analyze(); + ctx.getSessionVariable().setDisableNereidsRules(String.join(",", originDisableRules)); + cascadesContext.toMemo(); + LogicalPlan plan = (LogicalPlan) cascadesContext.getRewritePlan(); + LogicalPlanAdapter adapter = new LogicalPlanAdapter(plan, cascadesContext.getStatementContext()); + adapter.setViewDdlSqls(cascadesContext.getStatementContext().getViewDdlSqls()); + cascadesContext.getStatementContext().setParsedStatement(adapter); + return adapter; + } + protected ConnectContext createCtx(UserIdentity user, String host) throws IOException { ConnectContext ctx = new ConnectContext(); ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER);