Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions docs/mllib-data-types.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ dv2 = [1.0, 0.0, 3.0]
# Create a SparseVector.
sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
# Use a single-column SciPy csc_matrix as a sparse vector.
sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 2])), shape = (3, 1))
sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 2])), shape=(3, 1))
{% endhighlight %}

</div>
Expand Down Expand Up @@ -517,12 +517,12 @@ from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

# Create an RDD of indexed rows.
# - This can be done explicitly with the IndexedRow class:
indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
IndexedRow(1, [4, 5, 6]),
IndexedRow(2, [7, 8, 9]),
indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
IndexedRow(1, [4, 5, 6]),
IndexedRow(2, [7, 8, 9]),
IndexedRow(3, [10, 11, 12])])
# - or by using (long, vector) tuples:
indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]),
indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]),
(2, [7, 8, 9]), (3, [10, 11, 12])])

# Create an IndexedRowMatrix from an RDD of IndexedRows.
Expand Down Expand Up @@ -731,15 +731,15 @@ from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix

# Create an RDD of sub-matrix blocks.
blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])

# Create a BlockMatrix from an RDD of sub-matrix blocks.
mat = BlockMatrix(blocks, 3, 2)

# Get its size.
m = mat.numRows() # 6
n = mat.numCols() # 2
m = mat.numRows() # 6
n = mat.numCols() # 2

# Get the blocks as an RDD of sub-matrix blocks.
blocksRDD = mat.blocks
Expand Down
16 changes: 9 additions & 7 deletions docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ Similarly to text files, SequenceFiles can be saved and loaded by specifying the
classes can be specified, but for standard Writables this is not required.

{% highlight python %}
>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]
Expand All @@ -459,10 +459,12 @@ Elasticsearch ESInputFormat:

{% highlight python %}
$ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar ./bin/pyspark
>>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
"org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
>>> rdd.first() # the result is a MapWritable that is converted to a Python dict
>>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",
"org.apache.hadoop.io.NullWritable",
"org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=conf)
>>> rdd.first() # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
{u'field1': True,
u'field2': u'Some Text',
Expand Down Expand Up @@ -797,7 +799,6 @@ def increment_counter(x):
rdd.foreach(increment_counter)

print("Counter value: ", counter)

{% endhighlight %}
</div>

Expand Down Expand Up @@ -1455,13 +1456,14 @@ The code below shows an accumulator being used to add up the elements of an arra

{% highlight python %}
>>> accum = sc.accumulator(0)
>>> accum
Accumulator<id=0, value=0>

>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
>>> accum.value
10
{% endhighlight %}

Expand Down
6 changes: 3 additions & 3 deletions docs/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ Spark's primary abstraction is a distributed collection of items called a Resili
RDDs have _[actions](programming-guide.html#actions)_, which return values, and _[transformations](programming-guide.html#transformations)_, which return pointers to new RDDs. Let's start with a few actions:

{% highlight python %}
>>> textFile.count() # Number of items in this RDD
>>> textFile.count() # Number of items in this RDD
126

>>> textFile.first() # First item in this RDD
>>> textFile.first() # First item in this RDD
u'# Apache Spark'
{% endhighlight %}

Expand All @@ -90,7 +90,7 @@ Now let's use a transformation. We will use the [`filter`](programming-guide.htm
We can chain together transformations and actions:

{% highlight python %}
>>> textFile.filter(lambda line: "Spark" in line).count() # How many lines contain "Spark"?
>>> textFile.filter(lambda line: "Spark" in line).count() # How many lines contain "Spark"?
15
{% endhighlight %}

Expand Down
4 changes: 2 additions & 2 deletions docs/streaming-kafka-0-8-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ Next, we discuss how to use this approach in your streaming application.
for o in offsetRanges:
print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)

directKafkaStream\
.transform(storeOffsetRanges)\
directKafkaStream \
.transform(storeOffsetRanges) \
.foreachRDD(printOffsetRanges)
</div>
</div>
Expand Down
33 changes: 16 additions & 17 deletions docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
<div data-lang="python" markdown="1">

{% highlight python %}
spamInfoRDD = sc.pickleFile(...) # RDD containing spam information
spamInfoRDD = sc.pickleFile(...) # RDD containing spam information

# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
Expand Down Expand Up @@ -1495,16 +1495,15 @@ See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_
</div>
<div data-lang="python" markdown="1">
{% highlight python %}

def getWordBlacklist(sparkContext):
if ('wordBlacklist' not in globals()):
globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
return globals()['wordBlacklist']
if ("wordBlacklist" not in globals()):
globals()["wordBlacklist"] = sparkContext.broadcast(["a", "b", "c"])
return globals()["wordBlacklist"]

def getDroppedWordsCounter(sparkContext):
if ('droppedWordsCounter' not in globals()):
globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
return globals()['droppedWordsCounter']
if ("droppedWordsCounter" not in globals()):
globals()["droppedWordsCounter"] = sparkContext.accumulator(0)
return globals()["droppedWordsCounter"]

def echo(time, rdd):
# Get or register the blacklist Broadcast
Expand Down Expand Up @@ -1626,12 +1625,12 @@ See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_

# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
if ('sparkSessionSingletonInstance' not in globals()):
globals()['sparkSessionSingletonInstance'] = SparkSession\
.builder\
.config(conf=sparkConf)\
if ("sparkSessionSingletonInstance" not in globals()):
globals()["sparkSessionSingletonInstance"] = SparkSession \
.builder \
.config(conf=sparkConf) \
.getOrCreate()
return globals()['sparkSessionSingletonInstance']
return globals()["sparkSessionSingletonInstance"]

...

Expand Down Expand Up @@ -1829,11 +1828,11 @@ This behavior is made simple by using `StreamingContext.getOrCreate`. This is us
{% highlight python %}
# Function to create and setup a new StreamingContext
def functionToCreateContext():
sc = SparkContext(...) # new context
ssc = new StreamingContext(...)
lines = ssc.socketTextStream(...) # create DStreams
sc = SparkContext(...) # new context
ssc = StreamingContext(...)
lines = ssc.socketTextStream(...) # create DStreams
...
ssc.checkpoint(checkpointDirectory) # set checkpoint directory
ssc.checkpoint(checkpointDirectory) # set checkpoint directory
return ssc

# Get StreamingContext from checkpoint data or create a new one
Expand Down
79 changes: 39 additions & 40 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession\
.builder()\
.appName("StructuredNetworkWordCount")\
spark = SparkSession \
.builder() \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
{% endhighlight %}

Expand Down Expand Up @@ -124,22 +124,22 @@ This `lines` DataFrame represents an unbounded table containing the streaming te

{% highlight python %}
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark\
.readStream\
.format('socket')\
.option('host', 'localhost')\
.option('port', 9999)\
lines = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()

# Split the lines into words
words = lines.select(
explode(
split(lines.value, ' ')
).alias('word')
split(lines.value, " ")
).alias("word")
)

# Generate running word count
wordCounts = words.groupBy('word').count()
wordCounts = words.groupBy("word").count()
{% endhighlight %}

This `lines` DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have used two built-in SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we use the function `alias` to name the new column as "word". Finally, we have defined the `wordCounts` DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
Expand Down Expand Up @@ -180,10 +180,10 @@ query.awaitTermination();

{% highlight python %}
# Start running the query that prints the running counts to the console
query = wordCounts\
.writeStream\
.outputMode('complete')\
.format('console')\
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()

query.awaitTermination()
Expand Down Expand Up @@ -488,7 +488,7 @@ spark = SparkSession. ...

# Read text from socket
socketDF = spark \
.readStream() \
.readStream() \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
Expand All @@ -504,7 +504,7 @@ csvDF = spark \
.readStream() \
.option("sep", ";") \
.schema(userSchema) \
.csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory")
.csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory")
{% endhighlight %}

</div>
Expand Down Expand Up @@ -596,8 +596,7 @@ ds.groupByKey(new MapFunction<DeviceData, String>() { // using typed API
<div data-lang="python" markdown="1">

{% highlight python %}

df = ... # streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
df = ... # streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }

# Select the devices which have signal more than 10
df.select("device").where("signal > 10")
Expand Down Expand Up @@ -653,11 +652,11 @@ Dataset<Row> windowedCounts = words.groupBy(
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, '10 minutes', '5 minutes'),
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
{% endhighlight %}
Expand Down Expand Up @@ -704,7 +703,7 @@ streamingDf.join(staticDf, "type", "right_join"); // right outer join with a st
{% highlight python %}
staticDf = spark.read. ...
streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type") # inner equi-join with a static DF
streamingDf.join(staticDf, "type") # inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join") # right outer join with a static DF
{% endhighlight %}

Expand Down Expand Up @@ -907,25 +906,25 @@ spark.sql("select * from aggregates").show(); // interactively query in-memory
noAggDF = deviceDataDf.select("device").where("signal > 10")

# Print new data to console
noAggDF\
.writeStream()\
.format("console")\
noAggDF \
.writeStream() \
.format("console") \
.start()

# Write new data to Parquet files
noAggDF\
.writeStream()\
.parquet("path/to/destination/directory")\
noAggDF \
.writeStream() \
.parquet("path/to/destination/directory") \
.start()

# ========== DF with aggregation ==========
aggDF = df.groupBy("device").count()

# Print updated aggregations to console
aggDF\
.writeStream()\
.outputMode("complete")\
.format("console")\
aggDF \
.writeStream() \
.outputMode("complete") \
.format("console") \
.start()

# Have all the aggregates in an in memory table. The query name will be the table name
Expand Down Expand Up @@ -1072,11 +1071,11 @@ spark.streams().awaitAnyTermination(); // block until any one of them terminat
{% highlight python %}
spark = ... # spark session

spark.streams().active # get the list of currently active streaming queries
spark.streams().active # get the list of currently active streaming queries

spark.streams().get(id) # get a query object by its unique id
spark.streams().get(id) # get a query object by its unique id

spark.streams().awaitAnyTermination() # block until any one of them terminates
spark.streams().awaitAnyTermination() # block until any one of them terminates
{% endhighlight %}

</div>
Expand Down Expand Up @@ -1116,11 +1115,11 @@ aggDF
<div data-lang="python" markdown="1">

{% highlight python %}
aggDF\
.writeStream()\
.outputMode("complete")\
.option("checkpointLocation", "path/to/HDFS/dir")\
.format("memory")\
aggDF \
.writeStream() \
.outputMode("complete") \
.option("checkpointLocation", "path/to/HDFS/dir") \
.format("memory") \
.start()
{% endhighlight %}

Expand Down