Skip to content

Commit 6ea582e

Browse files
committed
[SPARK-24181][SQL] Better error message for writing sorted data
## What changes were proposed in this pull request? The exception message should clearly distinguish sorting and bucketing in `save` and `jdbc` write. When a user tries to write a sorted data using save or insertInto, it will throw an exception with message that `s"'$operation' does not support bucketing right now""`. We should throw `s"'$operation' does not support sortBy right now""` instead. ## How was this patch tested? More tests in `DataFrameReaderWriterSuite.scala` Author: DB Tsai <d_tsai@apple.com> Closes #21235 from dbtsai/fixException.
1 parent cac9b1d commit 6ea582e

File tree

3 files changed

+46
-9
lines changed

3 files changed

+46
-9
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -330,8 +330,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
330330
}
331331

332332
private def getBucketSpec: Option[BucketSpec] = {
333-
if (sortColumnNames.isDefined) {
334-
require(numBuckets.isDefined, "sortBy must be used together with bucketBy")
333+
if (sortColumnNames.isDefined && numBuckets.isEmpty) {
334+
throw new AnalysisException("sortBy must be used together with bucketBy")
335335
}
336336

337337
numBuckets.map { n =>
@@ -340,8 +340,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
340340
}
341341

342342
private def assertNotBucketed(operation: String): Unit = {
343-
if (numBuckets.isDefined || sortColumnNames.isDefined) {
344-
throw new AnalysisException(s"'$operation' does not support bucketing right now")
343+
if (getBucketSpec.isDefined) {
344+
if (sortColumnNames.isEmpty) {
345+
throw new AnalysisException(s"'$operation' does not support bucketBy right now")
346+
} else {
347+
throw new AnalysisException(s"'$operation' does not support bucketBy and sortBy right now")
348+
}
345349
}
346350
}
347351

sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,10 @@ abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils {
6060

6161
test("specify sorting columns without bucketing columns") {
6262
val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
63-
intercept[IllegalArgumentException](df.write.sortBy("j").saveAsTable("tt"))
63+
val e = intercept[AnalysisException] {
64+
df.write.sortBy("j").saveAsTable("tt")
65+
}
66+
assert(e.getMessage == "sortBy must be used together with bucketBy;")
6467
}
6568

6669
test("sorting by non-orderable column") {
@@ -74,7 +77,16 @@ abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils {
7477
val e = intercept[AnalysisException] {
7578
df.write.bucketBy(2, "i").parquet("/tmp/path")
7679
}
77-
assert(e.getMessage == "'save' does not support bucketing right now;")
80+
assert(e.getMessage == "'save' does not support bucketBy right now;")
81+
}
82+
83+
test("write bucketed and sorted data using save()") {
84+
val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
85+
86+
val e = intercept[AnalysisException] {
87+
df.write.bucketBy(2, "i").sortBy("i").parquet("/tmp/path")
88+
}
89+
assert(e.getMessage == "'save' does not support bucketBy and sortBy right now;")
7890
}
7991

8092
test("write bucketed data using insertInto()") {
@@ -83,7 +95,16 @@ abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils {
8395
val e = intercept[AnalysisException] {
8496
df.write.bucketBy(2, "i").insertInto("tt")
8597
}
86-
assert(e.getMessage == "'insertInto' does not support bucketing right now;")
98+
assert(e.getMessage == "'insertInto' does not support bucketBy right now;")
99+
}
100+
101+
test("write bucketed and sorted data using insertInto()") {
102+
val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
103+
104+
val e = intercept[AnalysisException] {
105+
df.write.bucketBy(2, "i").sortBy("i").insertInto("tt")
106+
}
107+
assert(e.getMessage == "'insertInto' does not support bucketBy and sortBy right now;")
87108
}
88109

89110
private lazy val df = {

sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
276276
assert(LastOptions.parameters("doubleOpt") == "6.7")
277277
}
278278

279-
test("check jdbc() does not support partitioning or bucketing") {
279+
test("check jdbc() does not support partitioning, bucketBy or sortBy") {
280280
val df = spark.read.text(Utils.createTempDir(namePrefix = "text").getCanonicalPath)
281281

282282
var w = df.write.partitionBy("value")
@@ -287,7 +287,19 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
287287

288288
w = df.write.bucketBy(2, "value")
289289
e = intercept[AnalysisException](w.jdbc(null, null, null))
290-
Seq("jdbc", "bucketing").foreach { s =>
290+
Seq("jdbc", "does not support bucketBy right now").foreach { s =>
291+
assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT)))
292+
}
293+
294+
w = df.write.sortBy("value")
295+
e = intercept[AnalysisException](w.jdbc(null, null, null))
296+
Seq("sortBy must be used together with bucketBy").foreach { s =>
297+
assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT)))
298+
}
299+
300+
w = df.write.bucketBy(2, "value").sortBy("value")
301+
e = intercept[AnalysisException](w.jdbc(null, null, null))
302+
Seq("jdbc", "does not support bucketBy and sortBy right now").foreach { s =>
291303
assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT)))
292304
}
293305
}

0 commit comments

Comments
 (0)