From 0cd8cda68e5371720521feb9d4adcdefc32b8aed Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 21 Feb 2020 17:09:23 +0900 Subject: [PATCH 1/7] [SPARK-30915][SS] FileStreamSinkLog: Avoid reading the metadata log file when finding the latest batch ID --- .../streaming/CompactibleFileStreamLog.scala | 20 +++++ .../execution/streaming/FileStreamSink.scala | 2 +- .../streaming/FileStreamSinkLogSuite.scala | 75 +++++++++++++++++++ 3 files changed, 96 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 905bce4d614e..19047a290d1e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -162,6 +162,26 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( batchAdded } + /** + * Return the latest batch Id. + * + * This method is a complement of getLatest() - while metadata log file per batch tends to be + * small, it doesn't apply to the compacted log file. This method only checks for existence of + * file to avoid huge cost on reading and deserializing compacted log file. + */ + def getLatestBatchId(): Option[Long] = { + val batchIds = fileManager.list(metadataPath, batchFilesFilter) + .map(f => pathToBatchId(f.getPath)) + .sorted(Ordering.Long.reverse) + for (batchId <- batchIds) { + val batchMetadataFile = batchIdToPath(batchId) + if (fileManager.exists(batchMetadataFile)) { + return Some(batchId) + } + } + None + } + /** * CompactibleFileStreamLog maintains logs by itself, and manual purging might break internal * state, specifically which latest compaction batch is purged. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index b679f163fc56..32245470d8f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -142,7 +142,7 @@ class FileStreamSink( } override def addBatch(batchId: Long, data: DataFrame): Unit = { - if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) { + if (batchId <= fileLog.getLatestBatchId().getOrElse(-1L)) { logInfo(s"Skipping already committed batch $batchId") } else { val committer = FileCommitProtocol.instantiate( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index f95daafdfe19..d3e638473a6f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -18,7 +18,14 @@ package org.apache.spark.sql.execution.streaming import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.net.URI import java.nio.charset.StandardCharsets.UTF_8 +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.mutable +import scala.util.Random + +import org.apache.hadoop.fs.{FSDataInputStream, Path, RawLocalFileSystem} import org.apache.spark.SparkFunSuite import org.apache.spark.sql.internal.SQLConf @@ -240,6 +247,40 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { )) } + test("getLatestBatchId") { + withCountOpenLocalFileSystemAsLocalFileSystem { + val scheme = CountOpenLocalFileSystem.scheme + withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { + withTempDir { file => + val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, + s"$scheme:///${file.getCanonicalPath}") + for (batchId <- 0 to 2) { + sinkLog.add( + batchId, + Array(newFakeSinkFileStatus("/a/b/" + batchId, FileStreamSinkLog.ADD_ACTION))) + } + + def getCountForOpenOnMetadataFile(batchId: Long): Long = { + val path = sinkLog.batchIdToPath(batchId).toUri.getPath + CountOpenLocalFileSystem.pathToNumOpenCalled + .get(path).map(_.get()).getOrElse(0) + } + + val curCount = getCountForOpenOnMetadataFile(2) + + assert(sinkLog.getLatestBatchId() === Some(2)) + // getLatestBatchId doesn't open the latest metadata log file + assert(getCountForOpenOnMetadataFile(2L) === curCount) + + assert(sinkLog.getLatest().map(_._1).getOrElse(-1L) === 2L) + // getLatest opens the latest metadata log file, which explains the needs on + // having "getLatestBatchId". + assert(getCountForOpenOnMetadataFile(2L) === curCount + 1) + } + } + } + } + /** * Create a fake SinkFileStatus using path and action. Most of tests don't care about other fields * in SinkFileStatus. @@ -267,4 +308,38 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { val log = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, input.toString) log.allFiles() } + + private def withCountOpenLocalFileSystemAsLocalFileSystem(body: => Unit): Unit = { + val optionKey = s"fs.${CountOpenLocalFileSystem.scheme}.impl" + val originClassForLocalFileSystem = spark.conf.getOption(optionKey) + try { + spark.conf.set(optionKey, classOf[CountOpenLocalFileSystem].getName) + body + } finally { + originClassForLocalFileSystem match { + case Some(fsClazz) => spark.conf.set(optionKey, fsClazz) + case _ => spark.conf.unset(optionKey) + } + } + } +} + +class CountOpenLocalFileSystem extends RawLocalFileSystem { + import CountOpenLocalFileSystem._ + + override def getUri: URI = { + URI.create(s"$scheme:///") + } + + override def open(f: Path, bufferSize: Int): FSDataInputStream = { + val path = f.toUri.getPath + val curVal = pathToNumOpenCalled.getOrElseUpdate(path, new AtomicLong(0)) + curVal.incrementAndGet() + super.open(f, bufferSize) + } +} + +object CountOpenLocalFileSystem { + val scheme = s"FileStreamSinkLogSuite${math.abs(Random.nextInt)}fs" + val pathToNumOpenCalled = new mutable.HashMap[String, AtomicLong] } From d270961519a4af5a9f8fa390125c567f56c07700 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 3 Apr 2020 12:33:43 +0900 Subject: [PATCH 2/7] Reflect review comments --- .../streaming/FileStreamSinkLogSuite.scala | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index d3e638473a6f..468ed0e95fbd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -251,9 +251,9 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { withCountOpenLocalFileSystemAsLocalFileSystem { val scheme = CountOpenLocalFileSystem.scheme withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { - withTempDir { file => + withTempDir { dir => val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, - s"$scheme:///${file.getCanonicalPath}") + s"$scheme:///${dir.getCanonicalPath}") for (batchId <- 0 to 2) { sinkLog.add( batchId, @@ -262,20 +262,24 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { def getCountForOpenOnMetadataFile(batchId: Long): Long = { val path = sinkLog.batchIdToPath(batchId).toUri.getPath - CountOpenLocalFileSystem.pathToNumOpenCalled - .get(path).map(_.get()).getOrElse(0) + CountOpenLocalFileSystem.pathToNumOpenCalled.get(path).map(_.get()).getOrElse(0) } - val curCount = getCountForOpenOnMetadataFile(2) + CountOpenLocalFileSystem.resetCount() - assert(sinkLog.getLatestBatchId() === Some(2)) + assert(sinkLog.getLatestBatchId() === Some(2L)) // getLatestBatchId doesn't open the latest metadata log file - assert(getCountForOpenOnMetadataFile(2L) === curCount) + (0L to 2L).foreach { batchId => + assert(getCountForOpenOnMetadataFile(batchId) === 0) + } assert(sinkLog.getLatest().map(_._1).getOrElse(-1L) === 2L) + (0L to 1L).foreach { batchId => + assert(getCountForOpenOnMetadataFile(batchId) === 0) + } // getLatest opens the latest metadata log file, which explains the needs on // having "getLatestBatchId". - assert(getCountForOpenOnMetadataFile(2L) === curCount + 1) + assert(getCountForOpenOnMetadataFile(2L) === 1) } } } @@ -342,4 +346,6 @@ class CountOpenLocalFileSystem extends RawLocalFileSystem { object CountOpenLocalFileSystem { val scheme = s"FileStreamSinkLogSuite${math.abs(Random.nextInt)}fs" val pathToNumOpenCalled = new mutable.HashMap[String, AtomicLong] + + def resetCount(): Unit = pathToNumOpenCalled.clear() } From 30338fb49dbe039fc264f4f5a867d5b7bbd7f711 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 3 Apr 2020 20:35:26 +0900 Subject: [PATCH 3/7] add 'L' where the constant should be Long --- .../execution/streaming/FileStreamSinkLogSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index 468ed0e95fbd..c2e635f0b636 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -254,7 +254,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { withTempDir { dir => val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, s"$scheme:///${dir.getCanonicalPath}") - for (batchId <- 0 to 2) { + for (batchId <- 0L to 2L) { sinkLog.add( batchId, Array(newFakeSinkFileStatus("/a/b/" + batchId, FileStreamSinkLog.ADD_ACTION))) @@ -262,7 +262,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { def getCountForOpenOnMetadataFile(batchId: Long): Long = { val path = sinkLog.batchIdToPath(batchId).toUri.getPath - CountOpenLocalFileSystem.pathToNumOpenCalled.get(path).map(_.get()).getOrElse(0) + CountOpenLocalFileSystem.pathToNumOpenCalled.get(path).map(_.get()).getOrElse(0L) } CountOpenLocalFileSystem.resetCount() @@ -270,16 +270,16 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { assert(sinkLog.getLatestBatchId() === Some(2L)) // getLatestBatchId doesn't open the latest metadata log file (0L to 2L).foreach { batchId => - assert(getCountForOpenOnMetadataFile(batchId) === 0) + assert(getCountForOpenOnMetadataFile(batchId) === 0L) } assert(sinkLog.getLatest().map(_._1).getOrElse(-1L) === 2L) (0L to 1L).foreach { batchId => - assert(getCountForOpenOnMetadataFile(batchId) === 0) + assert(getCountForOpenOnMetadataFile(batchId) === 0L) } // getLatest opens the latest metadata log file, which explains the needs on // having "getLatestBatchId". - assert(getCountForOpenOnMetadataFile(2L) === 1) + assert(getCountForOpenOnMetadataFile(2L) === 1L) } } } From ff9078e11b86311ba4cda4e63c46920057028200 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Sat, 18 Apr 2020 00:07:49 +0900 Subject: [PATCH 4/7] Address review comment (apply to more places) --- .../sql/execution/streaming/CompactibleFileStreamLog.scala | 2 +- .../spark/sql/execution/streaming/FileStreamSourceLog.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 19047a290d1e..c6a00168881c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -213,7 +213,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( * Returns all files except the deleted ones. */ def allFiles(): Array[T] = { - var latestId = getLatest().map(_._1).getOrElse(-1L) + var latestId = getLatestBatchId().getOrElse(-1L) // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileIndex` // is calling this method. This loop will retry the reading to deal with the // race condition. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index 7b2ea9627a98..c43887774c13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -96,7 +96,7 @@ class FileStreamSourceLog( val searchKeys = removedBatches.map(_._1) val retrievedBatches = if (searchKeys.nonEmpty) { logWarning(s"Get batches from removed files, this is unexpected in the current code path!!!") - val latestBatchId = getLatest().map(_._1).getOrElse(-1L) + val latestBatchId = getLatestBatchId().getOrElse(-1L) if (latestBatchId < 0) { Map.empty[Long, Option[Array[FileEntry]]] } else { From f6078bb82c449994037071d8a241cca80c187b75 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 29 Apr 2020 11:46:17 +0900 Subject: [PATCH 5/7] Deduplicate code --- .../streaming/CompactibleFileStreamLog.scala | 20 ---------------- .../execution/streaming/HDFSMetadataLog.scala | 23 +++++++++++++++---- 2 files changed, 19 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index c6a00168881c..35ee685a52a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -162,26 +162,6 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( batchAdded } - /** - * Return the latest batch Id. - * - * This method is a complement of getLatest() - while metadata log file per batch tends to be - * small, it doesn't apply to the compacted log file. This method only checks for existence of - * file to avoid huge cost on reading and deserializing compacted log file. - */ - def getLatestBatchId(): Option[Long] = { - val batchIds = fileManager.list(metadataPath, batchFilesFilter) - .map(f => pathToBatchId(f.getPath)) - .sorted(Ordering.Long.reverse) - for (batchId <- batchIds) { - val batchMetadataFile = batchIdToPath(batchId) - if (fileManager.exists(batchMetadataFile)) { - return Some(batchId) - } - } - None - } - /** * CompactibleFileStreamLog maintains logs by itself, and manual purging might break internal * state, specifically which latest compaction batch is purged. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index ed0c44da08c5..3c340230ab77 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -182,19 +182,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: } } - override def getLatest(): Option[(Long, T)] = { + /** + * Return the latest batch Id without reading the file. This method only checks for existence of + * file to avoid cost on reading and deserializing log file. + */ + def getLatestBatchId(): Option[Long] = { val batchIds = fileManager.list(metadataPath, batchFilesFilter) .map(f => pathToBatchId(f.getPath)) .sorted(Ordering.Long.reverse) for (batchId <- batchIds) { - val batch = get(batchId) - if (batch.isDefined) { - return Some((batchId, batch.get)) + val batchMetadataFile = batchIdToPath(batchId) + if (fileManager.exists(batchMetadataFile)) { + return Some(batchId) } } None } + override def getLatest(): Option[(Long, T)] = { + getLatestBatchId().map { batchId => + val content = get(batchId).getOrElse { + // This only happens in odd case where the file exists when getLatestBatchId() is called, + // but get() doesn't find it. + throw new IllegalStateException(s"failed to read log file for batch $batchId") + } + (batchId, content) + }.orElse(None) + } + /** * Get an array of [FileStatus] referencing batch files. * The array is sorted by most recent batch file first to From 83451c1fe28e40b62f40618429889f072971f866 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 21 May 2020 22:56:15 +0900 Subject: [PATCH 6/7] Update comment to make clearer --- .../spark/sql/execution/streaming/HDFSMetadataLog.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 3c340230ab77..f034cf1f4344 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -202,8 +202,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: override def getLatest(): Option[(Long, T)] = { getLatestBatchId().map { batchId => val content = get(batchId).getOrElse { - // This only happens in odd case where the file exists when getLatestBatchId() is called, - // but get() doesn't find it. + // If we find the last batch file, we must read that file, other than failing back to + // old batches. throw new IllegalStateException(s"failed to read log file for batch $batchId") } (batchId, content) From 4a5679e800a10a74a78236c5be9814e1a61c6ec7 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Sat, 23 May 2020 00:48:35 +0900 Subject: [PATCH 7/7] Reflect review comments --- .../sql/execution/streaming/HDFSMetadataLog.scala | 12 +++--------- .../execution/streaming/FileStreamSinkLogSuite.scala | 12 +++++++----- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index f034cf1f4344..5c86f8a50dda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -187,16 +187,10 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: * file to avoid cost on reading and deserializing log file. */ def getLatestBatchId(): Option[Long] = { - val batchIds = fileManager.list(metadataPath, batchFilesFilter) + fileManager.list(metadataPath, batchFilesFilter) .map(f => pathToBatchId(f.getPath)) .sorted(Ordering.Long.reverse) - for (batchId <- batchIds) { - val batchMetadataFile = batchIdToPath(batchId) - if (fileManager.exists(batchMetadataFile)) { - return Some(batchId) - } - } - None + .headOption } override def getLatest(): Option[(Long, T)] = { @@ -207,7 +201,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: throw new IllegalStateException(s"failed to read log file for batch $batchId") } (batchId, content) - }.orElse(None) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index c2e635f0b636..6d615b5ef044 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -18,11 +18,12 @@ package org.apache.spark.sql.execution.streaming import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.lang.{Long => JLong} import java.net.URI import java.nio.charset.StandardCharsets.UTF_8 +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong -import scala.collection.mutable import scala.util.Random import org.apache.hadoop.fs.{FSDataInputStream, Path, RawLocalFileSystem} @@ -262,7 +263,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { def getCountForOpenOnMetadataFile(batchId: Long): Long = { val path = sinkLog.batchIdToPath(batchId).toUri.getPath - CountOpenLocalFileSystem.pathToNumOpenCalled.get(path).map(_.get()).getOrElse(0L) + CountOpenLocalFileSystem.pathToNumOpenCalled.getOrDefault(path, 0L) } CountOpenLocalFileSystem.resetCount() @@ -337,15 +338,16 @@ class CountOpenLocalFileSystem extends RawLocalFileSystem { override def open(f: Path, bufferSize: Int): FSDataInputStream = { val path = f.toUri.getPath - val curVal = pathToNumOpenCalled.getOrElseUpdate(path, new AtomicLong(0)) - curVal.incrementAndGet() + pathToNumOpenCalled.compute(path, (_, v) => { + if (v == null) 1L else v + 1 + }) super.open(f, bufferSize) } } object CountOpenLocalFileSystem { val scheme = s"FileStreamSinkLogSuite${math.abs(Random.nextInt)}fs" - val pathToNumOpenCalled = new mutable.HashMap[String, AtomicLong] + val pathToNumOpenCalled = new ConcurrentHashMap[String, JLong] def resetCount(): Unit = pathToNumOpenCalled.clear() }