Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
parts: Array[Partition],
connectionProperties: Properties): DataFrame = {
// connectionProperties should override settings in extraOptions.
val params = extraOptions.toMap ++ connectionProperties.asScala.toMap
val options = new JDBCOptions(url, table, params)
val relation = JDBCRelation(parts, options)(sparkSession)
sparkSession.baseRelationToDataFrame(relation)
this.extraOptions = this.extraOptions ++ connectionProperties.asScala
// explicit url and dbtable should override all
this.extraOptions += ("url" -> url, "dbtable" -> table)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change, we lost the feature for parallel JDBC reading, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me revert this back.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I thought this was all handled in

override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val jdbcOptions = new JDBCOptions(parameters)
val partitionColumn = jdbcOptions.partitionColumn
val lowerBound = jdbcOptions.lowerBound
val upperBound = jdbcOptions.upperBound
val numPartitions = jdbcOptions.numPartitions
val partitionInfo = if (partitionColumn == null) {
null
} else {
JDBCPartitioningInfo(
partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt)
}
val parts = JDBCRelation.columnPartition(partitionInfo)
JDBCRelation(parts, jdbcOptions)(sqlContext.sparkSession)
}
.

Would this be possible to adding those into options rather than reverting this?

Copy link
Member

@gatorsmile gatorsmile Nov 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using a different mechanism for table partitioning. More flexible. Users can do more advanced partitioning (e.g., using multiple columns) here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I am sorry it seems a careless mistake.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm

format("jdbc").load()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
assertNotPartitioned("jdbc")
assertNotBucketed("jdbc")
// connectionProperties should override settings in extraOptions
this.extraOptions = this.extraOptions ++ (connectionProperties.asScala)
// connectionProperties should override settings in extraOptions.
this.extraOptions = this.extraOptions ++ connectionProperties.asScala
// explicit url and dbtable should override all
this.extraOptions += ("url" -> url, "dbtable" -> table)
format("jdbc").save()
Expand Down