From 7cdcb447c4078825552a92e0e7833961f68eb9d7 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 30 Nov 2021 11:34:47 -0800 Subject: [PATCH] [SPARK-37509][CORE] Improve Fallback Storage upload speed by avoiding S3 rate limiter --- .../apache/spark/storage/FallbackStorage.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala index 76137133227f0..d137099e73437 100644 --- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala +++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala @@ -31,6 +31,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH} import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} +import org.apache.spark.network.util.JavaUtils import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout} import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo} import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID @@ -60,15 +61,17 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging { val indexFile = r.getIndexFile(shuffleId, mapId) if (indexFile.exists()) { + val hash = JavaUtils.nonNegativeHash(indexFile.getName) fallbackFileSystem.copyFromLocalFile( new Path(indexFile.getAbsolutePath), - new Path(fallbackPath, s"$appId/$shuffleId/${indexFile.getName}")) + new Path(fallbackPath, s"$appId/$shuffleId/$hash/${indexFile.getName}")) val dataFile = r.getDataFile(shuffleId, mapId) if (dataFile.exists()) { + val hash = JavaUtils.nonNegativeHash(dataFile.getName) fallbackFileSystem.copyFromLocalFile( new Path(dataFile.getAbsolutePath), - new Path(fallbackPath, s"$appId/$shuffleId/${dataFile.getName}")) + new Path(fallbackPath, s"$appId/$shuffleId/$hash/${dataFile.getName}")) } // Report block statuses @@ -86,7 +89,8 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging { } def exists(shuffleId: Int, filename: String): Boolean = { - fallbackFileSystem.exists(new Path(fallbackPath, s"$appId/$shuffleId/$filename")) + val hash = JavaUtils.nonNegativeHash(filename) + fallbackFileSystem.exists(new Path(fallbackPath, s"$appId/$shuffleId/$hash/$filename")) } } @@ -168,7 +172,8 @@ private[spark] object FallbackStorage extends Logging { } val name = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name - val indexFile = new Path(fallbackPath, s"$appId/$shuffleId/$name") + val hash = JavaUtils.nonNegativeHash(name) + val indexFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name") val start = startReduceId * 8L val end = endReduceId * 8L Utils.tryWithResource(fallbackFileSystem.open(indexFile)) { inputStream => @@ -178,7 +183,8 @@ private[spark] object FallbackStorage extends Logging { index.skip(end - (start + 8L)) val nextOffset = index.readLong() val name = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name - val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$name") + val hash = JavaUtils.nonNegativeHash(name) + val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name") val f = fallbackFileSystem.open(dataFile) val size = nextOffset - offset logDebug(s"To byte array $size")