diff --git a/README.md b/README.md index 95e1c82f2a..5ec89f35e0 100644 --- a/README.md +++ b/README.md @@ -257,6 +257,10 @@ After apply the patch and rebuild spark, add following configuration in spark co spark.shuffle.service.enabled false spark.dynamicAllocation.enabled true ``` +For spark3.5 or above just add one more configuration: + ``` + spark.shuffle.sort.io.plugin.class org.apache.spark.shuffle.RssShuffleDataIo + ``` ### Deploy MapReduce Client diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDataIo.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDataIo.java new file mode 100644 index 0000000000..edbe25c9f9 --- /dev/null +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDataIo.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle; + +import org.apache.spark.SparkConf; +import org.apache.spark.shuffle.api.ShuffleDataIO; +import org.apache.spark.shuffle.api.ShuffleDriverComponents; +import org.apache.spark.shuffle.api.ShuffleExecutorComponents; +import org.apache.spark.shuffle.sort.io.LocalDiskShuffleExecutorComponents; + +public class RssShuffleDataIo implements ShuffleDataIO { + private final SparkConf sparkConf; + + public RssShuffleDataIo(SparkConf sparkConf) { + this.sparkConf = sparkConf; + } + + /** Compatible with SortShuffleManager when DelegationRssShuffleManager fallback */ + @Override + public ShuffleExecutorComponents executor() { + return new LocalDiskShuffleExecutorComponents(sparkConf); + } + + @Override + public ShuffleDriverComponents driver() { + return new RssShuffleDriverComponents(sparkConf); + } +} diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDriverComponents.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDriverComponents.java new file mode 100644 index 0000000000..893b2e8f4d --- /dev/null +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDriverComponents.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle; + +import org.apache.spark.SparkConf; +import org.apache.spark.shuffle.sort.io.LocalDiskShuffleDriverComponents; + +public class RssShuffleDriverComponents extends LocalDiskShuffleDriverComponents { + + private final SparkConf sparkConf; + + public RssShuffleDriverComponents(SparkConf sparkConf) { + this.sparkConf = sparkConf; + } + + /** + * Omitting @Override annotation to avoid compile error before Spark 3.5.0 + * + *
This method is called after DelegationRssShuffleManager initialize, so + * RssSparkConfig.RSS_ENABLED must be already set + */ + public boolean supportsReliableStorage() { + return sparkConf.get(RssSparkConfig.RSS_ENABLED) + || RssShuffleManager.class + .getCanonicalName() + .equals(sparkConf.get("spark.shuffle.manager")); + } +} diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java index ac37f03ceb..6d48b901c7 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java @@ -101,6 +101,8 @@ protected SparkConf createSparkConf() { public void updateSparkConfWithRss(SparkConf sparkConf) { sparkConf.set("spark.shuffle.manager", "org.apache.spark.shuffle.RssShuffleManager"); + sparkConf.set( + "spark.shuffle.sort.io.plugin.class", "org.apache.spark.shuffle.RssShuffleDataIo"); sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); sparkConf.set(RssSparkConfig.RSS_WRITER_BUFFER_SIZE.key(), "4m"); sparkConf.set(RssSparkConfig.RSS_WRITER_BUFFER_SPILL_SIZE.key(), "32m"); diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java index 1b92530ec4..1aaf7eed98 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java @@ -69,6 +69,8 @@ public class GetReaderTest extends IntegrationTestBase { public void test() throws Exception { SparkConf sparkConf = new SparkConf(); sparkConf.set("spark.shuffle.manager", "org.apache.spark.shuffle.RssShuffleManager"); + sparkConf.set( + "spark.shuffle.sort.io.plugin.class", "org.apache.spark.shuffle.RssShuffleDataIo"); sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), COORDINATOR_QUORUM); sparkConf.setMaster("local[4]");