From a0a32efa47be7dc0a51b71790bbee07620bb7d28 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Wed, 27 Jul 2016 02:49:52 -0700 Subject: [PATCH 1/2] Fix GaussianMixture training failed due to feature column type mistake --- .../org/apache/spark/ml/clustering/BisectingKMeans.scala | 2 ++ .../org/apache/spark/ml/clustering/GaussianMixture.scala | 8 +++++--- .../scala/org/apache/spark/ml/clustering/KMeans.scala | 2 ++ .../scala/org/apache/spark/ml/feature/Interaction.scala | 1 + .../scala/org/apache/spark/ml/feature/MinMaxScaler.scala | 3 ++- .../org/apache/spark/ml/feature/QuantileDiscretizer.scala | 3 ++- .../main/scala/org/apache/spark/ml/feature/RFormula.scala | 1 + .../org/apache/spark/ml/feature/SQLTransformer.scala | 1 + .../spark/ml/regression/AFTSurvivalRegression.scala | 2 +- .../apache/spark/ml/regression/IsotonicRegression.scala | 3 ++- 10 files changed, 19 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala index afb1080b9b7d..a3811f881624 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala @@ -99,6 +99,7 @@ class BisectingKMeansModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { + transformSchema(dataset.schema) val predictUDF = udf((vector: Vector) => predict(vector)) dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol)))) } @@ -222,6 +223,7 @@ class BisectingKMeans @Since("2.0.0") ( @Since("2.0.0") override def fit(dataset: Dataset[_]): BisectingKMeansModel = { + transformSchema(dataset.schema) val rdd: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => OldVectors.fromML(point) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala index 81749055c761..2bb3741cace4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala @@ -30,7 +30,7 @@ import org.apache.spark.ml.stat.distribution.MultivariateGaussian import org.apache.spark.ml.util._ import org.apache.spark.mllib.clustering.{GaussianMixture => MLlibGM} import org.apache.spark.mllib.linalg.{Matrices => OldMatrices, Matrix => OldMatrix, - Vector => OldVector, Vectors => OldVectors, VectorUDT => OldVectorUDT} + Vector => OldVector, Vectors => OldVectors} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.functions.{col, udf} @@ -61,9 +61,9 @@ private[clustering] trait GaussianMixtureParams extends Params with HasMaxIter w * @return output schema */ protected def validateAndTransformSchema(schema: StructType): StructType = { - SchemaUtils.checkColumnType(schema, $(featuresCol), new OldVectorUDT) + SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT) SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType) - SchemaUtils.appendColumn(schema, $(probabilityCol), new OldVectorUDT) + SchemaUtils.appendColumn(schema, $(probabilityCol), new VectorUDT) } } @@ -95,6 +95,7 @@ class GaussianMixtureModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { + transformSchema(dataset.schema) val predUDF = udf((vector: Vector) => predict(vector)) val probUDF = udf((vector: Vector) => predictProbability(vector)) dataset.withColumn($(predictionCol), predUDF(col($(featuresCol)))) @@ -317,6 +318,7 @@ class GaussianMixture @Since("2.0.0") ( @Since("2.0.0") override def fit(dataset: Dataset[_]): GaussianMixtureModel = { + transformSchema(dataset.schema) val rdd: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => OldVectors.fromML(point) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index 9fb7d6a9a21a..f18720dd2cd9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -120,6 +120,7 @@ class KMeansModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { + transformSchema(dataset.schema) val predictUDF = udf((vector: Vector) => predict(vector)) dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol)))) } @@ -304,6 +305,7 @@ class KMeans @Since("1.5.0") ( @Since("2.0.0") override def fit(dataset: Dataset[_]): KMeansModel = { + transformSchema(dataset.schema) val rdd: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => OldVectors.fromML(point) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala index 7b11f86279b9..b007b6816eba 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala @@ -68,6 +68,7 @@ class Interaction @Since("1.6.0") (@Since("1.6.0") override val uid: String) ext @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { + transformSchema(dataset.schema) val inputFeatures = $(inputCols).map(c => dataset.schema(c)) val featureEncoders = getFeatureEncoders(inputFeatures) val featureAttrs = getFeatureAttrs(inputFeatures) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala index 9ed8d83324cf..d5db3394e2f1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala @@ -111,7 +111,7 @@ class MinMaxScaler @Since("1.5.0") (@Since("1.5.0") override val uid: String) @Since("2.0.0") override def fit(dataset: Dataset[_]): MinMaxScalerModel = { - transformSchema(dataset.schema, logging = true) + transformSchema(dataset.schema) val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => OldVectors.fromML(v) } @@ -170,6 +170,7 @@ class MinMaxScalerModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { + transformSchema(dataset.schema) val originalRange = (originalMax.asBreeze - originalMin.asBreeze).toArray val minArray = originalMin.toArray diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index 9a636bd8a5e4..f64f27478b4e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -97,7 +97,7 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui @Since("1.6.0") override def transformSchema(schema: StructType): StructType = { - SchemaUtils.checkColumnType(schema, $(inputCol), DoubleType) + SchemaUtils.checkNumericType(schema, $(inputCol)) val inputFields = schema.fields require(inputFields.forall(_.name != $(outputCol)), s"Output column ${$(outputCol)} already exists.") @@ -108,6 +108,7 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui @Since("2.0.0") override def fit(dataset: Dataset[_]): Bucketizer = { + transformSchema(dataset.schema) val splits = dataset.stat.approxQuantile($(inputCol), (0.0 to 1.0 by 1.0/$(numBuckets)).toArray, $(relativeError)) splits(0) = Double.NegativeInfinity diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala index c95dacfce8cf..fe2baa217801 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala @@ -112,6 +112,7 @@ class RFormula @Since("1.5.0") (@Since("1.5.0") override val uid: String) @Since("2.0.0") override def fit(dataset: Dataset[_]): RFormulaModel = { + transformSchema(dataset.schema) require(isDefined(formula), "Formula must be defined first.") val parsedFormula = RFormulaParser.parse($(formula)) val resolvedFormula = parsedFormula.resolve(dataset.schema) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/SQLTransformer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/SQLTransformer.scala index 289037640fd4..e6d09630d199 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/SQLTransformer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/SQLTransformer.scala @@ -63,6 +63,7 @@ class SQLTransformer @Since("1.6.0") (@Since("1.6.0") override val uid: String) @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { + transformSchema(dataset.schema) val tableName = Identifiable.randomUID(uid) dataset.createOrReplaceTempView(tableName) val realStatement = $(statement).replace(tableIdentifier, tableName) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index 2b9912657f51..e32fbcaa352e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -196,7 +196,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S @Since("2.0.0") override def fit(dataset: Dataset[_]): AFTSurvivalRegressionModel = { - validateAndTransformSchema(dataset.schema, fitting = true) + transformSchema(dataset.schema) val instances = extractAFTPoints(dataset) val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala index 35396446edc1..ed80641e8139 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala @@ -164,7 +164,7 @@ class IsotonicRegression @Since("1.5.0") (@Since("1.5.0") override val uid: Stri @Since("2.0.0") override def fit(dataset: Dataset[_]): IsotonicRegressionModel = { - validateAndTransformSchema(dataset.schema, fitting = true) + transformSchema(dataset.schema) // Extract columns from data. If dataset is persisted, do not persist oldDataset. val instances = extractWeightedLabeledPoints(dataset) val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE @@ -234,6 +234,7 @@ class IsotonicRegressionModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { + transformSchema(dataset.schema) val predict = dataset.schema($(featuresCol)).dataType match { case DoubleType => udf { feature: Double => oldModel.predict(feature) } From 0663ad9042fc8f348f174dd0f9a02c6e721e8b16 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Thu, 28 Jul 2016 06:35:49 -0700 Subject: [PATCH 2/2] Change to use transformSchema(dataset.schema, logging = true) --- .../org/apache/spark/ml/clustering/BisectingKMeans.scala | 4 ++-- .../org/apache/spark/ml/clustering/GaussianMixture.scala | 4 ++-- .../main/scala/org/apache/spark/ml/clustering/KMeans.scala | 4 ++-- .../main/scala/org/apache/spark/ml/feature/Interaction.scala | 2 +- .../main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala | 4 ++-- .../org/apache/spark/ml/feature/QuantileDiscretizer.scala | 2 +- .../src/main/scala/org/apache/spark/ml/feature/RFormula.scala | 2 +- .../scala/org/apache/spark/ml/feature/SQLTransformer.scala | 2 +- .../apache/spark/ml/regression/AFTSurvivalRegression.scala | 4 ++-- .../org/apache/spark/ml/regression/IsotonicRegression.scala | 4 ++-- 10 files changed, 16 insertions(+), 16 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala index a3811f881624..a97bd0fb16fd 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala @@ -99,7 +99,7 @@ class BisectingKMeansModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { - transformSchema(dataset.schema) + transformSchema(dataset.schema, logging = true) val predictUDF = udf((vector: Vector) => predict(vector)) dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol)))) } @@ -223,7 +223,7 @@ class BisectingKMeans @Since("2.0.0") ( @Since("2.0.0") override def fit(dataset: Dataset[_]): BisectingKMeansModel = { - transformSchema(dataset.schema) + transformSchema(dataset.schema, logging = true) val rdd: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => OldVectors.fromML(point) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala index 2bb3741cace4..69f060ad7711 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala @@ -95,7 +95,7 @@ class GaussianMixtureModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { - transformSchema(dataset.schema) + transformSchema(dataset.schema, logging = true) val predUDF = udf((vector: Vector) => predict(vector)) val probUDF = udf((vector: Vector) => predictProbability(vector)) dataset.withColumn($(predictionCol), predUDF(col($(featuresCol)))) @@ -318,7 +318,7 @@ class GaussianMixture @Since("2.0.0") ( @Since("2.0.0") override def fit(dataset: Dataset[_]): GaussianMixtureModel = { - transformSchema(dataset.schema) + transformSchema(dataset.schema, logging = true) val rdd: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => OldVectors.fromML(point) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index f18720dd2cd9..6c46be719674 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -120,7 +120,7 @@ class KMeansModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { - transformSchema(dataset.schema) + transformSchema(dataset.schema, logging = true) val predictUDF = udf((vector: Vector) => predict(vector)) dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol)))) } @@ -305,7 +305,7 @@ class KMeans @Since("1.5.0") ( @Since("2.0.0") override def fit(dataset: Dataset[_]): KMeansModel = { - transformSchema(dataset.schema) + transformSchema(dataset.schema, logging = true) val rdd: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => OldVectors.fromML(point) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala index b007b6816eba..96d0bdee9e2b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala @@ -68,7 +68,7 @@ class Interaction @Since("1.6.0") (@Since("1.6.0") override val uid: String) ext @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { - transformSchema(dataset.schema) + transformSchema(dataset.schema, logging = true) val inputFeatures = $(inputCols).map(c => dataset.schema(c)) val featureEncoders = getFeatureEncoders(inputFeatures) val featureAttrs = getFeatureAttrs(inputFeatures) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala index d5db3394e2f1..068f11a2a573 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala @@ -111,7 +111,7 @@ class MinMaxScaler @Since("1.5.0") (@Since("1.5.0") override val uid: String) @Since("2.0.0") override def fit(dataset: Dataset[_]): MinMaxScalerModel = { - transformSchema(dataset.schema) + transformSchema(dataset.schema, logging = true) val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => OldVectors.fromML(v) } @@ -170,7 +170,7 @@ class MinMaxScalerModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { - transformSchema(dataset.schema) + transformSchema(dataset.schema, logging = true) val originalRange = (originalMax.asBreeze - originalMin.asBreeze).toArray val minArray = originalMin.toArray diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index f64f27478b4e..558a7bbf0a2d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -108,7 +108,7 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui @Since("2.0.0") override def fit(dataset: Dataset[_]): Bucketizer = { - transformSchema(dataset.schema) + transformSchema(dataset.schema, logging = true) val splits = dataset.stat.approxQuantile($(inputCol), (0.0 to 1.0 by 1.0/$(numBuckets)).toArray, $(relativeError)) splits(0) = Double.NegativeInfinity diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala index fe2baa217801..2ee899bcca56 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala @@ -112,7 +112,7 @@ class RFormula @Since("1.5.0") (@Since("1.5.0") override val uid: String) @Since("2.0.0") override def fit(dataset: Dataset[_]): RFormulaModel = { - transformSchema(dataset.schema) + transformSchema(dataset.schema, logging = true) require(isDefined(formula), "Formula must be defined first.") val parsedFormula = RFormulaParser.parse($(formula)) val resolvedFormula = parsedFormula.resolve(dataset.schema) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/SQLTransformer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/SQLTransformer.scala index e6d09630d199..259be2679ce1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/SQLTransformer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/SQLTransformer.scala @@ -63,7 +63,7 @@ class SQLTransformer @Since("1.6.0") (@Since("1.6.0") override val uid: String) @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { - transformSchema(dataset.schema) + transformSchema(dataset.schema, logging = true) val tableName = Identifiable.randomUID(uid) dataset.createOrReplaceTempView(tableName) val realStatement = $(statement).replace(tableIdentifier, tableName) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index e32fbcaa352e..d4ae59defff8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -196,7 +196,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S @Since("2.0.0") override def fit(dataset: Dataset[_]): AFTSurvivalRegressionModel = { - transformSchema(dataset.schema) + transformSchema(dataset.schema, logging = true) val instances = extractAFTPoints(dataset) val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) @@ -326,7 +326,7 @@ class AFTSurvivalRegressionModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { - transformSchema(dataset.schema) + transformSchema(dataset.schema, logging = true) val predictUDF = udf { features: Vector => predict(features) } val predictQuantilesUDF = udf { features: Vector => predictQuantiles(features)} if (hasQuantilesCol) { diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala index ed80641e8139..cd7b4f2a9c56 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala @@ -164,7 +164,7 @@ class IsotonicRegression @Since("1.5.0") (@Since("1.5.0") override val uid: Stri @Since("2.0.0") override def fit(dataset: Dataset[_]): IsotonicRegressionModel = { - transformSchema(dataset.schema) + transformSchema(dataset.schema, logging = true) // Extract columns from data. If dataset is persisted, do not persist oldDataset. val instances = extractWeightedLabeledPoints(dataset) val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE @@ -234,7 +234,7 @@ class IsotonicRegressionModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { - transformSchema(dataset.schema) + transformSchema(dataset.schema, logging = true) val predict = dataset.schema($(featuresCol)).dataType match { case DoubleType => udf { feature: Double => oldModel.predict(feature) }