diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index ecdbdaac4f..c045f39979 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -203,6 +203,19 @@ public RssShuffleManager(SparkConf sparkConf, boolean isDriver) { // pass that ShuffleHandle to executors (getWriter/getReader). @Override public ShuffleHandle registerShuffle(int shuffleId, int numMaps, ShuffleDependency dependency) { + + //Spark have three kinds of serializer: + //org.apache.spark.serializer.JavaSerializer + //org.apache.spark.sql.execution.UnsafeRowSerializer + //org.apache.spark.serializer.KryoSerializer, + //Only org.apache.spark.serializer.JavaSerializer don't support RelocationOfSerializedObjects. + //So when we find the parameters to use org.apache.spark.serializer.JavaSerializer, We should throw an exception + if (!SparkEnv.get().serializer().supportsRelocationOfSerializedObjects()) { + throw new IllegalArgumentException("Can't use serialized shuffle for shuffleId: " + shuffleId + ", because the" + + " serializer: " + SparkEnv.get().serializer().getClass().getName() + " does not support object " + + "relocation."); + } + // If yarn enable retry ApplicationMaster, appId will be not unique and shuffle data will be incorrect, // appId + timestamp can avoid such problem, // can't get appId in construct because SparkEnv is not created yet, diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index 36ecbc8c14..1691dfe452 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -265,6 +265,18 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) { @Override public ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency dependency) { + //Spark have three kinds of serializer: + //org.apache.spark.serializer.JavaSerializer + //org.apache.spark.sql.execution.UnsafeRowSerializer + //org.apache.spark.serializer.KryoSerializer, + //Only org.apache.spark.serializer.JavaSerializer don't support RelocationOfSerializedObjects. + //So when we find the parameters to use org.apache.spark.serializer.JavaSerializer, We should throw an exception + if (!SparkEnv.get().serializer().supportsRelocationOfSerializedObjects()) { + throw new IllegalArgumentException("Can't use serialized shuffle for shuffleId: " + shuffleId + ", because the" + + " serializer: " + SparkEnv.get().serializer().getClass().getName() + " does not support object " + + "relocation."); + } + if (id.get() == null) { id.compareAndSet(null, SparkEnv.get().conf().getAppId() + "_" + System.currentTimeMillis()); } diff --git a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java index b83c6279ed..50f47cb55e 100644 --- a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java +++ b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java @@ -55,6 +55,7 @@ public class GetReaderTest extends IntegrationTestBase { public void test() throws Exception { SparkConf sparkConf = new SparkConf(); sparkConf.set("spark.shuffle.manager", "org.apache.spark.shuffle.RssShuffleManager"); + sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), COORDINATOR_QUORUM); sparkConf.setMaster("local[4]"); final String remoteStorage1 = "hdfs://h1/p1"; diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java index 0d1271d831..814566c479 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java @@ -68,6 +68,7 @@ public class GetReaderTest extends IntegrationTestBase { public void test() throws Exception { SparkConf sparkConf = new SparkConf(); sparkConf.set("spark.shuffle.manager", "org.apache.spark.shuffle.RssShuffleManager"); + sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), COORDINATOR_QUORUM); sparkConf.setMaster("local[4]"); final String remoteStorage1 = "hdfs://h1/p1";