Skip to content

Commit fc00a0c

Browse files
committed
Deduplicate code
1 parent ff9078e commit fc00a0c

File tree

2 files changed

+20
-24
lines changed

2 files changed

+20
-24
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -162,26 +162,6 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
162162
batchAdded
163163
}
164164

165-
/**
166-
* Return the latest batch Id.
167-
*
168-
* This method is a complement of getLatest() - while metadata log file per batch tends to be
169-
* small, it doesn't apply to the compacted log file. This method only checks for existence of
170-
* file to avoid huge cost on reading and deserializing compacted log file.
171-
*/
172-
def getLatestBatchId(): Option[Long] = {
173-
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
174-
.map(f => pathToBatchId(f.getPath))
175-
.sorted(Ordering.Long.reverse)
176-
for (batchId <- batchIds) {
177-
val batchMetadataFile = batchIdToPath(batchId)
178-
if (fileManager.exists(batchMetadataFile)) {
179-
return Some(batchId)
180-
}
181-
}
182-
None
183-
}
184-
185165
/**
186166
* CompactibleFileStreamLog maintains logs by itself, and manual purging might break internal
187167
* state, specifically which latest compaction batch is purged.

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,19 +182,35 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
182182
}
183183
}
184184

185-
override def getLatest(): Option[(Long, T)] = {
185+
186+
/**
187+
* Return the latest batch Id without reading the file. This method only checks for existence of
188+
* file to avoid cost on reading and deserializing log file.
189+
*/
190+
def getLatestBatchId(): Option[Long] = {
186191
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
187192
.map(f => pathToBatchId(f.getPath))
188193
.sorted(Ordering.Long.reverse)
189194
for (batchId <- batchIds) {
190-
val batch = get(batchId)
191-
if (batch.isDefined) {
192-
return Some((batchId, batch.get))
195+
val batchMetadataFile = batchIdToPath(batchId)
196+
if (fileManager.exists(batchMetadataFile)) {
197+
return Some(batchId)
193198
}
194199
}
195200
None
196201
}
197202

203+
override def getLatest(): Option[(Long, T)] = {
204+
getLatestBatchId().map { batchId =>
205+
val content = get(batchId).getOrElse {
206+
// This only happens in odd case where the file exists when getLatestBatchId() is called,
207+
// but get() doesn't find it.
208+
throw new IllegalStateException(s"failed to read log file for batch $batchId")
209+
}
210+
(batchId, content)
211+
}.orElse(None)
212+
}
213+
198214
/**
199215
* Get an array of [FileStatus] referencing batch files.
200216
* The array is sorted by most recent batch file first to

0 commit comments

Comments
 (0)