Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,26 @@ def mode(self, saveMode):
self._jwrite = self._jwrite.mode(saveMode)
return self

@since(2.0)
def outputMode(self, outputMode):
"""Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add .. note:: Experimental.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Options include:

* `append`:Only the new rows in the streaming DataFrame/Dataset will be written to
the sink
* `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink
every time these is some updates
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each time the trigger fires?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to write something that makes sense generally, without understanding trigger and all. As is, since the trigger is optional, one does not need to know about triggers at all to start running stuff in structured streaming.


.. note:: Experimental.

>>> writer = sdf.write.outputMode('append')
"""
if not outputMode or type(outputMode) != str or len(outputMode.strip()) == 0:
raise ValueError('The output mode must be a non-empty string. Got: %s' % outputMode)
self._jwrite = self._jwrite.outputMode(outputMode)
return self

@since(1.4)
def format(self, source):
"""Specifies the underlying output data source.
Expand Down
7 changes: 4 additions & 3 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ def test_stream_save_options(self):
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
cq = df.write.option('checkpointLocation', chk).queryName('this_query') \
.format('parquet').option('path', out).startStream()
.format('parquet').outputMode('append').option('path', out).startStream()
try:
self.assertEqual(cq.name, 'this_query')
self.assertTrue(cq.isActive)
Expand All @@ -952,8 +952,9 @@ def test_stream_save_options_overwrite(self):
fake1 = os.path.join(tmpPath, 'fake1')
fake2 = os.path.join(tmpPath, 'fake2')
cq = df.write.option('checkpointLocation', fake1).format('memory').option('path', fake2) \
.queryName('fake_query').startStream(path=out, format='parquet', queryName='this_query',
checkpointLocation=chk)
.queryName('fake_query').outputMode('append') \
.startStream(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)

try:
self.assertEqual(cq.name, 'this_query')
self.assertTrue(cq.isActive)
Expand Down
54 changes: 54 additions & 0 deletions sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql;

import org.apache.spark.annotation.Experimental;

/**
* :: Experimental ::
*
* OutputMode is used to what data will be written to a streaming sink when there is
* new data available in a streaming DataFrame/Dataset.
*
* @since 2.0.0
*/
@Experimental
public class OutputMode {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add docs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't have to be in java, see Encoders.scala

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Just wanted to be extra-cautious for java-safety.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But there are no tests written in java :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one downside of writing this in java is that it doesn't show up in scaladocs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need a trait OutputMode that InternalOutputModes.Append will extend. Should that be a Scala trait or Java interface?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe they are equivalent when there are no implemented methods.

Copy link
Contributor Author

@tdas tdas May 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, writing it in Scala does not work because we need define static methods on OutputMode ('Append()', etc.) as well as the OutputMode interface that singleton object Append will extend. To do this in Scala we have to do

trait OutputMode
object OutputMode {
   def Append(): OutputMode = ???
}

But this makes the OutputMode static object unusable from Java, as you will have to call OutputMode$.MODULE$.Append().

So doing this in Java is the cleanest way for it to be usable in both Java and Scala. I am including a JavaOutputModeSuite as well.


/**
* OutputMode in which only the new rows in the streaming DataFrame/Dataset will be
* written to the sink. This output mode can be only be used in queries that do not
* contain any aggregation.
*
* @since 2.0.0
*/
public static OutputMode Append() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #13464 -- this fails Java lint. Can this be append() as would be conventional in Java? I don't see that it's there to implement some interface

return InternalOutputModes.Append$.MODULE$;
}

/**
* OutputMode in which all the rows in the streaming DataFrame/Dataset will be written
* to the sink every time these is some updates. This output mode can only be used in queries
* that contain aggregations.
*
* @since 2.0.0
*/
public static OutputMode Complete() {
return InternalOutputModes.Complete$.MODULE$;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

/**
* Internal helper class to generate objects representing various [[OutputMode]]s,
*/
private[sql] object InternalOutputModes {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add docs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is going to be internal we should probably just move it to execution.streaming

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are in catalyst. Still move it to execution.streaming?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, mostly I don't think they need to be in org.apache.spark.sql they could live in catalyst too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving them to org.apache.spark.streaming in catalyst.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on offline discussion with @rxin, will do this move in a later PR once we decide globally which classes should be moved to what package.


/**
* OutputMode in which only the new rows in the streaming DataFrame/Dataset will be
* written to the sink. This output mode can be only be used in queries that do not
* contain any aggregation.
*/
case object Append extends OutputMode

/**
* OutputMode in which all the rows in the streaming DataFrame/Dataset will be written
* to the sink every time these is some updates. This output mode can only be used in queries
* that contain aggregations.
*/
case object Complete extends OutputMode

/**
* OutputMode in which only the rows in the streaming DataFrame/Dataset that were updated will be
* written to the sink every time these is some updates. This output mode can only be used in
* queries that contain aggregations.
*/
case object Update extends OutputMode
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.{AnalysisException, InternalOutputModes, OutputMode}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._

Expand All @@ -29,8 +29,7 @@ object UnsupportedOperationChecker {
def checkForBatch(plan: LogicalPlan): Unit = {
plan.foreachUp {
case p if p.isStreaming =>
throwError(
"Queries with streaming sources must be executed with write.startStream()")(p)
throwError("Queries with streaming sources must be executed with write.startStream()")(p)

case _ =>
}
Expand All @@ -43,10 +42,10 @@ object UnsupportedOperationChecker {
"Queries without streaming sources cannot be executed with write.startStream()")(plan)
}

plan.foreachUp { implicit plan =>
plan.foreachUp { implicit subPlan =>

// Operations that cannot exists anywhere in a streaming plan
plan match {
subPlan match {

case _: Command =>
throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
Expand All @@ -55,21 +54,6 @@ object UnsupportedOperationChecker {
case _: InsertIntoTable =>
throwError("InsertIntoTable is not supported with streaming DataFrames/Datasets")

case Aggregate(_, _, child) if child.isStreaming =>
if (outputMode == Append) {
throwError(
"Aggregations are not supported on streaming DataFrames/Datasets in " +
"Append output mode. Consider changing output mode to Update.")
}
val moreStreamingAggregates = child.find {
case Aggregate(_, _, grandchild) if grandchild.isStreaming => true
case _ => false
}
if (moreStreamingAggregates.nonEmpty) {
throwError("Multiple streaming aggregations are not supported with " +
"streaming DataFrames/Datasets")
}

case Join(left, right, joinType, _) =>

joinType match {
Expand Down Expand Up @@ -119,10 +103,10 @@ object UnsupportedOperationChecker {
case GroupingSets(_, _, child, _) if child.isStreaming =>
throwError("GroupingSets is not supported on streaming DataFrames/Datasets")

case GlobalLimit(_, _) | LocalLimit(_, _) if plan.children.forall(_.isStreaming) =>
case GlobalLimit(_, _) | LocalLimit(_, _) if subPlan.children.forall(_.isStreaming) =>
throwError("Limits are not supported on streaming DataFrames/Datasets")

case Sort(_, _, _) | SortPartitions(_, _) if plan.children.forall(_.isStreaming) =>
case Sort(_, _, _) | SortPartitions(_, _) if subPlan.children.forall(_.isStreaming) =>
throwError("Sorting is not supported on streaming DataFrames/Datasets")

case Sample(_, _, _, _, child) if child.isStreaming =>
Expand All @@ -138,6 +122,27 @@ object UnsupportedOperationChecker {
case _ =>
}
}

// Checks related to aggregations
val aggregates = plan.collect { case a @ Aggregate(_, _, _) if a.isStreaming => a }
outputMode match {
case InternalOutputModes.Append if aggregates.nonEmpty =>
throwError(
s"$outputMode output mode not supported when there are streaming aggregations on " +
s"streaming DataFrames/DataSets")(plan)

case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty =>
throwError(
s"$outputMode output mode not supported when there are no streaming aggregations on " +
s"streaming DataFrames/Datasets")(plan)

case _ =>
}
if (aggregates.size > 1) {
throwError(
"Multiple streaming aggregations are not supported with " +
"streaming DataFrames/Datasets")(plan)
}
}

private def throwErrorIf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,17 @@
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis
package org.apache.spark.sql;

sealed trait OutputMode
import org.junit.Test;

case object Append extends OutputMode
case object Update extends OutputMode
public class JavaOutputModeSuite {

@Test
public void testOutputModes() {
OutputMode o1 = OutputMode.Append();
assert(o1.toString().toLowerCase().contains("append"));
OutputMode o2 = OutputMode.Complete();
assert (o2.toString().toLowerCase().contains("complete"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.{AnalysisException, OutputMode}
import org.apache.spark.sql.InternalOutputModes._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
Expand Down Expand Up @@ -79,35 +80,13 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
outputMode = Append,
expectedMsgs = "commands" :: Nil)

// Aggregates: Not supported on streams in Append mode
assertSupportedInStreamingPlan(
"aggregate - batch with update output mode",
batchRelation.groupBy("a")("count(*)"),
outputMode = Update)

assertSupportedInStreamingPlan(
"aggregate - batch with append output mode",
batchRelation.groupBy("a")("count(*)"),
outputMode = Append)

assertSupportedInStreamingPlan(
"aggregate - stream with update output mode",
streamRelation.groupBy("a")("count(*)"),
outputMode = Update)

assertNotSupportedInStreamingPlan(
"aggregate - stream with append output mode",
streamRelation.groupBy("a")("count(*)"),
outputMode = Append,
Seq("aggregation", "append output mode"))

// Multiple streaming aggregations not supported
def aggExprs(name: String): Seq[NamedExpression] = Seq(Count("*").as(name))

assertSupportedInStreamingPlan(
"aggregate - multiple batch aggregations",
Aggregate(Nil, aggExprs("c"), Aggregate(Nil, aggExprs("d"), batchRelation)),
Update)
Append)

assertSupportedInStreamingPlan(
"aggregate - multiple aggregations but only one streaming aggregation",
Expand Down Expand Up @@ -209,7 +188,6 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
_.intersect(_),
streamStreamSupported = false)


// Unary operations
testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _))
testUnaryOperatorInStreamingPlan("sort partitions", SortPartitions(Nil, _), expectedMsg = "sort")
Expand All @@ -218,6 +196,10 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
testUnaryOperatorInStreamingPlan(
"window", Window(Nil, Nil, Nil, _), expectedMsg = "non-time-based windows")

// Output modes with aggregation and non-aggregation plans
testOutputMode(Append, shouldSupportAggregation = false)
testOutputMode(Update, shouldSupportAggregation = true)
testOutputMode(Complete, shouldSupportAggregation = true)

/*
=======================================================================================
Expand Down Expand Up @@ -316,6 +298,37 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
outputMode)
}

def testOutputMode(
outputMode: OutputMode,
shouldSupportAggregation: Boolean): Unit = {

// aggregation
if (shouldSupportAggregation) {
assertNotSupportedInStreamingPlan(
s"$outputMode output mode - no aggregation",
streamRelation.where($"a" > 1),
outputMode = outputMode,
Seq("aggregation", s"$outputMode output mode"))

assertSupportedInStreamingPlan(
s"$outputMode output mode - aggregation",
streamRelation.groupBy("a")("count(*)"),
outputMode = outputMode)

} else {
assertSupportedInStreamingPlan(
s"$outputMode output mode - no aggregation",
streamRelation.where($"a" > 1),
outputMode = outputMode)

assertNotSupportedInStreamingPlan(
s"$outputMode output mode - aggregation",
streamRelation.groupBy("a")("count(*)"),
outputMode = outputMode,
Seq("aggregation", s"$outputMode output mode"))
}
}

/**
* Assert that the logical plan is supported as subplan insider a streaming plan.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql
import scala.collection.mutable

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.analysis.{Append, OutputMode, UnsupportedOperationChecker}
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -35,7 +35,7 @@ import org.apache.spark.util.{Clock, SystemClock}
* @since 2.0.0
*/
@Experimental
class ContinuousQueryManager(sparkSession: SparkSession) {
class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {

private[sql] val stateStoreCoordinator =
StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
Expand Down Expand Up @@ -175,9 +175,9 @@ class ContinuousQueryManager(sparkSession: SparkSession) {
checkpointLocation: String,
df: DataFrame,
sink: Sink,
outputMode: OutputMode,
trigger: Trigger = ProcessingTime(0),
triggerClock: Clock = new SystemClock(),
outputMode: OutputMode = Append): ContinuousQuery = {
triggerClock: Clock = new SystemClock()): ContinuousQuery = {
activeQueriesLock.synchronized {
if (activeQueries.contains(name)) {
throw new IllegalArgumentException(
Expand Down
Loading