Skip to content

Commit 98aa212

Browse files
committed
fix style
1 parent 6c49fa3 commit 98aa212

File tree

5 files changed

+13
-23
lines changed

5 files changed

+13
-23
lines changed

core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,21 +149,23 @@ object FileCommitProtocol extends Logging {
149149
outputPath: String,
150150
dynamicPartitionOverwrite: Boolean = false,
151151
isPartitionOverwrite: Boolean = false,
152-
staticPartitionKVS: Seq[(String, String)] = Seq.empty[(String, String)]):
152+
staticPartitionKVs: Seq[(String, String)] = Seq.empty[(String, String)]):
153153
FileCommitProtocol = {
154154

155155
logDebug(s"Creating committer $className; job $jobId; output=$outputPath;" +
156-
s" dynamic=$dynamicPartitionOverwrite")
156+
s" dynamic=$dynamicPartitionOverwrite; isPartitionOverWrite=$isPartitionOverwrite;" +
157+
s" staticPartitionKVS=$staticPartitionKVs")
157158
val clazz = Utils.classForName[FileCommitProtocol](className)
158159
// First try the constructor with arguments (jobId: String, outputPath: String,
159-
// dynamicPartitionOverwrite: Boolean).
160+
// dynamicPartitionOverwrite: Boolean, isPartitionOverwrite: Boolean,
161+
// staticPartitionKVs: Seq[(String, String)]).
160162
// If that doesn't exist, try the one with (jobId: string, outputPath: String).
161163
try {
162164
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String], classOf[Boolean],
163165
classOf[Boolean], classOf[Seq[(String, String)]])
164166
logDebug("Using (String, String, Boolean, Boolean, Seq[(String, String)]) constructor")
165167
ctor.newInstance(jobId, outputPath, dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean],
166-
isPartitionOverwrite.asInstanceOf[java.lang.Boolean], staticPartitionKVS)
168+
isPartitionOverwrite.asInstanceOf[java.lang.Boolean], staticPartitionKVs)
167169
} catch {
168170
case _: NoSuchMethodException =>
169171
logDebug("Falling back to (String, String) constructor")

core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class HadoopMapReduceCommitProtocol(
5555
path: String,
5656
dynamicPartitionOverwrite: Boolean = false,
5757
isPartitionOverwrite: Boolean = false,
58-
staticPartitionKVS: Seq[(String, String)] = Seq.empty[(String, String)])
58+
staticPartitionKVs: Seq[(String, String)] = Seq.empty[(String, String)])
5959
extends FileCommitProtocol with Serializable with Logging {
6060

6161
import FileCommitProtocol._
@@ -112,7 +112,7 @@ class HadoopMapReduceCommitProtocol(
112112
* Get the determinable base path of results according to specified partition key-value pairs.
113113
*/
114114
private def getStaticPartitionPath(): String = {
115-
staticPartitionKVS.map{kv =>
115+
staticPartitionKVs.map{kv =>
116116
escapePathName(kv._1) + "=" + escapePathName(kv._2)
117117
}.mkString(File.separator)
118118
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ case class InsertIntoHadoopFsRelationCommand(
117117
outputPath = outputPath.toString,
118118
dynamicPartitionOverwrite = dynamicPartitionOverwrite,
119119
isPartitionOverwrite = isPartitionOverwrite,
120-
staticPartitionKVS = staticPartitionKVs)
120+
staticPartitionKVs = staticPartitionKVs)
121121

122122
val doInsertion = (mode, pathExists) match {
123123
case (SaveMode.ErrorIfExists, true) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ class SQLHadoopMapReduceCommitProtocol(
3434
path: String,
3535
dynamicPartitionOverwrite: Boolean = false,
3636
isPartitionOverwrite: Boolean = false,
37-
staticPartitionKVS: Seq[(String, String)] = Seq.empty[(String, String)])
37+
staticPartitionKVs: Seq[(String, String)] = Seq.empty[(String, String)])
3838
extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite,
39-
isPartitionOverwrite, staticPartitionKVS)
39+
isPartitionOverwrite, staticPartitionKVs)
4040
with Serializable with Logging {
4141

4242
override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3194,9 +3194,9 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession {
31943194
}
31953195
}
31963196

3197-
test("SPARK-29037: For non dynamic partition overwrite, set a unique staging dir") {
3197+
test("SPARK-29037: For static partition overwrite, set a unique staging dir") {
31983198
withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.STATIC.toString) {
3199-
withTable("ta", "tb") {
3199+
withTable("ta") {
32003200
sql("create table ta(id int, p1 int, p2 int) using parquet partitioned by (p1, p2)")
32013201
sql("insert overwrite table ta partition(p1=1,p2) select 1, 3")
32023202
val df1 = sql("select * from ta order by p2")
@@ -3213,18 +3213,6 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession {
32133213
sql("insert overwrite table ta select 9, 9, 9")
32143214
val df5 = sql("select * from ta order by p2")
32153215
checkAnswer(df5, Array(Row(9, 9, 9)))
3216-
3217-
// For non-partitioned table write.
3218-
sql("create table tb(id int, p1 int, p2 int) using parquet")
3219-
sql("insert overwrite table tb select 1, 2, 3")
3220-
val df6 = sql("select * from tb order by p2")
3221-
checkAnswer(df6, Array(Row(1, 2, 3)))
3222-
sql("insert overwrite table tb select 1, 2, 4")
3223-
val df7 = sql("select * from tb order by p2")
3224-
checkAnswer(df7, Array(Row(1, 2, 4)))
3225-
sql("insert into table tb select 1, 2, 5")
3226-
val df8 = sql("select * from tb order by p2")
3227-
checkAnswer(df8, Array(Row(1, 2, 4), Row(1, 2, 5)))
32283216
}
32293217
}
32303218
}

0 commit comments

Comments
 (0)