Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,19 @@ public RssShuffleManager(SparkConf sparkConf, boolean isDriver) {
// pass that ShuffleHandle to executors (getWriter/getReader).
@Override
public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, int numMaps, ShuffleDependency<K, V, C> dependency) {

//Spark have three kinds of serializer:
//org.apache.spark.serializer.JavaSerializer
//org.apache.spark.sql.execution.UnsafeRowSerializer
//org.apache.spark.serializer.KryoSerializer,
//Only org.apache.spark.serializer.JavaSerializer don't support RelocationOfSerializedObjects.
//So when we find the parameters to use org.apache.spark.serializer.JavaSerializer, We should throw an exception
if (!SparkEnv.get().serializer().supportsRelocationOfSerializedObjects()) {
throw new IllegalArgumentException("Can't use serialized shuffle for shuffleId: " + shuffleId + ", because the"
+ " serializer: " + SparkEnv.get().serializer().getClass().getName() + " does not support object "
+ "relocation.");
}

// If yarn enable retry ApplicationMaster, appId will be not unique and shuffle data will be incorrect,
// appId + timestamp can avoid such problem,
// can't get appId in construct because SparkEnv is not created yet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,18 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) {
@Override
public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<K, V, C> dependency) {

//Spark have three kinds of serializer:
//org.apache.spark.serializer.JavaSerializer
//org.apache.spark.sql.execution.UnsafeRowSerializer
//org.apache.spark.serializer.KryoSerializer,
//Only org.apache.spark.serializer.JavaSerializer don't support RelocationOfSerializedObjects.
//So when we find the parameters to use org.apache.spark.serializer.JavaSerializer, We should throw an exception
if (!SparkEnv.get().serializer().supportsRelocationOfSerializedObjects()) {
throw new IllegalArgumentException("Can't use serialized shuffle for shuffleId: " + shuffleId + ", because the"
+ " serializer: " + SparkEnv.get().serializer().getClass().getName() + " does not support object "
+ "relocation.");
}

if (id.get() == null) {
id.compareAndSet(null, SparkEnv.get().conf().getAppId() + "_" + System.currentTimeMillis());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class GetReaderTest extends IntegrationTestBase {
public void test() throws Exception {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.shuffle.manager", "org.apache.spark.shuffle.RssShuffleManager");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), COORDINATOR_QUORUM);
sparkConf.setMaster("local[4]");
final String remoteStorage1 = "hdfs://h1/p1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class GetReaderTest extends IntegrationTestBase {
public void test() throws Exception {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.shuffle.manager", "org.apache.spark.shuffle.RssShuffleManager");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), COORDINATOR_QUORUM);
sparkConf.setMaster("local[4]");
final String remoteStorage1 = "hdfs://h1/p1";
Expand Down