From 686fc6d216c07cdbb7829f690734bdb12309314b Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 19 Mar 2019 09:07:06 +0900 Subject: [PATCH 1/7] [SPARK-27188][SS] Introduce retention of output entities for FileStreamSink --- .../structured-streaming-programming-guide.md | 5 +- .../execution/streaming/FileStreamSink.scala | 5 +- .../streaming/FileStreamSinkLog.scala | 16 +++- .../streaming/FileStreamSinkLogSuite.scala | 73 +++++++++++-------- 4 files changed, 64 insertions(+), 35 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index a371f4f50f9f..cf2f9ebe10eb 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1870,7 +1870,10 @@ Here are the details of all the sinks in Spark. File Sink Append - path: path to the output directory, must be specified. + path: path to the output directory, must be specified.
+ outputRetentionMs: time to live (TTL) for output files. Output files which batches were + committed older than TTL will be eventually excluded in metadata log. This means reader queries which read + the sink's output directory may not process them. By default it's disabled.

For file-format-specific options, see the related methods in DataFrameWriter (Scala/Java/Python/ 0, s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $defaultCompactInterval) " + "to a positive value.") + + private val ttlMs = outputTimeToLiveMs.getOrElse(Long.MaxValue) + + override def shouldRetain(log: SinkFileStatus): Boolean = { + val curTime = System.currentTimeMillis() + if (curTime - log.modificationTime > ttlMs) { + logDebug(s"${log.path} excluded by retention - current time: $curTime / " + + s"modification time: ${log.modificationTime} / TTL: $ttlMs.") + false + } else { + true + } + } } object FileStreamSinkLog { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index c53617b40e09..d3e3b9436727 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.util.Random -import org.apache.hadoop.fs.{FSDataInputStream, Path, RawLocalFileSystem} +import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path, RawLocalFileSystem} import org.apache.spark.SparkFunSuite import org.apache.spark.sql.internal.SQLConf @@ -130,6 +130,17 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { } } + private def listBatchFiles(fs: FileSystem, sinkLog: FileStreamSinkLog): Set[String] = { + fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName => + try { + getBatchIdFromFileName(fileName) + true + } catch { + case _: NumberFormatException => false + } + }.toSet + } + test("delete expired file") { // Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour // deterministically and one min batches to retain @@ -139,18 +150,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") { withFileStreamSinkLog { sinkLog => val fs = sinkLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf()) - - def listBatchFiles(): Set[String] = { - fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName => - try { - getBatchIdFromFileName(fileName) - true - } catch { - case _: NumberFormatException => false - } - }.toSet - } - + def listBatchFiles(): Set[String] = this.listBatchFiles(fs, sinkLog) sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION))) assert(Set("0") === listBatchFiles()) sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION))) @@ -174,18 +174,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2") { withFileStreamSinkLog { sinkLog => val fs = sinkLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf()) - - def listBatchFiles(): Set[String] = { - fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName => - try { - getBatchIdFromFileName(fileName) - true - } catch { - case _: NumberFormatException => false - } - }.toSet - } - + def listBatchFiles(): Set[String] = this.listBatchFiles(fs, sinkLog) sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION))) assert(Set("0") === listBatchFiles()) sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION))) @@ -206,6 +195,22 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { } } + test("filter out outdated entries when compacting") { + val curTime = System.currentTimeMillis() + withFileStreamSinkLog(sinkLog => { + val logs = Seq( + newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION, curTime), + newFakeSinkFileStatus("/a/b/y", FileStreamSinkLog.ADD_ACTION, curTime), + newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.ADD_ACTION, curTime)) + logs.foreach { log => assert(sinkLog.shouldRetain(log)) } + + val logs2 = Seq( + newFakeSinkFileStatus("/a/b/m", FileStreamSinkLog.ADD_ACTION, curTime - 80000), + newFakeSinkFileStatus("/a/b/n", FileStreamSinkLog.ADD_ACTION, curTime - 120000)) + logs2.foreach { log => assert(!sinkLog.shouldRetain(log)) } + }, Some(60000)) + } + test("read Spark 2.1.0 log format") { assert(readFromResource("file-sink-log-version-2.1.0") === Seq( SinkFileStatus("/a/b/0", 1, false, 1, 1, 100, FileStreamSinkLog.ADD_ACTION), @@ -260,23 +265,29 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { } /** - * Create a fake SinkFileStatus using path and action. Most of tests don't care about other fields - * in SinkFileStatus. + * Create a fake SinkFileStatus using path and action, and optionally modification time. + * Most of tests don't care about other fields in SinkFileStatus. */ - private def newFakeSinkFileStatus(path: String, action: String): SinkFileStatus = { + private def newFakeSinkFileStatus( + path: String, + action: String, + modificationTime: Long = Long.MaxValue): SinkFileStatus = { SinkFileStatus( path = path, size = 100L, isDir = false, - modificationTime = 100L, + modificationTime = modificationTime, blockReplication = 1, blockSize = 100L, action = action) } - private def withFileStreamSinkLog(f: FileStreamSinkLog => Unit): Unit = { + private def withFileStreamSinkLog( + f: FileStreamSinkLog => Unit, + ttl: Option[Long] = None): Unit = { withTempDir { file => - val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, file.getCanonicalPath) + val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, file.getCanonicalPath, + ttl) f(sinkLog) } } From 9bc07e36d1c43319dfdfdece4cd22d17d74ff242 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Mon, 30 Nov 2020 05:57:02 +0900 Subject: [PATCH 2/7] Introduce "retain context" to provide information which will apply to a batch of logs in shouldRetain --- .../streaming/CompactibleFileStreamLog.scala | 17 +++++++-- .../execution/streaming/FileStreamSink.scala | 6 ++-- .../streaming/FileStreamSinkLog.scala | 35 ++++++++++++++----- .../streaming/FileStreamSinkLogSuite.scala | 8 +++-- 4 files changed, 49 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 427d44b6b155..1543547d5a39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -112,7 +112,16 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( * Default implementation retains all log entries. Implementations should override the method * to change the behavior. */ - def shouldRetain(log: T): Boolean = true + def shouldRetain(log: T, context: Map[String, Any]): Boolean = true + + /** + * Construct the context which will be passed to `shouldRetain`. This method will be called once + * for a batch of `shouldRetain` calls. + * + * Default implementation provides an empty Map. Implementations should override the method + * to provide additional information. + */ + def constructRetainContext(batchId: Long): Map[String, Any] = Map.empty[String, Any] override def batchIdToPath(batchId: Long): Path = { if (isCompactionBatch(batchId, compactInterval)) { @@ -218,8 +227,9 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( * corresponding `batchId` file. It will delete expired files as well if enabled. */ private def compact(batchId: Long, logs: Array[T]): Boolean = { + val retainContext = constructRetainContext(batchId) def writeEntry(entry: T, output: OutputStream): Unit = { - if (shouldRetain(entry)) { + if (shouldRetain(entry, retainContext)) { output.write('\n') serializeEntry(entry, output) } @@ -258,7 +268,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( try { val logs = getAllValidBatches(latestId, compactInterval).flatMap { id => - filterInBatch(id)(shouldRetain).getOrElse { + val retainContext = constructRetainContext(id) + filterInBatch(id)(shouldRetain(_, retainContext)).getOrElse { throw new IllegalStateException( s"${batchIdToPath(id)} doesn't exist " + s"(latestId: $latestId, compactInterval: $compactInterval)") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 9ffba2068f01..e1c9b82ec2ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormat, FileFormatWriter} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.{SerializableConfiguration, Utils} object FileStreamSink extends Logging { // The name of the subdirectory that is used to store metadata about which files are valid. @@ -136,9 +136,9 @@ class FileStreamSink( private val basePath = new Path(path) private val logPath = getMetadataLogPath(basePath.getFileSystem(hadoopConf), basePath, sparkSession.sessionState.conf) - private val outputTimeToLive = options.get("outputRetentionMs").map(_.toLong) + private val retention = options.get("retention").map(Utils.timeStringAsMs) private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, - logPath.toString, outputTimeToLive) + logPath.toString, retention) private def basicWriteJobStatsTracker: BasicWriteJobStatsTracker = { val serializableHadoopConf = new SerializableConfiguration(hadoopConf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index b133b12cd9d3..2d0ae80cdc08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -82,7 +82,7 @@ class FileStreamSinkLog( metadataLogVersion: Int, sparkSession: SparkSession, path: String, - outputTimeToLiveMs: Option[Long] = None) + _retentionMs: Option[Long] = None) extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) { private implicit val formats = Serialization.formats(NoTypeHints) @@ -98,21 +98,40 @@ class FileStreamSinkLog( s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $defaultCompactInterval) " + "to a positive value.") - private val ttlMs = outputTimeToLiveMs.getOrElse(Long.MaxValue) + val retentionMs: Long = _retentionMs match { + case Some(retention) => + logInfo(s"Retention is set to ${_retentionMs.get} ms") + retention - override def shouldRetain(log: SinkFileStatus): Boolean = { - val curTime = System.currentTimeMillis() - if (curTime - log.modificationTime > ttlMs) { - logDebug(s"${log.path} excluded by retention - current time: $curTime / " + - s"modification time: ${log.modificationTime} / TTL: $ttlMs.") - false + case _ => Long.MaxValue + } + + override def shouldRetain(log: SinkFileStatus, context: Map[String, Any]): Boolean = { + if (retentionMs < Long.MaxValue) { + val curTime = context(FileStreamSinkLog.CONTEXT_KEY_CURRENT_TIME).asInstanceOf[Long] + if (curTime - log.modificationTime > retentionMs) { + logInfo(s"${log.path} excluded by retention - current time: $curTime / " + + s"modification time: ${log.modificationTime} / retention: $retentionMs ms.") + false + } else { + true + } } else { true } } + + override def constructRetainContext(batchId: Long): Map[String, Any] = { + if (retentionMs < Long.MaxValue) { + Map(FileStreamSinkLog.CONTEXT_KEY_CURRENT_TIME -> System.currentTimeMillis()) + } else { + Map.empty[String, Any] + } + } } object FileStreamSinkLog { val VERSION = 1 val ADD_ACTION = "add" + val CONTEXT_KEY_CURRENT_TIME = "currentTime" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index d3e3b9436727..49bae8b698ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -40,7 +40,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { test("shouldRetain") { withFileStreamSinkLog { sinkLog => val log = newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION) - assert(sinkLog.shouldRetain(log)) + assert(sinkLog.shouldRetain(log, sinkLog.constructRetainContext(1L))) } } @@ -202,12 +202,14 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION, curTime), newFakeSinkFileStatus("/a/b/y", FileStreamSinkLog.ADD_ACTION, curTime), newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.ADD_ACTION, curTime)) - logs.foreach { log => assert(sinkLog.shouldRetain(log)) } + logs.foreach { log => assert(sinkLog.shouldRetain(log, sinkLog.constructRetainContext(1L))) } val logs2 = Seq( newFakeSinkFileStatus("/a/b/m", FileStreamSinkLog.ADD_ACTION, curTime - 80000), newFakeSinkFileStatus("/a/b/n", FileStreamSinkLog.ADD_ACTION, curTime - 120000)) - logs2.foreach { log => assert(!sinkLog.shouldRetain(log)) } + logs2.foreach { log => + assert(!sinkLog.shouldRetain(log, sinkLog.constructRetainContext(1L))) + } }, Some(60000)) } From 61f9089983de45104d70c0c0d737a069ad4683df Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Mon, 30 Nov 2020 06:00:40 +0900 Subject: [PATCH 3/7] small fix --- .../spark/sql/execution/streaming/FileStreamSinkLog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index 2d0ae80cdc08..59885ba371e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -100,7 +100,7 @@ class FileStreamSinkLog( val retentionMs: Long = _retentionMs match { case Some(retention) => - logInfo(s"Retention is set to ${_retentionMs.get} ms") + logInfo(s"Retention is set to $retention ms") retention case _ => Long.MaxValue From 573959227e6a6b7785501cb5cdd18223b6579c39 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Mon, 30 Nov 2020 07:06:12 +0900 Subject: [PATCH 4/7] Fix doc --- docs/structured-streaming-programming-guide.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index cf2f9ebe10eb..0c923908bba3 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1871,9 +1871,10 @@ Here are the details of all the sinks in Spark. Append path: path to the output directory, must be specified.
- outputRetentionMs: time to live (TTL) for output files. Output files which batches were + retention: time to live (TTL) for output files. Output files which batches were committed older than TTL will be eventually excluded in metadata log. This means reader queries which read - the sink's output directory may not process them. By default it's disabled. + the sink's output directory may not process them. You can provide the value as string format of the time. (like "12h", "7d", etc.) + By default it's disabled.

For file-format-specific options, see the related methods in DataFrameWriter (
Scala/Java/Python/ Date: Mon, 30 Nov 2020 08:05:34 +0900 Subject: [PATCH 5/7] Less bugging on log messages --- .../spark/sql/execution/streaming/FileStreamSinkLog.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index 59885ba371e7..06580e0a6658 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -110,7 +110,7 @@ class FileStreamSinkLog( if (retentionMs < Long.MaxValue) { val curTime = context(FileStreamSinkLog.CONTEXT_KEY_CURRENT_TIME).asInstanceOf[Long] if (curTime - log.modificationTime > retentionMs) { - logInfo(s"${log.path} excluded by retention - current time: $curTime / " + + logDebug(s"${log.path} excluded by retention - current time: $curTime / " + s"modification time: ${log.modificationTime} / retention: $retentionMs ms.") false } else { @@ -123,7 +123,9 @@ class FileStreamSinkLog( override def constructRetainContext(batchId: Long): Map[String, Any] = { if (retentionMs < Long.MaxValue) { - Map(FileStreamSinkLog.CONTEXT_KEY_CURRENT_TIME -> System.currentTimeMillis()) + val curTime = System.currentTimeMillis() + logInfo(s"Retention will be checked based on current time $curTime.") + Map(FileStreamSinkLog.CONTEXT_KEY_CURRENT_TIME -> curTime) } else { Map.empty[String, Any] } From e1834ea80108e9072a21fa147290bf5a3188b331 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 1 Dec 2020 07:22:54 +0900 Subject: [PATCH 6/7] Simplify the interface --- .../streaming/CompactibleFileStreamLog.scala | 19 +++++-------------- .../streaming/FileStreamSinkLog.scala | 18 +++--------------- .../streaming/FileStreamSinkLogSuite.scala | 6 +++--- 3 files changed, 11 insertions(+), 32 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 1543547d5a39..74f0a2ba1767 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -112,16 +112,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( * Default implementation retains all log entries. Implementations should override the method * to change the behavior. */ - def shouldRetain(log: T, context: Map[String, Any]): Boolean = true - - /** - * Construct the context which will be passed to `shouldRetain`. This method will be called once - * for a batch of `shouldRetain` calls. - * - * Default implementation provides an empty Map. Implementations should override the method - * to provide additional information. - */ - def constructRetainContext(batchId: Long): Map[String, Any] = Map.empty[String, Any] + def shouldRetain(log: T, currentTime: Long): Boolean = true override def batchIdToPath(batchId: Long): Path = { if (isCompactionBatch(batchId, compactInterval)) { @@ -227,9 +218,9 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( * corresponding `batchId` file. It will delete expired files as well if enabled. */ private def compact(batchId: Long, logs: Array[T]): Boolean = { - val retainContext = constructRetainContext(batchId) + val curTime = System.currentTimeMillis() def writeEntry(entry: T, output: OutputStream): Unit = { - if (shouldRetain(entry, retainContext)) { + if (shouldRetain(entry, curTime)) { output.write('\n') serializeEntry(entry, output) } @@ -268,8 +259,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( try { val logs = getAllValidBatches(latestId, compactInterval).flatMap { id => - val retainContext = constructRetainContext(id) - filterInBatch(id)(shouldRetain(_, retainContext)).getOrElse { + val curTime = System.currentTimeMillis() + filterInBatch(id)(shouldRetain(_, curTime)).getOrElse { throw new IllegalStateException( s"${batchIdToPath(id)} doesn't exist " + s"(latestId: $latestId, compactInterval: $compactInterval)") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index 06580e0a6658..2d70d95c6850 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -106,11 +106,10 @@ class FileStreamSinkLog( case _ => Long.MaxValue } - override def shouldRetain(log: SinkFileStatus, context: Map[String, Any]): Boolean = { + override def shouldRetain(log: SinkFileStatus, currentTime: Long): Boolean = { if (retentionMs < Long.MaxValue) { - val curTime = context(FileStreamSinkLog.CONTEXT_KEY_CURRENT_TIME).asInstanceOf[Long] - if (curTime - log.modificationTime > retentionMs) { - logDebug(s"${log.path} excluded by retention - current time: $curTime / " + + if (currentTime - log.modificationTime > retentionMs) { + logDebug(s"${log.path} excluded by retention - current time: $currentTime / " + s"modification time: ${log.modificationTime} / retention: $retentionMs ms.") false } else { @@ -120,20 +119,9 @@ class FileStreamSinkLog( true } } - - override def constructRetainContext(batchId: Long): Map[String, Any] = { - if (retentionMs < Long.MaxValue) { - val curTime = System.currentTimeMillis() - logInfo(s"Retention will be checked based on current time $curTime.") - Map(FileStreamSinkLog.CONTEXT_KEY_CURRENT_TIME -> curTime) - } else { - Map.empty[String, Any] - } - } } object FileStreamSinkLog { val VERSION = 1 val ADD_ACTION = "add" - val CONTEXT_KEY_CURRENT_TIME = "currentTime" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index 49bae8b698ef..bdd6bec8e782 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -40,7 +40,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { test("shouldRetain") { withFileStreamSinkLog { sinkLog => val log = newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION) - assert(sinkLog.shouldRetain(log, sinkLog.constructRetainContext(1L))) + assert(sinkLog.shouldRetain(log, System.currentTimeMillis())) } } @@ -202,13 +202,13 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION, curTime), newFakeSinkFileStatus("/a/b/y", FileStreamSinkLog.ADD_ACTION, curTime), newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.ADD_ACTION, curTime)) - logs.foreach { log => assert(sinkLog.shouldRetain(log, sinkLog.constructRetainContext(1L))) } + logs.foreach { log => assert(sinkLog.shouldRetain(log, curTime)) } val logs2 = Seq( newFakeSinkFileStatus("/a/b/m", FileStreamSinkLog.ADD_ACTION, curTime - 80000), newFakeSinkFileStatus("/a/b/n", FileStreamSinkLog.ADD_ACTION, curTime - 120000)) logs2.foreach { log => - assert(!sinkLog.shouldRetain(log, sinkLog.constructRetainContext(1L))) + assert(!sinkLog.shouldRetain(log, curTime)) } }, Some(60000)) } From 8aceb86a3a0df6250e54d4b207aa9dea7ec93fbd Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 1 Dec 2020 12:33:06 +0900 Subject: [PATCH 7/7] Move currentTimeMillis out of loop --- .../sql/execution/streaming/CompactibleFileStreamLog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 74f0a2ba1767..3c76306f20cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -250,6 +250,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( * Returns all files except the deleted ones. */ def allFiles(): Array[T] = { + val curTime = System.currentTimeMillis() var latestId = getLatestBatchId().getOrElse(-1L) // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileIndex` // is calling this method. This loop will retry the reading to deal with the @@ -259,7 +260,6 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( try { val logs = getAllValidBatches(latestId, compactInterval).flatMap { id => - val curTime = System.currentTimeMillis() filterInBatch(id)(shouldRetain(_, curTime)).getOrElse { throw new IllegalStateException( s"${batchIdToPath(id)} doesn't exist " +