diff --git a/README.md b/README.md index 2454e0a4f1..f419f8d501 100644 --- a/README.md +++ b/README.md @@ -196,6 +196,9 @@ rss-xxx.tgz will be generated for deployment 2. Update Spark conf to enable Uniffle, e.g., ``` + # Uniffle transmits serialized shuffle data over network, therefore a serializer that supports relocation of + # serialized object should be used. + spark.serialier org.apache.spark.serializer.KryoSerializer # this could also be in the spark-defaults.conf spark.shuffle.manager org.apache.spark.shuffle.RssShuffleManager spark.rss.coordinator.quorum :19999,:19999 # Note: For Spark2, spark.sql.adaptive.enabled should be false because Spark2 doesn't support AQE. diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index cabccb3b6f..0c1bcc6968 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -152,6 +153,13 @@ private synchronized void putBlockId( public RssShuffleManager(SparkConf conf, boolean isDriver) { this.sparkConf = conf; + boolean supportsRelocation = Optional.ofNullable(SparkEnv.get()) + .map(env -> env.serializer().supportsRelocationOfSerializedObjects()) + .orElse(true); + if (!supportsRelocation) { + LOG.warn("RSSShuffleManager requires a serializer which supports relocations of serialized object. Please set " + + "spark.serializer to org.apache.spark.serializer.KryoSerializer instead"); + } this.user = sparkConf.get("spark.rss.quota.user", "user"); this.uuid = sparkConf.get("spark.rss.quota.uuid", Long.toString(System.currentTimeMillis())); // set & check replica config diff --git a/docs/client_guide.md b/docs/client_guide.md index c2884fa0e2..12c99c9175 100644 --- a/docs/client_guide.md +++ b/docs/client_guide.md @@ -38,6 +38,9 @@ This document will introduce how to deploy Uniffle client plugins with Spark and 2. Update Spark conf to enable Uniffle, eg, ``` + # Uniffle transmits serialized shuffle data over network, therefore a serializer that supports relocation of + # serialized object should be used. + spark.serialier org.apache.spark.serializer.KryoSerializer # this could also be in the spark-defaults.conf spark.shuffle.manager org.apache.spark.shuffle.RssShuffleManager spark.rss.coordinator.quorum :19999,:19999 # Note: For Spark2, spark.sql.adaptive.enabled should be false because Spark2 doesn't support AQE.