Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions patch/spark/spark-3.2.1_dynamic_allocation_support.patch
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@ diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/mai
index 5f37a1abb19..af4bee1e1bb 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -580,6 +580,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
@@ -580,6 +580,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
Utils.redact(this, getAll).sorted.map { case (k, v) => k + "=" + v }.mkString("\n")
}

+ /**
+ * Return true if remote shuffle service is enabled.
+ */
+ def isRssEnable(): Boolean = get("spark.shuffle.manager", "sort").contains("RssShuffleManager")
+ def isRssEnable(): Boolean = {
+ val shuffleMgr = get("spark.shuffle.manager", "sort")
+ shuffleMgr.contains("RssShuffleManager") || shuffleMgr.contains("UniffleShuffleManager")
+ }
}

private[spark] object SparkConf extends Logging {
Expand Down
9 changes: 6 additions & 3 deletions patch/spark/spark-3.3.1_dynamic_allocation_support.patch
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@ diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/mai
index 5f37a1abb19..af4bee1e1bb 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -580,6 +580,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
@@ -580,6 +580,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
Utils.redact(this, getAll).sorted.map { case (k, v) => k + "=" + v }.mkString("\n")
}

+ /**
+ * Return true if remote shuffle service is enabled.
+ */
+ def isRssEnable(): Boolean = get("spark.shuffle.manager", "sort").contains("RssShuffleManager")
+ def isRssEnable(): Boolean = {
+ val shuffleMgr = get("spark.shuffle.manager", "sort")
+ shuffleMgr.contains("RssShuffleManager") || shuffleMgr.contains("UniffleShuffleManager")
+ }
}

private[spark] object SparkConf extends Logging {
Expand Down
7 changes: 5 additions & 2 deletions patch/spark/spark-3.4.1_dynamic_allocation_support.patch
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@ diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/mai
index 08344d8e547..ff3bab6710d 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -580,6 +580,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
@@ -580,6 +580,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
Utils.redact(this, getAll).sorted.map { case (k, v) => k + "=" + v }.mkString("\n")
}

+ /**
+ * Return true if remote shuffle service is enabled.
+ */
+ def isRssEnable(): Boolean = get("spark.shuffle.manager", "sort").contains("RssShuffleManager")
+ def isRssEnable(): Boolean = {
+ val shuffleMgr = get("spark.shuffle.manager", "sort")
+ shuffleMgr.contains("RssShuffleManager") || shuffleMgr.contains("UniffleShuffleManager")
+ }
}

private[spark] object SparkConf extends Logging {
Expand Down