From fe03241d56899af9fa9cc4e63f4f97061bbb8b74 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 22 May 2017 12:55:31 -0700 Subject: [PATCH 1/3] [SPARK-20844] Remove experimental from Structured Streaming APIs --- docs/structured-streaming-programming-guide.md | 2 +- .../org/apache/spark/sql/streaming/Trigger.java | 3 --- .../main/scala/org/apache/spark/sql/Dataset.scala | 2 -- .../scala/org/apache/spark/sql/ForeachWriter.scala | 4 +--- .../scala/org/apache/spark/sql/functions.scala | 8 +------- .../spark/sql/streaming/DataStreamReader.scala | 1 - .../spark/sql/streaming/DataStreamWriter.scala | 4 +--- .../spark/sql/streaming/StreamingQuery.scala | 4 +--- .../sql/streaming/StreamingQueryException.scala | 4 +--- .../sql/streaming/StreamingQueryListener.scala | 14 +------------- .../sql/streaming/StreamingQueryManager.scala | 6 ++---- .../spark/sql/streaming/StreamingQueryStatus.scala | 4 +--- .../org/apache/spark/sql/streaming/progress.scala | 10 +--------- 13 files changed, 11 insertions(+), 55 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index bd01be944460..da29aa010cee 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -10,7 +10,7 @@ title: Structured Streaming Programming Guide # Overview Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the [Dataset/DataFrame API](sql-programming-guide.html) in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, *Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.* -**Structured Streaming is still ALPHA in Spark 2.1** and the APIs are still experimental. In this guide, we are going to walk you through the programming model and the APIs. First, let's start with a simple example - a streaming word count. +In this guide, we are going to walk you through the programming model and the APIs. First, let's start with a simple example - a streaming word count. # Quick Example Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in diff --git a/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java b/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java index 3e3997fa9bfe..be1d03fe1f33 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java +++ b/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java @@ -21,17 +21,14 @@ import scala.concurrent.duration.Duration; -import org.apache.spark.annotation.Experimental; import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.execution.streaming.OneTimeTrigger$; /** - * :: Experimental :: * Policy used to indicate how often results should be produced by a [[StreamingQuery]]. * * @since 2.0.0 */ -@Experimental @InterfaceStability.Evolving public class Trigger { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 53773f18ce55..8c5bd4142656 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2782,13 +2782,11 @@ class Dataset[T] private[sql]( } /** - * :: Experimental :: * Interface for saving the content of the streaming Dataset out into external storage. * * @group basic * @since 2.0.0 */ - @Experimental @InterfaceStability.Evolving def writeStream: DataStreamWriter[T] = { if (!isStreaming) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala index 372ec262f576..86e02e98c01f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala @@ -17,10 +17,9 @@ package org.apache.spark.sql -import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.annotation.InterfaceStability /** - * :: Experimental :: * A class to consume data generated by a `StreamingQuery`. Typically this is used to send the * generated data to external systems. Each partition will use a new deserialized instance, so you * usually should do all the initialization (e.g. opening a connection or initiating a transaction) @@ -66,7 +65,6 @@ import org.apache.spark.annotation.{Experimental, InterfaceStability} * }}} * @since 2.0.0 */ -@Experimental @InterfaceStability.Evolving abstract class ForeachWriter[T] extends Serializable { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 5edf03666ac2..714af86d070c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -23,7 +23,7 @@ import scala.reflect.runtime.universe.{typeTag, TypeTag} import scala.util.Try import scala.util.control.NonFatal -import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedFunction} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -2800,8 +2800,6 @@ object functions { * @group datetime_funcs * @since 2.0.0 */ - @Experimental - @InterfaceStability.Evolving def window( timeColumn: Column, windowDuration: String, @@ -2854,8 +2852,6 @@ object functions { * @group datetime_funcs * @since 2.0.0 */ - @Experimental - @InterfaceStability.Evolving def window(timeColumn: Column, windowDuration: String, slideDuration: String): Column = { window(timeColumn, windowDuration, slideDuration, "0 second") } @@ -2893,8 +2889,6 @@ object functions { * @group datetime_funcs * @since 2.0.0 */ - @Experimental - @InterfaceStability.Evolving def window(timeColumn: Column, windowDuration: String): Column = { window(timeColumn, windowDuration, windowDuration, "0 second") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 746b2a94f102..4c5a13e0fb0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -35,7 +35,6 @@ import org.apache.spark.sql.types.StructType * * @since 2.0.0 */ -@Experimental @InterfaceStability.Evolving final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging { /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 0d2611f9bbcc..14e7df672cc5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -21,7 +21,7 @@ import java.util.Locale import scala.collection.JavaConverters._ -import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.{AnalysisException, Dataset, ForeachWriter} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.execution.command.DDLUtils @@ -29,13 +29,11 @@ import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink} /** - * :: Experimental :: * Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems, * key-value stores, etc). Use `Dataset.writeStream` to access this. * * @since 2.0.0 */ -@Experimental @InterfaceStability.Evolving final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala index 12a1bb1db577..f2dfbe42260d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -19,16 +19,14 @@ package org.apache.spark.sql.streaming import java.util.UUID -import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.SparkSession /** - * :: Experimental :: * A handle to a query that is executing continuously in the background as new data arrives. * All these methods are thread-safe. * @since 2.0.0 */ -@Experimental @InterfaceStability.Evolving trait StreamingQuery { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala index 234a1166a195..03aeb14de502 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala @@ -17,10 +17,9 @@ package org.apache.spark.sql.streaming -import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.annotation.InterfaceStability /** - * :: Experimental :: * Exception that stopped a [[StreamingQuery]]. Use `cause` get the actual exception * that caused the failure. * @param message Message of this exception @@ -29,7 +28,6 @@ import org.apache.spark.annotation.{Experimental, InterfaceStability} * @param endOffset Ending offset in json of the range of data in exception occurred * @since 2.0.0 */ -@Experimental @InterfaceStability.Evolving class StreamingQueryException private[sql]( private val queryDebugString: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index c376913516ef..6aa82b89ede8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -19,17 +19,15 @@ package org.apache.spark.sql.streaming import java.util.UUID -import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.annotation.InterfaceStability import org.apache.spark.scheduler.SparkListenerEvent /** - * :: Experimental :: * Interface for listening to events related to [[StreamingQuery StreamingQueries]]. * @note The methods are not thread-safe as they may be called from different threads. * * @since 2.0.0 */ -@Experimental @InterfaceStability.Evolving abstract class StreamingQueryListener { @@ -66,32 +64,26 @@ abstract class StreamingQueryListener { /** - * :: Experimental :: * Companion object of [[StreamingQueryListener]] that defines the listener events. * @since 2.0.0 */ -@Experimental @InterfaceStability.Evolving object StreamingQueryListener { /** - * :: Experimental :: * Base type of [[StreamingQueryListener]] events * @since 2.0.0 */ - @Experimental @InterfaceStability.Evolving trait Event extends SparkListenerEvent /** - * :: Experimental :: * Event representing the start of a query * @param id An unique query id that persists across restarts. See `StreamingQuery.id()`. * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`. * @param name User-specified name of the query, null if not specified. * @since 2.1.0 */ - @Experimental @InterfaceStability.Evolving class QueryStartedEvent private[sql]( val id: UUID, @@ -99,17 +91,14 @@ object StreamingQueryListener { val name: String) extends Event /** - * :: Experimental :: * Event representing any progress updates in a query. * @param progress The query progress updates. * @since 2.1.0 */ - @Experimental @InterfaceStability.Evolving class QueryProgressEvent private[sql](val progress: StreamingQueryProgress) extends Event /** - * :: Experimental :: * Event representing that termination of a query. * * @param id An unique query id that persists across restarts. See `StreamingQuery.id()`. @@ -118,7 +107,6 @@ object StreamingQueryListener { * with an exception. Otherwise, it will be `None`. * @since 2.1.0 */ - @Experimental @InterfaceStability.Evolving class QueryTerminatedEvent private[sql]( val id: UUID, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 7810d9f6e964..002c45413b4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -24,7 +24,7 @@ import scala.collection.mutable import org.apache.hadoop.fs.Path -import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.annotation.InterfaceStability import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker @@ -34,12 +34,10 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{Clock, SystemClock, Utils} /** - * :: Experimental :: - * A class to manage all the [[StreamingQuery]] active on a `SparkSession`. + * A class to manage all the [[StreamingQuery]] active in a `SparkSession`. * * @since 2.0.0 */ -@Experimental @InterfaceStability.Evolving class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Logging { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala index 687b1267825f..a0c9bcc8929e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala @@ -22,10 +22,9 @@ import org.json4s.JsonAST.JValue import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ -import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.annotation.InterfaceStability /** - * :: Experimental :: * Reports information about the instantaneous status of a streaming query. * * @param message A human readable description of what the stream is currently doing. @@ -35,7 +34,6 @@ import org.apache.spark.annotation.{Experimental, InterfaceStability} * * @since 2.1.0 */ -@Experimental @InterfaceStability.Evolving class StreamingQueryStatus protected[sql]( val message: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 35fe6b8605fa..fb590e7df996 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -29,13 +29,11 @@ import org.json4s.JsonAST.JValue import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ -import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.annotation.InterfaceStability /** - * :: Experimental :: * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger. */ -@Experimental @InterfaceStability.Evolving class StateOperatorProgress private[sql]( val numRowsTotal: Long, @@ -54,7 +52,6 @@ class StateOperatorProgress private[sql]( } /** - * :: Experimental :: * Information about progress made in the execution of a [[StreamingQuery]] during * a trigger. Each event relates to processing done for a single trigger of the streaming * query. Events are emitted even when no new data is available to be processed. @@ -80,7 +77,6 @@ class StateOperatorProgress private[sql]( * @param sources detailed statistics on data being read from each of the streaming sources. * @since 2.1.0 */ -@Experimental @InterfaceStability.Evolving class StreamingQueryProgress private[sql]( val id: UUID, @@ -139,7 +135,6 @@ class StreamingQueryProgress private[sql]( } /** - * :: Experimental :: * Information about progress made for a source in the execution of a [[StreamingQuery]] * during a trigger. See [[StreamingQueryProgress]] for more information. * @@ -152,7 +147,6 @@ class StreamingQueryProgress private[sql]( * Spark. * @since 2.1.0 */ -@Experimental @InterfaceStability.Evolving class SourceProgress protected[sql]( val description: String, @@ -191,14 +185,12 @@ class SourceProgress protected[sql]( } /** - * :: Experimental :: * Information about progress made for a sink in the execution of a [[StreamingQuery]] * during a trigger. See [[StreamingQueryProgress]] for more information. * * @param description Description of the source corresponding to this status. * @since 2.1.0 */ -@Experimental @InterfaceStability.Evolving class SinkProgress protected[sql]( val description: String) extends Serializable { From 8221c01c0d182ce27ee1051b0bb1d8581d7fce23 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 24 May 2017 16:52:13 -0700 Subject: [PATCH 2/3] fix title and python --- .../structured-streaming-programming-guide.md | 2 +- python/pyspark/sql/dataframe.py | 6 +-- python/pyspark/sql/streaming.py | 42 +++++++++---------- 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index da29aa010cee..6a25c9939c26 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1,6 +1,6 @@ --- layout: global -displayTitle: Structured Streaming Programming Guide [Experimental] +displayTitle: Structured Streaming Programming Guide title: Structured Streaming Programming Guide --- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 7b67985f2b32..fbe66f18a361 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -209,7 +209,7 @@ def writeStream(self): Interface for saving the content of the streaming :class:`DataFrame` out into external storage. - .. note:: Experimental. + .. note:: Evolving. :return: :class:`DataStreamWriter` """ @@ -285,7 +285,7 @@ def isStreaming(self): :func:`collect`) will throw an :class:`AnalysisException` when there is a streaming source present. - .. note:: Experimental + .. note:: Evolving """ return self._jdf.isStreaming() @@ -368,7 +368,7 @@ def withWatermark(self, eventTime, delayThreshold): latest record that has been processed in the form of an interval (e.g. "1 minute" or "5 hours"). - .. note:: Experimental + .. note:: Evolving >>> sdf.select('name', sdf.time.cast('timestamp')).withWatermark('time', '10 minutes') DataFrame[name: string, time: timestamp] diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 65b59d480da3..76e8c4f47d8a 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -41,7 +41,7 @@ class StreamingQuery(object): A handle to a query that is executing continuously in the background as new data arrives. All these methods are thread-safe. - .. note:: Experimental + .. note:: Evolving .. versionadded:: 2.0 """ @@ -197,7 +197,7 @@ def exception(self): class StreamingQueryManager(object): """A class to manage all the :class:`StreamingQuery` StreamingQueries active. - .. note:: Experimental + .. note:: Evolving .. versionadded:: 2.0 """ @@ -283,7 +283,7 @@ class DataStreamReader(OptionUtils): (e.g. file systems, key-value stores, etc). Use :func:`spark.readStream` to access this. - .. note:: Experimental. + .. note:: Evolving. .. versionadded:: 2.0 """ @@ -300,7 +300,7 @@ def _df(self, jdf): def format(self, source): """Specifies the input data source format. - .. note:: Experimental. + .. note:: Evolving. :param source: string, name of the data source, e.g. 'json', 'parquet'. @@ -317,7 +317,7 @@ def schema(self, schema): By specifying the schema here, the underlying data source can skip the schema inference step, and thus speed up data loading. - .. note:: Experimental. + .. note:: Evolving. :param schema: a :class:`pyspark.sql.types.StructType` object @@ -340,7 +340,7 @@ def option(self, key, value): in the JSON/CSV datasources or partition values. If it isn't set, it uses the default value, session local timezone. - .. note:: Experimental. + .. note:: Evolving. >>> s = spark.readStream.option("x", 1) """ @@ -356,7 +356,7 @@ def options(self, **options): in the JSON/CSV datasources or partition values. If it isn't set, it uses the default value, session local timezone. - .. note:: Experimental. + .. note:: Evolving. >>> s = spark.readStream.options(x="1", y=2) """ @@ -368,7 +368,7 @@ def options(self, **options): def load(self, path=None, format=None, schema=None, **options): """Loads a data stream from a data source and returns it as a :class`DataFrame`. - .. note:: Experimental. + .. note:: Evolving. :param path: optional string for file-system backed data sources. :param format: optional string for format of the data source. Default to 'parquet'. @@ -411,7 +411,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, If the ``schema`` parameter is not specified, this function goes through the input once to determine the input schema. - .. note:: Experimental. + .. note:: Evolving. :param path: string represents path to the JSON dataset, or RDD of Strings storing JSON objects. @@ -488,7 +488,7 @@ def parquet(self, path): Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \ The default value is specified in ``spark.sql.parquet.mergeSchema``. - .. note:: Experimental. + .. note:: Evolving. >>> parquet_sdf = spark.readStream.schema(sdf_schema).parquet(tempfile.mkdtemp()) >>> parquet_sdf.isStreaming @@ -511,7 +511,7 @@ def text(self, path): Each line in the text file is a new row in the resulting DataFrame. - .. note:: Experimental. + .. note:: Evolving. :param paths: string, or list of strings, for input path(s). @@ -539,7 +539,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non ``inferSchema`` is enabled. To avoid going through the entire data once, disable ``inferSchema`` option or specify the schema explicitly using ``schema``. - .. note:: Experimental. + .. note:: Evolving. :param path: string, or list of strings, for input path(s). :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema. @@ -637,7 +637,7 @@ class DataStreamWriter(object): (e.g. file systems, key-value stores, etc). Use :func:`DataFrame.writeStream` to access this. - .. note:: Experimental. + .. note:: Evolving. .. versionadded:: 2.0 """ @@ -665,7 +665,7 @@ def outputMode(self, outputMode): written to the sink every time there are some updates. If the query doesn't contain aggregations, it will be equivalent to `append` mode. - .. note:: Experimental. + .. note:: Evolving. >>> writer = sdf.writeStream.outputMode('append') """ @@ -678,7 +678,7 @@ def outputMode(self, outputMode): def format(self, source): """Specifies the underlying output data source. - .. note:: Experimental. + .. note:: Evolving. :param source: string, name of the data source, which for now can be 'parquet'. @@ -696,7 +696,7 @@ def option(self, key, value): timestamps in the JSON/CSV datasources or partition values. If it isn't set, it uses the default value, session local timezone. - .. note:: Experimental. + .. note:: Evolving. """ self._jwrite = self._jwrite.option(key, to_str(value)) return self @@ -710,7 +710,7 @@ def options(self, **options): timestamps in the JSON/CSV datasources or partition values. If it isn't set, it uses the default value, session local timezone. - .. note:: Experimental. + .. note:: Evolving. """ for k in options: self._jwrite = self._jwrite.option(k, to_str(options[k])) @@ -723,7 +723,7 @@ def partitionBy(self, *cols): If specified, the output is laid out on the file system similar to Hive's partitioning scheme. - .. note:: Experimental. + .. note:: Evolving. :param cols: name of columns @@ -739,7 +739,7 @@ def queryName(self, queryName): :func:`start`. This name must be unique among all the currently active queries in the associated SparkSession. - .. note:: Experimental. + .. note:: Evolving. :param queryName: unique name for the query @@ -756,7 +756,7 @@ def trigger(self, processingTime=None, once=None): """Set the trigger for the stream query. If this is not set it will run the query as fast as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``. - .. note:: Experimental. + .. note:: Evolving. :param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'. @@ -794,7 +794,7 @@ def start(self, path=None, format=None, outputMode=None, partitionBy=None, query If ``format`` is not specified, the default data source configured by ``spark.sql.sources.default`` will be used. - .. note:: Experimental. + .. note:: Evolving. :param path: the path in a Hadoop supported file system :param format: the format used to save From 98e8326e3f338ce6a6de4697fbc4344d57668140 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Fri, 26 May 2017 12:50:33 -0500 Subject: [PATCH 3/3] more fixes --- python/pyspark/sql/context.py | 4 ++-- python/pyspark/sql/session.py | 4 ++-- .../java/org/apache/spark/sql/streaming/OutputMode.java | 3 --- .../main/java/org/apache/spark/sql/streaming/Trigger.java | 4 ---- .../src/main/scala/org/apache/spark/sql/SQLContext.scala | 2 -- .../src/main/scala/org/apache/spark/sql/SparkSession.scala | 2 -- .../org/apache/spark/sql/streaming/DataStreamReader.scala | 2 +- .../org/apache/spark/sql/streaming/ProcessingTime.scala | 6 +----- 8 files changed, 6 insertions(+), 21 deletions(-) diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 5197a9e00461..426f07cd9410 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -474,7 +474,7 @@ def readStream(self): Returns a :class:`DataStreamReader` that can be used to read data streams as a streaming :class:`DataFrame`. - .. note:: Experimental. + .. note:: Evolving. :return: :class:`DataStreamReader` @@ -490,7 +490,7 @@ def streams(self): """Returns a :class:`StreamingQueryManager` that allows managing all the :class:`StreamingQuery` StreamingQueries active on `this` context. - .. note:: Experimental. + .. note:: Evolving. """ from pyspark.sql.streaming import StreamingQueryManager return StreamingQueryManager(self._ssql_ctx.streams()) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index c1bf2bd76fb7..e3bf0f35ea15 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -586,7 +586,7 @@ def readStream(self): Returns a :class:`DataStreamReader` that can be used to read data streams as a streaming :class:`DataFrame`. - .. note:: Experimental. + .. note:: Evolving. :return: :class:`DataStreamReader` """ @@ -598,7 +598,7 @@ def streams(self): """Returns a :class:`StreamingQueryManager` that allows managing all the :class:`StreamingQuery` StreamingQueries active on `this` context. - .. note:: Experimental. + .. note:: Evolving. :return: :class:`StreamingQueryManager` """ diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java index 3f7cdb293e0f..8410abd14fd5 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java @@ -22,14 +22,11 @@ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes; /** - * :: Experimental :: - * * OutputMode is used to what data will be written to a streaming sink when there is * new data available in a streaming DataFrame/Dataset. * * @since 2.0.0 */ -@Experimental @InterfaceStability.Evolving public class OutputMode { diff --git a/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java b/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java index be1d03fe1f33..d31790a28568 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java +++ b/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java @@ -33,7 +33,6 @@ public class Trigger { /** - * :: Experimental :: * A trigger policy that runs a query periodically based on an interval in processing time. * If `interval` is 0, the query will run as fast as possible. * @@ -44,7 +43,6 @@ public static Trigger ProcessingTime(long intervalMs) { } /** - * :: Experimental :: * (Java-friendly) * A trigger policy that runs a query periodically based on an interval in processing time. * If `interval` is 0, the query will run as fast as possible. @@ -61,7 +59,6 @@ public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) { } /** - * :: Experimental :: * (Scala-friendly) * A trigger policy that runs a query periodically based on an interval in processing time. * If `duration` is 0, the query will run as fast as possible. @@ -77,7 +74,6 @@ public static Trigger ProcessingTime(Duration interval) { } /** - * :: Experimental :: * A trigger policy that runs a query periodically based on an interval in processing time. * If `interval` is effectively 0, the query will run as fast as possible. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index cc2983987eb9..7fde6e9469e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -505,7 +505,6 @@ class SQLContext private[sql](val sparkSession: SparkSession) /** - * :: Experimental :: * Returns a `DataStreamReader` that can be used to read streaming data in as a `DataFrame`. * {{{ * sparkSession.readStream.parquet("/path/to/directory/of/parquet/files") @@ -514,7 +513,6 @@ class SQLContext private[sql](val sparkSession: SparkSession) * * @since 2.0.0 */ - @Experimental @InterfaceStability.Evolving def readStream: DataStreamReader = sparkSession.readStream diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index a519492ed8f4..d2bf35071193 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -636,7 +636,6 @@ class SparkSession private( def read: DataFrameReader = new DataFrameReader(self) /** - * :: Experimental :: * Returns a `DataStreamReader` that can be used to read streaming data in as a `DataFrame`. * {{{ * sparkSession.readStream.parquet("/path/to/directory/of/parquet/files") @@ -645,7 +644,6 @@ class SparkSession private( * * @since 2.0.0 */ - @Experimental @InterfaceStability.Evolving def readStream: DataStreamReader = new DataStreamReader(self) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 4c5a13e0fb0d..766776230257 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -21,7 +21,7 @@ import java.util.Locale import scala.collection.JavaConverters._ -import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.annotation.InterfaceStability import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession} import org.apache.spark.sql.execution.command.DDLUtils diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala index 9ba1fc01cbd3..a033575d3d38 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala @@ -23,11 +23,10 @@ import scala.concurrent.duration.Duration import org.apache.commons.lang3.StringUtils -import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.annotation.InterfaceStability import org.apache.spark.unsafe.types.CalendarInterval /** - * :: Experimental :: * A trigger that runs a query periodically based on the processing time. If `interval` is 0, * the query will run as fast as possible. * @@ -49,7 +48,6 @@ import org.apache.spark.unsafe.types.CalendarInterval * * @since 2.0.0 */ -@Experimental @InterfaceStability.Evolving @deprecated("use Trigger.ProcessingTime(intervalMs)", "2.2.0") case class ProcessingTime(intervalMs: Long) extends Trigger { @@ -57,12 +55,10 @@ case class ProcessingTime(intervalMs: Long) extends Trigger { } /** - * :: Experimental :: * Used to create [[ProcessingTime]] triggers for [[StreamingQuery]]s. * * @since 2.0.0 */ -@Experimental @InterfaceStability.Evolving @deprecated("use Trigger.ProcessingTime(intervalMs)", "2.2.0") object ProcessingTime {