Skip to content

Commit

Permalink
[CARMEL-2696][FOLLOWUP] Cancel the auto vacuum after converting back …
Browse files Browse the repository at this point in the history
…to parquet (delta-io#83)
  • Loading branch information
LantaoJin authored and GitHub Enterprise committed Jun 17, 2020
1 parent e105dde commit 74df399
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import scala.util.{Failure, Success, Try}
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.delta.services.{ConvertToParquetEvent, DeltaTableMetadata}
import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, OptimisticTransaction}
import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableAddRangePartitionCommand, CommandUtils}
import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableAddRangePartitionCommand, CommandUtils, DDLUtils}
import org.apache.spark.sql.types.StructType

case class ConvertBackCommand(
Expand Down Expand Up @@ -119,6 +120,18 @@ case class ConvertBackCommand(
spark.sessionState.catalog.invalidateCachedTable(qualified)
spark.catalog.refreshTable(newTable.identifier.quotedString)
CommandUtils.updateTableStats(spark, newTable)

removeFromMetaTable(spark, convertProperties)
Seq.empty[Row]
}

private def removeFromMetaTable(
spark: SparkSession, convertProperties: ConvertProperties): Unit = {
convertProperties.catalogTable.foreach { table =>
val searchCondition = DeltaTableMetadata.buildSearchCondition(
table.identifier.database.getOrElse(""), table.identifier.table)
val isTemp = DDLUtils.isTemporaryTable(table)
spark.sharedState.externalCatalog.postToAll(ConvertToParquetEvent(searchCondition, isTemp))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ class DeltaTableListener(validate: ValidateTask) extends SparkListener with Logg
DeltaTableMetadata.insertIntoMetadataTable(spark, e.metadata)
}
})
case c @ ConvertToParquetEvent(_, isTemporaryTable) if isTemporaryTable => // do nothing
case e: ConvertToParquetEvent =>
validate.disableVacuum(e.metadata)
metaHandlerThread.execute(new Runnable {
override def run(): Unit = {
DeltaTableMetadata.deleteFromMetadataTable(spark, e.metadata)
}
})
case e: UpdateDeltaEvent =>
if (validate.deltaTableToVacuumTask.contains(e.metadata)) {
validate.deltaTableToVacuumTask(e.metadata).foreach(_.cancel(true))
Expand All @@ -65,8 +73,7 @@ class DeltaTableListener(validate: ValidateTask) extends SparkListener with Logg
val searchCondition = DeltaTableMetadata.buildSearchCondition(e.database, e.name)
if (validate.deltaTableToVacuumTask.contains(searchCondition)) {
val old = validate.deltaTableToVacuumTask.keySet.find(_.equals(searchCondition)).get
validate.deltaTableToVacuumTask(searchCondition).foreach(_.cancel(true))
validate.deltaTableToVacuumTask.remove(searchCondition)
validate.disableVacuum(old)
val newTableIdent = TableIdentifier(e.newName, Some(e.database))
val newTable = spark.sessionState.catalog.getTableMetadata(newTableIdent)
val newMetadata = DeltaTableMetadata(e.database, e.newName,
Expand Down Expand Up @@ -102,7 +109,6 @@ class DeltaTableListener(validate: ValidateTask) extends SparkListener with Logg
}
metaHandlerThread.execute(new Runnable {
override def run(): Unit = {
// todo this has performance problem till we changed the underlay storage
DeltaTableMetadata.deleteFromMetadataTable(spark, searchCondition)
}
})
Expand All @@ -123,6 +129,12 @@ case class ConvertToDeltaEvent(
override val database: String = metadata.db
}

case class ConvertToParquetEvent(
metadata: DeltaTableMetadata, isTemporaryTable: Boolean) extends DeltaMetaEvent {
override val name: String = metadata.tbl
override val database: String = metadata.db
}

case class UpdateDeltaEvent(metadata: DeltaTableMetadata) extends DeltaMetaEvent {
override val name: String = metadata.tbl
override val database: String = metadata.db
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,12 @@ class AutoVacuumSuite extends QueryTest
try {
try {
val create = conn.createStatement
create.execute(DELTA_META_TABLE_DROP_SQL)
try {
create.execute(DELTA_META_TABLE_DROP_SQL)
} catch {
case e: SQLException =>
// table not exists
}
create.execute(DELTA_META_TABLE_CREATION_SQL2)
create.close()
} catch {
Expand Down Expand Up @@ -326,6 +331,13 @@ class AutoVacuumSuite extends QueryTest
sql("SHOW DELTAS"),
Row("default", "delta3", "",
s"${getTableLocation("delta3")}", true, 2L) :: Nil)

sql(
"""
|CONVERT TO PARQUET delta3
|""".stripMargin)
Thread.sleep(7000)
checkAnswer(sql("SHOW DELTAS"), Nil)
}
}
}
Expand Down

0 comments on commit 74df399

Please sign in to comment.