Skip to content

Commit 8d1e555

Browse files
committed
Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
2 parents 60c72d9 + 9306b8c commit 8d1e555

File tree

5 files changed

+116
-30
lines changed

5 files changed

+116
-30
lines changed

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
200200
stageData.shuffleReadBytes += shuffleReadDelta
201201
execSummary.shuffleRead += shuffleReadDelta
202202

203+
val inputBytesDelta =
204+
(taskMetrics.inputMetrics.map(_.bytesRead).getOrElse(0L)
205+
- oldMetrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L))
206+
stageData.inputBytes += inputBytesDelta
207+
execSummary.inputBytes += inputBytesDelta
208+
203209
val diskSpillDelta =
204210
taskMetrics.diskBytesSpilled - oldMetrics.map(_.diskBytesSpilled).getOrElse(0L)
205211
stageData.diskBytesSpilled += diskSpillDelta

core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.scalatest.Matchers
2222

2323
import org.apache.spark._
2424
import org.apache.spark.{LocalSparkContext, SparkConf, Success}
25-
import org.apache.spark.executor.{ShuffleWriteMetrics, ShuffleReadMetrics, TaskMetrics}
25+
import org.apache.spark.executor._
2626
import org.apache.spark.scheduler._
2727
import org.apache.spark.util.Utils
2828

@@ -150,6 +150,9 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
150150
taskMetrics.executorRunTime = base + 4
151151
taskMetrics.diskBytesSpilled = base + 5
152152
taskMetrics.memoryBytesSpilled = base + 6
153+
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
154+
taskMetrics.inputMetrics = Some(inputMetrics)
155+
inputMetrics.bytesRead = base + 7
153156
taskMetrics
154157
}
155158

@@ -182,6 +185,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
182185
assert(stage1Data.diskBytesSpilled == 205)
183186
assert(stage0Data.memoryBytesSpilled == 112)
184187
assert(stage1Data.memoryBytesSpilled == 206)
188+
assert(stage0Data.inputBytes == 114)
189+
assert(stage1Data.inputBytes == 207)
185190
assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
186191
.totalBlocksFetched == 2)
187192
assert(stage0Data.taskData.get(1235L).get.taskMetrics.get.shuffleReadMetrics.get
@@ -208,6 +213,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
208213
assert(stage1Data.diskBytesSpilled == 610)
209214
assert(stage0Data.memoryBytesSpilled == 412)
210215
assert(stage1Data.memoryBytesSpilled == 612)
216+
assert(stage0Data.inputBytes == 414)
217+
assert(stage1Data.inputBytes == 614)
211218
assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
212219
.totalBlocksFetched == 302)
213220
assert(stage1Data.taskData.get(1237L).get.taskMetrics.get.shuffleReadMetrics.get

dev/create-release/create-release.sh

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,13 @@ make_binary_release() {
117117
spark-$RELEASE_VERSION-bin-$NAME.tgz.sha
118118
}
119119

120-
make_binary_release "hadoop1" "-Phive -Phive-thriftserver -Dhadoop.version=1.0.4"
121-
make_binary_release "cdh4" "-Phive -Phive-thriftserver -Dhadoop.version=2.0.0-mr1-cdh4.2.0"
120+
make_binary_release "hadoop1" "-Phive -Phive-thriftserver -Dhadoop.version=1.0.4" &
121+
make_binary_release "cdh4" "-Phive -Phive-thriftserver -Dhadoop.version=2.0.0-mr1-cdh4.2.0" &
122122
make_binary_release "hadoop2" \
123-
"-Phive -Phive-thriftserver -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Pyarn.version=2.2.0"
123+
"-Phive -Phive-thriftserver -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Pyarn.version=2.2.0" &
124124
make_binary_release "hadoop2-without-hive" \
125-
"-Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Pyarn.version=2.2.0"
125+
"-Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Pyarn.version=2.2.0" &
126+
wait
126127

127128
# Copy data
128129
echo "Copying release tarballs"

docs/mllib-feature-extraction.md

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,65 @@ displayTitle: <a href="mllib-guide.html">MLlib</a> - Feature Extraction
99

1010
## Word2Vec
1111

12-
## TFIDF
12+
Word2Vec computes distributed vector representation of words. The main advantage of the distributed
13+
representations is that similar words are close in the vector space, which makes generalization to
14+
novel patterns easier and model estimation more robust. Distributed vector representation is
15+
showed to be useful in many natural language processing applications such as named entity
16+
recognition, disambiguation, parsing, tagging and machine translation.
17+
18+
### Model
19+
20+
In our implementation of Word2Vec, we used skip-gram model. The training objective of skip-gram is
21+
to learn word vector representations that are good at predicting its context in the same sentence.
22+
Mathematically, given a sequence of training words `$w_1, w_2, \dots, w_T$`, the objective of the
23+
skip-gram model is to maximize the average log-likelihood
24+
`\[
25+
\frac{1}{T} \sum_{t = 1}^{T}\sum_{j=-k}^{j=k} \log p(w_{t+j} | w_t)
26+
\]`
27+
where $k$ is the size of the training window.
28+
29+
In the skip-gram model, every word $w$ is associated with two vectors $u_w$ and $v_w$ which are
30+
vector representations of $w$ as word and context respectively. The probability of correctly
31+
predicting word $w_i$ given word $w_j$ is determined by the softmax model, which is
32+
`\[
33+
p(w_i | w_j ) = \frac{\exp(u_{w_i}^{\top}v_{w_j})}{\sum_{l=1}^{V} \exp(u_l^{\top}v_{w_j})}
34+
\]`
35+
where $V$ is the vocabulary size.
36+
37+
The skip-gram model with softmax is expensive because the cost of computing $\log p(w_i | w_j)$
38+
is proportional to $V$, which can be easily in order of millions. To speed up training of Word2Vec,
39+
we used hierarchical softmax, which reduced the complexity of computing of $\log p(w_i | w_j)$ to
40+
$O(\log(V))$
41+
42+
### Example
43+
44+
The example below demonstrates how to load a text file, parse it as an RDD of `Seq[String]`,
45+
construct a `Word2Vec` instance and then fit a `Word2VecModel` with the input data. Finally,
46+
we display the top 40 synonyms of the specified word. To run the example, first download
47+
the [text8](http://mattmahoney.net/dc/text8.zip) data and extract it to your preferred directory.
48+
Here we assume the extracted file is `text8` and in same directory as you run the spark shell.
49+
50+
<div class="codetabs">
51+
<div data-lang="scala">
52+
{% highlight scala %}
53+
import org.apache.spark._
54+
import org.apache.spark.rdd._
55+
import org.apache.spark.SparkContext._
56+
import org.apache.spark.mllib.feature.Word2Vec
57+
58+
val input = sc.textFile("text8").map(line => line.split(" ").toSeq)
59+
60+
val word2vec = new Word2Vec()
61+
62+
val model = word2vec.fit(input)
63+
64+
val synonyms = model.findSynonyms("china", 40)
65+
66+
for((synonym, cosineSimilarity) <- synonyms) {
67+
println(s"$synonym $cosineSimilarity")
68+
}
69+
{% endhighlight %}
70+
</div>
71+
</div>
72+
73+
## TFIDF

mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.spark.mllib.rdd.RDDFunctions._
3434
import org.apache.spark.rdd._
3535
import org.apache.spark.util.Utils
3636
import org.apache.spark.util.random.XORShiftRandom
37+
import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
3738

3839
/**
3940
* Entry in vocabulary
@@ -287,11 +288,12 @@ class Word2Vec extends Serializable with Logging {
287288
var syn0Global =
288289
Array.fill[Float](vocabSize * vectorSize)((initRandom.nextFloat() - 0.5f) / vectorSize)
289290
var syn1Global = new Array[Float](vocabSize * vectorSize)
290-
291291
var alpha = startingAlpha
292292
for (k <- 1 to numIterations) {
293293
val partial = newSentences.mapPartitionsWithIndex { case (idx, iter) =>
294294
val random = new XORShiftRandom(seed ^ ((idx + 1) << 16) ^ ((-k - 1) << 8))
295+
val syn0Modify = new Array[Int](vocabSize)
296+
val syn1Modify = new Array[Int](vocabSize)
295297
val model = iter.foldLeft((syn0Global, syn1Global, 0, 0)) {
296298
case ((syn0, syn1, lastWordCount, wordCount), sentence) =>
297299
var lwc = lastWordCount
@@ -321,7 +323,8 @@ class Word2Vec extends Serializable with Logging {
321323
// Hierarchical softmax
322324
var d = 0
323325
while (d < bcVocab.value(word).codeLen) {
324-
val l2 = bcVocab.value(word).point(d) * vectorSize
326+
val inner = bcVocab.value(word).point(d)
327+
val l2 = inner * vectorSize
325328
// Propagate hidden -> output
326329
var f = blas.sdot(vectorSize, syn0, l1, 1, syn1, l2, 1)
327330
if (f > -MAX_EXP && f < MAX_EXP) {
@@ -330,10 +333,12 @@ class Word2Vec extends Serializable with Logging {
330333
val g = ((1 - bcVocab.value(word).code(d) - f) * alpha).toFloat
331334
blas.saxpy(vectorSize, g, syn1, l2, 1, neu1e, 0, 1)
332335
blas.saxpy(vectorSize, g, syn0, l1, 1, syn1, l2, 1)
336+
syn1Modify(inner) += 1
333337
}
334338
d += 1
335339
}
336340
blas.saxpy(vectorSize, 1.0f, neu1e, 0, 1, syn0, l1, 1)
341+
syn0Modify(lastWord) += 1
337342
}
338343
}
339344
a += 1
@@ -342,21 +347,36 @@ class Word2Vec extends Serializable with Logging {
342347
}
343348
(syn0, syn1, lwc, wc)
344349
}
345-
Iterator(model)
350+
val syn0Local = model._1
351+
val syn1Local = model._2
352+
val synOut = new PrimitiveKeyOpenHashMap[Int, Array[Float]](vocabSize * 2)
353+
var index = 0
354+
while(index < vocabSize) {
355+
if (syn0Modify(index) != 0) {
356+
synOut.update(index, syn0Local.slice(index * vectorSize, (index + 1) * vectorSize))
357+
}
358+
if (syn1Modify(index) != 0) {
359+
synOut.update(index + vocabSize,
360+
syn1Local.slice(index * vectorSize, (index + 1) * vectorSize))
361+
}
362+
index += 1
363+
}
364+
Iterator(synOut)
346365
}
347-
val (aggSyn0, aggSyn1, _, _) =
348-
partial.treeReduce { case ((syn0_1, syn1_1, lwc_1, wc_1), (syn0_2, syn1_2, lwc_2, wc_2)) =>
349-
val n = syn0_1.length
350-
val weight1 = 1.0f * wc_1 / (wc_1 + wc_2)
351-
val weight2 = 1.0f * wc_2 / (wc_1 + wc_2)
352-
blas.sscal(n, weight1, syn0_1, 1)
353-
blas.sscal(n, weight1, syn1_1, 1)
354-
blas.saxpy(n, weight2, syn0_2, 1, syn0_1, 1)
355-
blas.saxpy(n, weight2, syn1_2, 1, syn1_1, 1)
356-
(syn0_1, syn1_1, lwc_1 + lwc_2, wc_1 + wc_2)
366+
val synAgg = partial.flatMap(x => x).reduceByKey { case (v1, v2) =>
367+
blas.saxpy(vectorSize, 1.0f, v2, 1, v1, 1)
368+
v1
369+
}.collect()
370+
var i = 0
371+
while (i < synAgg.length) {
372+
val index = synAgg(i)._1
373+
if (index < vocabSize) {
374+
Array.copy(synAgg(i)._2, 0, syn0Global, index * vectorSize, vectorSize)
375+
} else {
376+
Array.copy(synAgg(i)._2, 0, syn1Global, (index - vocabSize) * vectorSize, vectorSize)
357377
}
358-
syn0Global = aggSyn0
359-
syn1Global = aggSyn1
378+
i += 1
379+
}
360380
}
361381
newSentences.unpersist()
362382

@@ -414,15 +434,6 @@ class Word2VecModel private[mllib] (
414434
}
415435
}
416436

417-
/**
418-
* Transforms an RDD to its vector representation
419-
* @param dataset a an RDD of words
420-
* @return RDD of vector representation
421-
*/
422-
def transform(dataset: RDD[String]): RDD[Vector] = {
423-
dataset.map(word => transform(word))
424-
}
425-
426437
/**
427438
* Find synonyms of a word
428439
* @param word a word

0 commit comments

Comments
 (0)