Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,15 @@ class ExternalAppendOnlyMap[K, V, C](
private var batchIndex = 0 // Which batch we're in
private var fileStream: FileInputStream = null

@volatile private var closed = false

// A volatile variable to remember which DeserializationStream is using. Need to set it when we
// open a DeserializationStream. But we should use `deserializeStream` rather than
// `deserializeStreamToBeClosed` to read the content because touching a volatile variable will
// reduce the performance. It must be volatile so that we can see its correct value in the
// `finalize` method, which could run in any thread.
@volatile private var deserializeStreamToBeClosed: DeserializationStream = null

// An intermediate stream that reads from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
private var deserializeStream = nextBatchStream()
Expand All @@ -401,6 +410,7 @@ class ExternalAppendOnlyMap[K, V, C](
// we're still in a valid batch.
if (batchIndex < batchOffsets.length - 1) {
if (deserializeStream != null) {
deserializeStreamToBeClosed = null
deserializeStream.close()
fileStream.close()
deserializeStream = null
Expand All @@ -419,7 +429,11 @@ class ExternalAppendOnlyMap[K, V, C](

val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
ser.deserializeStream(compressedStream)
// Before returning the stream, assign it to `deserializeStreamToBeClosed` so that we can
// close it in `finalize` and also avoid to touch the volatile `deserializeStreamToBeClosed`
// during reading the (K, C) pairs.
deserializeStreamToBeClosed = ser.deserializeStream(compressedStream)
deserializeStreamToBeClosed
} else {
// No more batches left
cleanup()
Expand Down Expand Up @@ -468,14 +482,34 @@ class ExternalAppendOnlyMap[K, V, C](
item
}

// TODO: Ensure this gets called even if the iterator isn't drained.
private def cleanup() {
batchIndex = batchOffsets.length // Prevent reading any other batch
val ds = deserializeStream
deserializeStream = null
fileStream = null
ds.close()
file.delete()
// TODO: Now only use `finalize` to ensure `close` gets called to clean up the resources. In the
// future, we need some mechanism to ensure this gets called once the resources are not used.
private def cleanup(): Unit = {
if (!closed) {
closed = true
batchIndex = batchOffsets.length // Prevent reading any other batch
fileStream = null
try {
val ds = deserializeStreamToBeClosed
deserializeStreamToBeClosed = null
deserializeStream = null
if (ds != null) {
ds.close()
}
} finally {
if (file.exists()) {
file.delete()
}
}
}
}

override def finalize(): Unit = {
try {
cleanup()
} finally {
super.finalize()
}
}
}

Expand Down