-
Notifications
You must be signed in to change notification settings - Fork 33
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
* Add checkpoint abstraction and IT * Add UT and more doc * Add more IT --------- (cherry picked from commit 88ad15f) Signed-off-by: Chen Dai <daichen@amazon.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
- Loading branch information
1 parent
1f2472d
commit 0524ae9
Showing
8 changed files
with
278 additions
and
25 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
85 changes: 85 additions & 0 deletions
85
flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkCheckpoint.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.spark | ||
|
||
import java.util.UUID | ||
|
||
import org.apache.hadoop.fs.{FSDataOutputStream, Path} | ||
|
||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.sql.SparkSession | ||
import org.apache.spark.sql.execution.streaming.CheckpointFileManager | ||
import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods | ||
|
||
/** | ||
* Manages the checkpoint directory for Flint indexes. | ||
* | ||
* @param spark | ||
* The SparkSession used for Hadoop configuration. | ||
* @param checkpointLocation | ||
* The path to the checkpoint directory. | ||
*/ | ||
class FlintSparkCheckpoint(spark: SparkSession, val checkpointLocation: String) extends Logging { | ||
|
||
/** Checkpoint root directory path */ | ||
private val checkpointRootDir = new Path(checkpointLocation) | ||
|
||
/** Spark checkpoint manager */ | ||
private val checkpointManager = | ||
CheckpointFileManager.create(checkpointRootDir, spark.sessionState.newHadoopConf()) | ||
|
||
/** | ||
* Checks if the checkpoint directory exists. | ||
* | ||
* @return | ||
* true if the checkpoint directory exists, false otherwise. | ||
*/ | ||
def exists(): Boolean = checkpointManager.exists(checkpointRootDir) | ||
|
||
/** | ||
* Creates the checkpoint directory and all necessary parent directories if they do not already | ||
* exist. | ||
* | ||
* @return | ||
* The path to the created checkpoint directory. | ||
*/ | ||
def createDirectory(): Path = { | ||
checkpointManager.createCheckpointDirectory | ||
} | ||
|
||
/** | ||
* Creates a temporary file in the checkpoint directory. | ||
* | ||
* @return | ||
* An optional FSDataOutputStream for the created temporary file, or None if creation fails. | ||
*/ | ||
def createTempFile(): Option[FSDataOutputStream] = { | ||
checkpointManager match { | ||
case manager: RenameHelperMethods => | ||
val tempFilePath = | ||
new Path(createDirectory(), s"${UUID.randomUUID().toString}.tmp") | ||
Some(manager.createTempFile(tempFilePath)) | ||
case _ => | ||
logInfo(s"Cannot create temp file at checkpoint location: ${checkpointManager.getClass}") | ||
None | ||
} | ||
} | ||
|
||
/** | ||
* Deletes the checkpoint directory. This method attempts to delete the checkpoint directory and | ||
* captures any exceptions that occur. Exceptions are logged but ignored so as not to disrupt | ||
* the caller's workflow. | ||
*/ | ||
def delete(): Unit = { | ||
try { | ||
checkpointManager.delete(checkpointRootDir) | ||
logInfo(s"Checkpoint directory $checkpointRootDir deleted") | ||
} catch { | ||
case e: Exception => | ||
logError(s"Error deleting checkpoint directory $checkpointRootDir", e) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
70 changes: 70 additions & 0 deletions
70
...ark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkCheckpointSuite.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.spark | ||
|
||
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} | ||
import org.scalatest.matchers.should.Matchers | ||
|
||
import org.apache.spark.FlintSuite | ||
|
||
class FlintSparkCheckpointSuite extends FlintSuite with Matchers { | ||
|
||
test("exists") { | ||
withCheckpoint { checkpoint => | ||
checkpoint.exists() shouldBe false | ||
checkpoint.createDirectory() | ||
checkpoint.exists() shouldBe true | ||
} | ||
} | ||
|
||
test("create directory") { | ||
withTempPath { tempDir => | ||
val checkpointDir = new Path(tempDir.getAbsolutePath, "sub/subsub") | ||
val checkpoint = new FlintSparkCheckpoint(spark, checkpointDir.toString) | ||
checkpoint.createDirectory() | ||
|
||
tempDir.exists() shouldBe true | ||
} | ||
} | ||
|
||
test("create temp file") { | ||
withCheckpoint { checkpoint => | ||
val tempFile = checkpoint.createTempFile() | ||
tempFile shouldBe defined | ||
|
||
// Close the stream to ensure the file is flushed | ||
tempFile.get.close() | ||
|
||
// Assert that there is a .tmp file | ||
listFiles(checkpoint.checkpointLocation) | ||
.exists(isTempFile) shouldBe true | ||
} | ||
} | ||
|
||
test("delete") { | ||
withCheckpoint { checkpoint => | ||
checkpoint.createDirectory() | ||
checkpoint.delete() | ||
checkpoint.exists() shouldBe false | ||
} | ||
} | ||
|
||
private def withCheckpoint(block: FlintSparkCheckpoint => Unit): Unit = { | ||
withTempPath { checkpointDir => | ||
val checkpoint = new FlintSparkCheckpoint(spark, checkpointDir.getAbsolutePath) | ||
block(checkpoint) | ||
} | ||
} | ||
|
||
private def listFiles(dir: String): Array[FileStatus] = { | ||
val fs = FileSystem.get(spark.sessionState.newHadoopConf()) | ||
fs.listStatus(new Path(dir)) | ||
} | ||
|
||
private def isTempFile(file: FileStatus): Boolean = { | ||
file.isFile && file.getPath.getName.endsWith(".tmp") | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters