Skip to content

Commit 3969f5f

Browse files
committed
Re-enable guarding of commit coordination with spark.speculation setting.
1 parent ede7590 commit 3969f5f

File tree

2 files changed

+6
-7
lines changed

2 files changed

+6
-7
lines changed

core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,12 +131,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
131131
// attempts, which should only occur if speculation is enabled
132132
val speculationEnabled = sparkConf.getBoolean("spark.speculation", false)
133133
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs
134-
// sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)
135-
136-
// TODO: revert this before merging the PR; this is enabled so this code path is exercised
137-
// by more tests (even though it might not be _necessary_, it should be _safe_ to do the
138-
// extra coordination)
139-
true
134+
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)
140135
}
141136
if (shouldCoordinateWithDriver) {
142137
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator

core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,11 @@ class OutputCommitCoordinatorSuite extends FunSuite with BeforeAndAfter {
7272

7373
before {
7474
tempDir = Utils.createTempDir()
75-
sc = new SparkContext("local[4]", classOf[OutputCommitCoordinatorSuite].getSimpleName) {
75+
val conf = new SparkConf()
76+
.setMaster("local[4]")
77+
.setAppName(classOf[OutputCommitCoordinatorSuite].getSimpleName)
78+
.set("spark.speculation", "true")
79+
sc = new SparkContext(conf) {
7680
override private[spark] def createSparkEnv(
7781
conf: SparkConf,
7882
isLocal: Boolean,

0 commit comments

Comments
 (0)