Skip to content

Commit 6a702d2

Browse files
committed
[SPARK-53934][CONNECT] Initial implement Connect JDBC driver
1 parent 96093bd commit 6a702d2

File tree

12 files changed

+2340
-10
lines changed

12 files changed

+2340
-10
lines changed

sql/connect/client/jdbc/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,13 @@
111111
<classifier>tests</classifier>
112112
<scope>test</scope>
113113
</dependency>
114+
<dependency>
115+
<groupId>org.apache.spark</groupId>
116+
<artifactId>spark-connect-client-jvm_${scala.binary.version}</artifactId>
117+
<version>${project.version}</version>
118+
<classifier>tests</classifier>
119+
<scope>test</scope>
120+
</dependency>
114121
<!-- Use mima to perform the compatibility check -->
115122
<dependency>
116123
<groupId>com.typesafe</groupId>

sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/NonRegisteringSparkConnectDriver.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.connect.client.jdbc
1919

20-
import java.sql.{Connection, Driver, DriverPropertyInfo, SQLFeatureNotSupportedException}
20+
import java.sql.{Connection, Driver, DriverPropertyInfo, SQLException, SQLFeatureNotSupportedException}
2121
import java.util.Properties
2222
import java.util.logging.Logger
2323

@@ -29,7 +29,11 @@ class NonRegisteringSparkConnectDriver extends Driver {
2929
override def acceptsURL(url: String): Boolean = url.startsWith("jdbc:sc://")
3030

3131
override def connect(url: String, info: Properties): Connection = {
32-
throw new UnsupportedOperationException("TODO(SPARK-53934)")
32+
if (url == null) {
33+
throw new SQLException("url must not be null")
34+
}
35+
36+
if (this.acceptsURL(url)) new SparkConnectConnection(url, info) else null
3337
}
3438

3539
override def getPropertyInfo(url: String, info: Properties): Array[DriverPropertyInfo] =
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connect.client.jdbc
19+
20+
import java.sql.{Array => JdbcArray, _}
21+
import java.util
22+
import java.util.Properties
23+
import java.util.concurrent.Executor
24+
25+
import org.apache.spark.sql.connect.SparkSession
26+
import org.apache.spark.sql.connect.client.SparkConnectClient
27+
import org.apache.spark.sql.connect.client.jdbc.util.JdbcErrorUtils._
28+
29+
class SparkConnectConnection(val url: String, val info: Properties) extends Connection {
30+
31+
private[jdbc] val client = SparkConnectClient
32+
.builder()
33+
.loadFromEnvironment()
34+
.userAgent("Spark Connect JDBC")
35+
.connectionString(url.stripPrefix("jdbc:"))
36+
.build()
37+
38+
private[jdbc] val spark = SparkSession.builder().client(client).create()
39+
40+
@volatile private var closed: Boolean = false
41+
42+
override def isClosed: Boolean = closed
43+
44+
override def close(): Unit = synchronized {
45+
if (!closed) {
46+
spark.close()
47+
closed = true
48+
}
49+
}
50+
51+
private[jdbc] def checkOpen(): Unit = {
52+
if (closed) {
53+
throw new SQLException("JDBC Connection is closed.")
54+
}
55+
if (!client.isSessionValid) {
56+
throw new SQLException(s"Spark Connect Session ${client.sessionId} is invalid.")
57+
}
58+
}
59+
60+
override def isValid(timeout: Int): Boolean = !closed && client.isSessionValid
61+
62+
override def setCatalog(catalog: String): Unit = {
63+
checkOpen()
64+
spark.catalog.setCurrentCatalog(catalog)
65+
}
66+
67+
override def getCatalog: String = {
68+
checkOpen()
69+
spark.catalog.currentCatalog()
70+
}
71+
72+
override def setSchema(schema: String): Unit = {
73+
checkOpen()
74+
spark.catalog.setCurrentDatabase(schema)
75+
}
76+
77+
override def getSchema: String = {
78+
checkOpen()
79+
spark.catalog.currentDatabase
80+
}
81+
82+
override def getMetaData: DatabaseMetaData = {
83+
checkOpen()
84+
new SparkConnectDatabaseMetaData(this)
85+
}
86+
87+
override def createStatement(): Statement = {
88+
checkOpen()
89+
new SparkConnectStatement(this)
90+
}
91+
92+
override def prepareStatement(sql: String): PreparedStatement =
93+
throw new SQLFeatureNotSupportedException
94+
95+
override def prepareCall(sql: String): CallableStatement =
96+
throw new SQLFeatureNotSupportedException
97+
98+
override def createStatement(
99+
resultSetType: Int,
100+
resultSetConcurrency: Int,
101+
resultSetHoldability: Int): Statement =
102+
throw new SQLFeatureNotSupportedException
103+
104+
override def prepareStatement(
105+
sql: String,
106+
resultSetType: Int,
107+
resultSetConcurrency: Int,
108+
resultSetHoldability: Int): PreparedStatement =
109+
throw new SQLFeatureNotSupportedException
110+
111+
override def prepareCall(
112+
sql: String,
113+
resultSetType: Int,
114+
resultSetConcurrency: Int,
115+
resultSetHoldability: Int): CallableStatement =
116+
throw new SQLFeatureNotSupportedException
117+
118+
override def prepareStatement(
119+
sql: String, autoGeneratedKeys: Int): PreparedStatement =
120+
throw new SQLFeatureNotSupportedException
121+
122+
override def prepareStatement(
123+
sql: String, columnIndexes: Array[Int]): PreparedStatement =
124+
throw new SQLFeatureNotSupportedException
125+
126+
override def prepareStatement(
127+
sql: String, columnNames: Array[String]): PreparedStatement =
128+
throw new SQLFeatureNotSupportedException
129+
130+
override def createStatement(
131+
resultSetType: Int, resultSetConcurrency: Int): Statement =
132+
throw new SQLFeatureNotSupportedException
133+
134+
override def prepareStatement(
135+
sql: String,
136+
resultSetType: Int,
137+
resultSetConcurrency: Int): PreparedStatement =
138+
throw new SQLFeatureNotSupportedException
139+
140+
override def prepareCall(
141+
sql: String,
142+
resultSetType: Int,
143+
resultSetConcurrency: Int): CallableStatement =
144+
throw new SQLFeatureNotSupportedException
145+
146+
override def nativeSQL(sql: String): String =
147+
throw new SQLFeatureNotSupportedException
148+
149+
override def setAutoCommit(autoCommit: Boolean): Unit = {
150+
checkOpen()
151+
if (!autoCommit) {
152+
throw new SQLFeatureNotSupportedException("Only auto-commit mode is supported")
153+
}
154+
}
155+
156+
override def getAutoCommit: Boolean = {
157+
checkOpen()
158+
true
159+
}
160+
161+
override def commit(): Unit = {
162+
checkOpen()
163+
throw new SQLException("Connection is in auto-commit mode")
164+
}
165+
166+
override def rollback(): Unit = {
167+
checkOpen()
168+
throw new SQLException("Connection is in auto-commit mode")
169+
}
170+
171+
override def setReadOnly(readOnly: Boolean): Unit = {
172+
checkOpen()
173+
if (readOnly) {
174+
throw new SQLFeatureNotSupportedException("Read-only mode is not supported")
175+
}
176+
}
177+
178+
override def isReadOnly: Boolean = {
179+
checkOpen()
180+
false
181+
}
182+
183+
override def setTransactionIsolation(level: Int): Unit = {
184+
checkOpen()
185+
if (level != Connection.TRANSACTION_NONE) {
186+
throw new SQLFeatureNotSupportedException(
187+
"Requested transaction isolation level " +
188+
s"${stringfiyTransactionIsolationLevel(level)} is not supported")
189+
}
190+
}
191+
192+
override def getTransactionIsolation: Int = {
193+
checkOpen()
194+
Connection.TRANSACTION_NONE
195+
}
196+
197+
override def getWarnings: SQLWarning = null
198+
199+
override def clearWarnings(): Unit = {}
200+
201+
override def getTypeMap: util.Map[String, Class[_]] =
202+
throw new SQLFeatureNotSupportedException
203+
204+
override def setTypeMap(map: util.Map[String, Class[_]]): Unit =
205+
throw new SQLFeatureNotSupportedException
206+
207+
override def setHoldability(holdability: Int): Unit = {
208+
if (holdability != ResultSet.HOLD_CURSORS_OVER_COMMIT) {
209+
throw new SQLFeatureNotSupportedException(
210+
s"Holdability ${stringfiyHoldability(holdability)} is not supported")
211+
}
212+
}
213+
214+
override def getHoldability: Int = ResultSet.HOLD_CURSORS_OVER_COMMIT
215+
216+
override def setSavepoint(): Savepoint =
217+
throw new SQLFeatureNotSupportedException
218+
219+
override def setSavepoint(name: String): Savepoint =
220+
throw new SQLFeatureNotSupportedException
221+
222+
override def rollback(savepoint: Savepoint): Unit =
223+
throw new SQLFeatureNotSupportedException
224+
225+
override def releaseSavepoint(savepoint: Savepoint): Unit =
226+
throw new SQLFeatureNotSupportedException
227+
228+
override def createClob(): Clob =
229+
throw new SQLFeatureNotSupportedException
230+
231+
override def createBlob(): Blob =
232+
throw new SQLFeatureNotSupportedException
233+
234+
override def createNClob(): NClob =
235+
throw new SQLFeatureNotSupportedException
236+
237+
override def createSQLXML(): SQLXML =
238+
throw new SQLFeatureNotSupportedException
239+
240+
override def setClientInfo(name: String, value: String): Unit =
241+
throw new SQLFeatureNotSupportedException
242+
243+
override def setClientInfo(properties: Properties): Unit =
244+
throw new SQLFeatureNotSupportedException
245+
246+
override def getClientInfo(name: String): String =
247+
throw new SQLFeatureNotSupportedException
248+
249+
override def getClientInfo: Properties =
250+
throw new SQLFeatureNotSupportedException
251+
252+
override def createArrayOf(typeName: String, elements: Array[AnyRef]): JdbcArray =
253+
throw new SQLFeatureNotSupportedException
254+
255+
override def createStruct(typeName: String, attributes: Array[AnyRef]): Struct =
256+
throw new SQLFeatureNotSupportedException
257+
258+
override def abort(executor: Executor): Unit = {
259+
if (executor == null) {
260+
throw new SQLException("executor can not be null")
261+
}
262+
if (!closed) {
263+
executor.execute { () => this.close() }
264+
}
265+
}
266+
267+
override def setNetworkTimeout(executor: Executor, milliseconds: Int): Unit =
268+
throw new SQLFeatureNotSupportedException
269+
270+
override def getNetworkTimeout: Int =
271+
throw new SQLFeatureNotSupportedException
272+
273+
override def unwrap[T](iface: Class[T]): T = if (isWrapperFor(iface)) {
274+
iface.asInstanceOf[T]
275+
} else {
276+
throw new SQLException(s"${this.getClass.getName} not unwrappable from ${iface.getName}")
277+
}
278+
279+
override def isWrapperFor(iface: Class[_]): Boolean = iface.isInstance(this)
280+
}

0 commit comments

Comments
 (0)