diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index 46a0730f5ddb..13dab0c5e663 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -17,16 +17,15 @@ package org.apache.spark.ml.feature +import breeze.linalg.{DenseVector => BDV} import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Since import org.apache.spark.ml._ -import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.linalg._ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ -import org.apache.spark.mllib.feature -import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql._ @@ -86,10 +85,14 @@ final class IDF @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("2.0.0") override def fit(dataset: Dataset[_]): IDFModel = { transformSchema(dataset.schema, logging = true) - val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map { - case Row(v: Vector) => OldVectors.fromML(v) + val input: RDD[Vector] = dataset.select($(inputCol)).rdd.map { + case Row(v: Vector) => v } - val idf = new feature.IDF($(minDocFreq)).fit(input) + val idf = input.treeAggregate( + new IDF.DocumentFrequencyAggregator(minDocFreq = $(minDocFreq)))( + seqOp = (df, v) => df.add(v), + combOp = (df1, df2) => df1.merge(df2) + ).idf() copyValues(new IDFModel(uid, idf).setParent(this)) } @@ -107,6 +110,92 @@ object IDF extends DefaultParamsReadable[IDF] { @Since("1.6.0") override def load(path: String): IDF = super.load(path) + + /** Document frequency aggregator. */ + class DocumentFrequencyAggregator(val minDocFreq: Int) extends Serializable { + + /** number of documents */ + private var m = 0L + /** document frequency vector */ + private var df: BDV[Long] = _ + + + def this() = this(0) + + /** Adds a new document. */ + def add(doc: Vector): this.type = { + if (isEmpty) { + df = BDV.zeros(doc.size) + } + doc match { + case SparseVector(size, indices, values) => + val nnz = indices.length + var k = 0 + while (k < nnz) { + if (values(k) > 0) { + df(indices(k)) += 1L + } + k += 1 + } + case DenseVector(values) => + val n = values.length + var j = 0 + while (j < n) { + if (values(j) > 0.0) { + df(j) += 1L + } + j += 1 + } + case other => + throw new UnsupportedOperationException( + s"Only sparse and dense vectors are supported but got ${other.getClass}.") + } + m += 1L + this + } + + /** Merges another. */ + def merge(other: DocumentFrequencyAggregator): this.type = { + if (!other.isEmpty) { + m += other.m + if (df == null) { + df = other.df.copy + } else { + df += other.df + } + } + this + } + + private def isEmpty: Boolean = m == 0L + + /** Returns the current IDF vector. */ + def idf(): Vector = { + if (isEmpty) { + throw new IllegalStateException("Haven't seen any document yet.") + } + val n = df.length + val inv = new Array[Double](n) + var j = 0 + while (j < n) { + /* + * If the term is not present in the minimum + * number of documents, set IDF to 0. This + * will cause multiplication in IDFModel to + * set TF-IDF to 0. + * + * Since arrays are initialized to 0 by default, + * we just omit changing those entries. + */ + if (df(j) >= minDocFreq) { + inv(j) = math.log((m + 1.0) / (df(j) + 1.0)) + } + j += 1 + } + Vectors.dense(inv) + } + } + } /** @@ -115,7 +204,7 @@ object IDF extends DefaultParamsReadable[IDF] { @Since("1.4.0") class IDFModel private[ml] ( @Since("1.4.0") override val uid: String, - idfModel: feature.IDFModel) + @Since("2.2.0") idfVector: Vector) extends Model[IDFModel] with IDFBase with MLWritable { import IDFModel._ @@ -131,9 +220,8 @@ class IDFModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) - // TODO: Make the idfModel.transform natively in ml framework to avoid extra conversion. - val idf = udf { vec: Vector => idfModel.transform(OldVectors.fromML(vec)).asML } - dataset.withColumn($(outputCol), idf(col($(inputCol)))) + val idfUDF = udf { vec: Vector => IDFModel.transform(idf, vec) } + dataset.withColumn($(outputCol), idfUDF(col($(inputCol)))) } @Since("1.4.0") @@ -143,13 +231,11 @@ class IDFModel private[ml] ( @Since("1.4.1") override def copy(extra: ParamMap): IDFModel = { - val copied = new IDFModel(uid, idfModel) + val copied = new IDFModel(uid, idf) copyValues(copied, extra).setParent(parent) } - /** Returns the IDF vector. */ - @Since("2.0.0") - def idf: Vector = idfModel.idf.asML + def idf: Vector = idfVector @Since("1.6.0") override def write: MLWriter = new IDFModelWriter(this) @@ -181,7 +267,7 @@ object IDFModel extends MLReadable[IDFModel] { val Row(idf: Vector) = MLUtils.convertVectorColumnsToML(data, "idf") .select("idf") .head() - val model = new IDFModel(metadata.uid, new feature.IDFModel(OldVectors.fromML(idf))) + val model = new IDFModel(metadata.uid, idf) DefaultParamsReader.getAndSetParams(model, metadata) model } @@ -192,4 +278,27 @@ object IDFModel extends MLReadable[IDFModel] { @Since("1.6.0") override def load(path: String): IDFModel = super.load(path) + + private def transform(idf: Vector, v: Vector): Vector = { + val newSize = v.size + v match { + case SparseVector(_, indices, values) => + val nnz = indices.length + val newValues = new Array[Double](nnz) + var k = 0 + while (k < nnz) { + newValues(k) = values(k) * idf(indices(k)) + k += 1 + } + Vectors.sparse(newSize, indices, newValues) + case DenseVector(values) => + val newValues = new Array[Double](newSize) + var j = 0 + while (j < newSize) { + newValues(j) = values(j) * idf(j) + j += 1 + } + Vectors.dense(newValues) + } + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala index 5325d95526a5..ce36974f58ff 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala @@ -22,8 +22,6 @@ import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors} import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.DefaultReadWriteTest import org.apache.spark.ml.util.TestingUtils._ -import org.apache.spark.mllib.feature.{IDFModel => OldIDFModel} -import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.Row @@ -46,7 +44,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead test("params") { ParamsSuite.checkParams(new IDF) - val model = new IDFModel("idf", new OldIDFModel(Vectors.dense(1.0))) + val model = new IDFModel("idf", Vectors.dense(1.0)) ParamsSuite.checkParams(model) } @@ -112,7 +110,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead } test("IDFModel read/write") { - val instance = new IDFModel("myIDFModel", new OldIDFModel(Vectors.dense(1.0, 2.0))) + val instance = new IDFModel("myIDFModel", Vectors.dense(1.0, 2.0)) .setInputCol("myInputCol") .setOutputCol("myOutputCol") val newInstance = testDefaultReadWrite(instance)