diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java index 41687636d4..bbabeb50f4 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java @@ -22,11 +22,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; -import java.util.stream.Collectors; import scala.reflect.ClassTag$; import scala.reflect.ManifestFactory$; @@ -324,27 +321,7 @@ public List buildBlockEvents(List shuffleBlockI @Override public long spill(long size, MemoryConsumer trigger) { - List events = buildBlockEvents(clear()); - List> futures = events.stream().map(x -> spillFunc.apply(x)).collect(Collectors.toList()); - CompletableFuture allOfFutures = - CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); - try { - allOfFutures.get(memorySpillTimeoutSec, TimeUnit.SECONDS); - } catch (TimeoutException timeoutException) { - // A best effort strategy to wait. - // If timeout exception occurs, the underlying tasks won't be cancelled. - } finally { - long releasedSize = futures.stream().filter(x -> x.isDone()).mapToLong(x -> { - try { - return x.get(); - } catch (Exception e) { - return 0; - } - }).sum(); - LOG.info("[taskId: {}] Spill triggered by memory consumer of {}, released memory size: {}", - taskId, trigger.getClass().getSimpleName(), releasedSize); - return releasedSize; - } + return 0L; } @VisibleForTesting diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java index 8758054142..5e5e6dfb8b 100644 --- a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java +++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java @@ -221,7 +221,6 @@ public void buildBlockEventsTest() { assertEquals(3, events.size()); } - @Test public void spillTest() { SparkConf conf = getConf(); conf.set("spark.rss.client.send.size.limit", "1000");