Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions docs/mllib-feature-extraction.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,49 @@ tf.cache()
val idf = new IDF(minDocFreq = 2).fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">

TF and IDF are implemented in [HashingTF](api/python/pyspark.mllib.html#pyspark.mllib.feature.HashingTF)
and [IDF](api/python/pyspark.mllib.html#pyspark.mllib.feature.IDF).
`HashingTF` takes an RDD of list as the input.
Each record could be an iterable of strings or other types.

{% highlight python %}
from pyspark import SparkContext
from pyspark.mllib.feature import HashingTF

sc = SparkContext()

# Load documents (one per line).
documents = sc.textFile("...").map(lambda line: line.split(" "))

hashingTF = HashingTF()
tf = hashingTF.transform(documents)
{% endhighlight %}

While applying `HashingTF` only needs a single pass to the data, applying `IDF` needs two passes:
first to compute the IDF vector and second to scale the term frequencies by IDF.

{% highlight python %}
from pyspark.mllib.feature import IDF

# ... continue from the previous example
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
{% endhighlight %}

MLLib's IDF implementation provides an option for ignoring terms which occur in less than a
minimum number of documents. In such cases, the IDF for these terms is set to 0. This feature
can be used by passing the `minDocFreq` value to the IDF constructor.

{% highlight python %}
# ... continue from the previous example
tf.cache()
idf = IDF(minDocFreq=2).fit(tf)
tfidf = idf.transform(tf)
{% endhighlight %}
</div>
</div>

Expand Down Expand Up @@ -223,6 +264,29 @@ val data1 = data.map(x => (x.label, scaler1.transform(x.features)))
val data2 = data.map(x => (x.label, scaler2.transform(Vectors.dense(x.features.toArray))))
{% endhighlight %}
</div>

<div data-lang="python">
{% highlight python %}
from pyspark.mllib.util import MLUtils
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import StandardScaler

data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
label = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)

scaler1 = StandardScaler().fit(features)
scaler2 = StandardScaler(withMean=True, withStd=True).fit(features)

# data1 will be unit variance.
data1 = label.zip(scaler1.transform(features))

# Without converting the features into dense vectors, transformation with zero mean will raise
# exception on sparse vector.
# data2 will be unit variance and zero mean.
data2 = label.zip(scaler1.transform(features.map(lambda x: Vectors.dense(x.toArray()))))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this run for you? It fails for me after calling data2.collect(). I think the bug is in linalg.py:426

for i in xrange(self.indices.size):

where self.indices does not have method size. I figure it should be len(self.indices)
(This bug must have been from a previous PR.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, thx!

{% endhighlight %}
</div>
</div>

## Normalizer
Expand Down Expand Up @@ -267,4 +331,25 @@ val data1 = data.map(x => (x.label, normalizer1.transform(x.features)))
val data2 = data.map(x => (x.label, normalizer2.transform(x.features)))
{% endhighlight %}
</div>

<div data-lang="python">
{% highlight python %}
from pyspark.mllib.util import MLUtils
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import Normalizer

data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
labels = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)

normalizer1 = Normalizer()
normalizer2 = Normalizer(p=float("inf"))

# Each sample in data1 will be normalized using $L^2$ norm.
data1 = labels.zip(normalizer1.transform(features))

# Each sample in data2 will be normalized using $L^\infty$ norm.
data2 = labels.zip(normalizer2.transform(features))
{% endhighlight %}
</div>
</div>
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.feature.Word2Vec
import org.apache.spark.mllib.feature.Word2VecModel
import org.apache.spark.mllib.feature._
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.random.{RandomRDDs => RG}
Expand Down Expand Up @@ -291,6 +290,43 @@ class PythonMLLibAPI extends Serializable {
ALS.trainImplicit(ratingsJRDD.rdd, rank, iterations, lambda, blocks, alpha)
}

/**
* Java stub for Normalizer.transform()
*/
def normalizeVector(p: Double, vector: Vector): Vector = {
new Normalizer(p).transform(vector)
}

/**
* Java stub for Normalizer.transform()
*/
def normalizeVector(p: Double, rdd: JavaRDD[Vector]): JavaRDD[Vector] = {
new Normalizer(p).transform(rdd)
}

/**
* Java stub for IDF.fit(). This stub returns a
* handle to the Java object instead of the content of the Java object.
* Extra care needs to be taken in the Python code to ensure it gets freed on
* exit; see the Py4J documentation.
*/
def fitStandardScaler(
withMean: Boolean,
withStd: Boolean,
data: JavaRDD[Vector]): StandardScalerModel = {
new StandardScaler(withMean, withStd).fit(data.rdd)
}

/**
* Java stub for IDF.fit(). This stub returns a
* handle to the Java object instead of the content of the Java object.
* Extra care needs to be taken in the Python code to ensure it gets freed on
* exit; see the Py4J documentation.
*/
def fitIDF(minDocFreq: Int, dataset: JavaRDD[Vector]): IDFModel = {
new IDF(minDocFreq).fit(dataset)
}

/**
* Java stub for Python mllib Word2Vec fit(). This stub returns a
* handle to the Java object instead of the content of the Java object.
Expand Down Expand Up @@ -328,6 +364,15 @@ class PythonMLLibAPI extends Serializable {
model.transform(word)
}

/**
* Transforms an RDD of words to its vector representation
* @param rdd an RDD of words
* @return an RDD of vector representations of words
*/
def transform(rdd: JavaRDD[String]): JavaRDD[Vector] = {
rdd.rdd.map(model.transform)
}

def findSynonyms(word: String, num: Int): java.util.List[java.lang.Object] = {
val vec = transform(word)
findSynonyms(vec, num)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.mllib.feature

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD

Expand Down Expand Up @@ -48,4 +49,14 @@ trait VectorTransformer extends Serializable {
data.map(x => this.transform(x))
}

/**
* Applies transformation on an JavaRDD[Vector].
*
* @param data JavaRDD[Vector] to be transformed.
* @return transformed JavaRDD[Vector].
*/
def transform(data: JavaRDD[Vector]): JavaRDD[Vector] = {
transform(data.rdd)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ class Word2VecModel private[mllib] (
throw new IllegalStateException(s"$word not in vocabulary")
}
}

/**
* Find synonyms of a word
* @param word a word
Expand All @@ -443,7 +443,7 @@ class Word2VecModel private[mllib] (
val vector = transform(word)
findSynonyms(vector,num)
}

/**
* Find synonyms of the vector representation of a word
* @param vector vector representation of a word
Expand Down
Loading