Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 0.5] Remove checkpoint folder when vacuuming index #629

Merged
merged 1 commit into from
Sep 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import scala.collection.JavaConverters._
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata}
import org.opensearch.flint.common.metadata.log.{FlintMetadataLogService, OptimisticTransaction}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.common.metadata.log.FlintMetadataLogService
import org.opensearch.flint.common.metadata.log.OptimisticTransaction
import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder
Expand Down Expand Up @@ -272,8 +271,17 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
.transientLog(latest => latest.copy(state = VACUUMING))
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {
val options = flintIndexMetadataService.getIndexMetadata(indexName).options.asScala
flintClient.deleteIndex(indexName)
flintIndexMetadataService.deleteIndexMetadata(indexName)

// Remove checkpoint folder if defined
val checkpoint = options
.get(CHECKPOINT_LOCATION.toString)
.map(path => new FlintSparkCheckpoint(spark, path.asInstanceOf[String]))
if (checkpoint.isDefined) {
checkpoint.get.delete()
}
true
})
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import java.util.UUID

import org.apache.hadoop.fs.{FSDataOutputStream, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods

/**
* Manages the checkpoint directory for Flint indexes.
*
* @param spark
* The SparkSession used for Hadoop configuration.
* @param checkpointLocation
* The path to the checkpoint directory.
*/
class FlintSparkCheckpoint(spark: SparkSession, val checkpointLocation: String) extends Logging {

/** Checkpoint root directory path */
private val checkpointRootDir = new Path(checkpointLocation)

/** Spark checkpoint manager */
private val checkpointManager =
CheckpointFileManager.create(checkpointRootDir, spark.sessionState.newHadoopConf())

/**
* Checks if the checkpoint directory exists.
*
* @return
* true if the checkpoint directory exists, false otherwise.
*/
def exists(): Boolean = checkpointManager.exists(checkpointRootDir)

/**
* Creates the checkpoint directory and all necessary parent directories if they do not already
* exist.
*
* @return
* The path to the created checkpoint directory.
*/
def createDirectory(): Path = {
checkpointManager.createCheckpointDirectory
}

/**
* Creates a temporary file in the checkpoint directory.
*
* @return
* An optional FSDataOutputStream for the created temporary file, or None if creation fails.
*/
def createTempFile(): Option[FSDataOutputStream] = {
checkpointManager match {
case manager: RenameHelperMethods =>
val tempFilePath =
new Path(createDirectory(), s"${UUID.randomUUID().toString}.tmp")
Some(manager.createTempFile(tempFilePath))
case _ =>
logInfo(s"Cannot create temp file at checkpoint location: ${checkpointManager.getClass}")
None
}
}

/**
* Deletes the checkpoint directory. This method attempts to delete the checkpoint directory and
* captures any exceptions that occur. Exceptions are logged but ignored so as not to disrupt
* the caller's workflow.
*/
def delete(): Unit = {
try {
checkpointManager.delete(checkpointRootDir)
logInfo(s"Checkpoint directory $checkpointRootDir deleted")
} catch {
case e: Exception =>
logError(s"Error deleting checkpoint directory $checkpointRootDir", e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@

package org.opensearch.flint.spark

import java.util.UUID

import org.apache.hadoop.fs.Path
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
Expand All @@ -16,8 +13,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
import org.apache.spark.sql.flint.{loadTable, parseTableName, qualifyTableName}

/**
Expand Down Expand Up @@ -72,35 +67,20 @@ trait FlintSparkValidationHelper extends Logging {
*/
def isCheckpointLocationAccessible(spark: SparkSession, checkpointLocation: String): Boolean = {
try {
val checkpointManager =
CheckpointFileManager.create(
new Path(checkpointLocation),
spark.sessionState.newHadoopConf())
val checkpoint = new FlintSparkCheckpoint(spark, checkpointLocation)

/*
* Read permission check: The primary intent here is to catch any exceptions
* during the accessibility check. The actual result is ignored, as the write
* permission check below will create any necessary sub-folders.
*/
checkpointManager.exists(new Path(checkpointLocation))
checkpoint.exists()

/*
* Write permission check: Attempt to create a temporary file to verify write access.
* The temporary file is left in place in case additional delete permissions required.
*/
checkpointManager match {
case manager: RenameHelperMethods =>
val tempFilePath =
new Path(
checkpointManager
.createCheckpointDirectory(), // create all parent folders if needed
s"${UUID.randomUUID().toString}.tmp")

manager.createTempFile(tempFilePath).close()
case _ =>
logInfo(
s"Bypass checkpoint location write permission check: ${checkpointManager.getClass}")
}
checkpoint.createTempFile().foreach(_.close())

true
} catch {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.scalatest.matchers.should.Matchers

import org.apache.spark.FlintSuite

class FlintSparkCheckpointSuite extends FlintSuite with Matchers {

test("exists") {
withCheckpoint { checkpoint =>
checkpoint.exists() shouldBe false
checkpoint.createDirectory()
checkpoint.exists() shouldBe true
}
}

test("create directory") {
withTempPath { tempDir =>
val checkpointDir = new Path(tempDir.getAbsolutePath, "sub/subsub")
val checkpoint = new FlintSparkCheckpoint(spark, checkpointDir.toString)
checkpoint.createDirectory()

tempDir.exists() shouldBe true
}
}

test("create temp file") {
withCheckpoint { checkpoint =>
val tempFile = checkpoint.createTempFile()
tempFile shouldBe defined

// Close the stream to ensure the file is flushed
tempFile.get.close()

// Assert that there is a .tmp file
listFiles(checkpoint.checkpointLocation)
.exists(isTempFile) shouldBe true
}
}

test("delete") {
withCheckpoint { checkpoint =>
checkpoint.createDirectory()
checkpoint.delete()
checkpoint.exists() shouldBe false
}
}

private def withCheckpoint(block: FlintSparkCheckpoint => Unit): Unit = {
withTempPath { checkpointDir =>
val checkpoint = new FlintSparkCheckpoint(spark, checkpointDir.getAbsolutePath)
block(checkpoint)
}
}

private def listFiles(dir: String): Array[FileStatus] = {
val fs = FileSystem.get(spark.sessionState.newHadoopConf())
fs.listStatus(new Path(dir))
}

private def isTempFile(file: FileStatus): Boolean = {
file.isFile && file.getPath.getName.endsWith(".tmp")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,34 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
flint.describeIndex(testFlintIndex) shouldBe empty
}

test("vacuum covering index with checkpoint") {
withTempDir { checkpointDir =>
flint
.coveringIndex()
.name(testIndex)
.onTable(testTable)
.addIndexColumns("name", "age")
.options(
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"checkpoint_location" -> checkpointDir.getAbsolutePath)),
testFlintIndex)
.create()
flint.refreshIndex(testFlintIndex)

val job = spark.streams.active.find(_.name == testFlintIndex)
awaitStreamingComplete(job.get.id.toString)
flint.deleteIndex(testFlintIndex)

// Checkpoint folder should be removed after vacuum
checkpointDir.exists() shouldBe true
sql(s"VACUUM INDEX $testIndex ON $testTable")
flint.describeIndex(testFlintIndex) shouldBe empty
checkpointDir.exists() shouldBe false
}
}

private def awaitRefreshComplete(query: String): Unit = {
sql(query)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,5 +408,33 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
flint.describeIndex(testFlintIndex) shouldBe empty
}

test("vacuum materialized view with checkpoint") {
withTempDir { checkpointDir =>
flint
.materializedView()
.name(testMvName)
.query(testQuery)
.options(
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"checkpoint_location" -> checkpointDir.getAbsolutePath,
"watermark_delay" -> "1 Second")),
testFlintIndex)
.create()
flint.refreshIndex(testFlintIndex)

val job = spark.streams.active.find(_.name == testFlintIndex)
awaitStreamingComplete(job.get.id.toString)
flint.deleteIndex(testFlintIndex)

// Checkpoint folder should be removed after vacuum
checkpointDir.exists() shouldBe true
sql(s"VACUUM MATERIALIZED VIEW $testMvName")
flint.describeIndex(testFlintIndex) shouldBe empty
checkpointDir.exists() shouldBe false
}
}

private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts)
}
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,33 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}
}

test("vacuum skipping index with checkpoint") {
withTempDir { checkpointDir =>
flint
.skippingIndex()
.onTable(testTable)
.addValueSet("address")
.options(
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"checkpoint_location" -> checkpointDir.getAbsolutePath)),
testIndex)
.create()
flint.refreshIndex(testIndex)

val job = spark.streams.active.find(_.name == testIndex)
awaitStreamingComplete(job.get.id.toString)
flint.deleteIndex(testIndex)

// Checkpoint folder should be removed after vacuum
checkpointDir.exists() shouldBe true
flint.vacuumIndex(testIndex)
flint.describeIndex(testIndex) shouldBe None
checkpointDir.exists() shouldBe false
}
}

// Custom matcher to check if a SparkPlan uses FlintSparkSkippingFileIndex
def useFlintSparkSkippingFileIndex(
subMatcher: Matcher[FlintSparkSkippingFileIndex]): Matcher[SparkPlan] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,33 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit
flint.describeIndex(testIndex) shouldBe empty
}

test("vacuum skipping index with checkpoint") {
withTempDir { checkpointDir =>
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year")
.options(
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"checkpoint_location" -> checkpointDir.getAbsolutePath)),
testIndex)
.create()
flint.refreshIndex(testIndex)

val job = spark.streams.active.find(_.name == testIndex)
awaitStreamingComplete(job.get.id.toString)
flint.deleteIndex(testIndex)

// Checkpoint folder should be removed after vacuum
checkpointDir.exists() shouldBe true
sql(s"VACUUM SKIPPING INDEX ON $testTable")
flint.describeIndex(testIndex) shouldBe empty
checkpointDir.exists() shouldBe false
}
}

test("analyze skipping index with for supported data types") {
val result = sql(s"ANALYZE SKIPPING INDEX ON $testTable")

Expand Down
Loading