Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MINOR] Use Literal constants #638

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,8 @@ object DeltaLog extends DeltaLogging {
files.sparkSession.sessionState.conf.resolver,
partitionFilters,
partitionColumnPrefixes)
val columnFilter = new Column(rewrittenFilters.reduceLeftOption(And).getOrElse(Literal(true)))
val expr = rewrittenFilters.reduceLeftOption(And).getOrElse(Literal.TrueLiteral)
val columnFilter = new Column(expr)
files.filter(columnFilter)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,22 +338,22 @@ trait OptimisticTransactionImpl extends TransactionalWrite with SQLMetricsReport
}

/** Returns files matching the given predicates. */
def filterFiles(): Seq[AddFile] = filterFiles(Seq(Literal.apply(true)))
def filterFiles(): Seq[AddFile] = filterFiles(Seq(Literal.TrueLiteral))

/** Returns files matching the given predicates. */
def filterFiles(filters: Seq[Expression]): Seq[AddFile] = {
val scan = snapshot.filesForScan(Nil, filters)
val partitionFilters = filters.filter { f =>
DeltaTableUtils.isPredicatePartitionColumnsOnly(f, metadata.partitionColumns, spark)
}
readPredicates += partitionFilters.reduceLeftOption(And).getOrElse(Literal(true))
readPredicates += partitionFilters.reduceLeftOption(And).getOrElse(Literal.TrueLiteral)
readFiles ++= scan.files
scan.files
}

/** Mark the entire table as tainted by this transaction. */
def readWholeTable(): Unit = {
readPredicates += Literal(true)
readPredicates += Literal.TrueLiteral
readTheWholeTable = true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ case class DeleteCommand(
val newTarget = DeltaTableUtils.replaceFileIndex(target, baseRelation.location)

val targetDF = Dataset.ofRows(sparkSession, newTarget)
val filterCond = Not(EqualNullSafe(cond, Literal(true, BooleanType)))
val filterCond = Not(EqualNullSafe(cond, Literal.TrueLiteral))
val updatedDF = targetDF.filter(new Column(filterCond))

val rewrittenFiles = withStatusCode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ case class MergeIntoCommand(
// source DataFrame
val sourceDF = Dataset.ofRows(spark, source)
.filter(new Column(incrSourceRowCountExpr))
.filter(new Column(notMatchedClauses.head.condition.getOrElse(Literal(true))))
.filter(new Column(notMatchedClauses.head.condition.getOrElse(Literal.TrueLiteral)))

// Skip data based on the merge condition
val conjunctivePredicates = splitConjunctivePredicates(condition)
Expand Down Expand Up @@ -506,22 +506,22 @@ case class MergeIntoCommand(
val exprs = clause match {
case u: DeltaMergeIntoUpdateClause =>
// Generate update expressions and set ROW_DELETED_COL = false
u.resolvedActions.map(_.expr) :+ Literal(false) :+ incrUpdatedCountExpr
u.resolvedActions.map(_.expr) :+ Literal.FalseLiteral :+ incrUpdatedCountExpr
case _: DeltaMergeIntoDeleteClause =>
// Generate expressions to set the ROW_DELETED_COL = true
targetOutputCols :+ Literal(true) :+ incrDeletedCountExpr
targetOutputCols :+ Literal.TrueLiteral :+ incrDeletedCountExpr
}
resolveOnJoinedPlan(exprs)
}

def notMatchedClauseOutput(clause: DeltaMergeIntoInsertClause): Seq[Expression] = {
val exprs = clause.resolvedActions.map(_.expr) :+ Literal(false) :+ incrInsertedCountExpr
resolveOnJoinedPlan(exprs)
resolveOnJoinedPlan(
clause.resolvedActions.map(_.expr) :+ Literal.FalseLiteral :+ incrInsertedCountExpr)
}

def clauseCondition(clause: DeltaMergeIntoClause): Expression = {
// if condition is None, then expression always evaluates to true
val condExpr = clause.condition.getOrElse(Literal(true))
val condExpr = clause.condition.getOrElse(Literal.TrueLiteral)
resolveOnJoinedPlan(Seq(condExpr)).head
}

Expand All @@ -536,9 +536,9 @@ case class MergeIntoCommand(
notMatchedConditions = notMatchedClauses.map(clauseCondition),
notMatchedOutputs = notMatchedClauses.map(notMatchedClauseOutput),
noopCopyOutput =
resolveOnJoinedPlan(targetOutputCols :+ Literal(false) :+ incrNoopCountExpr),
resolveOnJoinedPlan(targetOutputCols :+ Literal.FalseLiteral :+ incrNoopCountExpr),
deleteRowOutput =
resolveOnJoinedPlan(targetOutputCols :+ Literal(true) :+ Literal(true)),
resolveOnJoinedPlan(targetOutputCols :+ Literal.TrueLiteral :+ Literal.TrueLiteral),
joinedAttributes = joinedPlan.output,
joinedRowEncoder = joinedRowEncoder,
outputRowEncoder = outputRowEncoder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ case class UpdateCommand(
val startTime = System.nanoTime()
val numFilesTotal = deltaLog.snapshot.numOfFiles

val updateCondition = condition.getOrElse(Literal(true, BooleanType))
val updateCondition = condition.getOrElse(Literal.TrueLiteral)
val (metadataPredicates, dataPredicates) =
DeltaTableUtils.splitMetadataAndDataPredicates(
updateCondition, txn.metadata.partitionColumns, sparkSession)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,15 +576,15 @@ private[delta] object PartitionUtils {
.orElse(dateTry)
// Then falls back to string
.getOrElse {
if (raw == DEFAULT_PARTITION_NAME) {
Literal.create(null, NullType)
} else {
Literal.create(unescapePathName(raw), StringType)
if (raw == DEFAULT_PARTITION_NAME) {
Literal.default(NullType)
} else {
Literal.create(unescapePathName(raw), StringType)
}
}
}
} else {
if (raw == DEFAULT_PARTITION_NAME) {
Literal.create(null, NullType)
Literal.default(NullType)
} else {
Literal.create(unescapePathName(raw), StringType)
}
Expand Down