Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,15 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.linalg.{Vector, VectorUDT}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.mllib.clustering.{BisectingKMeans => MLlibBisectingKMeans,
BisectingKMeansModel => MLlibBisectingKMeansModel}
import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.{IntegerType, StructType}


Expand Down Expand Up @@ -75,7 +73,7 @@ private[clustering] trait BisectingKMeansParams extends Params with HasMaxIter
* @return output schema
*/
protected def validateAndTransformSchema(schema: StructType): StructType = {
SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT)
SchemaUtils.validateVectorCompatibleColumn(schema, getFeaturesCol)
SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType)
}
}
Expand Down Expand Up @@ -113,7 +111,8 @@ class BisectingKMeansModel private[ml] (
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)
val predictUDF = udf((vector: Vector) => predict(vector))
dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol))))
dataset.withColumn($(predictionCol),
predictUDF(DatasetUtils.columnToVector(dataset, getFeaturesCol)))
}

@Since("2.0.0")
Expand All @@ -132,9 +131,9 @@ class BisectingKMeansModel private[ml] (
*/
@Since("2.0.0")
def computeCost(dataset: Dataset[_]): Double = {
SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT)
val data = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => point }
parentModel.computeCost(data.map(OldVectors.fromML))
SchemaUtils.validateVectorCompatibleColumn(dataset.schema, getFeaturesCol)
val data = DatasetUtils.columnToOldVector(dataset, getFeaturesCol)
parentModel.computeCost(data)
}

@Since("2.0.0")
Expand Down Expand Up @@ -260,9 +259,7 @@ class BisectingKMeans @Since("2.0.0") (
@Since("2.0.0")
override def fit(dataset: Dataset[_]): BisectingKMeansModel = {
transformSchema(dataset.schema, logging = true)
val rdd: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
case Row(point: Vector) => OldVectors.fromML(point)
}
val rdd = DatasetUtils.columnToOldVector(dataset, getFeaturesCol)

val instr = Instrumentation.create(this, rdd)
instr.logParams(featuresCol, predictionCol, k, maxIter, seed, minDivisibleClusterSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.mllib.linalg.{Matrices => OldMatrices, Matrix => OldMatr
Vector => OldVector, Vectors => OldVectors}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.{IntegerType, StructType}


Expand Down Expand Up @@ -63,7 +63,7 @@ private[clustering] trait GaussianMixtureParams extends Params with HasMaxIter w
* @return output schema
*/
protected def validateAndTransformSchema(schema: StructType): StructType = {
SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT)
SchemaUtils.validateVectorCompatibleColumn(schema, getFeaturesCol)
val schemaWithPredictionCol = SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType)
SchemaUtils.appendColumn(schemaWithPredictionCol, $(probabilityCol), new VectorUDT)
}
Expand Down Expand Up @@ -109,8 +109,9 @@ class GaussianMixtureModel private[ml] (
transformSchema(dataset.schema, logging = true)
val predUDF = udf((vector: Vector) => predict(vector))
val probUDF = udf((vector: Vector) => predictProbability(vector))
dataset.withColumn($(predictionCol), predUDF(col($(featuresCol))))
.withColumn($(probabilityCol), probUDF(col($(featuresCol))))
dataset
.withColumn($(predictionCol), predUDF(DatasetUtils.columnToVector(dataset, getFeaturesCol)))
.withColumn($(probabilityCol), probUDF(DatasetUtils.columnToVector(dataset, getFeaturesCol)))
}

@Since("2.0.0")
Expand Down Expand Up @@ -340,7 +341,8 @@ class GaussianMixture @Since("2.0.0") (
val sc = dataset.sparkSession.sparkContext
val numClusters = $(k)

val instances: RDD[Vector] = dataset.select(col($(featuresCol))).rdd.map {
val instances: RDD[Vector] = dataset
.select(DatasetUtils.columnToVector(dataset, getFeaturesCol)).rdd.map {
case Row(features: Vector) => features
}.cache()

Expand Down
31 changes: 6 additions & 25 deletions mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.{Estimator, Model, PipelineStage}
import org.apache.spark.ml.linalg.{Vector, VectorUDT}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
Expand All @@ -34,7 +34,7 @@ import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, IntegerType, StructType}
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.VersionUtils.majorVersion

Expand Down Expand Up @@ -86,24 +86,13 @@ private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFe
@Since("1.5.0")
def getInitSteps: Int = $(initSteps)

/**
* Validates the input schema.
* @param schema input schema
*/
private[clustering] def validateSchema(schema: StructType): Unit = {
val typeCandidates = List( new VectorUDT,
new ArrayType(DoubleType, false),
new ArrayType(FloatType, false))

SchemaUtils.checkColumnTypes(schema, $(featuresCol), typeCandidates)
}
/**
* Validates and transforms the input schema.
* @param schema input schema
* @return output schema
*/
protected def validateAndTransformSchema(schema: StructType): StructType = {
validateSchema(schema)
SchemaUtils.validateVectorCompatibleColumn(schema, getFeaturesCol)
SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType)
}
}
Expand Down Expand Up @@ -160,12 +149,8 @@ class KMeansModel private[ml] (
// TODO: Replace the temp fix when we have proper evaluators defined for clustering.
@Since("2.0.0")
def computeCost(dataset: Dataset[_]): Double = {
validateSchema(dataset.schema)

val data: RDD[OldVector] = dataset.select(DatasetUtils.columnToVector(dataset, getFeaturesCol))
.rdd.map {
case Row(point: Vector) => OldVectors.fromML(point)
}
SchemaUtils.validateVectorCompatibleColumn(dataset.schema, getFeaturesCol)
val data = DatasetUtils.columnToOldVector(dataset, getFeaturesCol)
parentModel.computeCost(data)
}

Expand Down Expand Up @@ -351,11 +336,7 @@ class KMeans @Since("1.5.0") (
transformSchema(dataset.schema, logging = true)

val handlePersistence = dataset.storageLevel == StorageLevel.NONE
val instances: RDD[OldVector] = dataset.select(
DatasetUtils.columnToVector(dataset, getFeaturesCol))
.rdd.map {
case Row(point: Vector) => OldVectors.fromML(point)
}
val instances = DatasetUtils.columnToOldVector(dataset, getFeaturesCol)

if (handlePersistence) {
instances.persist(StorageLevel.MEMORY_AND_DISK)
Expand Down
9 changes: 5 additions & 4 deletions mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions.{col, monotonically_increasing_id, udf}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, StructType}
import org.apache.spark.util.PeriodicCheckpointer
import org.apache.spark.util.VersionUtils

Expand Down Expand Up @@ -345,7 +345,7 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM
s" must be >= 1. Found value: $getTopicConcentration")
}
}
SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT)
SchemaUtils.validateVectorCompatibleColumn(schema, getFeaturesCol)
SchemaUtils.appendColumn(schema, $(topicDistributionCol), new VectorUDT)
}

Expand Down Expand Up @@ -461,7 +461,8 @@ abstract class LDAModel private[ml] (
val transformer = oldLocalModel.getTopicDistributionMethod

val t = udf { (v: Vector) => transformer(OldVectors.fromML(v)).asML }
dataset.withColumn($(topicDistributionCol), t(col($(featuresCol)))).toDF()
dataset.withColumn($(topicDistributionCol),
t(DatasetUtils.columnToVector(dataset, getFeaturesCol))).toDF()
} else {
logWarning("LDAModel.transform was called without any output columns. Set an output column" +
" such as topicDistributionCol to produce results.")
Expand Down Expand Up @@ -938,7 +939,7 @@ object LDA extends MLReadable[LDA] {
featuresCol: String): RDD[(Long, OldVector)] = {
dataset
.withColumn("docId", monotonically_increasing_id())
.select("docId", featuresCol)
.select(col("docId"), DatasetUtils.columnToVector(dataset, featuresCol))
.rdd
.map { case Row(docId: Long, features: Vector) =>
(docId, OldVectors.fromML(features))
Expand Down
13 changes: 11 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.spark.ml.util

import org.apache.spark.ml.linalg.{Vectors, VectorUDT}
import org.apache.spark.sql.{Column, Dataset}
import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, Dataset, Row}
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType}

Expand Down Expand Up @@ -60,4 +62,11 @@ private[spark] object DatasetUtils {
throw new IllegalArgumentException(s"$other column cannot be cast to Vector")
}
}

def columnToOldVector(dataset: Dataset[_], colName: String): RDD[OldVector] = {
dataset.select(columnToVector(dataset, colName))
.rdd.map {
case Row(point: Vector) => OldVectors.fromML(point)
}
}
}
16 changes: 15 additions & 1 deletion mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.ml.util

import org.apache.spark.sql.types.{DataType, NumericType, StructField, StructType}
import org.apache.spark.ml.linalg.VectorUDT
import org.apache.spark.sql.types._


/**
Expand Down Expand Up @@ -101,4 +102,17 @@ private[spark] object SchemaUtils {
require(!schema.fieldNames.contains(col.name), s"Column ${col.name} already exists.")
StructType(schema.fields :+ col)
}

/**
* Check whether the given column in the schema is one of the supporting vector type: Vector,
* Array[Float]. Array[Double]
* @param schema input schema
* @param colName column name
*/
def validateVectorCompatibleColumn(schema: StructType, colName: String): Unit = {
val typeCandidates = List( new VectorUDT,
new ArrayType(DoubleType, false),
new ArrayType(FloatType, false))
checkColumnTypes(schema, colName, typeCandidates)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package org.apache.spark.ml.clustering

import scala.language.existentials

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.clustering.DistanceMeasure
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.{DataFrame, Dataset}

class BisectingKMeansSuite
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
Expand Down Expand Up @@ -182,6 +185,22 @@ class BisectingKMeansSuite

model.clusterCenters.forall(Vectors.norm(_, 2) == 1.0)
}

test("BisectingKMeans with Array input") {
def trainAndComputeCost(dataset: Dataset[_]): Double = {
val model = new BisectingKMeans().setK(k).setMaxIter(1).setSeed(1).fit(dataset)
model.computeCost(dataset)
}

val (newDataset, newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset)
val trueCost = trainAndComputeCost(newDataset)
val doubleArrayCost = trainAndComputeCost(newDatasetD)
val floatArrayCost = trainAndComputeCost(newDatasetF)

// checking the cost is fine enough as a sanity check
assert(trueCost ~== doubleArrayCost absTol 1e-6)
assert(trueCost ~== floatArrayCost absTol 1e-6)
}
}

object BisectingKMeansSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

package org.apache.spark.ml.clustering

import scala.language.existentials

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{DenseMatrix, Matrices, Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.stat.distribution.MultivariateGaussian
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.{Dataset, Row}

import org.apache.spark.sql.{DataFrame, Dataset, Row}

class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext
with DefaultReadWriteTest {
Expand Down Expand Up @@ -256,6 +257,22 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext
val expectedMatrix = GaussianMixture.unpackUpperTriangularMatrix(4, triangularValues)
assert(symmetricMatrix === expectedMatrix)
}

test("GaussianMixture with Array input") {
def trainAndComputlogLikelihood(dataset: Dataset[_]): Double = {
val model = new GaussianMixture().setK(k).setMaxIter(1).setSeed(1).fit(dataset)
model.summary.logLikelihood
}

val (newDataset, newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset)
val trueLikelihood = trainAndComputlogLikelihood(newDataset)
val doubleLikelihood = trainAndComputlogLikelihood(newDatasetD)
val floatLikelihood = trainAndComputlogLikelihood(newDatasetF)

// checking the cost is fine enough as a sanity check
assert(trueLikelihood ~== doubleLikelihood absTol 1e-6)
assert(trueLikelihood ~== floatLikelihood absTol 1e-6)
}
}

object GaussianMixtureSuite extends SparkFunSuite {
Expand Down
Loading