Skip to content

Commit 529db08

Browse files
committed
With SPARK-20236, FileCommitProtocol.instantiate() looks for a three argument constructor, passing in the dynamicPartitionOverwrite parameter. If there is no such constructor, it falls back to the classic two-arg one.
When InsertIntoHadoopFsRelationCommand passes down that dynamicPartitionOverwrite flag to FileCommitProtocol.instantiate(), it assumes that the instantiated protocol supports the specific requirements of dynamic partition overwrite. It does not notice when this does not hold, and so the output generated may be incorrect. This patch changes FileCommitProtocol.instantiate() so when dynamicPartitionOverwrite == true, it requires the protocol implementation to have a 3-arg constructor. Tests verify that * classes with only 2-arg constructor cannot be used with dynamic overwrite * classes with only 2-arg constructor can be used without dynamic overwrite * classes with 3 arg constructors can be used with both * the fallback to any two arg ctor takes place after the attempt to load the 3-arg ctor, * passing in invalid class types fail as expected (regression tests on expected behavior) Change-Id: I694868aecf865cfa552e031ea3f6dde8b600fa7b
1 parent 1098933 commit 529db08

File tree

2 files changed

+156
-1
lines changed

2 files changed

+156
-1
lines changed

core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.internal.io
2020
import org.apache.hadoop.fs._
2121
import org.apache.hadoop.mapreduce._
2222

23+
import org.apache.spark.internal.Logging
2324
import org.apache.spark.util.Utils
2425

2526

@@ -132,7 +133,7 @@ abstract class FileCommitProtocol {
132133
}
133134

134135

135-
object FileCommitProtocol {
136+
object FileCommitProtocol extends Logging {
136137
class TaskCommitMessage(val obj: Any) extends Serializable
137138

138139
object EmptyTaskCommitMessage extends TaskCommitMessage(null)
@@ -145,15 +146,23 @@ object FileCommitProtocol {
145146
jobId: String,
146147
outputPath: String,
147148
dynamicPartitionOverwrite: Boolean = false): FileCommitProtocol = {
149+
150+
logDebug(s"Creating committer $className; job $jobId; output=$outputPath;" +
151+
s" dynamic=$dynamicPartitionOverwrite")
148152
val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]]
149153
// First try the constructor with arguments (jobId: String, outputPath: String,
150154
// dynamicPartitionOverwrite: Boolean).
151155
// If that doesn't exist, try the one with (jobId: string, outputPath: String).
152156
try {
153157
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String], classOf[Boolean])
158+
logDebug("Using (String, String, Boolean) constructor")
154159
ctor.newInstance(jobId, outputPath, dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean])
155160
} catch {
156161
case _: NoSuchMethodException =>
162+
logDebug("Falling back to (String, String) constructor")
163+
require(!dynamicPartitionOverwrite,
164+
"Dynamic Partition Overwrite is enabled but" +
165+
s" the committer ${className} does not have the appropriate constructor")
157166
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
158167
ctor.newInstance(jobId, outputPath)
159168
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.internal.io
19+
20+
import org.apache.spark.SparkFunSuite
21+
22+
/**
23+
* Unit tests for instantiation of FileCommitProtocol implementations.
24+
*/
25+
class FileCommitProtocolInstantiationSuite extends SparkFunSuite {
26+
27+
test("Dynamic partitions require appropriate constructor") {
28+
29+
// you cannot instantiate a two-arg client with dynamic partitions
30+
// enabled.
31+
val ex = intercept[IllegalArgumentException] {
32+
instantiateClassic(true)
33+
}
34+
// check the contents of the message and rethrow if unexpected
35+
if (!ex.toString.contains("Dynamic Partition Overwrite")) {
36+
throw ex
37+
}
38+
}
39+
40+
test("Standard partitions work with classic constructor") {
41+
instantiateClassic(false)
42+
}
43+
44+
test("Three arg constructors have priority") {
45+
assert(3 == instantiateNew(false).argCount,
46+
"Wrong constructor argument count")
47+
}
48+
49+
test("Three arg constructors have priority when dynamic") {
50+
assert(3 == instantiateNew(true).argCount,
51+
"Wrong constructor argument count")
52+
}
53+
54+
test("The protocol must be of the correct class") {
55+
intercept[ClassCastException] {
56+
FileCommitProtocol.instantiate(
57+
classOf[Other].getCanonicalName,
58+
"job",
59+
"path",
60+
false)
61+
}
62+
}
63+
64+
test("If there is no matching constructor, class hierarchy is irrelevant") {
65+
intercept[NoSuchMethodException] {
66+
FileCommitProtocol.instantiate(
67+
classOf[NoMatchingArgs].getCanonicalName,
68+
"job",
69+
"path",
70+
false)
71+
}
72+
}
73+
74+
/**
75+
* Create a classic two-arg protocol instance.
76+
* @param dynamic dyanmic partitioning mode
77+
* @return the instance
78+
*/
79+
private def instantiateClassic(dynamic: Boolean): ClassicConstructorCommitProtocol = {
80+
FileCommitProtocol.instantiate(
81+
classOf[ClassicConstructorCommitProtocol].getCanonicalName,
82+
"job",
83+
"path",
84+
dynamic).asInstanceOf[ClassicConstructorCommitProtocol]
85+
}
86+
87+
/**
88+
* Create a three-arg protocol instance.
89+
* @param dynamic dyanmic partitioning mode
90+
* @return the instance
91+
*/
92+
private def instantiateNew(
93+
dynamic: Boolean): FullConstructorCommitProtocol = {
94+
FileCommitProtocol.instantiate(
95+
classOf[FullConstructorCommitProtocol].getCanonicalName,
96+
"job",
97+
"path",
98+
dynamic).asInstanceOf[FullConstructorCommitProtocol]
99+
}
100+
101+
}
102+
103+
/**
104+
* This protocol implementation does not have the new three-arg
105+
* constructor.
106+
*/
107+
private class ClassicConstructorCommitProtocol(arg1: String, arg2: String)
108+
extends HadoopMapReduceCommitProtocol(arg1, arg2) {
109+
}
110+
111+
/**
112+
* This protocol implementation does have the new three-arg constructor
113+
* alongside the original, and a 4 arg one for completeness.
114+
* The final value of the real constructor is the number of arguments
115+
* used in the 2- and 3- constructor, for test assertions.
116+
*/
117+
private class FullConstructorCommitProtocol(
118+
arg1: String,
119+
arg2: String,
120+
b: Boolean,
121+
val argCount: Int)
122+
extends HadoopMapReduceCommitProtocol(arg1, arg2, b) {
123+
124+
def this(arg1: String, arg2: String)= {
125+
this(arg1, arg2, false, 2)
126+
}
127+
128+
def this(arg1: String, arg2: String, b: Boolean) = {
129+
this(arg1, arg2, false, 3)
130+
}
131+
}
132+
133+
/**
134+
* This has the 2-arity constructor, but isn't the right class.
135+
*/
136+
private class Other(arg1: String, arg2: String) {
137+
138+
}
139+
140+
/**
141+
* This has no matching arguments
142+
*/
143+
private class NoMatchingArgs() {
144+
145+
}
146+

0 commit comments

Comments
 (0)