From 091659983ae7aede137650a58eb41ffe71136bbd Mon Sep 17 00:00:00 2001 From: "jianran.tfh" Date: Fri, 19 Aug 2016 15:49:15 +0800 Subject: [PATCH 1/3] paged jdbcRDD for mysql limit start,pageSize --- .../scala/org/apache/spark/rdd/JdbcRDD.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala index 0970b9807167..c880a8a2e64e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala @@ -133,6 +133,25 @@ class JdbcRDD[T: ClassTag]( } } +class PagedJdbcRDD[T: ClassTag]( + sc: SparkContext, + getConnection: () => Connection, + sql: String, + lowerBound: Long, + pageSize: Long, + numPartitions: Int, + mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _) + extends JdbcRDD[T](sc, getConnection, sql, lowerBound, pageSize, numPartitions, mapRow) { + + override def getPartitions: Array[Partition] = { + (0 until numPartitions).map { i => + val start = lowerBound + i * pageSize + val end = pageSize + new JdbcPartition(i, start.toLong, end.toLong) + }.toArray + } +} + object JdbcRDD { def resultSetToObjectArray(rs: ResultSet): Array[Object] = { Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1)) From 981e13652ed9b2131be95b8c1e3dc26d85f487fb Mon Sep 17 00:00:00 2001 From: "jianran.tfh" Date: Fri, 19 Aug 2016 16:28:24 +0800 Subject: [PATCH 2/3] limit --- .../main/scala/org/apache/spark/rdd/JdbcRDD.scala | 2 +- .../scala/org/apache/spark/rdd/JdbcRDDSuite.scala | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala index c880a8a2e64e..0728be922384 100644 --- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala @@ -133,7 +133,7 @@ class JdbcRDD[T: ClassTag]( } } -class PagedJdbcRDD[T: ClassTag]( +class LimitJdbcRDD[T: ClassTag]( sc: SparkContext, getConnection: () => Connection, sql: String, diff --git a/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala index 05013fbc49b8..2ea8c5172546 100644 --- a/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala @@ -84,6 +84,19 @@ class JdbcRDDSuite extends SparkFunSuite with BeforeAndAfter with LocalSparkCont assert(rdd.reduce(_ + _) === 10100) } + test("limit functionality") { + sc = new SparkContext("local", "test") + val rdd = new LimitJdbcRDD( + sc, + () => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") }, + "SELECT DATA FROM FOO ORDER BY ID OFFSET ? ROWS FETCH NEXT ? ROWS ONLY", + 1, 10, 10, + (r: ResultSet) => { r.getInt(1) } ).cache() + + assert(rdd.count === 100) + assert(rdd.reduce(_ + _) === 10100) + } + test("large id overflow") { sc = new SparkContext("local", "test") val rdd = new JdbcRDD( From c786cfa744e6e52dc9f8afc5300abd6b07cc01ae Mon Sep 17 00:00:00 2001 From: "jianran.tfh" Date: Fri, 19 Aug 2016 17:06:08 +0800 Subject: [PATCH 3/3] start from 0 --- core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala index 2ea8c5172546..5c8b89d82ac6 100644 --- a/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala @@ -90,7 +90,7 @@ class JdbcRDDSuite extends SparkFunSuite with BeforeAndAfter with LocalSparkCont sc, () => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") }, "SELECT DATA FROM FOO ORDER BY ID OFFSET ? ROWS FETCH NEXT ? ROWS ONLY", - 1, 10, 10, + 0, 10, 10, (r: ResultSet) => { r.getInt(1) } ).cache() assert(rdd.count === 100)