Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ private[sql] object JDBCRDD extends Logging {
quotedColumns,
filters,
parts,
url,
properties)
}
}
Expand All @@ -241,6 +242,7 @@ private[sql] class JDBCRDD(
columns: Array[String],
filters: Array[Filter],
partitions: Array[Partition],
url: String,
properties: Properties)
extends RDD[InternalRow](sc, Nil) {

Expand Down Expand Up @@ -361,6 +363,9 @@ private[sql] class JDBCRDD(
context.addTaskCompletionListener{ context => close() }
val part = thePart.asInstanceOf[JDBCPartition]
val conn = getConnection()
val dialect = JdbcDialects.get(url)
import scala.collection.JavaConverters._
dialect.beforeFetch(conn, properties.asScala.toMap)

// H2's JDBC driver does not support the setSchema() method. We pass a
// fully-qualified table name in the SELECT statement. I don't know how to
Expand Down Expand Up @@ -489,6 +494,13 @@ private[sql] class JDBCRDD(
}
try {
if (null != conn) {
if (!conn.getAutoCommit && !conn.isClosed) {
try {
conn.commit()
} catch {
case e: Throwable => logWarning("Exception committing transaction", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should catch nonfatal - i will fix it when merging.

}
}
conn.close()
}
logInfo("closed connection")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.jdbc

import java.sql.Connection

import org.apache.spark.sql.types._
import org.apache.spark.annotation.DeveloperApi

Expand Down Expand Up @@ -97,6 +99,15 @@ abstract class JdbcDialect extends Serializable {
s"SELECT * FROM $table WHERE 1=0"
}

/**
* Override connection specific properties to run before a select is made. This is in place to
* allow dialects that need special treatment to optimize behavior.
* @param connection The connection object
* @param properties The connection properties. This is passed through from the relation.
*/
def beforeFetch(connection: Connection, properties: Map[String, String]): Unit = {
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.jdbc

import java.sql.Types
import java.sql.{Connection, Types}

import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -70,4 +70,19 @@ private object PostgresDialect extends JdbcDialect {
override def getTableExistsQuery(table: String): String = {
s"SELECT 1 FROM $table LIMIT 1"
}

override def beforeFetch(connection: Connection, properties: Map[String, String]): Unit = {
super.beforeFetch(connection, properties)

// According to the postgres jdbc documentation we need to be in autocommit=false if we actually
// want to have fetchsize be non 0 (all the rows). This allows us to not have to cache all the
// rows inside the driver when fetching.
//
// See: https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
//
if (properties.getOrElse("fetchsize", "0").toInt > 0) {
connection.setAutoCommit(false)
}

}
}