diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala index 333a8c364a88..f132272a9bed 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala @@ -39,9 +39,9 @@ private[ml] trait LSHParams extends HasInputCol with HasOutputCol { * higher the dimension is, the lower the false negative rate. * @group param */ - final val outputDim: IntParam = new IntParam(this, "outputDim", "output dimension, where" + - "increasing dimensionality lowers the false negative rate, and decreasing dimensionality" + - " improves the running performance", ParamValidators.gt(0)) + final val outputDim: IntParam = new IntParam(this, "outputDim", "The output dimension, where" + + " increasing dimensionality lowers the false negative rate, and decreasing dimensionality" + + " improves the running performance.", ParamValidators.gt(0)) /** @group getParam */ final def getOutputDim: Int = $(outputDim) @@ -109,11 +109,11 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] * - Single Probing: Fast, return at most k elements (Probing only one buckets) * - Multiple Probing: Slow, return exact k elements (Probing multiple buckets close to the key) * - * @param dataset the dataset to search for nearest neighbors of the key - * @param key Feature vector representing the item to search for - * @param numNearestNeighbors The maximum number of nearest neighbors - * @param singleProbing True for using Single Probing; false for multiple probing - * @param distCol Output column for storing the distance between each result row and the key + * @param dataset The dataset to search for nearest neighbors of the key. + * @param key Feature vector representing the item to search for. + * @param numNearestNeighbors The maximum number of nearest neighbors. + * @param singleProbing True for using Single Probing; false for multiple probing. + * @param distCol Output column for storing the distance between each result row and the key. * @return A dataset containing at most k items closest to the key. A distCol is added to show * the distance between each row and the key. */ @@ -215,12 +215,12 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] * [[outputCol]] exists, it will use the [[outputCol]]. This allows caching of the transformed * data when necessary. * - * @param datasetA One of the datasets to join - * @param datasetB Another dataset to join - * @param threshold The threshold for the distance of row pairs - * @param distCol Output column for storing the distance between each result row and the key + * @param datasetA One of the datasets to join. + * @param datasetB Another dataset to join. + * @param threshold The threshold for the distance of row pairs. + * @param distCol Output column for storing the distance between each result row and the key. * @return A joined dataset containing pairs of rows. The original rows are in columns - * "datasetA" and "datasetB", and a distCol is added to show the distance of each pair + * "datasetA" and "datasetB", and a distCol is added to show the distance of each pair. */ def approxSimilarityJoin( datasetA: Dataset[_], diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala index d9d0f32254e2..1cc27ab0b2ec 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala @@ -33,8 +33,8 @@ import org.apache.spark.sql.types.StructType * * Model produced by [[MinHash]], where multiple hash functions are stored. Each hash function is * a perfect hash function: - * `h_i(x) = (x * k_i mod prime) mod numEntries` - * where `k_i` is the i-th coefficient, and both `x` and `k_i` are from `Z_prime^*` + * `h_i(x) = (x * k_i \mod prime) \mod numEntries` + * where `k_i` is the i-th coefficient, and both `x` and `k_i` are from `Z_{prime^*}` * * Reference: * [[https://en.wikipedia.org/wiki/Perfect_hash_function Wikipedia on Perfect Hash Function]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RandomProjection.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RandomProjection.scala index 1b524c6710b4..af32bdf07770 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RandomProjection.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RandomProjection.scala @@ -60,7 +60,7 @@ private[ml] trait RandomProjectionParams extends Params { * * Model produced by [[RandomProjection]], where multiple random vectors are stored. The vectors * are normalized to be unit vectors and each vector is used in a hash function: - * `h_i(x) = floor(r_i.dot(x) / bucketLength)` + * `h_i(x) = floor(r_i * x / bucketLength)` * where `r_i` is the i-th random unit vector. The number of buckets will be `(max L2 norm of input * vectors) / bucketLength`. * diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 635cf1304588..5dd7db289263 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -37,6 +37,7 @@ 'IDF', 'IDFModel', 'IndexToString', 'MaxAbsScaler', 'MaxAbsScalerModel', + 'MinHash', 'MinHashModel', 'MinMaxScaler', 'MinMaxScalerModel', 'NGram', 'Normalizer', @@ -44,6 +45,7 @@ 'PCA', 'PCAModel', 'PolynomialExpansion', 'QuantileDiscretizer', + 'RandomProjection', 'RandomProjectionModel', 'RegexTokenizer', 'RFormula', 'RFormulaModel', 'SQLTransformer', @@ -651,6 +653,87 @@ def idf(self): return self._call_java("idf") +class LSHParams(Params): + """ + Mixin for Locality Sensitive Hashing(LSH) algorithm parameters. + """ + + outputDim = Param(Params._dummy(), "outputDim", "The output dimension, where increasing " + + "dimensionality lowers the false negative rate, and decreasing " + + "dimensionality improves the running performance.", + typeConverter=TypeConverters.toInt) + + def __init__(self): + super(LSHParams, self).__init__() + + @since("2.1.0") + def setOutputDim(self, value): + """ + Sets the value of :py:attr:`outputDim`. + """ + return self._set(outputDim=value) + + @since("2.1.0") + def getOutputDim(self): + """ + Gets the value of outputDim or its default value. + """ + return self.getOrDefault(self.outputDim) + + +class LSHModel(): + """ + Mixin for Locality Sensitive Hashing(LSH) models. + """ + + @since("2.1.0") + def approxNearestNeighbors(self, dataset, key, numNearestNeighbors, singleProbing=True, + distCol="distCol"): + """ + Given a large dataset and an item, approximately find at most k items which have the + closest distance to the item. If the :py:attr:`outputCol` is missing, the method will + transform the data; if the :py:attr:`outputCol` exists, it will use that. This allows + caching of the transformed data when necessary. + + This method implements two ways of fetching k nearest neighbors: + + * Single Probing: Fast, return at most k elements (Probing only one buckets) + + * Multiple Probing: Slow, return exact k elements (Probing multiple buckets close to \ + the key) + + :param dataset: The dataset to search for nearest neighbors of the key. + :param key: Feature vector representing the item to search for. + :param numNearestNeighbors: The maximum number of nearest neighbors. + :param singleProbing: True for using single probing (default); false for multiple probing. + :param distCol: Output column for storing the distance between each result row and the key. + Use "distCol" as default value if it's not specified. + :return: A dataset containing at most k items closest to the key. A distCol is added + to show the distance between each row and the key. + """ + return self._call_java("approxNearestNeighbors", dataset, key, numNearestNeighbors, + singleProbing, distCol) + + @since("2.1.0") + def approxSimilarityJoin(self, datasetA, datasetB, threshold, distCol="distCol"): + """ + Join two dataset to approximately find all pairs of rows whose distance are smaller than + the threshold. If the :py:attr:`outputCol` is missing, the method will transform the data; + if the :py:attr:`outputCol` exists, it will use that. This allows caching of the + transformed data when necessary. + + :param datasetA: One of the datasets to join. + :param datasetB: Another dataset to join. + :param threshold: The threshold for the distance of row pairs. + :param distCol: Output column for storing the distance between each result row and the key. + Use "distCol" as default value if it's not specified. + :return: A joined dataset containing pairs of rows. The original rows are in columns + "datasetA" and "datasetB", and a distCol is added to show the distance of + each pair. + """ + return self._call_java("approxSimilarityJoin", datasetA, datasetB, threshold, distCol) + + @inherit_doc class MaxAbsScaler(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable): """ @@ -731,6 +814,119 @@ def maxAbs(self): return self._call_java("maxAbs") +@inherit_doc +class MinHash(JavaEstimator, LSHParams, HasInputCol, HasOutputCol, HasSeed, + JavaMLReadable, JavaMLWritable): + + """ + .. note:: Experimental + + LSH class for Jaccard distance. + The input can be dense or sparse vectors, but it is more efficient if it is sparse. + For example, `Vectors.sparse(10, Array[(2, 1.0), (3, 1.0), (5, 1.0)])` + means there are 10 elements in the space. This set contains elem 2, elem 3 and elem 5. + Also, any input vector must have at least 1 non-zero indices, and all non-zero values + are treated as binary "1" values. + + .. seealso:: `MinHash `_ + + >>> from pyspark.ml.linalg import Vectors + >>> data = [(Vectors.sparse(10, [0, 1], [1.0, 1.0]),), + ... (Vectors.sparse(10, [1, 2], [1.0, 1.0]),), + ... (Vectors.sparse(10, [2, 3], [1.0, 1.0]),), + ... (Vectors.sparse(10, [3, 4], [1.0, 1.0]),), + ... (Vectors.sparse(10, [4, 5], [1.0, 1.0]),)] + >>> df = spark.createDataFrame(data, ["keys"]) + >>> mh = MinHash(inputCol="keys", outputCol="values", seed=12345) + >>> model = mh.fit(df) + >>> model.numEntries + 20 + >>> model.randCoefficients + [776966252] + >>> model.transform(df).head() + Row(keys=SparseVector(10, {0: 1.0, 1: 1.0}), values=DenseVector([4.0])) + >>> data2 = [(Vectors.sparse(10, [5, 6], [1.0, 1.0]),), + ... (Vectors.sparse(10, [6, 7], [1.0, 1.0]),), + ... (Vectors.sparse(10, [7, 8], [1.0, 1.0]),), + ... (Vectors.sparse(10, [8, 9], [1.0, 1.0]),)] + >>> df2 = spark.createDataFrame(data2, ["keys"]) + >>> model.approxNearestNeighbors(df2, Vectors.sparse(10, [5, 8], [1.0, 1.0]), 1).collect() + [Row(keys=SparseVector(10, {5: 1.0, 6: 1.0}), values=DenseVector([6.0]), distCol=0.666...)] + >>> model.approxSimilarityJoin(df, df2, 1.0).select("distCol").head()[0] + 0.666... + >>> mhPath = temp_path + "/mh" + >>> mh.save(mhPath) + >>> mh2 = MinHash.load(mhPath) + >>> mh2.getOutputCol() == mh.getOutputCol() + True + >>> modelPath = temp_path + "/mh-model" + >>> model.save(modelPath) + >>> model2 = MinHashModel.load(modelPath) + >>> model2.numEntries == model.numEntries + True + >>> model2.randCoefficients == model.randCoefficients + True + + .. versionadded:: 2.1.0 + """ + + @keyword_only + def __init__(self, inputCol=None, outputCol=None, seed=None, outputDim=1): + """ + __init__(self, inputCol=None, outputCol=None, seed=None, outputDim=1) + """ + super(MinHash, self).__init__() + self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.MinHash", self.uid) + self._setDefault(outputDim=1) + kwargs = self.__init__._input_kwargs + self.setParams(**kwargs) + + @keyword_only + @since("2.1.0") + def setParams(self, inputCol=None, outputCol=None, seed=None, outputDim=1): + """ + setParams(self, inputCol=None, outputCol=None, seed=None, outputDim=1) + Sets params for this MinHash. + """ + kwargs = self.setParams._input_kwargs + return self._set(**kwargs) + + def _create_model(self, java_model): + return MinHashModel(java_model) + + +class MinHashModel(JavaModel, LSHModel, JavaMLReadable, JavaMLWritable): + """ + .. note:: Experimental + + Model produced by :py:class:`MinHash`, where multiple hash functions are stored. + Each hash function is a perfect hash function: + :math:`h_i(x) = (x * k_i \mod prime) \mod numEntries` + where :math:`k_i` is the i-th coefficient, and both `x` and :math:`k_i` + are from :math:`Z_{prime^*}` + + .. seealso:: `Perfect Hash Function `_ + + .. versionadded:: 2.1.0 + """ + + @property + @since("2.1.0") + def numEntries(self): + """ + The number of entries of the hash functions. + """ + return self._call_java("numEntries") + + @property + @since("2.1.0") + def randCoefficients(self): + """ + An array of random coefficients, each used by one hash function. + """ + return self._call_java("randCoefficients") + + @inherit_doc class MinMaxScaler(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable): """ @@ -1253,6 +1449,121 @@ def _create_model(self, java_model): outputCol=self.getOutputCol()) +@inherit_doc +class RandomProjection(JavaEstimator, LSHParams, HasInputCol, HasOutputCol, HasSeed, + JavaMLReadable, JavaMLWritable): + """ + .. note:: Experimental + + LSH class for Euclidean distance metrics. + The input is dense or sparse vectors, each of which represents a point in the Euclidean + distance space. The output will be vectors of configurable dimension. Hash value in the + same dimension is calculated by the same hash function. + + .. seealso:: `Stable Distributions \ + `_ + .. seealso:: `Hashing for Similarity Search: A Survey `_ + + >>> from pyspark.ml.linalg import Vectors + >>> data = [(Vectors.dense([-1.0, -1.0 ]),), + ... (Vectors.dense([-1.0, 1.0 ]),), + ... (Vectors.dense([1.0, -1.0 ]),), + ... (Vectors.dense([1.0, 1.0]),)] + >>> df = spark.createDataFrame(data, ["keys"]) + >>> rp = RandomProjection(inputCol="keys", outputCol="values", seed=12345, bucketLength=1.0) + >>> model = rp.fit(df) + >>> model.randUnitVectors + [DenseVector([-0.3041, 0.9527])] + >>> model.transform(df).head() + Row(keys=DenseVector([-1.0, -1.0]), values=DenseVector([-1.0])) + >>> data2 = [(Vectors.dense([2.0, 2.0 ]),), + ... (Vectors.dense([2.0, 3.0 ]),), + ... (Vectors.dense([3.0, 2.0 ]),), + ... (Vectors.dense([3.0, 3.0]),)] + >>> df2 = spark.createDataFrame(data2, ["keys"]) + >>> model.approxNearestNeighbors(df2, Vectors.dense([1.0, 2.0]), 1).collect() + [Row(keys=DenseVector([2.0, 2.0]), values=DenseVector([1.0]), distCol=1.0)] + >>> model.approxSimilarityJoin(df, df2, 3.0).select("distCol").head()[0] + 2.236... + >>> rpPath = temp_path + "/rp" + >>> rp.save(rpPath) + >>> rp2 = RandomProjection.load(rpPath) + >>> rp2.getBucketLength() == rp.getBucketLength() + True + >>> modelPath = temp_path + "/rp-model" + >>> model.save(modelPath) + >>> model2 = RandomProjectionModel.load(modelPath) + >>> model2.randUnitVectors == model.randUnitVectors + True + + .. versionadded:: 2.1.0 + """ + + bucketLength = Param(Params._dummy(), "bucketLength", "the length of each hash bucket, " + + "a larger bucket lowers the false negative rate.", + typeConverter=TypeConverters.toFloat) + + @keyword_only + def __init__(self, inputCol=None, outputCol=None, seed=None, outputDim=1, bucketLength=None): + """ + __init__(self, inputCol=None, outputCol=None, seed=None, outputDim=1, bucketLength=None) + """ + super(RandomProjection, self).__init__() + self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.RandomProjection", + self.uid) + self._setDefault(outputDim=1) + kwargs = self.__init__._input_kwargs + self.setParams(**kwargs) + + @keyword_only + @since("2.1.0") + def setParams(self, inputCol=None, outputCol=None, seed=None, outputDim=1, bucketLength=None): + """ + setParams(self, inputCol=None, outputCol=None, seed=None, outputDim=1, bucketLength=None) + Sets params for this RandomProjection. + """ + kwargs = self.setParams._input_kwargs + return self._set(**kwargs) + + @since("2.1.0") + def setBucketLength(self, value): + """ + Sets the value of :py:attr:`bucketLength`. + """ + return self._set(bucketLength=value) + + @since("2.1.0") + def getBucketLength(self): + """ + Gets the value of bucketLength or its default value. + """ + return self.getOrDefault(self.bucketLength) + + def _create_model(self, java_model): + return RandomProjectionModel(java_model) + + +class RandomProjectionModel(JavaModel, LSHModel, JavaMLReadable, JavaMLWritable): + """ + .. note:: Experimental + + Model fitted by :py:class:`RandomProjection`, where multiple random vectors are stored. + The vectors are normalized to be unit vectors and each vector is used in a hash function: + :math:`h_i(x) = floor(r_i * x / bucketLength)` where :math:`r_i` is the i-th random unit + vector. The number of buckets will be `(max L2 norm of input vectors) / bucketLength`. + + .. versionadded:: 2.1.0 + """ + + @property + @since("2.1.0") + def randUnitVectors(self): + """ + An array of random unit vectors. Each vector represents a hash function. + """ + return self._call_java("randUnitVectors") + + @inherit_doc @ignore_unicode_prefix class RegexTokenizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):