diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index dcbe772dbe5cd..23973b26a265d 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -615,7 +615,9 @@ Below is a list of all the keywords in Spark SQL. |MATCHED|non-reserved|non-reserved|non-reserved| |MATERIALIZED|non-reserved|non-reserved|non-reserved| |MAX|non-reserved|non-reserved|non-reserved| +|MEASURE|non-reserved|non-reserved|non-reserved| |MERGE|non-reserved|non-reserved|non-reserved| +|METRICS|non-reserved|non-reserved|non-reserved| |MICROSECOND|non-reserved|non-reserved|non-reserved| |MICROSECONDS|non-reserved|non-reserved|non-reserved| |MILLISECOND|non-reserved|non-reserved|non-reserved| diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 6168ad4cf0836..041fe78d5e547 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -367,6 +367,7 @@ object SparkBuild extends PomBuild { "org.apache.spark.kafka010", "org.apache.spark.network", "org.apache.spark.sql.avro", + "org.apache.spark.sql.metricview", "org.apache.spark.sql.pipelines", "org.apache.spark.sql.scripting", "org.apache.spark.types.variant", @@ -1532,6 +1533,7 @@ object Unidoc { .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/classic/"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/execution"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/internal"))) + .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/metricview"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/pipelines"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/scripting"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/ml"))) diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index 400461d2d4974..f12e033e5c325 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -16,6 +16,11 @@ lexer grammar SqlBaseLexer; +@header { +import java.util.ArrayDeque; +import java.util.Deque; +} + @members { /** * When true, parser should throw ParseException for unclosed bracketed comment. @@ -70,6 +75,11 @@ lexer grammar SqlBaseLexer; has_unclosed_bracketed_comment = true; } + /** + * This field stores the tags which are used to detect the end of a dollar quoted string literal. + */ + private final Deque tags = new ArrayDeque(); + /** * When greater than zero, it's in the middle of parsing ARRAY/MAP/STRUCT type. */ @@ -324,7 +334,9 @@ MAP: 'MAP' {incComplexTypeLevelCounter();}; MATCHED: 'MATCHED'; MATERIALIZED: 'MATERIALIZED'; MAX: 'MAX'; +MEASURE: 'MEASURE'; MERGE: 'MERGE'; +METRICS: 'METRICS'; MICROSECOND: 'MICROSECOND'; MICROSECONDS: 'MICROSECONDS'; MILLISECOND: 'MILLISECOND'; @@ -557,6 +569,10 @@ STRING_LITERAL | 'R"'(~'"')* '"' ; +BEGIN_DOLLAR_QUOTED_STRING + : DOLLAR_QUOTED_TAG {tags.push(getText());} -> pushMode(DOLLAR_QUOTED_STRING_MODE) + ; + DOUBLEQUOTED_STRING :'"' ( ~('"'|'\\') | '""' | ('\\' .) )* '"' ; @@ -634,6 +650,10 @@ fragment LETTER : [A-Z] ; +fragment DOLLAR_QUOTED_TAG + : '$' LETTER* '$' + ; + fragment UNICODE_LETTER : [\p{L}] ; @@ -656,3 +676,13 @@ WS UNRECOGNIZED : . ; + +mode DOLLAR_QUOTED_STRING_MODE; +DOLLAR_QUOTED_STRING_BODY + : ~'$'+ + | '$' ~'$'* + ; + +END_DOLLAR_QUOTED_STRING + : DOLLAR_QUOTED_TAG {getText().equals(tags.peek())}? {tags.pop();} -> popMode + ; diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 132ced820e9a9..9b7eaece945b9 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -323,6 +323,14 @@ statement (PARTITIONED ON identifierList) | (TBLPROPERTIES propertyList))* AS query #createView + | CREATE (OR REPLACE)? + VIEW (IF errorCapturingNot EXISTS)? identifierReference + identifierCommentList? + ((WITH METRICS) | + routineLanguage | + commentSpec | + (TBLPROPERTIES propertyList))* + AS codeLiteral #createMetricView | CREATE (OR REPLACE)? GLOBAL? TEMPORARY VIEW tableIdentifier (LEFT_PAREN colTypeList RIGHT_PAREN)? tableProvider (OPTIONS propertyList)? #createTempViewUsing @@ -1523,6 +1531,17 @@ complexColType : errorCapturingIdentifier COLON? dataType (errorCapturingNot NULL)? commentSpec? ; +// The code literal is defined as a dollar quoted string. +// A dollar quoted string consists of +// - a begin tag which contains a dollar sign, an optional tag, and another dollar sign, +// - a string literal that is made up of arbitrary sequence of characters, and +// - an end tag which has to be exact the same as the begin tag. +// As the string literal can contain dollar signs, we add + to DOLLAR_QUOTED_STRING_BODY to avoid +// the parser eagarly matching END_DOLLAR_QUOTED_STRING when seeing a dollar sign. +codeLiteral + : BEGIN_DOLLAR_QUOTED_STRING DOLLAR_QUOTED_STRING_BODY+ END_DOLLAR_QUOTED_STRING + ; + routineCharacteristics : (routineLanguage | specificName @@ -1997,7 +2016,9 @@ ansiNonReserved | MATCHED | MATERIALIZED | MAX + | MEASURE | MERGE + | METRICS | MICROSECOND | MICROSECONDS | MILLISECOND @@ -2387,7 +2408,9 @@ nonReserved | MATCHED | MATERIALIZED | MAX + | MEASURE | MERGE + | METRICS | MICROSECOND | MICROSECONDS | MILLISECOND diff --git a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala index 553161ea2db0a..696ef78a1a972 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala @@ -618,7 +618,7 @@ private[sql] object QueryParsingErrors extends DataTypeErrorsBase { ctx) } - def createViewWithBothIfNotExistsAndReplaceError(ctx: CreateViewContext): Throwable = { + def createViewWithBothIfNotExistsAndReplaceError(ctx: ParserRuleContext): Throwable = { new ParseException(errorClass = "_LEGACY_ERROR_TEMP_0052", ctx) } @@ -774,6 +774,15 @@ private[sql] object QueryParsingErrors extends DataTypeErrorsBase { ctx) } + def missingClausesForOperation( + ctx: ParserRuleContext, + clauses: String, + operation: String): Throwable = + new ParseException( + errorClass = "MISSING_CLAUSES_FOR_OPERATION", + messageParameters = Map("clauses" -> clauses, "operation" -> operation), + ctx) + def invalidDatetimeUnitError( ctx: ParserRuleContext, functionName: String, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 199306104d858..0d5f30bd2d78c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -825,6 +825,7 @@ object FunctionRegistry { expression[ThetaDifference]("theta_difference"), expression[ThetaIntersection]("theta_intersection"), expression[ApproxTopKEstimate]("approx_top_k_estimate"), + expression[Measure]("measure"), // grouping sets expression[Grouping]("grouping"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala index ada47bd3a40cd..25273c73eb7f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, View} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.metricview.logical.ResolvedMetricView /** * This file defines view types and analysis rules related to views. @@ -32,6 +33,7 @@ import org.apache.spark.sql.catalyst.rules.Rule object EliminateView extends Rule[LogicalPlan] with CastSupport { override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case v: View => v.child + case rmv: ResolvedMetricView => rmv.child } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index be90c7ad3656c..191e2091c40d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -49,6 +49,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAM import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE +import org.apache.spark.sql.metricview.util.MetricViewPlanner import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType} import org.apache.spark.sql.util.{CaseInsensitiveStringMap, PartitioningUtils} import org.apache.spark.util.ArrayImplicits._ @@ -943,7 +944,9 @@ class SessionCatalog( val table = qualifiedIdent.table val multiParts = Seq(CatalogManager.SESSION_CATALOG_NAME, db, table) - if (metadata.tableType == CatalogTableType.VIEW) { + if (CatalogTable.isMetricView(metadata)) { + parseMetricViewDefinition(metadata) + } else if (metadata.tableType == CatalogTableType.VIEW) { // The relation is a view, so we wrap the relation by: // 1. Add a [[View]] operator over the relation to keep track of the view desc; // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view. @@ -953,6 +956,27 @@ class SessionCatalog( } } + private def parseMetricViewDefinition(metadata: CatalogTable): LogicalPlan = { + val viewDefinition = metadata.viewText.getOrElse { + throw SparkException.internalError("Invalid view without text.") + } + val viewConfigs = metadata.viewSQLConfigs + val origin = CurrentOrigin.get.copy( + objectType = Some("METRIC VIEW"), + objectName = Some(metadata.qualifiedName) + ) + SQLConf.withExistingConf( + View.effectiveSQLConf( + configs = viewConfigs, + isTempView = false + ) + ) { + CurrentOrigin.withOrigin(origin) { + MetricViewPlanner.planRead(metadata, viewDefinition, parser, metadata.schema) + } + } + } + private def buildViewDDL(metadata: CatalogTable, isTempView: Boolean): Option[String] = { if (isTempView) { None diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index eab99a96f4c3e..01153d516e5cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -722,6 +722,18 @@ object CatalogTable { val VIEW_CATALOG_AND_NAMESPACE = VIEW_PREFIX + "catalogAndNamespace.numParts" val VIEW_CATALOG_AND_NAMESPACE_PART_PREFIX = VIEW_PREFIX + "catalogAndNamespace.part." + + // Property to indicate that a VIEW is actually a METRIC VIEW + val VIEW_WITH_METRICS = VIEW_PREFIX + "viewWithMetrics" + + /** + * Check if a CatalogTable is a metric view by looking at its properties. + */ + def isMetricView(table: CatalogTable): Boolean = { + table.tableType == CatalogTableType.VIEW && + table.properties.get(VIEW_WITH_METRICS).contains("true") + } + // Convert the current catalog and namespace to properties. def catalogAndNamespaceToProps( currentCatalog: String, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Measure.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Measure.scala new file mode 100644 index 0000000000000..e2d85cafa52f2 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Measure.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.analysis.FunctionRegistry +import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, UnevaluableAggregateFunc} +import org.apache.spark.sql.catalyst.trees.TreePattern.{MEASURE, TreePattern} +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, DataType} + +// This function serves as an annotation to tell the analyzer to calculate +// the measures defined in metric views. It cannot be evaluated in execution phase +// and instead it'll be replaced to the actual aggregate functions defined by +// the measure (as input argument). +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(expr) - this function is used and can only be used to calculate a measure defined in a metric view.", + examples = """ + Examples: + > SELECT dimension_col, _FUNC_(measure_col) + FROM test_metric_view + GROUP BY dimension_col; + dim_1, 100 + dim_2, 200 + """, + group = "agg_funcs", + since = "4.2.0") +// scalastyle:on line.size.limit +case class Measure(child: Expression) + extends UnevaluableAggregateFunc with ExpectsInputTypes + with UnaryLike[Expression] { + + override protected def withNewChildInternal(newChild: Expression): Measure = + copy(child = newChild) + + override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType) + + override def dataType: DataType = child.dataType + + override def prettyName: String = getTagValue(FunctionRegistry.FUNC_ALIAS).getOrElse("measure") + + override def nullable: Boolean = child.nullable + + override val nodePatterns: Seq[TreePattern] = Seq(MEASURE) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index f918232c42ac9..563eacab244d5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3710,6 +3710,16 @@ class AstBuilder extends DataTypeAstBuilder Literal.create(createString(ctx), StringType) } + /** + * Create a String from a dollar-quoted string literal (e.g., $$text$$). + * This is used for code literals in features like metric views where the content + * may contain special characters that would be difficult to escape in regular strings. + */ + override def visitCodeLiteral(ctx: CodeLiteralContext): String = { + assert(ctx != null) + dollarQuotedString(ctx.DOLLAR_QUOTED_STRING_BODY()) + } + /** * Create a String from a string literal context. This supports: * - Consecutive string literals: `'hello' 'world'` becomes `'helloworld'` diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala index 336db1382f898..0f7b6be765ee3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala @@ -24,7 +24,7 @@ import scala.util.matching.Regex import org.antlr.v4.runtime.{ParserRuleContext, Token} import org.antlr.v4.runtime.misc.Interval -import org.antlr.v4.runtime.tree.{ParseTree, TerminalNodeImpl} +import org.antlr.v4.runtime.tree.{ParseTree, TerminalNode, TerminalNodeImpl} import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier @@ -88,6 +88,18 @@ object ParserUtils extends SparkParserUtils { node.getText.slice(1, node.getText.length - 1) } + /** + * Obtain the string literal provided as a dollar quoted string. + * A dollar quoted string is defined as {{{$[tag]$$[tag]$}}}, + * where the string literal is parsed as a list of body sections. + * This helper method concatenates all body sections and restores the string literal back. + */ + def dollarQuotedString(sections: util.List[TerminalNode]): String = { + val sb = new StringBuilder() + sections forEach (body => sb.append(body.getText)) + sb.toString() + } + /** Collect the entries if any. */ def entry(key: String, value: Token): Seq[(String, String)] = { Option(value).toSeq.map(x => key -> string(x)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala index 5ea93e74c5d70..a650eb8ed5368 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala @@ -70,6 +70,7 @@ object TreePattern extends Enumeration { val MAP_FROM_ARRAYS: Value = Value val MAP_FROM_ENTRIES: Value = Value val MAP_OBJECTS: Value = Value + val MEASURE: Value = Value val MULTI_ALIAS: Value = Value val NEW_INSTANCE: Value = Value val NOT: Value = Value @@ -149,6 +150,7 @@ object TreePattern extends Enumeration { val LIMIT: Value = Value val LOCAL_RELATION: Value = Value val LOGICAL_QUERY_STAGE: Value = Value + val METRIC_VIEW_PLACEHOLDER: Value = Value val NATURAL_LIKE_JOIN: Value = Value val NO_GROUPING_AGGREGATE_REFERENCE: Value = Value val OFFSET: Value = Value @@ -162,6 +164,7 @@ object TreePattern extends Enumeration { val RELATION_TIME_TRAVEL: Value = Value val REPARTITION_OPERATION: Value = Value val REBALANCE_PARTITIONS: Value = Value + val RESOLVED_METRIC_VIEW: Value = Value val SERIALIZE_FROM_OBJECT: Value = Value val SORT: Value = Value val SQL_TABLE_FUNCTION: Value = Value diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/logical/metricViewNodes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/logical/metricViewNodes.scala new file mode 100644 index 0000000000000..a7fa037a4b33f --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/logical/metricViewNodes.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.metricview.logical + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode} +import org.apache.spark.sql.catalyst.trees.TreePattern.{METRIC_VIEW_PLACEHOLDER, RESOLVED_METRIC_VIEW, TreePattern} +import org.apache.spark.sql.metricview.serde.MetricView + +case class MetricViewPlaceholder( + metadata: CatalogTable, + desc: MetricView, + outputMetrics: Seq[Attribute], + child: LogicalPlan, + isCreate: Boolean = false) extends UnaryNode { + final override val nodePatterns: Seq[TreePattern] = Seq(METRIC_VIEW_PLACEHOLDER) + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = { + copy(child = newChild) + } + override def output: Seq[Attribute] = outputMetrics + override lazy val resolved: Boolean = child.resolved + override def simpleString(maxFields: Int): String = + s"$nodeName ${output.mkString("[", ", ", "]")}".trim + + override def producedAttributes: AttributeSet = AttributeSet(outputMetrics) +} + +case class ResolvedMetricView( + identifier: TableIdentifier, + child: LogicalPlan) extends UnaryNode { + override def output: scala.Seq[Attribute] = child.output + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = + copy(child = newChild) + override lazy val resolved: Boolean = child.resolved + final override val nodePatterns: Seq[TreePattern] = Seq(RESOLVED_METRIC_VIEW) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala new file mode 100644 index 0000000000000..121d908eda90b --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.metricview.util + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} +import org.apache.spark.sql.catalyst.types.DataTypeUtils +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.metricview.logical.MetricViewPlaceholder +import org.apache.spark.sql.metricview.serde.{AssetSource, MetricView, MetricViewFactory, MetricViewValidationException, MetricViewYAMLParsingException, SQLSource} +import org.apache.spark.sql.types.StructType + +object MetricViewPlanner { + + def planWrite( + metadata: CatalogTable, + yaml: String, + sqlParser: ParserInterface): MetricViewPlaceholder = { + val (metricView, dataModelPlan) = parseYAML(yaml, sqlParser) + MetricViewPlaceholder( + metadata, + metricView, + Seq.empty, + dataModelPlan, + isCreate = true + ) + } + + def planRead( + metadata: CatalogTable, + yaml: String, + sqlParser: ParserInterface, + expectedSchema: StructType): MetricViewPlaceholder = { + val (metricView, dataModelPlan) = parseYAML(yaml, sqlParser) + MetricViewPlaceholder( + metadata, + metricView, + DataTypeUtils.toAttributes(expectedSchema), + dataModelPlan + ) + } + + private def parseYAML( + yaml: String, + sqlParser: ParserInterface): (MetricView, LogicalPlan) = { + val metricView = try { + MetricViewFactory.fromYAML(yaml) + } catch { + case e: MetricViewValidationException => + throw QueryCompilationErrors.invalidLiteralForWindowDurationError() + case e: MetricViewYAMLParsingException => + throw QueryCompilationErrors.invalidLiteralForWindowDurationError() + } + val source = metricView.from match { + case asset: AssetSource => UnresolvedRelation(sqlParser.parseMultipartIdentifier(asset.name)) + case sqlSource: SQLSource => sqlParser.parsePlan(sqlSource.sql) + case _ => throw SparkException.internalError("Either SQLSource or AssetSource") + } + // Compute filter here because all necessary information is available. + val parsedPlan = metricView.where.map { cond => + Filter(sqlParser.parseExpression(cond), source) + }.getOrElse(source) + (metricView, parsedPlan) + } +} diff --git a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala index 4d66392109e70..1a8ad1b622b82 100644 --- a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala +++ b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala @@ -209,7 +209,7 @@ class SparkConnectDatabaseMetaDataSuite extends ConnectFunSuite with RemoteSpark withConnection { conn => val metadata = conn.getMetaData // scalastyle:off line.size.limit - assert(metadata.getSQLKeywords === "ADD,AFTER,AGGREGATE,ALWAYS,ANALYZE,ANTI,ANY_VALUE,ARCHIVE,ASC,BINDING,BUCKET,BUCKETS,BYTE,CACHE,CASCADE,CATALOG,CATALOGS,CHANGE,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATION,COLLECTION,COLUMNS,COMMENT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONTAINS,CONTINUE,COST,DATA,DATABASE,DATABASES,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAYOFYEAR,DAYS,DBPROPERTIES,DEFINED,DEFINER,DELAY,DELIMITED,DESC,DFS,DIRECTORIES,DIRECTORY,DISTRIBUTE,DIV,DO,ELSEIF,ENFORCED,ESCAPED,EVOLUTION,EXCHANGE,EXCLUDE,EXIT,EXPLAIN,EXPORT,EXTEND,EXTENDED,FIELDS,FILEFORMAT,FIRST,FLOW,FOLLOWING,FORMAT,FORMATTED,FOUND,FUNCTIONS,GENERATED,GEOGRAPHY,GEOMETRY,HANDLER,HOURS,IDENTIFIER,IF,IGNORE,ILIKE,IMMEDIATE,INCLUDE,INCREMENT,INDEX,INDEXES,INPATH,INPUT,INPUTFORMAT,INVOKER,ITEMS,ITERATE,JSON,KEY,KEYS,LAST,LAZY,LEAVE,LEVEL,LIMIT,LINES,LIST,LOAD,LOCATION,LOCK,LOCKS,LOGICAL,LONG,LOOP,MACRO,MAP,MATCHED,MATERIALIZED,MICROSECOND,MICROSECONDS,MILLISECOND,MILLISECONDS,MINUS,MINUTES,MONTHS,MSCK,NAME,NAMESPACE,NAMESPACES,NANOSECOND,NANOSECONDS,NORELY,NULLS,OFFSET,OPTION,OPTIONS,OUTPUTFORMAT,OVERWRITE,PARTITIONED,PARTITIONS,PERCENT,PIVOT,PLACING,PRECEDING,PRINCIPALS,PROCEDURES,PROPERTIES,PURGE,QUARTER,QUERY,RECORDREADER,RECORDWRITER,RECOVER,RECURSION,REDUCE,REFRESH,RELY,RENAME,REPAIR,REPEAT,REPEATABLE,REPLACE,RESET,RESPECT,RESTRICT,ROLE,ROLES,SCHEMA,SCHEMAS,SECONDS,SECURITY,SEMI,SEPARATED,SERDE,SERDEPROPERTIES,SETS,SHORT,SHOW,SINGLE,SKEWED,SORT,SORTED,SOURCE,STATISTICS,STORED,STRATIFY,STREAM,STREAMING,STRING,STRUCT,SUBSTR,SYNC,SYSTEM_TIME,SYSTEM_VERSION,TABLES,TARGET,TBLPROPERTIES,TERMINATED,TIMEDIFF,TIMESTAMPADD,TIMESTAMPDIFF,TIMESTAMP_LTZ,TIMESTAMP_NTZ,TINYINT,TOUCH,TRANSACTION,TRANSACTIONS,TRANSFORM,TRUNCATE,TRY_CAST,TYPE,UNARCHIVE,UNBOUNDED,UNCACHE,UNLOCK,UNPIVOT,UNSET,UNTIL,USE,VAR,VARIABLE,VARIANT,VERSION,VIEW,VIEWS,VOID,WATERMARK,WEEK,WEEKS,WHILE,X,YEARS,ZONE") + assert(metadata.getSQLKeywords === "ADD,AFTER,AGGREGATE,ALWAYS,ANALYZE,ANTI,ANY_VALUE,ARCHIVE,ASC,BINDING,BUCKET,BUCKETS,BYTE,CACHE,CASCADE,CATALOG,CATALOGS,CHANGE,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATION,COLLECTION,COLUMNS,COMMENT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONTAINS,CONTINUE,COST,DATA,DATABASE,DATABASES,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAYOFYEAR,DAYS,DBPROPERTIES,DEFINED,DEFINER,DELAY,DELIMITED,DESC,DFS,DIRECTORIES,DIRECTORY,DISTRIBUTE,DIV,DO,ELSEIF,ENFORCED,ESCAPED,EVOLUTION,EXCHANGE,EXCLUDE,EXIT,EXPLAIN,EXPORT,EXTEND,EXTENDED,FIELDS,FILEFORMAT,FIRST,FLOW,FOLLOWING,FORMAT,FORMATTED,FOUND,FUNCTIONS,GENERATED,GEOGRAPHY,GEOMETRY,HANDLER,HOURS,IDENTIFIER,IF,IGNORE,ILIKE,IMMEDIATE,INCLUDE,INCREMENT,INDEX,INDEXES,INPATH,INPUT,INPUTFORMAT,INVOKER,ITEMS,ITERATE,JSON,KEY,KEYS,LAST,LAZY,LEAVE,LEVEL,LIMIT,LINES,LIST,LOAD,LOCATION,LOCK,LOCKS,LOGICAL,LONG,LOOP,MACRO,MAP,MATCHED,MATERIALIZED,MEASURE,METRICS,MICROSECOND,MICROSECONDS,MILLISECOND,MILLISECONDS,MINUS,MINUTES,MONTHS,MSCK,NAME,NAMESPACE,NAMESPACES,NANOSECOND,NANOSECONDS,NORELY,NULLS,OFFSET,OPTION,OPTIONS,OUTPUTFORMAT,OVERWRITE,PARTITIONED,PARTITIONS,PERCENT,PIVOT,PLACING,PRECEDING,PRINCIPALS,PROCEDURES,PROPERTIES,PURGE,QUARTER,QUERY,RECORDREADER,RECORDWRITER,RECOVER,RECURSION,REDUCE,REFRESH,RELY,RENAME,REPAIR,REPEAT,REPEATABLE,REPLACE,RESET,RESPECT,RESTRICT,ROLE,ROLES,SCHEMA,SCHEMAS,SECONDS,SECURITY,SEMI,SEPARATED,SERDE,SERDEPROPERTIES,SETS,SHORT,SHOW,SINGLE,SKEWED,SORT,SORTED,SOURCE,STATISTICS,STORED,STRATIFY,STREAM,STREAMING,STRING,STRUCT,SUBSTR,SYNC,SYSTEM_TIME,SYSTEM_VERSION,TABLES,TARGET,TBLPROPERTIES,TERMINATED,TIMEDIFF,TIMESTAMPADD,TIMESTAMPDIFF,TIMESTAMP_LTZ,TIMESTAMP_NTZ,TINYINT,TOUCH,TRANSACTION,TRANSACTIONS,TRANSFORM,TRUNCATE,TRY_CAST,TYPE,UNARCHIVE,UNBOUNDED,UNCACHE,UNLOCK,UNPIVOT,UNSET,UNTIL,USE,VAR,VARIABLE,VARIANT,VERSION,VIEW,VIEWS,VOID,WATERMARK,WEEK,WEEKS,WHILE,X,YEARS,ZONE") // scalastyle:on line.size.limit } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala new file mode 100644 index 0000000000000..a7763c2799a6e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import scala.collection.mutable + +import org.apache.spark.SparkException +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Measure} +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.METRIC_VIEW_PLACEHOLDER +import org.apache.spark.sql.metricview.logical.{MetricViewPlaceholder, ResolvedMetricView} +import org.apache.spark.sql.metricview.serde.{Column => CanonicalColumn, DimensionExpression, JsonUtils, MeasureExpression, MetricView => CanonicalMetricView} +import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder} + +/** + * Analysis rule for resolving metric view operations (CREATE and SELECT). + * + * == Background == + * A metric view is a special type of view that defines a semantic layer over raw data by + * declaring dimensions (grouping columns) and measures (pre-aggregated metrics). Users can + * query metric views using the MEASURE() function to access pre-defined aggregations without + * needing to know the underlying aggregation logic. + * + * == Metric View Definition (YAML) == + * A metric view is defined using YAML syntax that specifies: + * - source: The underlying table or SQL query + * - where: Optional filter condition applied to the source + * - select: List of columns, each being either a dimension or measure + * - Dimensions: Expressions used for grouping (e.g., "region", "upper(region)") + * - Measures: Aggregate expressions (e.g., "sum(count)", "avg(price)") + * + * Example YAML definition: + * {{{ + * version: "0.1" + * source: + * asset: "sales_table" + * where: "product = 'product_1'" + * select: + * - name: region + * expression: dimension(region) + * - name: region_upper + * expression: dimension(upper(region)) + * - name: total_sales + * expression: measure(sum(amount)) + * - name: avg_price + * expression: measure(avg(price)) + * }}} + * + * This rule handles two distinct workflows: + * + * == Workflow 1: CREATE METRIC VIEW == + * Purpose: Analyze the metric view definition and derive the output schema for catalog storage. + * + * SQL Example: + * {{{ + * CREATE VIEW sales_metrics + * WITH METRICS + * LANGUAGE YAML + * AS $$$$ + * }}} + * + * Processing steps: + * 1. Detect [[MetricViewPlaceholder]] nodes marked for creation (isCreate = true) + * 2. Parse the YAML definition to extract dimensions and measures + * 3. Build an [[Aggregate]] logical plan: + * {{{ + * Aggregate( + * groupingExpressions = [region, upper(region)], // all dimensions + * aggregateExpressions = [ + * region, // dimensions become output columns + * upper(region) AS region_upper, + * sum(amount) AS total_sales, // measures with their aggregations + * avg(price) AS avg_price + * ], + * child = Filter(product = 'product_1', sales_table) + * ) + * }}} + * 4. The analyzer resolves this plan to derive column data types + * 5. The resolved schema (with metadata about dimensions/measures) is stored in the catalog + * + * Key insight: We construct an Aggregate node even though it won't be executed. This allows + * the analyzer to infer proper data types for measures (e.g., sum(int) -> long). + * + * == Workflow 2: SELECT FROM METRIC VIEW == + * Purpose: Rewrite user queries to replace MEASURE() function calls with actual aggregations. + * + * SQL Example: + * {{{ + * SELECT region, MEASURE(total_sales), MEASURE(avg_price) + * FROM sales_metrics + * WHERE region_upper = 'REGION_1' + * GROUP BY region + * }}} + * + * Processing steps: + * 1. Detect queries against metric views (identified by [[MetricViewReadOperation]]) + * 2. Load and parse the stored metric view definition from catalog metadata + * 3. Build a [[Project]] node that: + * - Projects dimension expressions: [region, upper(region) AS region_upper] + * - Includes non-conflicting source columns for measure aggregate functions to reference + * - Result: The metric view now exposes dimensions as queryable columns + * 4. Locate [[Aggregate]] nodes containing MEASURE() function calls + * 5. Substitute each MEASURE() call with its corresponding aggregate expression: + * {{{ + * Before substitution: + * Aggregate( + * groupingExpressions = [region], + * aggregateExpressions = [region, MEASURE(total_sales), MEASURE(avg_price)], + * child = Filter(region_upper = 'REGION_1', sales_metrics) + * ) + * + * After substitution: + * Aggregate( + * groupingExpressions = [region], + * aggregateExpressions = [region, sum(amount), avg(price)], + * child = Filter(region_upper = 'REGION_1', + * Project([upper(region) AS region_upper, region, amount, price], + * Filter(product = 'product_1', sales_table))) + * ) + * }}} + * 6. Return the rewritten plan for further optimization and execution + * + * Key behaviors: + * - Dimensions can be used directly in SELECT, WHERE, GROUP BY, ORDER BY + * - Measures must be accessed via MEASURE() function and can only appear in aggregate context + * - The WHERE clause from the metric view definition is automatically applied + * - Source table columns are hidden from the metric view + * + * Example query patterns: + * {{{ + * -- Dimension only (no aggregation needed) + * SELECT region_upper FROM sales_metrics GROUP BY 1 + * => SELECT upper(region) FROM sales_table WHERE product = 'product_1' GROUP BY 1 + * + * -- Measure only (aggregates entire dataset) + * SELECT MEASURE(total_sales) FROM sales_metrics + * => SELECT sum(amount) FROM sales_table WHERE product = 'product_1' + * + * -- Dimensions + Measures (group by dimensions) + * SELECT region, MEASURE(total_sales) FROM sales_metrics GROUP BY region + * => SELECT region, sum(amount) FROM sales_table + * WHERE product = 'product_1' GROUP BY region + * }}} + * + * The rule operates on unresolved plans and transforms [[MetricViewPlaceholder]] nodes + * into resolved logical plans that can be further optimized and executed. + */ +case class ResolveMetricView(session: SparkSession) extends Rule[LogicalPlan] { + private def parser: ParserInterface = session.sessionState.sqlParser + override def apply(plan: LogicalPlan): LogicalPlan = { + if (!plan.containsPattern(METRIC_VIEW_PLACEHOLDER)) { + return plan + } + plan.resolveOperatorsUp { + // CREATE PATH: to create a metric view, we need to analyze the metric view + // definition and get the output schema (with column metadata). Since the measures + // are aggregate functions, we need to use an Aggregate node and group by all + // dimensions to get the output schema. + case mvp: MetricViewPlaceholder if mvp.isCreate && mvp.child.resolved => + val (dimensions, measures) = buildMetricViewOutput(mvp.desc) + Aggregate( + // group by all dimensions + dimensions.map(_.toAttribute).toSeq, + // select all dimensions and measures to get the final output (mostly data types) + (dimensions ++ measures).toSeq, + mvp.child + ) + + // SELECT PATH: to read a metric view, user will use the `MEASURE` aggregate function + // to read the measures, so it'll lead to an Aggregate node. This way, we only need to + // Resolve the Aggregate node based on the metric view output and then replace + // the AttributeReference of the metric view output to the actual expressions. + case node @ MetricViewReadOperation(metricView) => + // step 1: parse the metric view definition + val (dimensions, measures) = + parseMetricViewColumns(metricView.outputMetrics, metricView.desc.select) + + // step 2: build the Project node containing the dimensions + val dimensionExprs = dimensions.map(_.namedExpr) + // Drop the source columns if it conflicts with dimensions + val sourceOutput = metricView.child.output + // 1. hide the column conflict with dimensions + // 2. add an alias to the source column so they are stable with DeduplicateRelation + // 3. metric view output should use the same exprId + val dimensionAttrs = metricView.outputMetrics.filter(a => + dimensions.exists(_.exprId == a.exprId) + ) + val sourceProjList = sourceOutput.filterNot { attr => + // conflict with dimensions + dimensionAttrs + .resolve(Seq(attr.name), session.sessionState.conf.resolver) + .nonEmpty + }.map { attr => + // add an alias to the source column so they are stable with DeduplicateRelation + Alias(attr, attr.name)() + } + val withDimensions = node.transformDownWithPruning( + _.containsPattern(METRIC_VIEW_PLACEHOLDER)) { + case mv: MetricViewPlaceholder + if mv.metadata.identifier == metricView.metadata.identifier => + ResolvedMetricView( + mv.metadata.identifier, + Project(sourceProjList ++ dimensionExprs, mv.child) + ) + } + + // step 3: resolve the measure references in Aggregate node + withDimensions match { + case aggregate: Aggregate => transformAggregateWithMeasures( + aggregate, + measures + ) + case other => + throw SparkException.internalError("ran into unexpected node: " + other) + } + } + } + + private def buildMetricViewOutput(metricView: CanonicalMetricView) + : (Seq[NamedExpression], Seq[NamedExpression]) = { + val dimensions = new mutable.ArrayBuffer[NamedExpression]() + val measures = new mutable.ArrayBuffer[NamedExpression]() + metricView.select.foreach { col => + val metadata = new MetadataBuilder() + .withMetadata(Metadata.fromJson(JsonUtils.toJson(col.getColumnMetadata))) + .build() + col.expression match { + case DimensionExpression(expr) => + dimensions.append( + Alias(parser.parseExpression(expr), col.name)(explicitMetadata = Some(metadata))) + case MeasureExpression(expr) => + measures.append( + Alias(parser.parseExpression(expr), col.name)(explicitMetadata = Some(metadata))) + } + } + (dimensions.toSeq, measures.toSeq) + } + + private def parseMetricViewColumns( + metricViewOutput: Seq[Attribute], + columns: Seq[CanonicalColumn] + ): (Seq[MetricViewDimension], Seq[MetricViewMeasure]) = { + val dimensions = new mutable.ArrayBuffer[MetricViewDimension]() + val measures = new mutable.ArrayBuffer[MetricViewMeasure]() + metricViewOutput.zip(columns).foreach { case (attr, column) => + column.expression match { + case DimensionExpression(expr) => + dimensions.append( + MetricViewDimension( + attr.name, + parser.parseExpression(expr), + attr.exprId, + attr.dataType) + ) + case MeasureExpression(expr) => + measures.append( + MetricViewMeasure( + attr.name, + parser.parseExpression(expr), + attr.exprId, + attr.dataType) + ) + } + } + (dimensions.toSeq, measures.toSeq) + } + + private def transformAggregateWithMeasures( + aggregate: Aggregate, + measures: Seq[MetricViewMeasure]): LogicalPlan = { + val measuresMap = measures.map(m => m.exprId -> m).toMap + val newAggExprs = aggregate.aggregateExpressions.map { expr => + expr.transform { + case AggregateExpression(Measure(a: AttributeReference), _, _, _, _) => + measuresMap(a.exprId).expr + }.asInstanceOf[NamedExpression] + } + aggregate.copy(aggregateExpressions = newAggExprs) + } +} + +object MetricViewReadOperation { + def unapply(plan: LogicalPlan): Option[MetricViewPlaceholder] = { + plan match { + case a: Aggregate if a.resolved && a.containsPattern(METRIC_VIEW_PLACEHOLDER) => + collectMetricViewNode(a.child) + case _ => + None + } + } + + @scala.annotation.tailrec + private def collectMetricViewNode(plan: LogicalPlan): Option[MetricViewPlaceholder] = { + plan match { + case f: Filter => collectMetricViewNode(f.child) + case s: Expand => collectMetricViewNode(s.child) + case s: Project => collectMetricViewNode(s.child) + case s: SubqueryAlias => collectMetricViewNode(s.child) + case m: MetricViewPlaceholder => Some(m) + case _ => None + } + } +} + +sealed trait MetricViewColumn { + def name: String + def expr: Expression + def exprId: ExprId + def dataType: DataType + def namedExpr: NamedExpression = { + Alias(UpCast(expr, dataType), name)(exprId = exprId) + } +} + +case class MetricViewDimension( + name: String, + expr: Expression, + exprId: ExprId, + dataType: DataType) extends MetricViewColumn + +case class MetricViewMeasure( + name: String, + expr: Expression, + exprId: ExprId, + dataType: DataType) extends MetricViewColumn diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index f8f6e31be1bcc..2885d215ee34f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -724,6 +724,56 @@ class SparkSqlAstBuilder extends AstBuilder { } } + override def visitCreateMetricView(ctx: CreateMetricViewContext): LogicalPlan = withOrigin(ctx) { + checkDuplicateClauses(ctx.commentSpec(), "COMMENT", ctx) + checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx) + checkDuplicateClauses(ctx.routineLanguage(), "LANGUAGE", ctx) + checkDuplicateClauses(ctx.METRICS(), "WITH METRICS", ctx) + val userSpecifiedColumns = Option(ctx.identifierCommentList).toSeq.flatMap { icl => + icl.identifierComment.asScala.map { ic => + ic.identifier.getText -> Option(ic.commentSpec()).map(visitCommentSpec) + } + } + + if (ctx.EXISTS != null && ctx.REPLACE != null) { + throw QueryParsingErrors.createViewWithBothIfNotExistsAndReplaceError(ctx) + } + + if (ctx.METRICS(0) == null) { + throw QueryParsingErrors.missingClausesForOperation( + ctx, "WITH METRICS", "METRIC VIEW CREATION") + } + + if (ctx.routineLanguage(0) == null) { + throw QueryParsingErrors.missingClausesForOperation( + ctx, "LANGUAGE", "METRIC VIEW CREATION") + } + + val languageCtx = ctx.routineLanguage(0) + if (languageCtx.SQL() != null) { + operationNotAllowed("Unsupported language for metric view: SQL", languageCtx) + } + val name: String = languageCtx.IDENTIFIER().getText + if (!name.equalsIgnoreCase("YAML")) { + operationNotAllowed(s"Unsupported language for metric view: $name", languageCtx) + } + + val properties = ctx.propertyList.asScala.headOption + .map(visitPropertyKeyValues) + .getOrElse(Map.empty) + val codeLiteral = visitCodeLiteral(ctx.codeLiteral()) + + CreateMetricViewCommand( + withIdentClause(ctx.identifierReference(), UnresolvedIdentifier(_)), + userSpecifiedColumns, + visitCommentSpecList(ctx.commentSpec()), + properties, + codeLiteral, + allowExisting = ctx.EXISTS != null, + replace = ctx.REPLACE != null + ) + } + /** * Create a [[CreateFunctionCommand]]. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala new file mode 100644 index 0000000000000..5fb36b429c4ae --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.SparkException +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.{QueryPlanningTracker, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, SchemaUnsupported} +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan} +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.metricview.util.MetricViewPlanner +import org.apache.spark.sql.types.StructType + +case class CreateMetricViewCommand( + child: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])], + comment: Option[String], + properties: Map[String, String], + originalText: String, + allowExisting: Boolean, + replace: Boolean) extends UnaryRunnableCommand with IgnoreCachedData { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + val name = child match { + case v: ResolvedIdentifier => + v.identifier.asTableIdentifier + case _ => throw SparkException.internalError( + s"Failed to resolve identifier for creating metric view") + } + val analyzed = MetricViewHelper.analyzeMetricViewText(sparkSession, name, originalText) + + if (userSpecifiedColumns.nonEmpty) { + if (userSpecifiedColumns.length > analyzed.output.length) { + throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError( + name, userSpecifiedColumns.map(_._1), analyzed) + } else if (userSpecifiedColumns.length < analyzed.output.length) { + throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError( + name, userSpecifiedColumns.map(_._1), analyzed) + } + } + catalog.createTable( + ViewHelper.prepareTable( + sparkSession, name, Some(originalText), analyzed, userSpecifiedColumns, + properties, SchemaUnsupported, comment, + None, isMetricView = true), + ignoreIfExists = allowExisting) + Seq.empty + } + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = { + copy(child = newChild) + } +} + +case class AlterMetricViewCommand(child: LogicalPlan, originalText: String) + +object MetricViewHelper { + def analyzeMetricViewText( + session: SparkSession, + name: TableIdentifier, + viewText: String): LogicalPlan = { + val analyzer = session.sessionState.analyzer + // this metadata is used for analysis check, it'll be replaced during create/update with + // more accurate information + val tableMeta = CatalogTable( + identifier = name, + tableType = CatalogTableType.VIEW, + storage = CatalogStorageFormat.empty, + schema = new StructType(), + viewOriginalText = Some(viewText), + viewText = Some(viewText)) + val metricViewNode = MetricViewPlanner.planWrite( + tableMeta, viewText, session.sessionState.sqlParser) + val analyzed = analyzer.executeAndCheck(metricViewNode, new QueryPlanningTracker) + ViewHelper.verifyTemporaryObjectsNotExists(isTemporary = false, name, analyzed, Seq.empty) + analyzed + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 11ec17ca57fd9..95d76c72d2951 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -133,7 +133,7 @@ case class CreateViewCommand( SchemaUtils.checkIndeterminateCollationInSchema(plan.schema) if (viewType == LocalTempView) { - val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) + val aliasedPlan = aliasPlan(sparkSession, analyzedPlan, userSpecifiedColumns) val tableDefinition = createTemporaryViewRelation( name, sparkSession, @@ -148,7 +148,7 @@ case class CreateViewCommand( } else if (viewType == GlobalTempView) { val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) val viewIdent = TableIdentifier(name.table, Option(db)) - val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) + val aliasedPlan = aliasPlan(sparkSession, analyzedPlan, userSpecifiedColumns) val tableDefinition = createTemporaryViewRelation( viewIdent, sparkSession, @@ -178,7 +178,10 @@ case class CreateViewCommand( // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` // Nothing we need to retain from the old view, so just drop and create a new one catalog.dropTable(viewIdent, ignoreIfNotExists = false, purge = false) - catalog.createTable(prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false) + catalog.createTable( + prepareTable( + sparkSession, name, originalText, analyzedPlan, userSpecifiedColumns, properties, + viewSchemaMode, comment, collation), ignoreIfExists = false) } else { // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already // exists. @@ -186,56 +189,14 @@ case class CreateViewCommand( } } else { // Create the view if it doesn't exist. - catalog.createTable(prepareTable(sparkSession, analyzedPlan), ignoreIfExists = allowExisting) + catalog.createTable( + prepareTable( + sparkSession, name, originalText, analyzedPlan, userSpecifiedColumns, properties, + viewSchemaMode, comment, collation), ignoreIfExists = allowExisting) } Seq.empty[Row] } - /** - * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns, - * else return the analyzed plan directly. - */ - private def aliasPlan(session: SparkSession, analyzedPlan: LogicalPlan): LogicalPlan = { - if (userSpecifiedColumns.isEmpty) { - analyzedPlan - } else { - val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { - case (attr, (colName, None)) => Alias(attr, colName)() - case (attr, (colName, Some(colComment))) => - val meta = new MetadataBuilder().putString("comment", colComment).build() - Alias(attr, colName)(explicitMetadata = Some(meta)) - } - session.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed - } - } - - /** - * Returns a [[CatalogTable]] that can be used to save in the catalog. Generate the view-specific - * properties(e.g. view default database, view query output column names) and store them as - * properties in the CatalogTable, and also creates the proper schema for the view. - */ - private def prepareTable(session: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = { - if (originalText.isEmpty) { - throw QueryCompilationErrors.createPersistedViewFromDatasetAPINotAllowedError() - } - val aliasedSchema = CharVarcharUtils.getRawSchema( - aliasPlan(session, analyzedPlan).schema, session.sessionState.conf) - val newProperties = generateViewProperties( - properties, session, analyzedPlan.schema.fieldNames, aliasedSchema.fieldNames, viewSchemaMode) - - CatalogTable( - identifier = name, - tableType = CatalogTableType.VIEW, - storage = CatalogStorageFormat.empty, - schema = aliasedSchema, - properties = newProperties, - viewOriginalText = originalText, - viewText = originalText, - comment = comment, - collation = collation - ) - } - override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { copy(plan = WithCTE(plan, cteDefs)) } @@ -812,4 +773,70 @@ object ViewHelper extends SQLConfHelper with Logging with CapturesConfig { properties = Map((VIEW_STORING_ANALYZED_PLAN, "true")), collation = collation) } + + + /** + * Returns a [[CatalogTable]] that can be used to save in the catalog. Generate the view-specific + * properties(e.g. view default database, view query output column names) and store them as + * properties in the CatalogTable, and also creates the proper schema for the view. + */ + def prepareTable( + session: SparkSession, + name: TableIdentifier, + originalText: Option[String], + analyzedPlan: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])], + properties: Map[String, String], + viewSchemaMode: ViewSchemaMode, + comment: Option[String], + collation: Option[String], + isMetricView: Boolean = false): CatalogTable = { + if (originalText.isEmpty) { + throw QueryCompilationErrors.createPersistedViewFromDatasetAPINotAllowedError() + } + val aliasedSchema = CharVarcharUtils.getRawSchema( + aliasPlan(session, analyzedPlan, userSpecifiedColumns).schema, session.sessionState.conf) + val newProperties = generateViewProperties( + properties, session, analyzedPlan.schema.fieldNames, aliasedSchema.fieldNames, viewSchemaMode) + + // Add property to indicate if this is a metric view + val finalProperties = if (isMetricView) { + newProperties + (CatalogTable.VIEW_WITH_METRICS -> "true") + } else { + newProperties + } + + CatalogTable( + identifier = name, + tableType = CatalogTableType.VIEW, + storage = CatalogStorageFormat.empty, + schema = aliasedSchema, + properties = finalProperties, + viewOriginalText = originalText, + viewText = originalText, + comment = comment, + collation = collation + ) + } + + /** + * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns, + * else return the analyzed plan directly. + */ + def aliasPlan( + session: SparkSession, + analyzedPlan: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])]): LogicalPlan = { + if (userSpecifiedColumns.isEmpty) { + analyzedPlan + } else { + val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { + case (attr, (colName, None)) => Alias(attr, colName)() + case (attr, (colName, Some(colComment))) => + val meta = new MetadataBuilder().putString("comment", colComment).build() + Alias(attr, colName)(explicitMetadata = Some(meta)) + } + session.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 00c9a26cb5bf3..24bf618ee8610 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.internal import org.apache.spark.annotation.Unstable import org.apache.spark.sql.{DataSourceRegistration, ExperimentalMethods, SparkSessionExtensions, UDTFRegistration} import org.apache.spark.sql.artifact.ArtifactManager -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, FunctionRegistry, InvokeProcedures, ReplaceCharWithVarchar, ResolveDataSource, ResolveEventTimeWatermark, ResolveExecuteImmediate, ResolveSessionCatalog, ResolveTranspose, TableFunctionRegistry} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, FunctionRegistry, InvokeProcedures, ReplaceCharWithVarchar, ResolveDataSource, ResolveEventTimeWatermark, ResolveExecuteImmediate, ResolveMetricView, ResolveSessionCatalog, ResolveTranspose, TableFunctionRegistry} import org.apache.spark.sql.catalyst.analysis.resolver.ResolverExtension import org.apache.spark.sql.catalyst.catalog.{FunctionExpressionBuilder, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{Expression, ExtractSemiStructuredFields} @@ -245,6 +245,7 @@ abstract class BaseSessionStateBuilder( ResolveWriteToStream +: new EvalSubqueriesForTimeTravel +: new ResolveTranspose(session) +: + ResolveMetricView(session) +: new InvokeProcedures(session) +: ResolveExecuteImmediate(session, this.catalogManager) +: ExtractSemiStructuredFields +: diff --git a/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out b/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out index b8443e417cafc..c363cf2f69852 100644 --- a/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out @@ -210,7 +210,9 @@ MAP false MATCHED false MATERIALIZED false MAX false +MEASURE false MERGE false +METRICS false MICROSECOND false MICROSECONDS false MILLISECOND false diff --git a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out index 00baa0c7e7254..ddc5b05b35506 100644 --- a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out @@ -210,7 +210,9 @@ MAP false MATCHED false MATERIALIZED false MAX false +MEASURE false MERGE false +METRICS false MICROSECOND false MICROSECONDS false MILLISECOND false diff --git a/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out b/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out index 00baa0c7e7254..ddc5b05b35506 100644 --- a/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out @@ -210,7 +210,9 @@ MAP false MATCHED false MATERIALIZED false MAX false +MEASURE false MERGE false +METRICS false MICROSECOND false MICROSECONDS false MILLISECOND false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewSuite.scala new file mode 100644 index 0000000000000..5e6033aeaa75d --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewSuite.scala @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.metricview.serde.{AssetSource, Column, DimensionExpression, MeasureExpression, MetricView, MetricViewFactory, SQLSource} +import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} + +class SimpleMetricViewSuite extends MetricViewSuite with SharedSparkSession + +/** + * A suite for testing metric view related functionality. + */ +abstract class MetricViewSuite extends QueryTest with SQLTestUtils { + import testImplicits._ + + protected val testMetricViewName = "test_metric_view" + protected val testTableName = "test_table" + protected val testTableData = Seq( + ("region_1", "product_1", 80, 5.0), + ("region_1", "product_2", 70, 10.0), + ("REGION_1", "product_3", 60, 15.0), + ("REGION_1", "product_4", 50, 20.0), + ("region_2", "product_1", 40, 25.0), + ("region_2", "product_2", 30, 30.0), + ("REGION_2", "product_3", 20, 35.0), + ("REGION_2", "product_4", 10, 40.0) + ) + protected val testMetricViewColumns = Seq( + Column("region", DimensionExpression("region"), 0), + Column("product", DimensionExpression("product"), 1), + Column("region_upper", DimensionExpression("upper(region)"), 2), + Column("count_sum", MeasureExpression("sum(count)"), 3), + Column("price_avg", MeasureExpression("avg(price)"), 4) + ) + + override protected def beforeAll(): Unit = { + super.beforeAll() + testTableData + .toDF("region", "product", "count", "price") + .write + .saveAsTable(testTableName) + } + + protected def createMetricView( + metricViewName: String, + metricViewDefinition: MetricView): Unit = { + val yaml = MetricViewFactory.toYAML(metricViewDefinition) + sql(s""" + |CREATE VIEW $metricViewName + |WITH METRICS + |LANGUAGE YAML + |AS + |$$$$ + |$yaml + |$$$$ + |""".stripMargin) + } + + protected def withMetricView( + viewName: String, + metricViewDefinition: MetricView)(body: => Unit): Unit = { + createMetricView(viewName, metricViewDefinition) + withView(viewName) { + body + } + } + + test("test source type") { + val sources = Seq( + AssetSource(testTableName), + SQLSource("SELECT * FROM test_table") + ) + sources.foreach { source => + val metricView = MetricView("0.1", source, None, testMetricViewColumns) + withMetricView(testMetricViewName, metricView) { + checkAnswer( + sql("SELECT measure(count_sum), measure(price_avg) FROM test_metric_view"), + sql("SELECT sum(count), avg(price) FROM test_table") + ) + checkAnswer( + sql("SELECT measure(count_sum), measure(price_avg) " + + "FROM test_metric_view WHERE region_upper = 'REGION_1'"), + sql("SELECT sum(count), avg(price) FROM test_table WHERE upper(region) = 'REGION_1'") + ) + } + } + } + + test("test where clause") { + val metricView = MetricView( + "0.1", AssetSource(testTableName), + Some("product = 'product_1'"), testMetricViewColumns) + withMetricView(testMetricViewName, metricView) { + checkAnswer( + sql("SELECT measure(count_sum), measure(price_avg) FROM test_metric_view"), + sql("SELECT sum(count), avg(price) FROM test_table WHERE product = 'product_1'") + ) + checkAnswer( + sql("SELECT measure(count_sum), measure(price_avg) " + + "FROM test_metric_view WHERE region_upper = 'REGION_1'"), + sql("SELECT sum(count), avg(price) FROM test_table WHERE " + + "product = 'product_1' AND upper(region) = 'REGION_1'") + ) + } + } + + test("test dimensions and measures") { + val metricView = MetricView( + "0.1", AssetSource(testTableName), None, testMetricViewColumns) + withMetricView(testMetricViewName, metricView) { + // dimension and measure + checkAnswer( + sql("SELECT region, product, measure(count_sum), measure(price_avg) " + + "FROM test_metric_view GROUP BY region, product"), + sql("SELECT region, product, sum(count), avg(price) " + + "FROM test_table GROUP BY region, product") + ) + // dimension only + checkAnswer( + sql("SELECT region_upper FROM test_metric_view GROUP BY 1"), + sql("SELECT upper(region) FROM test_table GROUP BY 1") + ) + // measure only + checkAnswer( + sql("SELECT measure(count_sum) FROM test_metric_view"), + sql("SELECT sum(count) FROM test_table") + ) + } + } + + test("column from source cannot be used when query metric view") { + val metricView = MetricView("0.1", AssetSource(testTableName), None, testMetricViewColumns) + withMetricView(testMetricViewName, metricView) { + checkError( + exception = intercept[AnalysisException] { + sql("SELECT sum(count) FROM test_metric_view").collect() + }, + condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION", + parameters = Map( + "objectName" -> "`count`", + "proposal" -> "`count_sum`, `product`, `region`, `price_avg`, `region_upper`" + ), + queryContext = Array(ExpectedContext( + fragment = "count", + start = 11, + stop = 15 + )) + ) + } + } + + test("test ORDER BY and LIMIT clauses") { + val metricView = MetricView( + "0.1", AssetSource(testTableName), None, testMetricViewColumns) + withMetricView(testMetricViewName, metricView) { + checkAnswer( + sql("SELECT region, measure(count_sum) " + + "FROM test_metric_view GROUP BY region ORDER BY 2 DESC"), + sql("SELECT region, sum(count) " + + "FROM test_table GROUP BY region ORDER BY 2 DESC") + ) + checkAnswer( + sql("SELECT product, measure(price_avg) " + + "FROM test_metric_view GROUP BY product ORDER BY 2 ASC LIMIT 2"), + sql("SELECT product, avg(price) " + + "FROM test_table GROUP BY product ORDER BY 2 ASC LIMIT 2") + ) + } + } + + test("test complex WHERE conditions with dimensions") { + val metricView = MetricView( + "0.1", AssetSource(testTableName), None, testMetricViewColumns) + withMetricView(testMetricViewName, metricView) { + checkAnswer( + sql("SELECT region, product, measure(count_sum) " + + "FROM test_metric_view WHERE region_upper = 'REGION_1' " + + "AND product IN ('product_1', 'product_2') " + + "GROUP BY region, product"), + sql("SELECT region, product, sum(count) " + + "FROM test_table WHERE upper(region) = 'REGION_1' " + + "AND product IN ('product_1', 'product_2') " + + "GROUP BY region, product") + ) + checkAnswer( + sql("SELECT measure(count_sum), measure(price_avg) " + + "FROM test_metric_view WHERE region_upper LIKE 'REGION_%' AND product <> 'product_4'"), + sql("SELECT sum(count), avg(price) " + + "FROM test_table WHERE upper(region) LIKE 'REGION_%' AND product <> 'product_4'") + ) + } + } + + test("test metric view with where clause and additional query filters") { + val metricView = MetricView( + "0.1", AssetSource(testTableName), + Some("product IN ('product_1', 'product_2')"), testMetricViewColumns) + withMetricView(testMetricViewName, metricView) { + checkAnswer( + sql("SELECT region, measure(count_sum) " + + "FROM test_metric_view WHERE region_upper = 'REGION_1' GROUP BY region"), + sql("SELECT region, sum(count) " + + "FROM test_table WHERE product IN ('product_1', 'product_2') " + + "AND upper(region) = 'REGION_1' GROUP BY region") + ) + } + } + + test("test multiple measures with different aggregations") { + val columns = Seq( + Column("region", DimensionExpression("region"), 0), + Column("count_sum", MeasureExpression("sum(count)"), 1), + Column("count_avg", MeasureExpression("avg(count)"), 2), + Column("count_max", MeasureExpression("max(count)"), 3), + Column("count_min", MeasureExpression("min(count)"), 4), + Column("price_sum", MeasureExpression("sum(price)"), 5) + ) + val metricView = MetricView("0.1", AssetSource(testTableName), None, columns) + withMetricView(testMetricViewName, metricView) { + checkAnswer( + sql("SELECT measure(count_sum), measure(count_avg), measure(count_max), " + + "measure(count_min), measure(price_sum) FROM test_metric_view"), + sql("SELECT sum(count), avg(count), max(count), min(count), sum(price) FROM test_table") + ) + checkAnswer( + sql("SELECT region, measure(count_sum), measure(count_max), measure(price_sum) " + + "FROM test_metric_view GROUP BY region"), + sql("SELECT region, sum(count), max(count), sum(price) FROM test_table GROUP BY region") + ) + } + } + + test("test dimension expressions with case statements") { + val columns = Seq( + Column("region", DimensionExpression("region"), 0), + Column("region_category", DimensionExpression( + "CASE WHEN region = 'region_1' THEN 'Group A' ELSE 'Group B' END"), 1), + Column("count_sum", MeasureExpression("sum(count)"), 2) + ) + val metricView = MetricView("0.1", AssetSource(testTableName), None, columns) + withMetricView(testMetricViewName, metricView) { + checkAnswer( + sql("SELECT region_category, measure(count_sum) " + + "FROM test_metric_view GROUP BY region_category"), + sql("SELECT CASE WHEN region = 'region_1' THEN 'Group A' ELSE 'Group B' END, " + + "sum(count) FROM test_table " + + "GROUP BY CASE WHEN region = 'region_1' THEN 'Group A' ELSE 'Group B' END") + ) + } + } + + test("test measure expressions with arithmetic operations") { + val columns = Seq( + Column("region", DimensionExpression("region"), 0), + Column("total_revenue", MeasureExpression("sum(count * price)"), 1), + Column("avg_revenue", MeasureExpression("avg(count * price)"), 2) + ) + val metricView = MetricView("0.1", AssetSource(testTableName), None, columns) + withMetricView(testMetricViewName, metricView) { + checkAnswer( + sql("SELECT measure(total_revenue), measure(avg_revenue) FROM test_metric_view"), + sql("SELECT sum(count * price), avg(count * price) FROM test_table") + ) + checkAnswer( + sql("SELECT region, measure(total_revenue) " + + "FROM test_metric_view GROUP BY region ORDER BY 2 DESC"), + sql("SELECT region, sum(count * price) " + + "FROM test_table GROUP BY region ORDER BY 2 DESC") + ) + } + } + + test("test dimensions with aggregate functions in GROUP BY") { + val metricView = MetricView( + "0.1", AssetSource(testTableName), None, testMetricViewColumns) + withMetricView(testMetricViewName, metricView) { + checkAnswer( + sql("SELECT region_upper, product, measure(count_sum) " + + "FROM test_metric_view GROUP BY region_upper, product ORDER BY region_upper, product"), + sql("SELECT upper(region), product, sum(count) " + + "FROM test_table GROUP BY upper(region), product ORDER BY upper(region), product") + ) + } + } + + test("test WHERE clause with OR conditions") { + val metricView = MetricView( + "0.1", AssetSource(testTableName), None, testMetricViewColumns) + withMetricView(testMetricViewName, metricView) { + checkAnswer( + sql("SELECT measure(count_sum), measure(price_avg) " + + "FROM test_metric_view WHERE region = 'region_1' OR product = 'product_1'"), + sql("SELECT sum(count), avg(price) " + + "FROM test_table WHERE region = 'region_1' OR product = 'product_1'") + ) + } + } + + test("test dimension-only query with multiple dimensions") { + val metricView = MetricView( + "0.1", AssetSource(testTableName), None, testMetricViewColumns) + withMetricView(testMetricViewName, metricView) { + checkAnswer( + sql("SELECT region_upper, product " + + "FROM test_metric_view GROUP BY region_upper, product"), + sql("SELECT upper(region), product FROM test_table GROUP BY upper(region), product") + ) + } + } + + test("test query with SELECT * should fail") { + val metricView = MetricView( + "0.1", AssetSource(testTableName), None, testMetricViewColumns) + withMetricView(testMetricViewName, metricView) { + intercept[Exception] { + sql("SELECT * FROM test_metric_view").collect() + } + } + } + + test("test SQLSource with complex query") { + val sqlSource = SQLSource( + "SELECT region, product, count, price FROM test_table WHERE count > 20") + val metricView = MetricView("0.1", sqlSource, None, testMetricViewColumns) + withMetricView(testMetricViewName, metricView) { + checkAnswer( + sql("SELECT measure(count_sum), measure(price_avg) FROM test_metric_view"), + sql("SELECT sum(count), avg(price) FROM test_table WHERE count > 20") + ) + checkAnswer( + sql("SELECT region, measure(count_sum) FROM test_metric_view GROUP BY region"), + sql("SELECT region, sum(count) FROM test_table WHERE count > 20 GROUP BY region") + ) + } + } + + test("test measure function without GROUP BY") { + val metricView = MetricView( + "0.1", AssetSource(testTableName), None, testMetricViewColumns) + withMetricView(testMetricViewName, metricView) { + checkAnswer( + sql("SELECT measure(count_sum) FROM test_metric_view"), + sql("SELECT sum(count) FROM test_table") + ) + checkAnswer( + sql("SELECT measure(count_sum), measure(price_avg) FROM test_metric_view"), + sql("SELECT sum(count), avg(price) FROM test_table") + ) + } + } + + test("test combining multiple dimension expressions in WHERE") { + val metricView = MetricView( + "0.1", AssetSource(testTableName), None, testMetricViewColumns) + withMetricView(testMetricViewName, metricView) { + checkAnswer( + sql("SELECT product, measure(count_sum) " + + "FROM test_metric_view WHERE region = 'region_1' AND region_upper = 'REGION_1' " + + "GROUP BY product"), + sql("SELECT product, sum(count) " + + "FROM test_table WHERE region = 'region_1' AND upper(region) = 'REGION_1' " + + "GROUP BY product") + ) + } + } + + test("test measure with COUNT DISTINCT") { + val columns = Seq( + Column("region", DimensionExpression("region"), 0), + Column("product_count", MeasureExpression("count(distinct product)"), 1), + Column("count_sum", MeasureExpression("sum(count)"), 2) + ) + val metricView = MetricView("0.1", AssetSource(testTableName), None, columns) + withMetricView(testMetricViewName, metricView) { + checkAnswer( + sql("SELECT region, measure(product_count), measure(count_sum) " + + "FROM test_metric_view GROUP BY region"), + sql("SELECT region, count(distinct product), sum(count) " + + "FROM test_table GROUP BY region") + ) + } + } + + test("test union of same aggregated metric view dataframe") { + val metricView = MetricView( + "0.1", AssetSource(testTableName), None, testMetricViewColumns) + withMetricView(testMetricViewName, metricView) { + // Create a DataFrame with aggregation and groupBy from metric view + val df = sql( + s"""SELECT region, measure(count_sum) as total_count, measure(price_avg) as avg_price + |FROM $testMetricViewName + |GROUP BY region + |""".stripMargin) + + // Union the same DataFrame with itself - tests DeduplicateRelations + val unionDf = df.union(df) + + // Expected result: each region should appear twice with identical values + val expectedDf = sql( + """ + |SELECT region, sum(count) as total_count, avg(price) as avg_price + |FROM test_table + |GROUP BY region + |UNION ALL + |SELECT region, sum(count) as total_count, avg(price) as avg_price + |FROM test_table + |GROUP BY region + |""".stripMargin) + + checkAnswer(unionDf, expectedDf) + + // Verify the result has duplicate rows + assert(unionDf.count() == df.count() * 2, + "Union should double the row count") + + // Verify that distinct values are the same as the original + checkAnswer(unionDf.distinct(), df) + } + } +} diff --git a/sql/gen-sql-functions-docs.py b/sql/gen-sql-functions-docs.py index b49124ece0866..9cc478e5cadf5 100644 --- a/sql/gen-sql-functions-docs.py +++ b/sql/gen-sql-functions-docs.py @@ -175,7 +175,7 @@ def _make_pretty_examples(jspark, infos): pretty_output = "" for info in infos: if (info.examples.startswith("\n Examples:") and info.name.lower() not in - ("from_avro", "to_avro", "from_protobuf", "to_protobuf")): + ("from_avro", "to_avro", "from_protobuf", "to_protobuf", "measure")): output = [] output.append("-- %s" % info.name) query_examples = filter(lambda x: x.startswith(" > "), info.examples.split("\n")) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala index d69f99a1e42fe..a95eeee032217 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala @@ -214,7 +214,7 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer { val sessionHandle = client.openSession(user, "") val infoValue = client.getInfo(sessionHandle, GetInfoType.CLI_ODBC_KEYWORDS) // scalastyle:off line.size.limit - assert(infoValue.getStringValue == "ADD,AFTER,AGGREGATE,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,ATOMIC,AUTHORIZATION,BEGIN,BETWEEN,BIGINT,BINARY,BINDING,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALL,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONDITION,CONSTRAINT,CONTAINS,CONTINUE,COST,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,DATA,DATABASE,DATABASES,DATE,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAY,DAYOFYEAR,DAYS,DBPROPERTIES,DEC,DECIMAL,DECLARE,DEFAULT,DEFINED,DEFINER,DELAY,DELETE,DELIMITED,DESC,DESCRIBE,DETERMINISTIC,DFS,DIRECTORIES,DIRECTORY,DISTINCT,DISTRIBUTE,DIV,DO,DOUBLE,DROP,ELSE,ELSEIF,END,ENFORCED,ESCAPE,ESCAPED,EVOLUTION,EXCEPT,EXCHANGE,EXCLUDE,EXECUTE,EXISTS,EXIT,EXPLAIN,EXPORT,EXTEND,EXTENDED,EXTERNAL,EXTRACT,FALSE,FETCH,FIELDS,FILEFORMAT,FILTER,FIRST,FLOAT,FLOW,FOLLOWING,FOR,FOREIGN,FORMAT,FORMATTED,FOUND,FROM,FULL,FUNCTION,FUNCTIONS,GENERATED,GEOGRAPHY,GEOMETRY,GLOBAL,GRANT,GROUP,GROUPING,HANDLER,HAVING,HOUR,HOURS,IDENTIFIER,IDENTITY,IF,IGNORE,ILIKE,IMMEDIATE,IMPORT,IN,INCLUDE,INCREMENT,INDEX,INDEXES,INNER,INPATH,INPUT,INPUTFORMAT,INSERT,INT,INTEGER,INTERSECT,INTERVAL,INTO,INVOKER,IS,ITEMS,ITERATE,JOIN,JSON,KEY,KEYS,LANGUAGE,LAST,LATERAL,LAZY,LEADING,LEAVE,LEFT,LEVEL,LIKE,LIMIT,LINES,LIST,LOAD,LOCAL,LOCATION,LOCK,LOCKS,LOGICAL,LONG,LOOP,MACRO,MAP,MATCHED,MATERIALIZED,MAX,MERGE,MICROSECOND,MICROSECONDS,MILLISECOND,MILLISECONDS,MINUS,MINUTE,MINUTES,MODIFIES,MONTH,MONTHS,MSCK,NAME,NAMESPACE,NAMESPACES,NANOSECOND,NANOSECONDS,NATURAL,NO,NONE,NORELY,NOT,NULL,NULLS,NUMERIC,OF,OFFSET,ON,ONLY,OPTION,OPTIONS,OR,ORDER,OUT,OUTER,OUTPUTFORMAT,OVER,OVERLAPS,OVERLAY,OVERWRITE,PARTITION,PARTITIONED,PARTITIONS,PERCENT,PIVOT,PLACING,POSITION,PRECEDING,PRIMARY,PRINCIPALS,PROCEDURE,PROCEDURES,PROPERTIES,PURGE,QUARTER,QUERY,RANGE,READS,REAL,RECORDREADER,RECORDWRITER,RECOVER,RECURSION,RECURSIVE,REDUCE,REFERENCES,REFRESH,RELY,RENAME,REPAIR,REPEAT,REPEATABLE,REPLACE,RESET,RESPECT,RESTRICT,RETURN,RETURNS,REVOKE,RIGHT,ROLE,ROLES,ROLLBACK,ROLLUP,ROW,ROWS,SCHEMA,SCHEMAS,SECOND,SECONDS,SECURITY,SELECT,SEMI,SEPARATED,SERDE,SERDEPROPERTIES,SESSION_USER,SET,SETS,SHORT,SHOW,SINGLE,SKEWED,SMALLINT,SOME,SORT,SORTED,SOURCE,SPECIFIC,SQL,SQLEXCEPTION,SQLSTATE,START,STATISTICS,STORED,STRATIFY,STREAM,STREAMING,STRING,STRUCT,SUBSTR,SUBSTRING,SYNC,SYSTEM_TIME,SYSTEM_VERSION,TABLE,TABLES,TABLESAMPLE,TARGET,TBLPROPERTIES,TERMINATED,THEN,TIME,TIMEDIFF,TIMESTAMP,TIMESTAMPADD,TIMESTAMPDIFF,TIMESTAMP_LTZ,TIMESTAMP_NTZ,TINYINT,TO,TOUCH,TRAILING,TRANSACTION,TRANSACTIONS,TRANSFORM,TRIM,TRUE,TRUNCATE,TRY_CAST,TYPE,UNARCHIVE,UNBOUNDED,UNCACHE,UNION,UNIQUE,UNKNOWN,UNLOCK,UNPIVOT,UNSET,UNTIL,UPDATE,USE,USER,USING,VALUE,VALUES,VAR,VARCHAR,VARIABLE,VARIANT,VERSION,VIEW,VIEWS,VOID,WATERMARK,WEEK,WEEKS,WHEN,WHERE,WHILE,WINDOW,WITH,WITHIN,WITHOUT,X,YEAR,YEARS,ZONE") + assert(infoValue.getStringValue == "ADD,AFTER,AGGREGATE,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,ATOMIC,AUTHORIZATION,BEGIN,BETWEEN,BIGINT,BINARY,BINDING,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALL,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONDITION,CONSTRAINT,CONTAINS,CONTINUE,COST,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,DATA,DATABASE,DATABASES,DATE,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAY,DAYOFYEAR,DAYS,DBPROPERTIES,DEC,DECIMAL,DECLARE,DEFAULT,DEFINED,DEFINER,DELAY,DELETE,DELIMITED,DESC,DESCRIBE,DETERMINISTIC,DFS,DIRECTORIES,DIRECTORY,DISTINCT,DISTRIBUTE,DIV,DO,DOUBLE,DROP,ELSE,ELSEIF,END,ENFORCED,ESCAPE,ESCAPED,EVOLUTION,EXCEPT,EXCHANGE,EXCLUDE,EXECUTE,EXISTS,EXIT,EXPLAIN,EXPORT,EXTEND,EXTENDED,EXTERNAL,EXTRACT,FALSE,FETCH,FIELDS,FILEFORMAT,FILTER,FIRST,FLOAT,FLOW,FOLLOWING,FOR,FOREIGN,FORMAT,FORMATTED,FOUND,FROM,FULL,FUNCTION,FUNCTIONS,GENERATED,GEOGRAPHY,GEOMETRY,GLOBAL,GRANT,GROUP,GROUPING,HANDLER,HAVING,HOUR,HOURS,IDENTIFIER,IDENTITY,IF,IGNORE,ILIKE,IMMEDIATE,IMPORT,IN,INCLUDE,INCREMENT,INDEX,INDEXES,INNER,INPATH,INPUT,INPUTFORMAT,INSERT,INT,INTEGER,INTERSECT,INTERVAL,INTO,INVOKER,IS,ITEMS,ITERATE,JOIN,JSON,KEY,KEYS,LANGUAGE,LAST,LATERAL,LAZY,LEADING,LEAVE,LEFT,LEVEL,LIKE,LIMIT,LINES,LIST,LOAD,LOCAL,LOCATION,LOCK,LOCKS,LOGICAL,LONG,LOOP,MACRO,MAP,MATCHED,MATERIALIZED,MAX,MEASURE,MERGE,METRICS,MICROSECOND,MICROSECONDS,MILLISECOND,MILLISECONDS,MINUS,MINUTE,MINUTES,MODIFIES,MONTH,MONTHS,MSCK,NAME,NAMESPACE,NAMESPACES,NANOSECOND,NANOSECONDS,NATURAL,NO,NONE,NORELY,NOT,NULL,NULLS,NUMERIC,OF,OFFSET,ON,ONLY,OPTION,OPTIONS,OR,ORDER,OUT,OUTER,OUTPUTFORMAT,OVER,OVERLAPS,OVERLAY,OVERWRITE,PARTITION,PARTITIONED,PARTITIONS,PERCENT,PIVOT,PLACING,POSITION,PRECEDING,PRIMARY,PRINCIPALS,PROCEDURE,PROCEDURES,PROPERTIES,PURGE,QUARTER,QUERY,RANGE,READS,REAL,RECORDREADER,RECORDWRITER,RECOVER,RECURSION,RECURSIVE,REDUCE,REFERENCES,REFRESH,RELY,RENAME,REPAIR,REPEAT,REPEATABLE,REPLACE,RESET,RESPECT,RESTRICT,RETURN,RETURNS,REVOKE,RIGHT,ROLE,ROLES,ROLLBACK,ROLLUP,ROW,ROWS,SCHEMA,SCHEMAS,SECOND,SECONDS,SECURITY,SELECT,SEMI,SEPARATED,SERDE,SERDEPROPERTIES,SESSION_USER,SET,SETS,SHORT,SHOW,SINGLE,SKEWED,SMALLINT,SOME,SORT,SORTED,SOURCE,SPECIFIC,SQL,SQLEXCEPTION,SQLSTATE,START,STATISTICS,STORED,STRATIFY,STREAM,STREAMING,STRING,STRUCT,SUBSTR,SUBSTRING,SYNC,SYSTEM_TIME,SYSTEM_VERSION,TABLE,TABLES,TABLESAMPLE,TARGET,TBLPROPERTIES,TERMINATED,THEN,TIME,TIMEDIFF,TIMESTAMP,TIMESTAMPADD,TIMESTAMPDIFF,TIMESTAMP_LTZ,TIMESTAMP_NTZ,TINYINT,TO,TOUCH,TRAILING,TRANSACTION,TRANSACTIONS,TRANSFORM,TRIM,TRUE,TRUNCATE,TRY_CAST,TYPE,UNARCHIVE,UNBOUNDED,UNCACHE,UNION,UNIQUE,UNKNOWN,UNLOCK,UNPIVOT,UNSET,UNTIL,UPDATE,USE,USER,USING,VALUE,VALUES,VAR,VARCHAR,VARIABLE,VARIANT,VERSION,VIEW,VIEWS,VOID,WATERMARK,WEEK,WEEKS,WHEN,WHERE,WHILE,WINDOW,WITH,WITHIN,WITHOUT,X,YEAR,YEARS,ZONE") // scalastyle:on line.size.limit } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index dec947651dd6e..9f5566407e386 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.exec.{UDAF, UDF} import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF} import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, InvokeProcedures, ReplaceCharWithVarchar, ResolveDataSource, ResolveExecuteImmediate, ResolveSessionCatalog, ResolveTranspose} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, InvokeProcedures, ReplaceCharWithVarchar, ResolveDataSource, ResolveExecuteImmediate, ResolveMetricView, ResolveSessionCatalog, ResolveTranspose} import org.apache.spark.sql.catalyst.analysis.resolver.ResolverExtension import org.apache.spark.sql.catalyst.catalog.{ExternalCatalogWithListener, InvalidUDFClassException} import org.apache.spark.sql.catalyst.expressions.{Expression, ExtractSemiStructuredFields} @@ -132,6 +132,7 @@ class HiveSessionStateBuilder( new EvalSubqueriesForTimeTravel +: new DetermineTableStats(session) +: new ResolveTranspose(session) +: + ResolveMetricView(session) +: new InvokeProcedures(session) +: ResolveExecuteImmediate(session, catalogManager) +: ExtractSemiStructuredFields +: diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveMetricViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveMetricViewSuite.scala new file mode 100644 index 0000000000000..bccc2f629a158 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveMetricViewSuite.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.execution.MetricViewSuite +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.tags.SlowHiveTest + +/** + * A test suite for Hive metric view related functionality. + */ +@SlowHiveTest +class HiveMetricViewSuite extends MetricViewSuite with TestHiveSingleton