Skip to content

Commit

Permalink
Add support for optimize (file compaction) on Delta tables
Browse files Browse the repository at this point in the history
This PR adds the functionality to compact small files in a Delta table into large files through OPTIMIZE SQL command. The file compaction process potentially improves the performance of read queries on Delta tables. This processing (removing small files and adding large files) doesn't change the data in the table and the changes are committed transactionally to the Delta log.

**Syntax of the SQL command**
```
OPTIMIZE ('/path/to/dir' | delta.table) [WHERE part = 25];
```
* The SQL command allows selecting subset of partitions to operate file compaction on.

**Configuration options**
- `optimize.minFileSize` - Files which are smaller than this threshold (in bytes) will be grouped together and rewritten as larger files.
- `optimize.maxFileSize` - Target file size produced by the OPTIMIZE command.
- `optimize.maxThreads` - Maximum number of parallel jobs allowed in OPTIMIZE command

New test suite `OptimizeCompactionSuite.scala`

Closes delta-io#934

GitOrigin-RevId: f818d49b0f13296768e61f9f06fadf33a7831056
  • Loading branch information
vkorukanti authored and jbguerraz committed Jul 6, 2022
1 parent f74ca6b commit ddee78c
Show file tree
Hide file tree
Showing 12 changed files with 1,193 additions and 11 deletions.
10 changes: 7 additions & 3 deletions core/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ statement
constraint #addTableConstraint
| ALTER TABLE table=qualifiedName
DROP CONSTRAINT (IF EXISTS)? name=identifier #dropTableConstraint
| OPTIMIZE (path=STRING | table=qualifiedName)
(WHERE partitionPredicate = exprToken)? #optimizeTable
| .*? #passThrough
;

Expand Down Expand Up @@ -124,12 +126,12 @@ number
;

constraint
: CHECK '(' checkExprToken+ ')' #checkConstraint
: CHECK '(' exprToken+ ')' #checkConstraint
;

// We don't have an expression rule in our grammar here, so we just grab the tokens and defer
// parsing them to later.
checkExprToken
exprToken
: .+?
;

Expand All @@ -139,7 +141,7 @@ nonReserved
: VACUUM | RETAIN | HOURS | DRY | RUN
| CONVERT | TO | DELTA | PARTITIONED | BY
| DESC | DESCRIBE | LIMIT | DETAIL
| GENERATE | FOR | TABLE | CHECK | EXISTS
| GENERATE | FOR | TABLE | CHECK | EXISTS | OPTIMIZE
;

// Define how the keywords above should appear in a user's SQL statement.
Expand All @@ -165,13 +167,15 @@ LIMIT: 'LIMIT';
MINUS: '-';
NOT: 'NOT' | '!';
NULL: 'NULL';
OPTIMIZE: 'OPTIMIZE';
FOR: 'FOR';
TABLE: 'TABLE';
PARTITIONED: 'PARTITIONED';
RETAIN: 'RETAIN';
RUN: 'RUN';
TO: 'TO';
VACUUM: 'VACUUM';
WHERE: 'WHERE';

// Multi-character operator tokens need to be defined even though we don't explicitly reference
// them so that they can be recognized as single tokens when parsing. If we split them up and
Expand Down
35 changes: 33 additions & 2 deletions core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,30 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
ctx.RUN != null)
}

/**
* Create a [[OptimizeTableCommand]] logical plan.
* Syntax:
* {{{
* OPTIMIZE <table-identifier> [WHERE predicate-using-partition-columns]
* }}}
* Examples:
* {{{
* OPTIMIZE '/path/to/delta/table';
* OPTIMIZE delta_table_name;
* OPTIMIZE delta.`/path/to/delta/table`;
* OPTIMIZE delta_table_name WHERE partCol = 25;
* }}}
*/
override def visitOptimizeTable(ctx: OptimizeTableContext): AnyRef = withOrigin(ctx) {
if (ctx.path == null && ctx.table == null) {
throw new ParseException("OPTIMIZE command requires a file path or table name.", ctx)
}
OptimizeTableCommand(
Option(ctx.path).map(string),
Option(ctx.table).map(visitTableIdentifier),
Option(ctx.partitionPredicate).map(extractRawText(_)))
}

override def visitDescribeDeltaDetail(
ctx: DescribeDeltaDetailContext): LogicalPlan = withOrigin(ctx) {
DescribeDeltaDetailCommand(
Expand Down Expand Up @@ -225,10 +249,17 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
// space. This produces some strange spacing (e.g. `structCol . arr [ 0 ]`), but right now we
// think that's preferable to the additional complexity involved in trying to produce cleaner
// output.
private def buildCheckConstraintText(tokens: Seq[CheckExprTokenContext]): String = {
private def buildCheckConstraintText(tokens: Seq[ExprTokenContext]): String = {
tokens.map(_.getText).mkString(" ")
}

private def extractRawText(exprContext: ExprTokenContext): String = {
// Extract the raw expression which will be parsed later
exprContext.getStart.getInputStream.getText(new Interval(
exprContext.getStart.getStartIndex,
exprContext.getStop.getStopIndex))
}

override def visitAddTableConstraint(
ctx: AddTableConstraintContext): LogicalPlan = withOrigin(ctx) {
val checkConstraint = ctx.constraint().asInstanceOf[CheckConstraintContext]
Expand All @@ -237,7 +268,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
createUnresolvedTable(ctx.table.identifier.asScala.map(_.getText).toSeq,
"ALTER TABLE ... ADD CONSTRAINT"),
ctx.name.getText,
buildCheckConstraintText(checkConstraint.checkExprToken().asScala.toSeq))
buildCheckConstraintText(checkConstraint.exprToken().asScala.toSeq))
}

override def visitDropTableConstraint(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,10 @@ object DeltaErrors
new AnalysisException("Cannot describe the history of a view.")
}

def viewNotSupported(operationName: String): Throwable = {
new AnalysisException(s"Operation $operationName can not be performed on a view")
}

def copyIntoValidationRequireDeltaTableExists: Throwable = {
new AnalysisException("COPY INTO validation failed. Target table does not exist.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,17 @@ object DeltaOperations {
override def changesData: Boolean = true
}

val OPTIMIZE_OPERATION_NAME = "OPTIMIZE"

/** Recorded when optimizing the table. */
case class Optimize(
predicate: Seq[String]
) extends Operation(OPTIMIZE_OPERATION_NAME) {
override val parameters: Map[String, Any] = Map(
"predicate" -> JsonUtils.toJson(predicate)
)
}


private def structFieldToMap(colPath: Seq[String], field: StructField): Map[String, Any] = {
Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf}
import org.apache.spark.sql.delta.stats.FileSizeHistogram
import org.apache.spark.sql.delta.util.DeltaFileOperations
import org.apache.spark.sql.delta.util.FileNames.deltaFile

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubqueryAliases, NoSuchTableException, UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -392,11 +392,28 @@ trait DeltaCommand extends DeltaLogging {
path: Option[String],
tableIdentifier: Option[TableIdentifier],
operationName: String): DeltaLog = {
val tablePath = tableIdentifier.map { ti =>
new Path(spark.sessionState.catalog.getTableMetadata(ti).location)
}.orElse(path.map(new Path(_))).getOrElse {
throw DeltaErrors.missingTableIdentifierException(operationName)
}
val tablePath =
if (path.nonEmpty) {
new Path(path.get)
} else if (tableIdentifier.nonEmpty) {
val sessionCatalog = spark.sessionState.catalog
lazy val metadata = sessionCatalog.getTableMetadata(tableIdentifier.get)

DeltaTableIdentifier(spark, tableIdentifier.get) match {
case Some(id) if id.path.nonEmpty =>
new Path(id.path.get)
case Some(id) if id.table.nonEmpty =>
new Path(metadata.location)
case _ =>
if (metadata.tableType == CatalogTableType.VIEW) {
throw DeltaErrors.viewNotSupported(operationName)
}
throw DeltaErrors.notADeltaTableException(operationName)
}
} else {
throw DeltaErrors.missingTableIdentifierException(operationName)
}

val deltaLog = DeltaLog.forTable(spark, tablePath)
if (deltaLog.snapshot.version < 0) {
throw DeltaErrors.notADeltaTableException(
Expand Down
Loading

0 comments on commit ddee78c

Please sign in to comment.