Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
layout: global
displayTitle: Structured Streaming Programming Guide [Experimental]
displayTitle: Structured Streaming Programming Guide
title: Structured Streaming Programming Guide
---

Expand All @@ -10,7 +10,7 @@ title: Structured Streaming Programming Guide
# Overview
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the [Dataset/DataFrame API](sql-programming-guide.html) in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, *Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main title still says Experimental :P

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha, good catch


**Structured Streaming is still ALPHA in Spark 2.1** and the APIs are still experimental. In this guide, we are going to walk you through the programming model and the APIs. First, let's start with a simple example - a streaming word count.
In this guide, we are going to walk you through the programming model and the APIs. First, let's start with a simple example - a streaming word count.

# Quick Example
Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ def readStream(self):
Returns a :class:`DataStreamReader` that can be used to read data streams
as a streaming :class:`DataFrame`.

.. note:: Experimental.
.. note:: Evolving.

:return: :class:`DataStreamReader`

Expand All @@ -490,7 +490,7 @@ def streams(self):
"""Returns a :class:`StreamingQueryManager` that allows managing all the
:class:`StreamingQuery` StreamingQueries active on `this` context.

.. note:: Experimental.
.. note:: Evolving.
"""
from pyspark.sql.streaming import StreamingQueryManager
return StreamingQueryManager(self._ssql_ctx.streams())
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def writeStream(self):
Interface for saving the content of the streaming :class:`DataFrame` out into external
storage.

.. note:: Experimental.
.. note:: Evolving.

:return: :class:`DataStreamWriter`
"""
Expand Down Expand Up @@ -285,7 +285,7 @@ def isStreaming(self):
:func:`collect`) will throw an :class:`AnalysisException` when there is a streaming
source present.

.. note:: Experimental
.. note:: Evolving
"""
return self._jdf.isStreaming()

Expand Down Expand Up @@ -368,7 +368,7 @@ def withWatermark(self, eventTime, delayThreshold):
latest record that has been processed in the form of an interval
(e.g. "1 minute" or "5 hours").

.. note:: Experimental
.. note:: Evolving

>>> sdf.select('name', sdf.time.cast('timestamp')).withWatermark('time', '10 minutes')
DataFrame[name: string, time: timestamp]
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ def readStream(self):
Returns a :class:`DataStreamReader` that can be used to read data streams
as a streaming :class:`DataFrame`.

.. note:: Experimental.
.. note:: Evolving.

:return: :class:`DataStreamReader`
"""
Expand All @@ -598,7 +598,7 @@ def streams(self):
"""Returns a :class:`StreamingQueryManager` that allows managing all the
:class:`StreamingQuery` StreamingQueries active on `this` context.

.. note:: Experimental.
.. note:: Evolving.

:return: :class:`StreamingQueryManager`
"""
Expand Down
42 changes: 21 additions & 21 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class StreamingQuery(object):
A handle to a query that is executing continuously in the background as new data arrives.
All these methods are thread-safe.

.. note:: Experimental
.. note:: Evolving

.. versionadded:: 2.0
"""
Expand Down Expand Up @@ -197,7 +197,7 @@ def exception(self):
class StreamingQueryManager(object):
"""A class to manage all the :class:`StreamingQuery` StreamingQueries active.

.. note:: Experimental
.. note:: Evolving

.. versionadded:: 2.0
"""
Expand Down Expand Up @@ -283,7 +283,7 @@ class DataStreamReader(OptionUtils):
(e.g. file systems, key-value stores, etc). Use :func:`spark.readStream`
to access this.

.. note:: Experimental.
.. note:: Evolving.

.. versionadded:: 2.0
"""
Expand All @@ -300,7 +300,7 @@ def _df(self, jdf):
def format(self, source):
"""Specifies the input data source format.

.. note:: Experimental.
.. note:: Evolving.

:param source: string, name of the data source, e.g. 'json', 'parquet'.

Expand All @@ -317,7 +317,7 @@ def schema(self, schema):
By specifying the schema here, the underlying data source can skip the schema
inference step, and thus speed up data loading.

.. note:: Experimental.
.. note:: Evolving.

:param schema: a :class:`pyspark.sql.types.StructType` object

Expand All @@ -340,7 +340,7 @@ def option(self, key, value):
in the JSON/CSV datasources or partition values.
If it isn't set, it uses the default value, session local timezone.

.. note:: Experimental.
.. note:: Evolving.

>>> s = spark.readStream.option("x", 1)
"""
Expand All @@ -356,7 +356,7 @@ def options(self, **options):
in the JSON/CSV datasources or partition values.
If it isn't set, it uses the default value, session local timezone.

.. note:: Experimental.
.. note:: Evolving.

>>> s = spark.readStream.options(x="1", y=2)
"""
Expand All @@ -368,7 +368,7 @@ def options(self, **options):
def load(self, path=None, format=None, schema=None, **options):
"""Loads a data stream from a data source and returns it as a :class`DataFrame`.

.. note:: Experimental.
.. note:: Evolving.

:param path: optional string for file-system backed data sources.
:param format: optional string for format of the data source. Default to 'parquet'.
Expand Down Expand Up @@ -411,7 +411,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
If the ``schema`` parameter is not specified, this function goes
through the input once to determine the input schema.

.. note:: Experimental.
.. note:: Evolving.

:param path: string represents path to the JSON dataset,
or RDD of Strings storing JSON objects.
Expand Down Expand Up @@ -488,7 +488,7 @@ def parquet(self, path):
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
The default value is specified in ``spark.sql.parquet.mergeSchema``.

.. note:: Experimental.
.. note:: Evolving.

>>> parquet_sdf = spark.readStream.schema(sdf_schema).parquet(tempfile.mkdtemp())
>>> parquet_sdf.isStreaming
Expand All @@ -511,7 +511,7 @@ def text(self, path):

Each line in the text file is a new row in the resulting DataFrame.

.. note:: Experimental.
.. note:: Evolving.

:param paths: string, or list of strings, for input path(s).

Expand Down Expand Up @@ -539,7 +539,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
``inferSchema`` is enabled. To avoid going through the entire data once, disable
``inferSchema`` option or specify the schema explicitly using ``schema``.

.. note:: Experimental.
.. note:: Evolving.

:param path: string, or list of strings, for input path(s).
:param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema.
Expand Down Expand Up @@ -637,7 +637,7 @@ class DataStreamWriter(object):
(e.g. file systems, key-value stores, etc). Use :func:`DataFrame.writeStream`
to access this.

.. note:: Experimental.
.. note:: Evolving.

.. versionadded:: 2.0
"""
Expand Down Expand Up @@ -665,7 +665,7 @@ def outputMode(self, outputMode):
written to the sink every time there are some updates. If the query doesn't contain
aggregations, it will be equivalent to `append` mode.

.. note:: Experimental.
.. note:: Evolving.

>>> writer = sdf.writeStream.outputMode('append')
"""
Expand All @@ -678,7 +678,7 @@ def outputMode(self, outputMode):
def format(self, source):
"""Specifies the underlying output data source.

.. note:: Experimental.
.. note:: Evolving.

:param source: string, name of the data source, which for now can be 'parquet'.

Expand All @@ -696,7 +696,7 @@ def option(self, key, value):
timestamps in the JSON/CSV datasources or partition values.
If it isn't set, it uses the default value, session local timezone.

.. note:: Experimental.
.. note:: Evolving.
"""
self._jwrite = self._jwrite.option(key, to_str(value))
return self
Expand All @@ -710,7 +710,7 @@ def options(self, **options):
timestamps in the JSON/CSV datasources or partition values.
If it isn't set, it uses the default value, session local timezone.

.. note:: Experimental.
.. note:: Evolving.
"""
for k in options:
self._jwrite = self._jwrite.option(k, to_str(options[k]))
Expand All @@ -723,7 +723,7 @@ def partitionBy(self, *cols):
If specified, the output is laid out on the file system similar
to Hive's partitioning scheme.

.. note:: Experimental.
.. note:: Evolving.

:param cols: name of columns

Expand All @@ -739,7 +739,7 @@ def queryName(self, queryName):
:func:`start`. This name must be unique among all the currently active queries
in the associated SparkSession.

.. note:: Experimental.
.. note:: Evolving.

:param queryName: unique name for the query

Expand All @@ -756,7 +756,7 @@ def trigger(self, processingTime=None, once=None):
"""Set the trigger for the stream query. If this is not set it will run the query as fast
as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.

.. note:: Experimental.
.. note:: Evolving.

:param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'.

Expand Down Expand Up @@ -794,7 +794,7 @@ def start(self, path=None, format=None, outputMode=None, partitionBy=None, query
If ``format`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used.

.. note:: Experimental.
.. note:: Evolving.

:param path: the path in a Hadoop supported file system
:param format: the format used to save
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,11 @@
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes;

/**
* :: Experimental ::
*
* OutputMode is used to what data will be written to a streaming sink when there is
* new data available in a streaming DataFrame/Dataset.
*
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
public class OutputMode {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,18 @@

import scala.concurrent.duration.Duration;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;

/**
* :: Experimental ::
* Policy used to indicate how often results should be produced by a [[StreamingQuery]].
*
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
public class Trigger {

Copy link
Contributor

@tdas tdas May 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file has more places with ":: Experimental ::" in the scala docs

/**
* :: Experimental ::
* A trigger policy that runs a query periodically based on an interval in processing time.
* If `interval` is 0, the query will run as fast as possible.
*
Expand All @@ -47,7 +43,6 @@ public static Trigger ProcessingTime(long intervalMs) {
}

/**
* :: Experimental ::
* (Java-friendly)
* A trigger policy that runs a query periodically based on an interval in processing time.
* If `interval` is 0, the query will run as fast as possible.
Expand All @@ -64,7 +59,6 @@ public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
}

/**
* :: Experimental ::
* (Scala-friendly)
* A trigger policy that runs a query periodically based on an interval in processing time.
* If `duration` is 0, the query will run as fast as possible.
Expand All @@ -80,7 +74,6 @@ public static Trigger ProcessingTime(Duration interval) {
}

/**
* :: Experimental ::
* A trigger policy that runs a query periodically based on an interval in processing time.
* If `interval` is effectively 0, the query will run as fast as possible.
*
Expand Down
2 changes: 0 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2782,13 +2782,11 @@ class Dataset[T] private[sql](
}

/**
* :: Experimental ::
* Interface for saving the content of the streaming Dataset out into external storage.
*
* @group basic
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
def writeStream: DataStreamWriter[T] = {
if (!isStreaming) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

package org.apache.spark.sql

import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.annotation.InterfaceStability

/**
* :: Experimental ::
* A class to consume data generated by a `StreamingQuery`. Typically this is used to send the
* generated data to external systems. Each partition will use a new deserialized instance, so you
* usually should do all the initialization (e.g. opening a connection or initiating a transaction)
Expand Down Expand Up @@ -66,7 +65,6 @@ import org.apache.spark.annotation.{Experimental, InterfaceStability}
* }}}
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
abstract class ForeachWriter[T] extends Serializable {

Expand Down
2 changes: 0 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,6 @@ class SQLContext private[sql](val sparkSession: SparkSession)


/**
* :: Experimental ::
* Returns a `DataStreamReader` that can be used to read streaming data in as a `DataFrame`.
* {{{
* sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")
Expand All @@ -514,7 +513,6 @@ class SQLContext private[sql](val sparkSession: SparkSession)
*
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
def readStream: DataStreamReader = sparkSession.readStream

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,6 @@ class SparkSession private(
def read: DataFrameReader = new DataFrameReader(self)

/**
* :: Experimental ::
* Returns a `DataStreamReader` that can be used to read streaming data in as a `DataFrame`.
* {{{
* sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")
Expand All @@ -645,7 +644,6 @@ class SparkSession private(
*
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
def readStream: DataStreamReader = new DataStreamReader(self)

Expand Down
Loading