Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,7 @@ class CheckpointWriter(
val executor = Executors.newFixedThreadPool(1)
val compressionCodec = CompressionCodec.createCodec(conf)
private var stopped = false
private var _fs: FileSystem = _

@volatile private[this] var fs: FileSystem = null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this even need to be a member? it looks like it's used entirely in one method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually referenced by inner classes. I think the idea here is that we'll avoid calling .getFileSystem too much by caching the result, but will have the ability to clear the cache in case the cached FileSystem seems to be in a bad state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: fs and latestCheckpointTime don't need to be volatile as we use Executors.newFixedThreadPool(1) here. We should also give the thread a name for better debugging.

@volatile private var latestCheckpointTime: Time = null

class CheckpointWriteHandler(
Expand All @@ -196,6 +195,9 @@ class CheckpointWriter(
if (latestCheckpointTime == null || latestCheckpointTime < checkpointTime) {
latestCheckpointTime = checkpointTime
}
if (fs == null) {
fs = new Path(checkpointDir).getFileSystem(hadoopConf)
}
var attempts = 0
val startTime = System.currentTimeMillis()
val tempFile = new Path(checkpointDir, "temp")
Expand Down Expand Up @@ -263,7 +265,7 @@ class CheckpointWriter(
case ioe: IOException =>
logWarning("Error in attempt " + attempts + " of writing checkpoint to "
+ checkpointFile, ioe)
reset()
fs = null
}
}
logWarning("Could not write checkpoint for time " + checkpointTime + " to file "
Expand Down Expand Up @@ -297,15 +299,6 @@ class CheckpointWriter(
", waited for " + (endTime - startTime) + " ms.")
stopped = true
}

private def fs = synchronized {
if (_fs == null) _fs = new Path(checkpointDir).getFileSystem(hadoopConf)
_fs
}

private def reset() = synchronized {
_fs = null
}
}


Expand Down