Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for optimize (file compaction) on Delta tables #934

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions core/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ statement
constraint #addTableConstraint
| ALTER TABLE table=qualifiedName
DROP CONSTRAINT (IF EXISTS)? name=identifier #dropTableConstraint
| OPTIMIZE (path=STRING | table=qualifiedName)
(WHERE partitionPredicate = exprToken)? #optimizeTable
| .*? #passThrough
;

Expand Down Expand Up @@ -124,12 +126,12 @@ number
;

constraint
: CHECK '(' checkExprToken+ ')' #checkConstraint
: CHECK '(' exprToken+ ')' #checkConstraint
;

// We don't have an expression rule in our grammar here, so we just grab the tokens and defer
// parsing them to later.
checkExprToken
exprToken
: .+?
;

Expand All @@ -139,7 +141,7 @@ nonReserved
: VACUUM | RETAIN | HOURS | DRY | RUN
| CONVERT | TO | DELTA | PARTITIONED | BY
| DESC | DESCRIBE | LIMIT | DETAIL
| GENERATE | FOR | TABLE | CHECK | EXISTS
| GENERATE | FOR | TABLE | CHECK | EXISTS | OPTIMIZE
;

// Define how the keywords above should appear in a user's SQL statement.
Expand All @@ -165,13 +167,15 @@ LIMIT: 'LIMIT';
MINUS: '-';
NOT: 'NOT' | '!';
NULL: 'NULL';
OPTIMIZE: 'OPTIMIZE';
FOR: 'FOR';
TABLE: 'TABLE';
PARTITIONED: 'PARTITIONED';
RETAIN: 'RETAIN';
RUN: 'RUN';
TO: 'TO';
VACUUM: 'VACUUM';
WHERE: 'WHERE';

// Multi-character operator tokens need to be defined even though we don't explicitly reference
// them so that they can be recognized as single tokens when parsing. If we split them up and
Expand Down
35 changes: 33 additions & 2 deletions core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,30 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
ctx.RUN != null)
}

/**
* Create a [[OptimizeTableCommand]] logical plan.
* Syntax:
* {{{
* OPTIMIZE <table-identifier> [WHERE predicate-using-partition-columns]
* }}}
* Examples:
* {{{
* OPTIMIZE '/path/to/delta/table';
* OPTIMIZE delta_table_name;
* OPTIMIZE delta.`/path/to/delta/table`;
* OPTIMIZE delta_table_name WHERE partCol = 25;
* }}}
*/
override def visitOptimizeTable(ctx: OptimizeTableContext): AnyRef = withOrigin(ctx) {
if (ctx.path == null && ctx.table == null) {
throw new ParseException("OPTIMIZE command requires a file path or table name.", ctx)
}
OptimizeTableCommand(
Option(ctx.path).map(string),
Option(ctx.table).map(visitTableIdentifier),
Option(ctx.partitionPredicate).map(extractRawText(_)))
}

override def visitDescribeDeltaDetail(
ctx: DescribeDeltaDetailContext): LogicalPlan = withOrigin(ctx) {
DescribeDeltaDetailCommand(
Expand Down Expand Up @@ -225,10 +249,17 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
// space. This produces some strange spacing (e.g. `structCol . arr [ 0 ]`), but right now we
// think that's preferable to the additional complexity involved in trying to produce cleaner
// output.
private def buildCheckConstraintText(tokens: Seq[CheckExprTokenContext]): String = {
private def buildCheckConstraintText(tokens: Seq[ExprTokenContext]): String = {
tokens.map(_.getText).mkString(" ")
}

private def extractRawText(exprContext: ExprTokenContext): String = {
// Extract the raw expression which will be parsed later
exprContext.getStart.getInputStream.getText(new Interval(
exprContext.getStart.getStartIndex,
exprContext.getStop.getStopIndex))
}

override def visitAddTableConstraint(
ctx: AddTableConstraintContext): LogicalPlan = withOrigin(ctx) {
val checkConstraint = ctx.constraint().asInstanceOf[CheckConstraintContext]
Expand All @@ -237,7 +268,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
createUnresolvedTable(ctx.table.identifier.asScala.map(_.getText).toSeq,
"ALTER TABLE ... ADD CONSTRAINT"),
ctx.name.getText,
buildCheckConstraintText(checkConstraint.checkExprToken().asScala.toSeq))
buildCheckConstraintText(checkConstraint.exprToken().asScala.toSeq))
}

override def visitDropTableConstraint(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,10 @@ object DeltaErrors
new AnalysisException("Cannot describe the history of a view.")
}

def viewNotSupported(operationName: String): Throwable = {
new AnalysisException(s"Operation $operationName can not be performed on a view")
}

def copyIntoValidationRequireDeltaTableExists: Throwable = {
new AnalysisException("COPY INTO validation failed. Target table does not exist.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,17 @@ object DeltaOperations {
override def changesData: Boolean = true
}

val OPTIMIZE_OPERATION_NAME = "OPTIMIZE"

/** Recorded when optimizing the table. */
case class Optimize(
predicate: Seq[String]
) extends Operation(OPTIMIZE_OPERATION_NAME) {
override val parameters: Map[String, Any] = Map(
"predicate" -> JsonUtils.toJson(predicate)
)
}


private def structFieldToMap(colPath: Seq[String], field: StructField): Map[String, Any] = {
Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubqueryAliases, NoSuchTableException, UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -392,11 +393,28 @@ trait DeltaCommand extends DeltaLogging {
path: Option[String],
tableIdentifier: Option[TableIdentifier],
operationName: String): DeltaLog = {
val tablePath = tableIdentifier.map { ti =>
new Path(spark.sessionState.catalog.getTableMetadata(ti).location)
}.orElse(path.map(new Path(_))).getOrElse {
throw DeltaErrors.missingTableIdentifierException(operationName)
}
val tablePath =
if (path.nonEmpty) {
new Path(path.get)
} else if (tableIdentifier.nonEmpty) {
val sessionCatalog = spark.sessionState.catalog
lazy val metadata = sessionCatalog.getTableMetadata(tableIdentifier.get)

DeltaTableIdentifier(spark, tableIdentifier.get) match {
case Some(id) if id.path.nonEmpty =>
new Path(id.path.get)
case Some(id) if id.table.nonEmpty =>
new Path(metadata.location)
case _ =>
if (metadata.tableType == CatalogTableType.VIEW) {
throw DeltaErrors.viewNotSupported(operationName)
}
throw DeltaErrors.notADeltaTableException(operationName)
}
} else {
throw DeltaErrors.missingTableIdentifierException(operationName)
}

val deltaLog = DeltaLog.forTable(spark, tablePath)
if (deltaLog.snapshot.version < 0) {
throw DeltaErrors.notADeltaTableException(
Expand Down
Loading