Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 66 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.analysis.{Append, OutputMode, UnresolvedRelation, Update}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.execution.streaming.{MemoryPlan, MemorySink, StreamExecution}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils
import org.apache.spark.util.{Clock, SystemClock, Utils}

/**
* :: Experimental ::
Expand Down Expand Up @@ -114,6 +114,55 @@ final class DataFrameWriter private[sql](df: DataFrame) {
this
}

/**
* Specifies the behavior when writing dataFrame or DataSet to sink. Options include:
* - `Append`: append the data.
* - `Update`: Inplace append the data
*
* @since 2.0.0
*/
@Experimental
def outputMode(outputMode: OutputMode): DataFrameWriter = {
// mode() is used for non-continuous queries
// outputMode() is used for continuous queries
assertStreaming("outputMode() can only be called on a continuous queries")
this.outputMode = outputMode
this
}

/**
* Specifies the behavior when writing dataFrame or DataSet to sink. Options include:
* - `Append`: append the data.
* - `Update`: Inplace append the data
*
* @since 2.0.0
*/
@Experimental
def outputMode(outputMode: String): DataFrameWriter = {
// mode() is used for non-continuous queries
// outputMode() is used for continuous queries
assertStreaming("outputMode() can only be called on a continuous queries")
this.outputMode = outputMode.toLowerCase match {
case "append" => Append
case "update" => Update
case _ => throw new IllegalArgumentException(s"Unknown outputMode : $outputMode. " +
"Accepted modes are 'append', 'update'.")
}
this
}

/**
* Specifies the user Defined Clock as trigger Clock. Default option is SystemClock
*
* @since 2.0.0
*/
@Experimental
def triggerClock(triggerClock: Clock): DataFrameWriter = {
assertStreaming("triggerClock() can only be called on continuous queries")
this.triggerClock = triggerClock
this
}

/**
* Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
*
Expand Down Expand Up @@ -314,7 +363,8 @@ final class DataFrameWriter private[sql](df: DataFrame) {
} else {
checkpointPath.toUri.toString
}

val outputMode = this.outputMode
val triggerClock: Clock = this.triggerClock
val sink = new MemorySink(df.schema)
val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
resultDf.registerTempTable(queryName)
Expand All @@ -323,7 +373,10 @@ final class DataFrameWriter private[sql](df: DataFrame) {
checkpointLocation,
df,
sink,
trigger)
trigger,
triggerClock,
outputMode
)
continuousQuery
} else {
val dataSource =
Expand All @@ -333,6 +386,8 @@ final class DataFrameWriter private[sql](df: DataFrame) {
options = extraOptions.toMap,
partitionColumns = normalizedParCols.getOrElse(Nil))

val outputMode = this.outputMode
val triggerClock: Clock = this.triggerClock
val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
val checkpointLocation = extraOptions.getOrElse("checkpointLocation",
new Path(df.sparkSession.sessionState.conf.checkpointLocation.get, queryName).toUri.toString
Expand All @@ -343,7 +398,9 @@ final class DataFrameWriter private[sql](df: DataFrame) {
checkpointLocation,
df,
dataSource.createSink(),
trigger)
trigger,
triggerClock,
outputMode)
}
}

Expand Down Expand Up @@ -663,6 +720,10 @@ final class DataFrameWriter private[sql](df: DataFrame) {

private var mode: SaveMode = SaveMode.ErrorIfExists

private var outputMode: OutputMode = Append

private var triggerClock: Clock = new SystemClock()

private var trigger: Trigger = ProcessingTime(0L)

private var extraOptions = new scala.collection.mutable.HashMap[String, String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ import org.mockito.Mockito._
import org.scalatest.BeforeAndAfter

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{Append, Update}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.Utils
import org.apache.spark.util.{ManualClock, SystemClock, Utils}

object LastOptions {

Expand Down Expand Up @@ -334,6 +335,57 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
assert(q.asInstanceOf[StreamExecution].trigger == ProcessingTime(100000))
}

test("outputMode") {
val df = sqlContext.read
.format("org.apache.spark.sql.streaming.test")
.stream("/test")

var q = df.write
.format("org.apache.spark.sql.streaming.test")
.outputMode("update")
.option("checkpointLocation", newMetadataDir)
.trigger(ProcessingTime(10.seconds))
.startStream()
q.stop()

assert(q.asInstanceOf[StreamExecution].outputMode == Update)

q = df.write
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
.outputMode("append")
.trigger(ProcessingTime.create(100, TimeUnit.SECONDS))
.startStream()
q.stop()

assert(q.asInstanceOf[StreamExecution].outputMode == Append)
}

test("triggerClock") {
val df = sqlContext.read
.format("org.apache.spark.sql.streaming.test")
.stream("/test")
var q = df.write
.format("org.apache.spark.sql.streaming.test")
.triggerClock(new ManualClock(System.currentTimeMillis()))
.option("checkpointLocation", newMetadataDir)
.trigger(ProcessingTime(10.seconds))
.startStream()
q.stop()

assert(q.asInstanceOf[StreamExecution].triggerClock.isInstanceOf[ManualClock])

q = df.write
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
.outputMode("append")
.trigger(ProcessingTime.create(100, TimeUnit.SECONDS))
.startStream()
q.stop()

assert(q.asInstanceOf[StreamExecution].triggerClock.isInstanceOf[SystemClock])
}

test("source metadataPath") {
LastOptions.clear()

Expand Down Expand Up @@ -371,6 +423,18 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B

private def newTextInput = Utils.createTempDir(namePrefix = "text").getCanonicalPath

test("check outputMode() can only be called on continuous queries") {
val df = sqlContext.read.text(newTextInput)
val w = df.write.option("checkpointLocation", newMetadataDir)
val e = intercept[AnalysisException](w.outputMode(Append))
assert(e.getMessage == "outputMode() can only be called on a continuous queries;")
}
test("check triggerClock() can only be called on continuous queries") {
val df = sqlContext.read.text(newTextInput)
val w = df.write.option("checkpointLocation", newMetadataDir)
val e = intercept[AnalysisException](w.triggerClock(new SystemClock))
assert(e.getMessage == "triggerClock() can only be called on continuous queries;")
}
test("check trigger() can only be called on continuous queries") {
val df = sqlContext.read.text(newTextInput)
val w = df.write.option("checkpointLocation", newMetadataDir)
Expand Down