Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
* your external database systems.
*
* You can set the following JDBC-specific option(s) for storing JDBC:
* <li>`truncate` (default `false`): use `TRUNCATE TABLE` instead of `DROP TABLE`.</li>
*
* In case of failures, users should turn off `truncate` option to use `DROP TABLE` again. Also,
* due to the different behavior of `TRUNCATE TABLE` among DBMS, it's not always safe to use this.
* MySQLDialect, DB2Dialect, MsSqlServerDialect, DerbyDialect, and OracleDialect supports this
* while PostgresDialect and default JDBCDirect doesn't. For unknown and unsupported JDBCDirect,
* the user option `truncate` is ignored.
*
* @param url JDBC database url of the form `jdbc:subprotocol:subname`
* @param table Name of the table in the external database.
* @param connectionProperties JDBC database connection arguments, a list of arbitrary string
Expand Down Expand Up @@ -423,8 +432,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}

if (mode == SaveMode.Overwrite && tableExists) {
JdbcUtils.dropTable(conn, table)
tableExists = false
if (extraOptions.getOrElse("truncate", "false").toBoolean &&
JdbcUtils.isCascadingTruncateTable(url) == Some(false)) {
JdbcUtils.truncateTable(conn, table)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If truncateTable failed due to a non fatal exception, should we fall back to the previous way (i.e., drop and create)? This is a design decision. CC @srowen @rxin

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say no, because user has explicitly specified truncate. They can turn if off themselves.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

: ) Sure. Then, the next design question to @rxin and @srowen

Should we still truncate the table if the table schema does not match the schema of new table?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do whatever we do with drop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Then, the current implementation looks good to me.

@dongjoon-hyun Could you summarize the previous discussion and design decision we made? Document them in the PR description. Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, I will ask one question.

Literally, we should not do whatever we do with drop, e.g., we should not drop INDEX, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I'll update the document and PR description more clearly.

Thank you for guidance, @rxin and @gatorsmile .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Drop, Create and Insert: Create and Insert could fail, but we still drop the table.
  • Truncate and Insert: Insert could fail, but we always truncate the table.

I think it is OK to raise an exception here, but check whether the exception message is meaningful or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, dropping index does not make sense here.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jul 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current exception message is "Column xxx not found".

} else {
JdbcUtils.dropTable(conn, table)
tableExists = false
}
}

// Create the table if the table didn't exist.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,22 @@ object JdbcUtils extends Logging {
}
}

/**
* Truncates a table from the JDBC database.
*/
def truncateTable(conn: Connection, table: String): Unit = {
val statement = conn.createStatement
try {
statement.executeUpdate(s"TRUNCATE TABLE $table")
} finally {
statement.close()
}
}

def isCascadingTruncateTable(url: String): Option[Boolean] = {
JdbcDialects.get(url).isCascadingTruncateTable()
}

/**
* Returns a PreparedStatement that inserts a row into table via conn.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ private object DB2Dialect extends JdbcDialect {
case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR))
case _ => None
}

override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ abstract class JdbcDialect extends Serializable {
def beforeFetch(connection: Connection, properties: Map[String, String]): Unit = {
}

/**
* Return Some[true] iff `TRUNCATE TABLE` causes cascading default.
* Some[true] : TRUNCATE TABLE causes cascading.
* Some[false] : TRUNCATE TABLE does not cause cascading.
* None: The behavior of TRUNCATE TABLE is unknown (default).
*/
def isCascadingTruncateTable(): Option[Boolean] = None
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,6 @@ private object MsSqlServerDialect extends JdbcDialect {
case TimestampType => Some(JdbcType("DATETIME", java.sql.Types.TIMESTAMP))
case _ => None
}

override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,6 @@ private case object MySQLDialect extends JdbcDialect {
override def getTableExistsQuery(table: String): String = {
s"SELECT 1 FROM $table LIMIT 1"
}

override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,6 @@ private case object OracleDialect extends JdbcDialect {
case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR))
case _ => None
}

override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,6 @@ private object PostgresDialect extends JdbcDialect {
}

}

override def isCascadingTruncateTable(): Option[Boolean] = Some(true)
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
properties.setProperty("password", "testPass")
properties.setProperty("rowId", "false")

val testH2Dialect = new JdbcDialect {
override def canHandle(url: String) : Boolean = url.startsWith("jdbc:h2")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] =
Some(StringType)
override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
}

before {
Utils.classForName("org.h2.Driver")
conn = DriverManager.getConnection(url)
Expand Down Expand Up @@ -145,14 +153,25 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
assert(2 === spark.read.jdbc(url, "TEST.APPENDTEST", new Properties()).collect()(0).length)
}

test("CREATE then INSERT to truncate") {
test("Truncate") {
JdbcDialects.registerDialect(testH2Dialect)
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2)
val df3 = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3)

df.write.jdbc(url1, "TEST.TRUNCATETEST", properties)
df2.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.TRUNCATETEST", properties)
df2.write.mode(SaveMode.Overwrite).option("truncate", true)
.jdbc(url1, "TEST.TRUNCATETEST", properties)
assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count())
assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length)

val m = intercept[SparkException] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To check my understanding here, this overwrites the table with a different schema (new column seq). This shows the truncate fails because the schema has changed.

I guess it would be nice to test the case where the truncate works at least, though, we can't really test whether it truncates vs drops.

Could you for example just repeat the code on line 163-166 here to verify that overwriting just results in the same results?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that would be better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nevermind my last comment. You test the truncate succeeds path already.
OK, the assertion here makes sense though it highlights that if truncation can't succeed, then it only fails after truncating and the new dataframe can't be written. I suppose that's reasonable semantics, since it otherwise requires doing something like testing an insert.

df3.write.mode(SaveMode.Overwrite).option("truncate", true)
.jdbc(url1, "TEST.TRUNCATETEST", properties)
}.getMessage
assert(m.contains("Column \"seq\" not found"))
assert(0 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count())
JdbcDialects.unregisterDialect(testH2Dialect)
}

test("Incompatible INSERT to append") {
Expand Down