Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}

private def getBucketSpec: Option[BucketSpec] = {
if (sortColumnNames.isDefined) {
require(numBuckets.isDefined, "sortBy must be used together with bucketBy")
if (sortColumnNames.isDefined && numBuckets.isEmpty) {
throw new AnalysisException("sortBy must be used together with bucketBy")
}

numBuckets.map { n =>
Expand All @@ -340,8 +340,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}

private def assertNotBucketed(operation: String): Unit = {
if (numBuckets.isDefined || sortColumnNames.isDefined) {
throw new AnalysisException(s"'$operation' does not support bucketing right now")
if (getBucketSpec.isDefined) {
if (sortColumnNames.isEmpty) {
throw new AnalysisException(s"'$operation' does not support bucketBy right now")
} else {
throw new AnalysisException(s"'$operation' does not support bucketBy and sortBy right now")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils {

test("specify sorting columns without bucketing columns") {
val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
intercept[IllegalArgumentException](df.write.sortBy("j").saveAsTable("tt"))
val e = intercept[AnalysisException] {
df.write.sortBy("j").saveAsTable("tt")
}
assert(e.getMessage == "sortBy must be used together with bucketBy;")
}

test("sorting by non-orderable column") {
Expand All @@ -74,7 +77,16 @@ abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils {
val e = intercept[AnalysisException] {
df.write.bucketBy(2, "i").parquet("/tmp/path")
}
assert(e.getMessage == "'save' does not support bucketing right now;")
assert(e.getMessage == "'save' does not support bucketBy right now;")
}

test("write bucketed and sorted data using save()") {
val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")

val e = intercept[AnalysisException] {
df.write.bucketBy(2, "i").sortBy("i").parquet("/tmp/path")
}
assert(e.getMessage == "'save' does not support bucketBy and sortBy right now;")
}

test("write bucketed data using insertInto()") {
Expand All @@ -83,7 +95,16 @@ abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils {
val e = intercept[AnalysisException] {
df.write.bucketBy(2, "i").insertInto("tt")
}
assert(e.getMessage == "'insertInto' does not support bucketing right now;")
assert(e.getMessage == "'insertInto' does not support bucketBy right now;")
}

test("write bucketed and sorted data using insertInto()") {
val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")

val e = intercept[AnalysisException] {
df.write.bucketBy(2, "i").sortBy("i").insertInto("tt")
}
assert(e.getMessage == "'insertInto' does not support bucketBy and sortBy right now;")
}

private lazy val df = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
assert(LastOptions.parameters("doubleOpt") == "6.7")
}

test("check jdbc() does not support partitioning or bucketing") {
test("check jdbc() does not support partitioning, bucketBy or sortBy") {
val df = spark.read.text(Utils.createTempDir(namePrefix = "text").getCanonicalPath)

var w = df.write.partitionBy("value")
Expand All @@ -287,7 +287,19 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be

w = df.write.bucketBy(2, "value")
e = intercept[AnalysisException](w.jdbc(null, null, null))
Seq("jdbc", "bucketing").foreach { s =>
Seq("jdbc", "does not support bucketBy right now").foreach { s =>
assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT)))
}

w = df.write.sortBy("value")
e = intercept[AnalysisException](w.jdbc(null, null, null))
Seq("sortBy must be used together with bucketBy").foreach { s =>
assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT)))
}

w = df.write.bucketBy(2, "value").sortBy("value")
e = intercept[AnalysisException](w.jdbc(null, null, null))
Seq("jdbc", "does not support bucketBy and sortBy right now").foreach { s =>
assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT)))
}
}
Expand Down