diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 0cc037b157e0..d747e1534e3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1441,7 +1441,7 @@ class Dataset[T] private[sql]( if (sqlContext.conf.supportQuotedRegexColumnName) { colRegex(colName) } else { - Column(addDataFrameIdToCol(resolve(colName))) + createColumnWithPlanId(colName) } } @@ -1457,25 +1457,6 @@ class Dataset[T] private[sql]( def metadataColumn(colName: String): Column = Column(queryExecution.analyzed.getMetadataAttributeByName(colName)) - // Attach the dataset id and column position to the column reference, so that we can detect - // ambiguous self-join correctly. See the rule `DetectAmbiguousSelfJoin`. - // This must be called before we return a `Column` that contains `AttributeReference`. - // Note that, the metadata added here are only available in the analyzer, as the analyzer rule - // `DetectAmbiguousSelfJoin` will remove it. - private def addDataFrameIdToCol(expr: NamedExpression): NamedExpression = { - val newExpr = expr transform { - case a: AttributeReference - if sparkSession.conf.get(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED) => - val metadata = new MetadataBuilder() - .withMetadata(a.metadata) - .putLong(Dataset.DATASET_ID_KEY, id) - .putLong(Dataset.COL_POS_KEY, logicalPlan.output.indexWhere(a.semanticEquals)) - .build() - a.withMetadata(metadata) - } - newExpr.asInstanceOf[NamedExpression] - } - /** * Selects column based on the column name specified as a regex and returns it as [[Column]]. * @group untypedrel @@ -1489,8 +1470,18 @@ class Dataset[T] private[sql]( case ParserUtils.qualifiedEscapedIdentifier(nameParts, columnNameRegex) => Column(UnresolvedRegex(columnNameRegex, Some(nameParts), caseSensitive)) case _ => - Column(addDataFrameIdToCol(resolve(colName))) + createColumnWithPlanId(colName) + } + } + + private def createColumnWithPlanId(colName: String) = { + val expr = resolve(colName) match { + case attr: AttributeReference => UnresolvedAttribute(Seq(attr.name)) + case _ => UnresolvedAttribute.quotedString(colName) } + // reuse existing DATASET_ID_KEY as the PLAN_ID + expr.setTagValue(LogicalPlan.PLAN_ID_TAG, id) + Column(expr) } /**