Skip to content

Commit c5f2b30

Browse files
committed
Merge branch 'master' of git://git.apache.org/spark into SPARK-3854
2 parents d80d71a + 69c67ab commit c5f2b30

File tree

23 files changed

+2152
-22
lines changed

23 files changed

+2152
-22
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio
2525
import scala.collection.JavaConversions._
2626
import scala.collection.mutable
2727
import scala.language.existentials
28-
import scala.reflect.ClassTag
29-
import scala.util.{Try, Success, Failure}
3028

3129
import net.razorvine.pickle.{Pickler, Unpickler}
3230

@@ -42,7 +40,7 @@ import org.apache.spark.rdd.RDD
4240
import org.apache.spark.util.Utils
4341

4442
private[spark] class PythonRDD(
45-
parent: RDD[_],
43+
@transient parent: RDD[_],
4644
command: Array[Byte],
4745
envVars: JMap[String, String],
4846
pythonIncludes: JList[String],
@@ -55,9 +53,9 @@ private[spark] class PythonRDD(
5553
val bufferSize = conf.getInt("spark.buffer.size", 65536)
5654
val reuse_worker = conf.getBoolean("spark.python.worker.reuse", true)
5755

58-
override def getPartitions = parent.partitions
56+
override def getPartitions = firstParent.partitions
5957

60-
override val partitioner = if (preservePartitoning) parent.partitioner else None
58+
override val partitioner = if (preservePartitoning) firstParent.partitioner else None
6159

6260
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
6361
val startTime = System.currentTimeMillis
@@ -234,7 +232,7 @@ private[spark] class PythonRDD(
234232
dataOut.writeInt(command.length)
235233
dataOut.write(command)
236234
// Data values
237-
PythonRDD.writeIteratorToStream(parent.iterator(split, context), dataOut)
235+
PythonRDD.writeIteratorToStream(firstParent.iterator(split, context), dataOut)
238236
dataOut.writeInt(SpecialLengths.END_OF_DATA_SECTION)
239237
dataOut.flush()
240238
} catch {
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
"""
19+
Counts words in new text files created in the given directory
20+
Usage: hdfs_wordcount.py <directory>
21+
<directory> is the directory that Spark Streaming will use to find and read new text files.
22+
23+
To run this on your local machine on directory `localdir`, run this example
24+
$ bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localdir
25+
26+
Then create a text file in `localdir` and the words in the file will get counted.
27+
"""
28+
29+
import sys
30+
31+
from pyspark import SparkContext
32+
from pyspark.streaming import StreamingContext
33+
34+
if __name__ == "__main__":
35+
if len(sys.argv) != 2:
36+
print >> sys.stderr, "Usage: hdfs_wordcount.py <directory>"
37+
exit(-1)
38+
39+
sc = SparkContext(appName="PythonStreamingHDFSWordCount")
40+
ssc = StreamingContext(sc, 1)
41+
42+
lines = ssc.textFileStream(sys.argv[1])
43+
counts = lines.flatMap(lambda line: line.split(" "))\
44+
.map(lambda x: (x, 1))\
45+
.reduceByKey(lambda a, b: a+b)
46+
counts.pprint()
47+
48+
ssc.start()
49+
ssc.awaitTermination()
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
"""
19+
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
20+
Usage: network_wordcount.py <hostname> <port>
21+
<hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
22+
23+
To run this on your local machine, you need to first run a Netcat server
24+
`$ nc -lk 9999`
25+
and then run the example
26+
`$ bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999`
27+
"""
28+
29+
import sys
30+
31+
from pyspark import SparkContext
32+
from pyspark.streaming import StreamingContext
33+
34+
if __name__ == "__main__":
35+
if len(sys.argv) != 3:
36+
print >> sys.stderr, "Usage: network_wordcount.py <hostname> <port>"
37+
exit(-1)
38+
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
39+
ssc = StreamingContext(sc, 1)
40+
41+
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
42+
counts = lines.flatMap(lambda line: line.split(" "))\
43+
.map(lambda word: (word, 1))\
44+
.reduceByKey(lambda a, b: a+b)
45+
counts.pprint()
46+
47+
ssc.start()
48+
ssc.awaitTermination()
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
"""
19+
Counts words in UTF8 encoded, '\n' delimited text received from the
20+
network every second.
21+
22+
Usage: stateful_network_wordcount.py <hostname> <port>
23+
<hostname> and <port> describe the TCP server that Spark Streaming
24+
would connect to receive data.
25+
26+
To run this on your local machine, you need to first run a Netcat server
27+
`$ nc -lk 9999`
28+
and then run the example
29+
`$ bin/spark-submit examples/src/main/python/streaming/stateful_network_wordcount.py \
30+
localhost 9999`
31+
"""
32+
33+
import sys
34+
35+
from pyspark import SparkContext
36+
from pyspark.streaming import StreamingContext
37+
38+
if __name__ == "__main__":
39+
if len(sys.argv) != 3:
40+
print >> sys.stderr, "Usage: stateful_network_wordcount.py <hostname> <port>"
41+
exit(-1)
42+
sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
43+
ssc = StreamingContext(sc, 1)
44+
ssc.checkpoint("checkpoint")
45+
46+
def updateFunc(new_values, last_sum):
47+
return sum(new_values) + (last_sum or 0)
48+
49+
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
50+
running_counts = lines.flatMap(lambda line: line.split(" "))\
51+
.map(lambda word: (word, 1))\
52+
.updateStateByKey(updateFunc)
53+
54+
running_counts.pprint()
55+
56+
ssc.start()
57+
ssc.awaitTermination()

python/docs/conf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@
131131
# Add any paths that contain custom static files (such as style sheets) here,
132132
# relative to this directory. They are copied after the builtin static files,
133133
# so a file named "default.css" will overwrite the builtin "default.css".
134-
html_static_path = ['_static']
134+
#html_static_path = ['_static']
135135

136136
# Add any extra paths that contain custom files (such as robots.txt or
137137
# .htaccess) here, relative to this directory. These files are copied

python/docs/epytext.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
(r"L{([\w.()]+)}", r":class:`\1`"),
66
(r"[LC]{(\w+\.\w+)\(\)}", r":func:`\1`"),
77
(r"C{([\w.()]+)}", r":class:`\1`"),
8-
(r"[IBCM]{(.+)}", r"`\1`"),
8+
(r"[IBCM]{([^}]+)}", r"`\1`"),
99
('pyspark.rdd.RDD', 'RDD'),
1010
)
1111

python/docs/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Contents:
1313

1414
pyspark
1515
pyspark.sql
16+
pyspark.streaming
1617
pyspark.mllib
1718

1819

python/docs/pyspark.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ Subpackages
77
.. toctree::
88
:maxdepth: 1
99

10-
pyspark.mllib
1110
pyspark.sql
11+
pyspark.streaming
12+
pyspark.mllib
1213

1314
Contents
1415
--------

python/pyspark/context.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class SparkContext(object):
6868

6969
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
7070
environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
71-
gateway=None):
71+
gateway=None, jsc=None):
7272
"""
7373
Create a new SparkContext. At least the master and app name should be set,
7474
either through the named parameters here or through C{conf}.
@@ -104,14 +104,14 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
104104
SparkContext._ensure_initialized(self, gateway=gateway)
105105
try:
106106
self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
107-
conf)
107+
conf, jsc)
108108
except:
109109
# If an error occurs, clean up in order to allow future SparkContext creation:
110110
self.stop()
111111
raise
112112

113113
def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
114-
conf):
114+
conf, jsc):
115115
self.environment = environment or {}
116116
self._conf = conf or SparkConf(_jvm=self._jvm)
117117
self._batchSize = batchSize # -1 represents an unlimited batch size
@@ -154,7 +154,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
154154
self.environment[varName] = v
155155

156156
# Create the Java SparkContext through Py4J
157-
self._jsc = self._initialize_context(self._conf._jconf)
157+
self._jsc = jsc or self._initialize_context(self._conf._jconf)
158158

159159
# Create a single Accumulator in Java that we'll send all our updates through;
160160
# they will be passed back to us through a TCP server

python/pyspark/mllib/feature.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ def transform(self, word):
4444
"""
4545
:param word: a word
4646
:return: vector representation of word
47+
4748
Transforms a word to its vector representation
4849
4950
Note: local use only
@@ -57,6 +58,7 @@ def findSynonyms(self, x, num):
5758
:param x: a word or a vector representation of word
5859
:param num: number of synonyms to find
5960
:return: array of (word, cosineSimilarity)
61+
6062
Find synonyms of a word
6163
6264
Note: local use only

0 commit comments

Comments
 (0)