From 3e7db155c2ca10128b9d81e17883d3376ebcb58f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 13 Apr 2015 18:24:05 +0800 Subject: [PATCH 1/4] Fix wrong logic to generate WHERE clause for JDBC. --- .../apache/spark/sql/jdbc/JDBCRelation.scala | 14 ++++++++++++-- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 17 +++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala index 4fa84dc076f7..b92cf26170e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala @@ -68,9 +68,19 @@ private[sql] object JDBCRelation { var currentValue: Long = partitioning.lowerBound var ans = new ArrayBuffer[Partition]() while (i < numPartitions) { - val lowerBound = if (i != 0) s"$column >= $currentValue" else null + val lowerBound = + if (i != 0) { + s"$column >= $currentValue" + } else { + s"$column >= ${partitioning.lowerBound}" + } currentValue += stride - val upperBound = if (i != numPartitions - 1) s"$column < $currentValue" else null + val upperBound = + if (i != numPartitions - 1) { + s"$column < $currentValue" + } else { + s"$column < ${partitioning.upperBound}" + } val whereClause = if (upperBound == null) { lowerBound diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 3596b183d432..f0a75c17557d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -126,6 +126,16 @@ class JDBCSuite extends FunSuite with BeforeAndAfter { |OPTIONS (url '$url', dbtable 'TEST.FLTTYPES', user 'testUser', password 'testPass') """.stripMargin.replaceAll("\n", " ")) + // SPARK-6800 + val stmt2 = conn.createStatement() + stmt2.execute("CREATE TABLE test.Person (person_id INT NOT NULL GENERATED ALWAYS AS IDENTITY CONSTRAINT person_pk PRIMARY KEY, name VARCHAR(50), age INT)") + stmt2.execute("INSERT INTO test.Person(name, age) VALUES('Armando Carvalho', 50)") + stmt2.execute("INSERT INTO test.Person(name, age) VALUES('Lurdes Pereira', 23)") + stmt2.execute("INSERT INTO test.Person(name, age) VALUES('Ana Rita Costa', 12)") + stmt2.execute("INSERT INTO test.Person(name, age) VALUES('Armando Pereira', 32)") + stmt2.execute("INSERT INTO test.Person(name, age) VALUES('Miguel Costa', 15)") + stmt2.execute("INSERT INTO test.Person(name, age) VALUES('Anabela Sintra', 13)") + // Untested: IDENTITY, OTHER, UUID, ARRAY, and GEOMETRY types. } @@ -133,6 +143,13 @@ class JDBCSuite extends FunSuite with BeforeAndAfter { conn.close() } + test("SPARK-6800") { + val people = jdbc(url = urlWithUserAndPass, table = "test.Person", + columnName = "age", lowerBound = 0, upperBound = 40, numPartitions = 10) + assert(people.count() == 5) + assert(people.filter("AGE > 40").count() == 0) + } + test("SELECT *") { assert(sql("SELECT * FROM foobar").collect().size === 3) } From 3eb74d614a05d33a3071d586fedc20bc4f2e88d6 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 15 Apr 2015 10:48:50 +0800 Subject: [PATCH 2/4] Revert and modify doc. --- .../apache/spark/sql/jdbc/JDBCRelation.scala | 20 ++++++------------- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 17 ---------------- 2 files changed, 6 insertions(+), 31 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala index b92cf26170e1..1c113b34a1ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala @@ -50,9 +50,11 @@ private[sql] object JDBCRelation { * Given a partitioning schematic (a column of integral type, a number of * partitions, and upper and lower bounds on the column's value), generate * WHERE clauses for each partition so that each row in the table appears - * exactly once. The parameters minValue and maxValue are advisory in that + * exactly once. The parameters minValue and maxValue are advisory in that * incorrect values may cause the partitioning to be poor, but no data - * will fail to be represented. + * will fail to be represented. Note: the upper and lower bounds are just + * used to decide partition stride, not for filtering. So all the rows in + * table will be partitioned. */ def columnPartition(partitioning: JDBCPartitioningInfo): Array[Partition] = { if (partitioning == null) return Array[Partition](JDBCPartition(null, 0)) @@ -68,19 +70,9 @@ private[sql] object JDBCRelation { var currentValue: Long = partitioning.lowerBound var ans = new ArrayBuffer[Partition]() while (i < numPartitions) { - val lowerBound = - if (i != 0) { - s"$column >= $currentValue" - } else { - s"$column >= ${partitioning.lowerBound}" - } + val lowerBound = if (i != 0) s"$column >= $currentValue" else null currentValue += stride - val upperBound = - if (i != numPartitions - 1) { - s"$column < $currentValue" - } else { - s"$column < ${partitioning.upperBound}" - } + val upperBound = if (i != numPartitions - 1) s"$column < $currentValue" else null val whereClause = if (upperBound == null) { lowerBound diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index f0a75c17557d..3596b183d432 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -126,16 +126,6 @@ class JDBCSuite extends FunSuite with BeforeAndAfter { |OPTIONS (url '$url', dbtable 'TEST.FLTTYPES', user 'testUser', password 'testPass') """.stripMargin.replaceAll("\n", " ")) - // SPARK-6800 - val stmt2 = conn.createStatement() - stmt2.execute("CREATE TABLE test.Person (person_id INT NOT NULL GENERATED ALWAYS AS IDENTITY CONSTRAINT person_pk PRIMARY KEY, name VARCHAR(50), age INT)") - stmt2.execute("INSERT INTO test.Person(name, age) VALUES('Armando Carvalho', 50)") - stmt2.execute("INSERT INTO test.Person(name, age) VALUES('Lurdes Pereira', 23)") - stmt2.execute("INSERT INTO test.Person(name, age) VALUES('Ana Rita Costa', 12)") - stmt2.execute("INSERT INTO test.Person(name, age) VALUES('Armando Pereira', 32)") - stmt2.execute("INSERT INTO test.Person(name, age) VALUES('Miguel Costa', 15)") - stmt2.execute("INSERT INTO test.Person(name, age) VALUES('Anabela Sintra', 13)") - // Untested: IDENTITY, OTHER, UUID, ARRAY, and GEOMETRY types. } @@ -143,13 +133,6 @@ class JDBCSuite extends FunSuite with BeforeAndAfter { conn.close() } - test("SPARK-6800") { - val people = jdbc(url = urlWithUserAndPass, table = "test.Person", - columnName = "age", lowerBound = 0, upperBound = 40, numPartitions = 10) - assert(people.count() == 5) - assert(people.filter("AGE > 40").count() == 0) - } - test("SELECT *") { assert(sql("SELECT * FROM foobar").collect().size === 3) } From 1dcc9294d0a5a6e9ac58536c0b39ccb433b89b1c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 15 Apr 2015 13:39:41 +0800 Subject: [PATCH 3/4] Update document. --- docs/sql-programming-guide.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 332618edf0c5..03500867df70 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1371,7 +1371,10 @@ the Data Sources API. The following options are supported: These options must all be specified if any of them is specified. They describe how to partition the table when reading in parallel from multiple workers. - partitionColumn must be a numeric column from the table in question. + partitionColumn must be a numeric column from the table in question. Notice + that lowerBound and upperBound are just used to decide the + partition stride, not for filtering the rows in table. So all rows in the table will be + partitioned and returned. From 51386c86682636896278a0967860de4c4cf03992 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 16 Apr 2015 00:30:31 +0800 Subject: [PATCH 4/4] Update code comment. --- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index c25ef58e6f62..b237fe684cdc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -873,8 +873,8 @@ class SQLContext(@transient val sparkContext: SparkContext) * passed to this function. * * @param columnName the name of a column of integral type that will be used for partitioning. - * @param lowerBound the minimum value of `columnName` to retrieve - * @param upperBound the maximum value of `columnName` to retrieve + * @param lowerBound the minimum value of `columnName` used to decide partition stride + * @param upperBound the maximum value of `columnName` used to decide partition stride * @param numPartitions the number of partitions. the range `minValue`-`maxValue` will be split * evenly into this many partitions *