From 8a50584ed6ea38b5fccc64e6da3fc18d4513c9c5 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 15 Oct 2014 17:02:16 -0700 Subject: [PATCH 1/9] Python API for mllib.feature --- .../mllib/api/python/PythonMLLibAPI.scala | 50 ++- .../mllib/feature/VectorTransformer.scala | 11 + .../apache/spark/mllib/feature/Word2Vec.scala | 4 +- python/pyspark/mllib/feature.py | 401 ++++++++++++++---- python/pyspark/mllib/linalg.py | 2 +- 5 files changed, 373 insertions(+), 95 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index f7251e65e04f1..e76659d894a95 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -29,8 +29,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.mllib.classification._ import org.apache.spark.mllib.clustering._ -import org.apache.spark.mllib.feature.Word2Vec -import org.apache.spark.mllib.feature.Word2VecModel +import org.apache.spark.mllib.feature._ import org.apache.spark.mllib.optimization._ import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.random.{RandomRDDs => RG} @@ -289,6 +288,43 @@ class PythonMLLibAPI extends Serializable { ALS.trainImplicit(ratingsJRDD.rdd, rank, iterations, lambda, blocks, alpha) } + /** + * Java stub for Normalizer.transform() + */ + def normalizeVector(p: Double, vector: Vector): Vector = { + new Normalizer(p).transform(vector) + } + + /** + * Java stub for Normalizer.transform() + */ + def normalizeVector(p: Double, rdd: JavaRDD[Vector]): JavaRDD[Vector] = { + new Normalizer(p).transform(rdd) + } + + /** + * Java stub for IDF.fit(). This stub returns a + * handle to the Java object instead of the content of the Java object. + * Extra care needs to be taken in the Python code to ensure it gets freed on + * exit; see the Py4J documentation. + */ + def fitStandardScaler( + withMean: Boolean, + withStd: Boolean, + data: JavaRDD[Vector]): StandardScalerModel = { + new StandardScaler(withMean, withStd).fit(data.rdd) + } + + /** + * Java stub for IDF.fit(). This stub returns a + * handle to the Java object instead of the content of the Java object. + * Extra care needs to be taken in the Python code to ensure it gets freed on + * exit; see the Py4J documentation. + */ + def fitIDF(minDocFreq: Int, dataset: JavaRDD[Vector]): IDFModel = { + new IDF(minDocFreq).fit(dataset) + } + /** * Java stub for Python mllib Word2Vec fit(). This stub returns a * handle to the Java object instead of the content of the Java object. @@ -326,6 +362,16 @@ class PythonMLLibAPI extends Serializable { model.transform(word) } + /** + * TODO: model is not serializable + * Transforms an RDD of words to its vector representation + * @param rdd an RDD of words + * @return an RDD of vector representations of words + */ + def transform(rdd: JavaRDD[String]): JavaRDD[Vector] = { + rdd.rdd.map(model.transform(_)) + } + def findSynonyms(word: String, num: Int): java.util.List[java.lang.Object] = { val vec = transform(word) findSynonyms(vec, num) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala index 415a845332d45..fac413f8ee24a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala @@ -20,6 +20,7 @@ package org.apache.spark.mllib.feature import org.apache.spark.annotation.DeveloperApi import org.apache.spark.mllib.linalg.Vector import org.apache.spark.rdd.RDD +import org.apache.spark.api.java.JavaRDD /** * :: DeveloperApi :: @@ -48,4 +49,14 @@ trait VectorTransformer extends Serializable { data.map(x => this.transform(x)) } + /** + * Applies transformation on an JavaRDD[Vector]. + * + * @param data JavaRDD[Vector] to be transformed. + * @return transformed JavaRDD[Vector]. + */ + def transform(data: JavaRDD[Vector]): JavaRDD[Vector] = { + transform(data.rdd) + } + } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index d321994c2a651..f5f7ad613d4c4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -432,7 +432,7 @@ class Word2VecModel private[mllib] ( throw new IllegalStateException(s"$word not in vocabulary") } } - + /** * Find synonyms of a word * @param word a word @@ -443,7 +443,7 @@ class Word2VecModel private[mllib] ( val vector = transform(word) findSynonyms(vector,num) } - + /** * Find synonyms of the vector representation of a word * @param vector vector representation of a word diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py index f4cbf31b94fe2..b648047b12cce 100644 --- a/python/pyspark/mllib/feature.py +++ b/python/pyspark/mllib/feature.py @@ -18,60 +18,324 @@ """ Python package for feature in MLlib. """ +import warnings + +from py4j.protocol import Py4JJavaError +from py4j.java_gateway import JavaObject + +from pyspark import RDD, SparkContext from pyspark.serializers import PickleSerializer, AutoBatchedSerializer +from pyspark.mllib.linalg import Vectors + +__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler', + 'HashTF', 'IDFModel', 'IDF', + 'Word2Vec', 'Word2VecModel'] + + +# TODO: move these helper functions into utils +_picklable_classes = [ + 'LinkedList', + 'SparseVector', + 'DenseVector', + 'DenseMatrix', + 'Rating', + 'LabeledPoint', +] + + +def _py2java(sc, a): + """ Convert Python object into Java """ + if isinstance(a, RDD): + a = a._to_java_object_rdd() + elif not isinstance(a, (int, long, float, bool, basestring)): + bytes = bytearray(PickleSerializer().dumps(a)) + a = sc._jvm.SerDe.loads(bytes) + return a + -from pyspark.mllib.linalg import _convert_to_vector +def _java2py(sc, r): + if isinstance(r, JavaObject): + clsName = r.getClass().getSimpleName() + if clsName in ("RDD", "JavaRDD"): + if clsName == "RDD": + r = r.toJavaRDD() + jrdd = sc._jvm.PythonRDD.javaToPython(r) + return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer())) -__all__ = ['Word2Vec', 'Word2VecModel'] + elif clsName in _picklable_classes: + r = sc._jvm.SerDe.dumps(r) + if isinstance(r, bytearray): + r = PickleSerializer().loads(str(r)) + return r -class Word2VecModel(object): + +def _callJavaFunc(sc, func, *args): + """ Call Java Function """ - class for Word2Vec model + args = [_py2java(sc, a) for a in args] + return _java2py(sc, func(*args)) + + +def _callAPI(sc, name, *args): + """ Call API in PythonMLLibAPI """ - def __init__(self, sc, java_model): + api = getattr(sc._jvm.PythonMLLibAPI(), name) + return _callJavaFunc(sc, api, *args) + + +class VectorTransformer(object): + """ + :: DeveloperApi :: + Base class for transformation of a vector or RDD of vector + """ + def transform(self, vector): + """ + Applies transformation on a vector. + + :param vector: vector to be transformed. """ - :param sc: Spark context - :param java_model: Handle to Java model object + raise NotImplementedError + + +class Normalizer(VectorTransformer): + """ + :: Experimental :: + Normalizes samples individually to unit L^p^ norm + + For any 1 <= p < Double.PositiveInfinity, normalizes samples using + sum(abs(vector).^p^)^(1/p)^ as norm. + + For p = Double.PositiveInfinity, max(abs(vector)) will be used as + norm for normalization. + + >>> v = Vectors.dense(range(3)) + >>> nor = Normalizer(1) + >>> nor.transform(v) + DenseVector([0.0, 0.3333333333333333, 0.6666666666666666]) + + >>> rdd = sc.parallelize([v]) + >>> nor.transform(rdd).collect() + [DenseVector([0.0, 0.3333333333333333, 0.6666666666666666])] + """ + def __init__(self, p=2): + """ + :param p: Normalization in L^p^ space, p = 2 by default. + """ + assert p >= 1.0, "p should be greater than 1.0" + self.p = float(p) + + def transform(self, vector): """ + Applies unit length normalization on a vector. + + :param vector: vector to be normalized. + :return: normalized vector. If the norm of the input is zero, it + will return the input vector. + """ + sc = SparkContext._active_spark_context + assert sc is not None, "SparkContext should be initialized first" + return _callAPI(sc, "normalizeVector", self.p, vector) + + +class JavaModelWrapper(VectorTransformer): + """ + Wrapper for the model in JVM + """ + def __init__(self, sc, java_model): self._sc = sc self._java_model = java_model def __del__(self): self._sc._gateway.detach(self._java_model) - def transform(self, word): + def transform(self, dataset): + return _callJavaFunc(self._sc, self._java_model.transform, dataset) + + +class StandardScalerModel(JavaModelWrapper): + """ + :: Experimental :: + Represents a StandardScaler model that can transform vectors. + """ + def transform(self, vector): """ - :param word: a word - :return: vector representation of word + Applies standardization transformation on a vector. + + :param vector: Vector to be standardized. + :return: Standardized vector. If the variance of a column is zero, + it will return default `0.0` for the column with zero variance. + """ + JavaModelWrapper.transform(vector) + + +class StandardScaler(object): + """ + :: Experimental :: + Standardizes features by removing the mean and scaling to unit + variance using column summary statistics on the samples in the + training set. + + >>> vs = [Vectors.dense([-2.0, 2.3, 0]), Vectors.dense([3.8, 0.0, 1.9])] + >>> dataset = sc.parallelize(vs) + >>> standardizer = StandardScaler(True, True) + >>> model = standardizer.fit(dataset) + >>> result = model.transform(dataset) + >>> for r in result.collect(): r + DenseVector([-0.7071067811865475, 0.7071067811865475, -0.7071067811865476]) + DenseVector([0.7071067811865475, -0.7071067811865475, 0.7071067811865476]) + """ + def __init__(self, withMean=False, withStd=True): + """ + :param withMean: False by default. Centers the data with mean + before scaling. It will build a dense output, so this + does not work on sparse input and will raise an exception. + :param withStd: True by default. Scales the data to unit standard + deviation. + """ + if not (withMean or withStd): + warnings.warn("Both withMean and withStd are false. The model does nothing.") + self.withMean = withMean + self.withStd = withStd + + def fit(self, dataset): + """ + Computes the mean and variance and stores as a model to be used for later scaling. + + :param data: The data used to compute the mean and variance to build + the transformation model. + :return: a StandardScalarModel + """ + sc = dataset.context + jmodel = _callAPI(sc, "fitStandardScaler", self.withMean, self.withStd, dataset) + return StandardScalerModel(sc, jmodel) + + +class HashTF(object): + """ + :: Experimental :: + Maps a sequence of terms to their term frequencies using the hashing trick. + >>> htf = HashTF(100) + >>> doc = "a a b b c d".split(" ") + >>> htf.transform(doc) + SparseVector(100, {1: 1.0, 14: 1.0, 31: 2.0, 44: 2.0}) + """ + def __init__(self, numFeatures=1 << 20): + """ + :param numFeatures: number of features (default: 2^20) + """ + self.numFeatures = numFeatures + + def indexOf(self, term): + """ Returns the index of the input term. """ + return hash(term) % self.numFeatures + + def transform(self, document): + """ + Transforms the input document (list of terms) to term frequency vectors, + or transform the RDD of document to RDD of term frequency vectors. + """ + if isinstance(document, RDD): + return document.map(self.transform) + + freq = {} + for term in document: + i = self.indexOf(term) + freq[i] = freq.get(i, 0) + 1.0 + return Vectors.sparse(self.numFeatures, freq.items()) + + +class IDFModel(JavaModelWrapper): + """ + Represents an IDF model that can transform term frequency vectors. + """ + def transform(self, dataset): + """ + Transforms term frequency (TF) vectors to TF-IDF vectors. + + If `minDocFreq` was set for the IDF calculation, + the terms which occur in fewer than `minDocFreq` + documents will have an entry of 0. + + :param dataset: an RDD of term frequency vectors + :return: an RDD of TF-IDF vectors + """ + JavaModelWrapper.transform(dataset) + + +class IDF(object): + """ + :: Experimental :: + Inverse document frequency (IDF). + + The standard formulation is used: `idf = log((m + 1) / (d(t) + 1))`, + where `m` is the total number of documents and `d(t)` is the number + of documents that contain term `t`. + + This implementation supports filtering out terms which do not appear + in a minimum number of documents (controlled by the variable `minDocFreq`). + For terms that are not in at least `minDocFreq` documents, the IDF is + found as 0, resulting in TF-IDFs of 0. + + >>> n = 4 + >>> freqs = [Vectors.sparse(n, (1, 3), (1.0, 2.0)), + ... Vectors.dense([0.0, 1.0, 2.0, 3.0]), + ... Vectors.sparse(n, [1], [1.0])] + >>> data = sc.parallelize(freqs) + >>> idf = IDF() + >>> model = idf.fit(data) + >>> tfidf = model.transform(data) + >>> for r in tfidf.collect(): r + SparseVector(4, {1: 0.0, 3: 0.575364144904}) + DenseVector([0.0, 0.0, 1.3862943611198906, 0.8630462173553426]) + SparseVector(4, {1: 0.0}) + """ + def __init__(self, minDocFreq=0): + """ + :param minDocFreq: minimum of documents in which a term + should appear for filtering + """ + self.minDocFreq = minDocFreq + + def fit(self, dataset): + """ + Computes the inverse document frequency. + + :param dataset: an RDD of term frequency vectors + """ + sc = dataset.context + jmodel = _callAPI(sc, "fitIDF", self.minDocFreq, dataset) + return IDFModel(sc, jmodel) + + +class Word2VecModel(JavaModelWrapper): + """ + class for Word2Vec model + """ + def transform(self, word): + """ Transforms a word to its vector representation - Note: local use only + :param word: a word + :return: vector representation of word(s) """ - # TODO: make transform usable in RDD operations from python side - result = self._java_model.transform(word) - return PickleSerializer().loads(str(self._sc._jvm.SerDe.dumps(result))) + try: + return _callJavaFunc(self._sc, self._java_model.transform, word) + except Py4JJavaError: + raise ValueError("%s not found" % word) - def findSynonyms(self, x, num): + def findSynonyms(self, word, num): """ - :param x: a word or a vector representation of word + Find synonyms of a word + + :param word: a word or a vector representation of word :param num: number of synonyms to find :return: array of (word, cosineSimilarity) - Find synonyms of a word - Note: local use only """ - # TODO: make findSynonyms usable in RDD operations from python side - ser = PickleSerializer() - if type(x) == str: - jlist = self._java_model.findSynonyms(x, num) - else: - bytes = bytearray(ser.dumps(_convert_to_vector(x))) - vec = self._sc._jvm.SerDe.loads(bytes) - jlist = self._java_model.findSynonyms(vec, num) - words, similarity = ser.loads(str(self._sc._jvm.SerDe.dumps(jlist))) + words, similarity = _callJavaFunc(self._sc, self._java_model.findSynonyms, word, num) return zip(words, similarity) @@ -86,6 +350,7 @@ class Word2Vec(object): We used skip-gram model in our implementation and hierarchical softmax method to train the model. The variable names in the implementation matches the original C implementation. + For original C implementation, see https://code.google.com/p/word2vec/ For research papers, see Efficient Estimation of Word Representations in Vector Space @@ -95,90 +360,46 @@ class Word2Vec(object): >>> sentence = "a b " * 100 + "a c " * 10 >>> localDoc = [sentence, sentence] >>> doc = sc.parallelize(localDoc).map(lambda line: line.split(" ")) - >>> model = Word2Vec().setVectorSize(10).setSeed(42L).fit(doc) + >>> model = Word2Vec(vectorSize=10).fit(doc) + >>> syms = model.findSynonyms("a", 2) - >>> str(syms[0][0]) - 'b' - >>> str(syms[1][0]) - 'c' - >>> len(syms) - 2 + >>> [s[0] for s in syms] + [u'b', u'c'] >>> vec = model.transform("a") - >>> len(vec) - 10 >>> syms = model.findSynonyms(vec, 2) - >>> str(syms[0][0]) - 'b' - >>> str(syms[1][0]) - 'c' - >>> len(syms) - 2 + >>> [s[0] for s in syms] + [u'b', u'c'] """ - def __init__(self): + def __init__(self, vectorSize=100, learningRate=0.025, numPartitions=1, + numIterations=1, seed=42L): """ Construct Word2Vec instance - """ - self.vectorSize = 100 - self.learningRate = 0.025 - self.numPartitions = 1 - self.numIterations = 1 - self.seed = 42L - def setVectorSize(self, vectorSize): - """ - Sets vector size (default: 100). + :param vectorSize: vector size (default: 100). + :param learningRate: initial learning rate (default: 0.025). + :param numPartitions: number of partitions (default: 1). Use + a small number for accuracy. + :param numIterations: number of iterations (default: 1), which should + be smaller than or equal to number of partitions. """ self.vectorSize = vectorSize - return self - - def setLearningRate(self, learningRate): - """ - Sets initial learning rate (default: 0.025). - """ self.learningRate = learningRate - return self - - def setNumPartitions(self, numPartitions): - """ - Sets number of partitions (default: 1). Use a small number for accuracy. - """ self.numPartitions = numPartitions - return self - - def setNumIterations(self, numIterations): - """ - Sets number of iterations (default: 1), which should be smaller than or equal to number of - partitions. - """ self.numIterations = numIterations - return self - - def setSeed(self, seed): - """ - Sets random seed. - """ self.seed = seed - return self def fit(self, data): """ Computes the vector representation of each word in vocabulary. :param data: training data. RDD of subtype of Iterable[String] - :return: python Word2VecModel instance + :return: Word2VecModel instance """ sc = data.context - ser = PickleSerializer() - vectorSize = self.vectorSize - learningRate = self.learningRate - numPartitions = self.numPartitions - numIterations = self.numIterations - seed = self.seed - - model = sc._jvm.PythonMLLibAPI().trainWord2Vec( - data._to_java_object_rdd(), vectorSize, - learningRate, numPartitions, numIterations, seed) - return Word2VecModel(sc, model) + jmodel = _callAPI(sc, "trainWord2Vec", data, int(self.vectorSize), + float(self.learningRate), int(self.numPartitions), + int(self.numIterations), long(self.seed)) + return Word2VecModel(sc, jmodel) def _test(): diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index 24c5480b2f753..c6bacea15f095 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -215,7 +215,7 @@ def __str__(self): return "[" + ",".join([str(v) for v in self.array]) + "]" def __repr__(self): - return "DenseVector(%r)" % self.array + return "DenseVector(%r)" % list(self.array) def __eq__(self, other): return isinstance(other, DenseVector) and self.array == other.array From 486795f1d8792c15c9f97b22b1015b23fb7c8d81 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 15 Oct 2014 17:35:50 -0700 Subject: [PATCH 2/9] update programming guide, HashTF -> HashingTF --- docs/mllib-feature-extraction.md | 100 +++++++++++++++++++++++++++++++ python/pyspark/mllib/feature.py | 4 +- 2 files changed, 102 insertions(+), 2 deletions(-) diff --git a/docs/mllib-feature-extraction.md b/docs/mllib-feature-extraction.md index 1511ae6dda4ed..4d56fa2ff1112 100644 --- a/docs/mllib-feature-extraction.md +++ b/docs/mllib-feature-extraction.md @@ -95,8 +95,50 @@ tf.cache() val idf = new IDF(minDocFreq = 2).fit(tf) val tfidf: RDD[Vector] = idf.transform(tf) {% endhighlight %} + +
+ +TF and IDF are implemented in [HashingTF](api/python/pyspark.mllib.html#pyspark.mllib.feature.HashingTF) +and [IDF](api/python/pyspark.mllib.html#pyspark.mllib.feature.IDF). +`HashingTF` takes an RDD of list as the input. +Each record could be an iterable of strings or other types. + +{% highlight python %} +from pyspark import SparkContext +from pyspark.mllib.linalg import Vector +from pyspark.mllib.feature import HashingTF + +sc = SparkContext() + +# Load documents (one per line). +documents = sc.textFile("...").map(lambda line: line.split(" ")) + +hashingTF = HashingTF() +tf = hashingTF.transform(documents) +{% endhighlight %} + +While applying `HashingTF` only needs a single pass to the data, applying `IDF` needs two passes: +first to compute the IDF vector and second to scale the term frequencies by IDF. + +{% highlight python %} +from pyspark.mllib.feature import IDF + +# ... continue from the previous example +tf.cache() +idf = IDF().fit(tf) +tfidf = idf.transform(tf) +{% endhighlight %} +MLLib's IDF implementation provides an option for ignoring terms which occur in less than a +minimum number of documents. In such cases, the IDF for these terms is set to 0. This feature +can be used by passing the `minDocFreq` value to the IDF constructor. +{% highlight python %} +# ... continue from the previous example +tf.cache() +idf = IDF().fit(tf) +tfidf = idf.transform(tf) +{% endhighlight %}
@@ -162,6 +204,20 @@ for((synonym, cosineSimilarity) <- synonyms) { } {% endhighlight %} +
+{% highlight python %} +from pyspark.mllib.feature import Word2Vec + +input = sc.textFile("text8").map(lambda line: line.split(" ")) + +word2vec = Word2Vec() +model = word2vec.fit(input) + +synonyms = model.findSynonyms("china", 40) +for synonym, cosineSimilarity in synonyms: + print synonym, cosineSimilarity +{% endhighlight %} +
## StandardScaler @@ -223,6 +279,29 @@ val data1 = data.map(x => (x.label, scaler1.transform(x.features))) val data2 = data.map(x => (x.label, scaler2.transform(Vectors.dense(x.features.toArray)))) {% endhighlight %} + +
+{% highlight python %} +from pyspark.mllib.util import MLUtils +from pyspark.mllib.linalg import Vectors +from pyspark.mllib.feature import StandardScaler + +data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") +label = data.map(lambda x: x.label) +features = data.map(lambda x: x.features) + +scaler1 = StandardScaler().fit(features) +scaler2 = StandardScaler(withMean=True, withStd=True).fit(features) + +# data1 will be unit variance. +data1 = label.zip(scaler1.transform(features)) + +# Without converting the features into dense vectors, transformation with zero mean will raise +# exception on sparse vector. +# data2 will be unit variance and zero mean. +data2 = label.zip(scaler1.transform(features.map(lambda x: Vectors.dense(x.toArray())))) +{% endhighlight %} +
## Normalizer @@ -267,4 +346,25 @@ val data1 = data.map(x => (x.label, normalizer1.transform(x.features))) val data2 = data.map(x => (x.label, normalizer2.transform(x.features))) {% endhighlight %} + +
+{% highlight python %} +from pyspark.mllib.util import MLUtils +from pyspark.mllib.linalg import Vectors +from pyspark.mllib.feature import Normalizer + +data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") +label = data.map(lambda x: x.label) +features = data.map(lambda x: x.features) + +normalizer1 = Normalizer() +normalizer2 = Normalizer(p=float("inf")) + +# Each sample in data1 will be normalized using $L^2$ norm. +data1 = label.zip(normalizer1.transform(features)) + +# Each sample in data2 will be normalized using $L^\infty$ norm. +data2 = label.zip(normalizer2.transform(features)) +{% endhighlight %} +
diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py index b648047b12cce..cb7fb236cea20 100644 --- a/python/pyspark/mllib/feature.py +++ b/python/pyspark/mllib/feature.py @@ -211,12 +211,12 @@ def fit(self, dataset): return StandardScalerModel(sc, jmodel) -class HashTF(object): +class HashingTF(object): """ :: Experimental :: Maps a sequence of terms to their term frequencies using the hashing trick. - >>> htf = HashTF(100) + >>> htf = HashingTF(100) >>> doc = "a a b b c d".split(" ") >>> htf.transform(doc) SparseVector(100, {1: 1.0, 14: 1.0, 31: 2.0, 44: 2.0}) From 7a1891abe6647a5f9dc82c21add907fe2d4b9aa8 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 15 Oct 2014 20:35:41 -0700 Subject: [PATCH 3/9] fix tests --- python/pyspark/mllib/feature.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py index cb7fb236cea20..5cfa8ecc1dec5 100644 --- a/python/pyspark/mllib/feature.py +++ b/python/pyspark/mllib/feature.py @@ -166,7 +166,7 @@ def transform(self, vector): :return: Standardized vector. If the variance of a column is zero, it will return default `0.0` for the column with zero variance. """ - JavaModelWrapper.transform(vector) + return JavaModelWrapper.transform(self, vector) class StandardScaler(object): @@ -261,7 +261,7 @@ def transform(self, dataset): :param dataset: an RDD of term frequency vectors :return: an RDD of TF-IDF vectors """ - JavaModelWrapper.transform(dataset) + return JavaModelWrapper.transform(self, dataset) class IDF(object): From a405ae7b967a1a9398e3cdbb812149be7314f29e Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 15 Oct 2014 22:08:21 -0700 Subject: [PATCH 4/9] fix tests --- python/pyspark/mllib/feature.py | 12 ++++++------ python/pyspark/mllib/linalg.py | 14 +++++++++++--- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py index 5cfa8ecc1dec5..3fc0a3e49fcdb 100644 --- a/python/pyspark/mllib/feature.py +++ b/python/pyspark/mllib/feature.py @@ -112,11 +112,11 @@ class Normalizer(VectorTransformer): >>> v = Vectors.dense(range(3)) >>> nor = Normalizer(1) >>> nor.transform(v) - DenseVector([0.0, 0.3333333333333333, 0.6666666666666666]) + DenseVector([0.0, 0.3333, 0.6667]) >>> rdd = sc.parallelize([v]) >>> nor.transform(rdd).collect() - [DenseVector([0.0, 0.3333333333333333, 0.6666666666666666])] + [DenseVector([0.0, 0.3333, 0.6667])] """ def __init__(self, p=2): """ @@ -182,8 +182,8 @@ class StandardScaler(object): >>> model = standardizer.fit(dataset) >>> result = model.transform(dataset) >>> for r in result.collect(): r - DenseVector([-0.7071067811865475, 0.7071067811865475, -0.7071067811865476]) - DenseVector([0.7071067811865475, -0.7071067811865475, 0.7071067811865476]) + DenseVector([-0.7071, 0.7071, -0.7071]) + DenseVector([0.7071, -0.7071, 0.7071]) """ def __init__(self, withMean=False, withStd=True): """ @@ -287,8 +287,8 @@ class IDF(object): >>> model = idf.fit(data) >>> tfidf = model.transform(data) >>> for r in tfidf.collect(): r - SparseVector(4, {1: 0.0, 3: 0.575364144904}) - DenseVector([0.0, 0.0, 1.3862943611198906, 0.8630462173553426]) + SparseVector(4, {1: 0.0, 3: 0.5754}) + DenseVector([0.0, 0.0, 1.3863, 0.863]) SparseVector(4, {1: 0.0}) """ def __init__(self, minDocFreq=0): diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index c6bacea15f095..04f85a2b7a0d4 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -98,6 +98,13 @@ def _vector_size(v): raise TypeError("Cannot treat type %s as a vector" % type(v)) +def _format_float(f, digits=4): + s = str(round(f, 4)) + if '.' in s: + s = s[:s.index('.') + 1 + digits] + return s + + class Vector(object): """ Abstract class for DenseVector and SparseVector @@ -215,7 +222,7 @@ def __str__(self): return "[" + ",".join([str(v) for v in self.array]) + "]" def __repr__(self): - return "DenseVector(%r)" % list(self.array) + return "DenseVector([%s])" % (', '.join(_format_float(i) for i in self.array)) def __eq__(self, other): return isinstance(other, DenseVector) and self.array == other.array @@ -418,7 +425,8 @@ def __str__(self): def __repr__(self): inds = self.indices vals = self.values - entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))]) + entries = ", ".join(["{0}: {1}".format(inds[i], _format_float(vals[i])) + for i in xrange(len(inds))]) return "SparseVector({0}, {{{1}}})".format(self.size, entries) def __eq__(self, other): @@ -478,7 +486,7 @@ def dense(elements): returns a NumPy array. >>> Vectors.dense([1, 2, 3]) - DenseVector(array('d', [1.0, 2.0, 3.0])) + DenseVector([1.0, 2.0, 3.0]) """ return DenseVector(elements) From 3abb8c2da68633d3312c2c8c3bf1680bb0ee8edf Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 27 Oct 2014 13:36:04 -0700 Subject: [PATCH 5/9] address comments --- .../mllib/api/python/PythonMLLibAPI.scala | 3 +- python/pyspark/mllib/feature.py | 29 +++++++++++++++++-- python/pyspark/mllib/linalg.py | 4 +-- 3 files changed, 29 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index d67baf78bc5f4..23d18704ceab7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -365,13 +365,12 @@ class PythonMLLibAPI extends Serializable { } /** - * TODO: model is not serializable * Transforms an RDD of words to its vector representation * @param rdd an RDD of words * @return an RDD of vector representations of words */ def transform(rdd: JavaRDD[String]): JavaRDD[Vector] = { - rdd.rdd.map(model.transform(_)) + rdd.rdd.map(model.transform) } def findSynonyms(word: String, num: Int): java.util.List[java.lang.Object] = { diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py index 3e1a3cdec6035..91fe733ba0e82 100644 --- a/python/pyspark/mllib/feature.py +++ b/python/pyspark/mllib/feature.py @@ -20,6 +20,7 @@ """ import warnings +import py4j.protocol from py4j.protocol import Py4JJavaError from py4j.java_gateway import JavaObject @@ -32,6 +33,25 @@ 'Word2Vec', 'Word2VecModel'] +# Hack for support float('inf') in Py4j +old_smart_decode = py4j.protocol.smart_decode + +float_str_mapping = { + u'nan': u'NaN', + u'inf': u'Infinity', + u'-inf': u'-Infinity', +} + + +def new_smart_decode(obj): + if isinstance(obj, float): + s = unicode(obj) + return float_str_mapping.get(s, s) + return old_smart_decode(obj) + +py4j.protocol.smart_decode = new_smart_decode + + # TODO: move these helper functions into utils _picklable_classes = [ 'LinkedList', @@ -103,11 +123,10 @@ class Normalizer(VectorTransformer): :: Experimental :: Normalizes samples individually to unit L^p^ norm - For any 1 <= p < Double.PositiveInfinity, normalizes samples using + For any 1 <= p <= float('inf'), normalizes samples using sum(abs(vector).^p^)^(1/p)^ as norm. - For p = Double.PositiveInfinity, max(abs(vector)) will be used as - norm for normalization. + For p = float('inf'), max(abs(vector)) will be used as norm for normalization. >>> v = Vectors.dense(range(3)) >>> nor = Normalizer(1) @@ -117,6 +136,10 @@ class Normalizer(VectorTransformer): >>> rdd = sc.parallelize([v]) >>> nor.transform(rdd).collect() [DenseVector([0.0, 0.3333, 0.6667])] + + >>> nor2 = Normalizer(float("inf")) + >>> nor2.transform(v) + DenseVector([0.0, 0.5, 1.0]) """ def __init__(self, p=2): """ diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index a35aece2bda7a..1b9bf596242df 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -112,7 +112,7 @@ def _vector_size(v): def _format_float(f, digits=4): - s = str(round(f, 4)) + s = str(round(f, digits)) if '.' in s: s = s[:s.index('.') + 1 + digits] return s @@ -423,7 +423,7 @@ def toArray(self): Returns a copy of this SparseVector as a 1-dimensional NumPy array. """ arr = np.zeros((self.size,), dtype=np.float64) - for i in xrange(self.indices.size): + for i in xrange(len(self.indices)): arr[self.indices[i]] = self.values[i] return arr From 806c7c24c7fbb1f10e8bbddcc804f4899d8f0b11 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 27 Oct 2014 14:09:09 -0700 Subject: [PATCH 6/9] address comments --- docs/mllib-feature-extraction.md | 2 +- python/pyspark/mllib/feature.py | 12 ++++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/docs/mllib-feature-extraction.md b/docs/mllib-feature-extraction.md index 4d56fa2ff1112..dbda66128a4b4 100644 --- a/docs/mllib-feature-extraction.md +++ b/docs/mllib-feature-extraction.md @@ -136,7 +136,7 @@ can be used by passing the `minDocFreq` value to the IDF constructor. {% highlight python %} # ... continue from the previous example tf.cache() -idf = IDF().fit(tf) +idf = IDF(minDocFreq=2).fit(tf) tfidf = idf.transform(tf) {% endhighlight %} diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py index 91fe733ba0e82..d0e3bbdb33ae4 100644 --- a/python/pyspark/mllib/feature.py +++ b/python/pyspark/mllib/feature.py @@ -18,6 +18,7 @@ """ Python package for feature in MLlib. """ +import sys import warnings import py4j.protocol @@ -394,7 +395,7 @@ class Word2Vec(object): [u'b', u'c'] """ def __init__(self, vectorSize=100, learningRate=0.025, numPartitions=1, - numIterations=1, seed=42L): + numIterations=1, seed=None): """ Construct Word2Vec instance @@ -404,12 +405,15 @@ def __init__(self, vectorSize=100, learningRate=0.025, numPartitions=1, a small number for accuracy. :param numIterations: number of iterations (default: 1), which should be smaller than or equal to number of partitions. + :param seed: the seed used for randomness """ + import random # this can't be on the top because of mllib.random + self.vectorSize = vectorSize self.learningRate = learningRate self.numPartitions = numPartitions self.numIterations = numIterations - self.seed = seed + self.seed = random.randint(0, sys.maxint) if seed is None else seed def fit(self, data): """ @@ -436,4 +440,8 @@ def _test(): exit(-1) if __name__ == "__main__": + # remove current path from list of search paths to avoid importing mllib.random + # for C{import random}, which is done in an external dependency of pyspark during doctests. + import sys + sys.path.pop(0) _test() From b6286939304da666a8158c71a47b6c95af28b639 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 27 Oct 2014 18:08:38 -0700 Subject: [PATCH 7/9] rollback changes in Word2Vec --- python/pyspark/mllib/feature.py | 51 +++++++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 12 deletions(-) diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py index d0e3bbdb33ae4..96df2ee55708d 100644 --- a/python/pyspark/mllib/feature.py +++ b/python/pyspark/mllib/feature.py @@ -384,7 +384,7 @@ class Word2Vec(object): >>> sentence = "a b " * 100 + "a c " * 10 >>> localDoc = [sentence, sentence] >>> doc = sc.parallelize(localDoc).map(lambda line: line.split(" ")) - >>> model = Word2Vec(vectorSize=10).fit(doc) + >>> model = Word2Vec().setVectorSize(10).setSeed(42L).fit(doc) >>> syms = model.findSynonyms("a", 2) >>> [s[0] for s in syms] @@ -394,26 +394,53 @@ class Word2Vec(object): >>> [s[0] for s in syms] [u'b', u'c'] """ - def __init__(self, vectorSize=100, learningRate=0.025, numPartitions=1, - numIterations=1, seed=None): + def __init__(self): """ Construct Word2Vec instance - - :param vectorSize: vector size (default: 100). - :param learningRate: initial learning rate (default: 0.025). - :param numPartitions: number of partitions (default: 1). Use - a small number for accuracy. - :param numIterations: number of iterations (default: 1), which should - be smaller than or equal to number of partitions. - :param seed: the seed used for randomness """ import random # this can't be on the top because of mllib.random + self.vectorSize = 100 + self.learningRate = 0.025 + self.numPartitions = 1 + self.numIterations = 1 + self.seed = random.randint(0, sys.maxint) + + def setVectorSize(self, vectorSize): + """ + Sets vector size (default: 100). + """ self.vectorSize = vectorSize + return self + + def setLearningRate(self, learningRate): + """ + Sets initial learning rate (default: 0.025). + """ self.learningRate = learningRate + return self + + def setNumPartitions(self, numPartitions): + """ + Sets number of partitions (default: 1). Use a small number for accuracy. + """ self.numPartitions = numPartitions + return self + + def setNumIterations(self, numIterations): + """ + Sets number of iterations (default: 1), which should be smaller than or equal to number of + partitions. + """ self.numIterations = numIterations - self.seed = random.randint(0, sys.maxint) if seed is None else seed + return self + + def setSeed(self, seed): + """ + Sets random seed. + """ + self.seed = seed + return self def fit(self, data): """ From 67f6d21e4d46b9d9ae41acdd26520f78eb64cf6d Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 28 Oct 2014 00:02:27 -0700 Subject: [PATCH 8/9] address comments --- docs/mllib-feature-extraction.md | 21 ++---------- .../mllib/feature/VectorTransformer.scala | 2 +- python/pyspark/mllib/feature.py | 33 +++++++++++-------- 3 files changed, 24 insertions(+), 32 deletions(-) diff --git a/docs/mllib-feature-extraction.md b/docs/mllib-feature-extraction.md index d88fb3b737f42..886d71df474bc 100644 --- a/docs/mllib-feature-extraction.md +++ b/docs/mllib-feature-extraction.md @@ -105,7 +105,6 @@ Each record could be an iterable of strings or other types. {% highlight python %} from pyspark import SparkContext -from pyspark.mllib.linalg import Vector from pyspark.mllib.feature import HashingTF sc = SparkContext() @@ -204,20 +203,6 @@ for((synonym, cosineSimilarity) <- synonyms) { } {% endhighlight %} -
-{% highlight python %} -from pyspark.mllib.feature import Word2Vec - -input = sc.textFile("text8").map(lambda line: line.split(" ")) - -word2vec = Word2Vec() -model = word2vec.fit(input) - -synonyms = model.findSynonyms("china", 40) -for synonym, cosineSimilarity in synonyms: - print synonym, cosineSimilarity -{% endhighlight %} -
## StandardScaler @@ -354,17 +339,17 @@ from pyspark.mllib.linalg import Vectors from pyspark.mllib.feature import Normalizer data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") -label = data.map(lambda x: x.label) +labels = data.map(lambda x: x.label) features = data.map(lambda x: x.features) normalizer1 = Normalizer() normalizer2 = Normalizer(p=float("inf")) # Each sample in data1 will be normalized using $L^2$ norm. -data1 = label.zip(normalizer1.transform(features)) +data1 = labels.zip(normalizer1.transform(features)) # Each sample in data2 will be normalized using $L^\infty$ norm. -data2 = label.zip(normalizer2.transform(features)) +data2 = labels.zip(normalizer2.transform(features)) {% endhighlight %} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala index fac413f8ee24a..7358c1c84f79c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala @@ -18,9 +18,9 @@ package org.apache.spark.mllib.feature import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.api.java.JavaRDD import org.apache.spark.mllib.linalg.Vector import org.apache.spark.rdd.RDD -import org.apache.spark.api.java.JavaRDD /** * :: DeveloperApi :: diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py index 96df2ee55708d..b7e85e8de1998 100644 --- a/python/pyspark/mllib/feature.py +++ b/python/pyspark/mllib/feature.py @@ -30,27 +30,26 @@ from pyspark.mllib.linalg import Vectors, _to_java_object_rdd __all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler', - 'HashTF', 'IDFModel', 'IDF', - 'Word2Vec', 'Word2VecModel'] + 'HashingTF', 'IDFModel', 'IDF', 'Word2Vec', 'Word2VecModel'] # Hack for support float('inf') in Py4j -old_smart_decode = py4j.protocol.smart_decode +_old_smart_decode = py4j.protocol.smart_decode -float_str_mapping = { +_float_str_mapping = { u'nan': u'NaN', u'inf': u'Infinity', u'-inf': u'-Infinity', } -def new_smart_decode(obj): +def _new_smart_decode(obj): if isinstance(obj, float): s = unicode(obj) - return float_str_mapping.get(s, s) - return old_smart_decode(obj) + return _float_str_mapping.get(s, s) + return _old_smart_decode(obj) -py4j.protocol.smart_decode = new_smart_decode +py4j.protocol.smart_decode = _new_smart_decode # TODO: move these helper functions into utils @@ -108,6 +107,7 @@ def _callAPI(sc, name, *args): class VectorTransformer(object): """ :: DeveloperApi :: + Base class for transformation of a vector or RDD of vector """ def transform(self, vector): @@ -122,12 +122,13 @@ def transform(self, vector): class Normalizer(VectorTransformer): """ :: Experimental :: - Normalizes samples individually to unit L^p^ norm - For any 1 <= p <= float('inf'), normalizes samples using - sum(abs(vector).^p^)^(1/p)^ as norm. + Normalizes samples individually to unit L\ :sup:`p`\ norm + + For any 1 <= `p` <= float('inf'), normalizes samples using + sum(abs(vector). :sup:`p`) :sup:`(1/p)` as norm. - For p = float('inf'), max(abs(vector)) will be used as norm for normalization. + For `p` = float('inf'), max(abs(vector)) will be used as norm for normalization. >>> v = Vectors.dense(range(3)) >>> nor = Normalizer(1) @@ -142,7 +143,7 @@ class Normalizer(VectorTransformer): >>> nor2.transform(v) DenseVector([0.0, 0.5, 1.0]) """ - def __init__(self, p=2): + def __init__(self, p=2.0): """ :param p: Normalization in L^p^ space, p = 2 by default. """ @@ -180,6 +181,7 @@ def transform(self, dataset): class StandardScalerModel(JavaModelWrapper): """ :: Experimental :: + Represents a StandardScaler model that can transform vectors. """ def transform(self, vector): @@ -196,6 +198,7 @@ def transform(self, vector): class StandardScaler(object): """ :: Experimental :: + Standardizes features by removing the mean and scaling to unit variance using column summary statistics on the samples in the training set. @@ -238,6 +241,7 @@ def fit(self, dataset): class HashingTF(object): """ :: Experimental :: + Maps a sequence of terms to their term frequencies using the hashing trick. >>> htf = HashingTF(100) @@ -291,6 +295,7 @@ def transform(self, dataset): class IDF(object): """ :: Experimental :: + Inverse document frequency (IDF). The standard formulation is used: `idf = log((m + 1) / (d(t) + 1))`, @@ -341,6 +346,8 @@ def transform(self, word): """ Transforms a word to its vector representation + Note: local use only + :param word: a word :return: vector representation of word(s) """ From 4f48f48d0c013e50f1a96f1e6bb0af4d88bf366c Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 28 Oct 2014 00:05:45 -0700 Subject: [PATCH 9/9] add a note for HashingTF --- python/pyspark/mllib/feature.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py index b7e85e8de1998..324343443ebdb 100644 --- a/python/pyspark/mllib/feature.py +++ b/python/pyspark/mllib/feature.py @@ -244,6 +244,8 @@ class HashingTF(object): Maps a sequence of terms to their term frequencies using the hashing trick. + Note: the terms must be hashable (can not be dict/set/list...). + >>> htf = HashingTF(100) >>> doc = "a a b b c d".split(" ") >>> htf.transform(doc)