Skip to content

Commit e8c14d6

Browse files
committed
Address more comments
1 parent 48d7fbf commit e8c14d6

File tree

3 files changed

+61
-60
lines changed

3 files changed

+61
-60
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -60,20 +60,20 @@ class FileStreamSinkLog(sqlContext: SQLContext, path: String)
6060
/**
6161
* If we delete the old files after compaction at once, there is a race condition in S3: other
6262
* processes may see the old files are deleted but still cannot see the compaction file. The user
63-
* should set a reasonable `fileExpiredTimeMS`. We will wait until then so that the compaction
63+
* should set a reasonable `fileCleanupDelayMs`. We will wait until then so that the compaction
6464
* file is guaranteed to be visible for all readers
6565
*/
66-
private val fileExpiredTimeMs = sqlContext.getConf(SQLConf.FILE_STREAM_SINK_LOG_EXPIRED_TIME)
66+
private val fileCleanupDelayMs = sqlContext.getConf(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY)
6767

68-
private val isDeletingExpiredLog = sqlContext.getConf(SQLConf.FILE_STREAM_SINK_LOG_DELETE)
68+
private val isDeletingExpiredLog = sqlContext.getConf(SQLConf.FILE_SINK_LOG_DELETION)
6969

70-
private val compactLength = sqlContext.getConf(SQLConf.FILE_STREAM_SINK_LOG_COMPACT_LEN)
71-
require(compactLength > 0,
72-
s"Please set ${SQLConf.FILE_STREAM_SINK_LOG_COMPACT_LEN.key} (was $compactLength) " +
70+
private val compactInterval = sqlContext.getConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL)
71+
require(compactInterval > 0,
72+
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
7373
"to a positive value.")
7474

7575
override def batchIdToPath(batchId: Long): Path = {
76-
if (isCompactionBatch(batchId, compactLength)) {
76+
if (isCompactionBatch(batchId, compactInterval)) {
7777
new Path(metadataPath, s"$batchId$COMPACT_FILE_SUFFIX")
7878
} else {
7979
new Path(metadataPath, batchId.toString)
@@ -110,7 +110,7 @@ class FileStreamSinkLog(sqlContext: SQLContext, path: String)
110110
}
111111

112112
override def add(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = {
113-
if (isCompactionBatch(batchId, compactLength)) {
113+
if (isCompactionBatch(batchId, compactInterval)) {
114114
compact(batchId, logs)
115115
} else {
116116
super.add(batchId, logs)
@@ -127,7 +127,7 @@ class FileStreamSinkLog(sqlContext: SQLContext, path: String)
127127
// race condition.
128128
while (true) {
129129
if (latestId >= 0) {
130-
val startId = getAllValidBatches(latestId, compactLength)(0)
130+
val startId = getAllValidBatches(latestId, compactInterval)(0)
131131
try {
132132
val logs = get(Some(startId), Some(latestId)).flatMap(_._2)
133133
return compactLogs(logs).toArray
@@ -155,7 +155,7 @@ class FileStreamSinkLog(sqlContext: SQLContext, path: String)
155155
* corresponding `batchId` file. It will delete expired files as well if enabled.
156156
*/
157157
private def compact(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = {
158-
val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactLength)
158+
val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
159159
val allLogs = validBatches.flatMap(batchId => get(batchId)).flatten ++ logs
160160
if (super.add(batchId, compactLogs(allLogs))) {
161161
if (isDeletingExpiredLog) {
@@ -172,10 +172,10 @@ class FileStreamSinkLog(sqlContext: SQLContext, path: String)
172172
* Since all logs before `compactionBatchId` are compacted and written into the
173173
* `compactionBatchId` log file, they can be removed. However, due to the eventual consistency of
174174
* S3, the compaction file may not be seen by other processes at once. So we only delete files
175-
* created `fileExpiredTimeMs` milliseconds ago.
175+
* created `fileCleanupDelayMs` milliseconds ago.
176176
*/
177177
private def deleteExpiredLog(compactionBatchId: Long): Unit = {
178-
val expiredTime = System.currentTimeMillis() - fileExpiredTimeMs
178+
val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs
179179
fileManager.list(metadataPath, new PathFilter {
180180
override def accept(path: Path): Boolean = {
181181
try {
@@ -206,37 +206,37 @@ object FileStreamSinkLog {
206206

207207
/**
208208
* Returns if this is a compaction batch. FileStreamSinkLog will compact old logs every
209-
* `compactLength` commits.
209+
* `compactInterval` commits.
210210
*
211-
* E.g., if `compactLength` is 3, then 2, 5, 8, ... are all compaction batches.
211+
* E.g., if `compactInterval` is 3, then 2, 5, 8, ... are all compaction batches.
212212
*/
213-
def isCompactionBatch(batchId: Long, compactLength: Int): Boolean = {
214-
(batchId + 1) % compactLength == 0
213+
def isCompactionBatch(batchId: Long, compactInterval: Int): Boolean = {
214+
(batchId + 1) % compactInterval == 0
215215
}
216216

217217
/**
218218
* Returns all valid batches before the specified `compactionBatchId`. They contain all logs we
219219
* need to do a new compaction.
220220
*
221-
* E.g., if `compactLength` is 3 and `compactionBatchId` is 5, this method should returns
221+
* E.g., if `compactInterval` is 3 and `compactionBatchId` is 5, this method should returns
222222
* `Seq(2, 3, 4)` (Note: it includes the previous compaction batch 2).
223223
*/
224224
def getValidBatchesBeforeCompactionBatch(
225225
compactionBatchId: Long,
226-
compactLength: Int): Seq[Long] = {
227-
assert(isCompactionBatch(compactionBatchId, compactLength),
226+
compactInterval: Int): Seq[Long] = {
227+
assert(isCompactionBatch(compactionBatchId, compactInterval),
228228
s"$compactionBatchId is not a compaction batch")
229-
(math.max(0, compactionBatchId - compactLength)) until compactionBatchId
229+
(math.max(0, compactionBatchId - compactInterval)) until compactionBatchId
230230
}
231231

232232
/**
233233
* Returns all necessary logs before `batchId` (inclusive). If `batchId` is a compaction, just
234234
* return itself. Otherwise, it will find the previous compaction batch and return all batches
235235
* between it and `batchId`.
236236
*/
237-
def getAllValidBatches(batchId: Long, compactLength: Long): Seq[Long] = {
237+
def getAllValidBatches(batchId: Long, compactInterval: Long): Seq[Long] = {
238238
assert(batchId >= 0)
239-
val start = math.max(0, (batchId + 1) / compactLength * compactLength - 1)
239+
val start = math.max(0, (batchId + 1) / compactInterval * compactInterval - 1)
240240
start to batchId
241241
}
242242

sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.internal
1919

2020
import java.util.{NoSuchElementException, Properties}
21+
import java.util.concurrent.TimeUnit
2122

2223
import scala.collection.JavaConverters._
2324
import scala.collection.immutable
@@ -443,24 +444,25 @@ object SQLConf {
443444
.booleanConf
444445
.createWithDefault(false)
445446

446-
val FILE_STREAM_SINK_LOG_DELETE = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion")
447+
val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion")
447448
.internal()
448449
.doc("Whether to delete the expired log files in file stream sink.")
449450
.booleanConf
450451
.createWithDefault(true)
451452

452-
val FILE_STREAM_SINK_LOG_COMPACT_LEN =
453-
SQLConfigBuilder("spark.sql.streaming.fileSink.log.compactLen")
453+
val FILE_SINK_LOG_COMPACT_INTERVAL =
454+
SQLConfigBuilder("spark.sql.streaming.fileSink.log.compactInterval")
454455
.internal()
455-
.doc("Every how many log files is a compaction triggered.")
456+
.doc("Number of log files after which all the previous files " +
457+
"are compacted into the next log file.")
456458
.intConf
457459
.createWithDefault(10)
458460

459-
val FILE_STREAM_SINK_LOG_EXPIRED_TIME =
460-
SQLConfigBuilder("spark.sql.streaming.fileSink.log.expired")
461+
val FILE_SINK_LOG_CLEANUP_DELAY =
462+
SQLConfigBuilder("spark.sql.streaming.fileSink.log.cleanupDelay")
461463
.internal()
462464
.doc("How long in milliseconds a file is guaranteed to be visible for all readers.")
463-
.longConf
465+
.timeConf(TimeUnit.MILLISECONDS)
464466
.createWithDefault(3600 * 1000L) // 1 hour
465467

466468
object Deprecated {

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -37,41 +37,41 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
3737
}
3838

3939
test("isCompactionBatch") {
40-
assert(false === isCompactionBatch(0, compactLength = 3))
41-
assert(false === isCompactionBatch(1, compactLength = 3))
42-
assert(true === isCompactionBatch(2, compactLength = 3))
43-
assert(false === isCompactionBatch(3, compactLength = 3))
44-
assert(false === isCompactionBatch(4, compactLength = 3))
45-
assert(true === isCompactionBatch(5, compactLength = 3))
40+
assert(false === isCompactionBatch(0, compactInterval = 3))
41+
assert(false === isCompactionBatch(1, compactInterval = 3))
42+
assert(true === isCompactionBatch(2, compactInterval = 3))
43+
assert(false === isCompactionBatch(3, compactInterval = 3))
44+
assert(false === isCompactionBatch(4, compactInterval = 3))
45+
assert(true === isCompactionBatch(5, compactInterval = 3))
4646
}
4747

4848
test("getValidBatchesBeforeCompactionBatch") {
4949
intercept[AssertionError] {
50-
getValidBatchesBeforeCompactionBatch(0, compactLength = 3)
50+
getValidBatchesBeforeCompactionBatch(0, compactInterval = 3)
5151
}
5252
intercept[AssertionError] {
53-
getValidBatchesBeforeCompactionBatch(1, compactLength = 3)
53+
getValidBatchesBeforeCompactionBatch(1, compactInterval = 3)
5454
}
55-
assert(Seq(0, 1) === getValidBatchesBeforeCompactionBatch(2, compactLength = 3))
55+
assert(Seq(0, 1) === getValidBatchesBeforeCompactionBatch(2, compactInterval = 3))
5656
intercept[AssertionError] {
57-
getValidBatchesBeforeCompactionBatch(3, compactLength = 3)
57+
getValidBatchesBeforeCompactionBatch(3, compactInterval = 3)
5858
}
5959
intercept[AssertionError] {
60-
getValidBatchesBeforeCompactionBatch(4, compactLength = 3)
60+
getValidBatchesBeforeCompactionBatch(4, compactInterval = 3)
6161
}
62-
assert(Seq(2, 3, 4) === getValidBatchesBeforeCompactionBatch(5, compactLength = 3))
62+
assert(Seq(2, 3, 4) === getValidBatchesBeforeCompactionBatch(5, compactInterval = 3))
6363
}
6464

6565
test("getAllValidBatches") {
66-
assert(Seq(0) === getAllValidBatches(0, compactLength = 3))
67-
assert(Seq(0, 1) === getAllValidBatches(1, compactLength = 3))
68-
assert(Seq(2) === getAllValidBatches(2, compactLength = 3))
69-
assert(Seq(2, 3) === getAllValidBatches(3, compactLength = 3))
70-
assert(Seq(2, 3, 4) === getAllValidBatches(4, compactLength = 3))
71-
assert(Seq(5) === getAllValidBatches(5, compactLength = 3))
72-
assert(Seq(5, 6) === getAllValidBatches(6, compactLength = 3))
73-
assert(Seq(5, 6, 7) === getAllValidBatches(7, compactLength = 3))
74-
assert(Seq(8) === getAllValidBatches(8, compactLength = 3))
66+
assert(Seq(0) === getAllValidBatches(0, compactInterval = 3))
67+
assert(Seq(0, 1) === getAllValidBatches(1, compactInterval = 3))
68+
assert(Seq(2) === getAllValidBatches(2, compactInterval = 3))
69+
assert(Seq(2, 3) === getAllValidBatches(3, compactInterval = 3))
70+
assert(Seq(2, 3, 4) === getAllValidBatches(4, compactInterval = 3))
71+
assert(Seq(5) === getAllValidBatches(5, compactInterval = 3))
72+
assert(Seq(5, 6) === getAllValidBatches(6, compactInterval = 3))
73+
assert(Seq(5, 6, 7) === getAllValidBatches(7, compactInterval = 3))
74+
assert(Seq(8) === getAllValidBatches(8, compactInterval = 3))
7575
}
7676

7777
test("compactLogs") {
@@ -124,7 +124,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
124124
}
125125

126126
test("batchIdToPath") {
127-
withSQLConf(SQLConf.FILE_STREAM_SINK_LOG_COMPACT_LEN.key -> "3") {
127+
withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
128128
withFileStreamSinkLog { sinkLog =>
129129
assert("0" === sinkLog.batchIdToPath(0).getName)
130130
assert("1" === sinkLog.batchIdToPath(1).getName)
@@ -137,32 +137,31 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
137137
}
138138

139139
test("compact") {
140-
withSQLConf(SQLConf.FILE_STREAM_SINK_LOG_COMPACT_LEN.key -> "3") {
140+
withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
141141
withFileStreamSinkLog { sinkLog =>
142142
for (batchId <- 0 to 10) {
143143
sinkLog.add(
144144
batchId,
145145
Seq(SinkFileStatus("/a/b/" + batchId, 100L, FileStreamSinkLog.ADD_ACTION)))
146-
assert(sinkLog.allFiles() === (0 to batchId).map {
146+
val expectedFiles = (0 to batchId).map {
147147
id => SinkFileStatus("/a/b/" + id, 100L, FileStreamSinkLog.ADD_ACTION)
148-
})
148+
}
149+
assert(sinkLog.allFiles() === expectedFiles)
149150
if (isCompactionBatch(batchId, 3)) {
150151
// Since batchId is a compaction batch, the batch log file should contain all logs
151-
assert(sinkLog.get(batchId).getOrElse(Nil) === (0 to batchId).map {
152-
id => SinkFileStatus("/a/b/" + id, 100L, FileStreamSinkLog.ADD_ACTION)
153-
})
152+
assert(sinkLog.get(batchId).getOrElse(Nil) === expectedFiles)
154153
}
155154
}
156155
}
157156
}
158157
}
159158

160159
test("delete expired file") {
161-
// Set FILE_STREAM_SINK_LOG_EXPIRED_TIME to 0 so that we can detect the deleting behaviour
160+
// Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour
162161
// deterministically
163162
withSQLConf(
164-
SQLConf.FILE_STREAM_SINK_LOG_COMPACT_LEN.key -> "3",
165-
SQLConf.FILE_STREAM_SINK_LOG_EXPIRED_TIME.key -> "0") {
163+
SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3",
164+
SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0") {
166165
withFileStreamSinkLog { sinkLog =>
167166
val metadataPath = new File(sinkLog.metadataPath.toUri.toString)
168167

0 commit comments

Comments
 (0)