Skip to content

Commit d4cedc5

Browse files
committed
Moves Java JdbcRDD test case to a separate test suite
1 parent ffcdf2e commit d4cedc5

File tree

2 files changed

+119
-83
lines changed

2 files changed

+119
-83
lines changed

core/src/test/java/org/apache/spark/JavaAPISuite.java

Lines changed: 3 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,13 @@
1818
package org.apache.spark;
1919

2020
import java.io.*;
21-
import java.net.URI;
22-
import java.nio.ByteBuffer;
2321
import java.nio.channels.FileChannel;
24-
import java.sql.Connection;
25-
import java.sql.DriverManager;
26-
import java.sql.PreparedStatement;
27-
import java.sql.ResultSet;
28-
import java.sql.SQLException;
29-
import java.sql.Statement;
22+
import java.nio.ByteBuffer;
23+
import java.net.URI;
3024
import java.util.*;
3125
import java.util.concurrent.*;
3226

27+
import org.apache.spark.input.PortableDataStream;
3328
import scala.Tuple2;
3429
import scala.Tuple3;
3530
import scala.Tuple4;
@@ -56,10 +51,8 @@
5651
import org.apache.spark.api.java.*;
5752
import org.apache.spark.api.java.function.*;
5853
import org.apache.spark.executor.TaskMetrics;
59-
import org.apache.spark.input.PortableDataStream;
6054
import org.apache.spark.partial.BoundedDouble;
6155
import org.apache.spark.partial.PartialResult;
62-
import org.apache.spark.rdd.JdbcRDD;
6356
import org.apache.spark.storage.StorageLevel;
6457
import org.apache.spark.util.StatCounter;
6558

@@ -1515,77 +1508,4 @@ public void testRegisterKryoClasses() {
15151508
conf.get("spark.kryo.classesToRegister"));
15161509
}
15171510

1518-
1519-
private void setUpJdbc() throws Exception {
1520-
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
1521-
Connection connection =
1522-
DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb;create=true");
1523-
1524-
try {
1525-
Statement create = connection.createStatement();
1526-
create.execute(
1527-
"CREATE TABLE FOO(" +
1528-
"ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," +
1529-
"DATA INTEGER)");
1530-
create.close();
1531-
1532-
PreparedStatement insert = connection.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)");
1533-
for (int i = 1; i <= 100; i++) {
1534-
insert.setInt(i, i * 2);
1535-
insert.executeUpdate();
1536-
}
1537-
} catch (SQLException e) {
1538-
// If table doesn't exist...
1539-
if (e.getSQLState().compareTo("X0Y32") != 0) {
1540-
throw e;
1541-
}
1542-
} finally {
1543-
connection.close();
1544-
}
1545-
}
1546-
1547-
private void tearDownJdbc() throws SQLException {
1548-
try {
1549-
DriverManager.getConnection("jdbc:derby:;shutdown=true");
1550-
} catch(SQLException e) {
1551-
if (e.getSQLState().compareTo("XJ015") != 0) {
1552-
throw e;
1553-
}
1554-
}
1555-
}
1556-
1557-
@Test
1558-
public void testJavaJdbcRDD() throws Exception {
1559-
setUpJdbc();
1560-
1561-
try {
1562-
JavaRDD<Integer> rdd = JdbcRDD.create(
1563-
sc,
1564-
new JdbcRDD.ConnectionFactory() {
1565-
@Override
1566-
public Connection getConnection() throws SQLException {
1567-
return DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb");
1568-
}
1569-
},
1570-
"SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
1571-
1, 100, 3,
1572-
new Function<ResultSet, Integer>() {
1573-
@Override
1574-
public Integer call(ResultSet r) throws Exception {
1575-
return r.getInt(1);
1576-
}
1577-
}
1578-
).cache();
1579-
1580-
Assert.assertEquals(rdd.count(), 100);
1581-
Assert.assertEquals(rdd.reduce(new Function2<Integer, Integer, Integer>() {
1582-
@Override
1583-
public Integer call(Integer i1, Integer i2) {
1584-
return i1 + i2;
1585-
}
1586-
}), Integer.valueOf(10100));
1587-
} finally {
1588-
tearDownJdbc();
1589-
}
1590-
}
15911511
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark;
18+
19+
import java.io.Serializable;
20+
import java.sql.Connection;
21+
import java.sql.DriverManager;
22+
import java.sql.PreparedStatement;
23+
import java.sql.ResultSet;
24+
import java.sql.SQLException;
25+
import java.sql.Statement;
26+
27+
import org.apache.spark.api.java.JavaRDD;
28+
import org.apache.spark.api.java.JavaSparkContext;
29+
import org.apache.spark.api.java.function.Function;
30+
import org.apache.spark.api.java.function.Function2;
31+
import org.apache.spark.rdd.JdbcRDD;
32+
import org.junit.After;
33+
import org.junit.Assert;
34+
import org.junit.Before;
35+
import org.junit.Test;
36+
37+
public class JavaJdbcRDDSuite implements Serializable {
38+
private transient JavaSparkContext sc;
39+
40+
@Before
41+
public void setUp() throws ClassNotFoundException, SQLException {
42+
sc = new JavaSparkContext("local", "JavaAPISuite");
43+
44+
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
45+
Connection connection =
46+
DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb;create=true");
47+
48+
try {
49+
Statement create = connection.createStatement();
50+
create.execute(
51+
"CREATE TABLE FOO(" +
52+
"ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," +
53+
"DATA INTEGER)");
54+
create.close();
55+
56+
PreparedStatement insert = connection.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)");
57+
for (int i = 1; i <= 100; i++) {
58+
insert.setInt(1, i * 2);
59+
insert.executeUpdate();
60+
}
61+
insert.close();
62+
} catch (SQLException e) {
63+
// If table doesn't exist...
64+
if (e.getSQLState().compareTo("X0Y32") != 0) {
65+
throw e;
66+
}
67+
} finally {
68+
connection.close();
69+
}
70+
}
71+
72+
@After
73+
public void tearDown() throws SQLException {
74+
try {
75+
DriverManager.getConnection("jdbc:derby:;shutdown=true");
76+
} catch(SQLException e) {
77+
if (e.getSQLState().compareTo("XJ015") != 0) {
78+
throw e;
79+
}
80+
}
81+
82+
sc.stop();
83+
sc = null;
84+
}
85+
86+
@Test
87+
public void testJavaJdbcRDD() throws Exception {
88+
JavaRDD<Integer> rdd = JdbcRDD.create(
89+
sc,
90+
new JdbcRDD.ConnectionFactory() {
91+
@Override
92+
public Connection getConnection() throws SQLException {
93+
return DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb");
94+
}
95+
},
96+
"SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
97+
1, 100, 1,
98+
new Function<ResultSet, Integer>() {
99+
@Override
100+
public Integer call(ResultSet r) throws Exception {
101+
return r.getInt(1);
102+
}
103+
}
104+
).cache();
105+
106+
Assert.assertEquals(100, rdd.count());
107+
Assert.assertEquals(
108+
Integer.valueOf(10100),
109+
rdd.reduce(new Function2<Integer, Integer, Integer>() {
110+
@Override
111+
public Integer call(Integer i1, Integer i2) {
112+
return i1 + i2;
113+
}
114+
}));
115+
}
116+
}

0 commit comments

Comments
 (0)