diff --git a/docs/ml-guide.md b/docs/ml-guide.md index f6288e7c32d9..c510c30d16a2 100644 --- a/docs/ml-guide.md +++ b/docs/ml-guide.md @@ -122,6 +122,8 @@ There are no deprecations. * [SPARK-21027](https://issues.apache.org/jira/browse/SPARK-21027): We are now setting the default parallelism used in `OneVsRest` to be 1 (i.e. serial), in 2.2 and earlier version, the `OneVsRest` parallelism would be parallelism of the default threadpool in scala. +* [SPARK-19781](https://issues.apache.org/jira/browse/SPARK-19781): + `Bucketizer` now creates an additional bucket for NULL values as well as NaN. ## From 2.1 to 2.2 diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala index 8299a3e95d82..b5eeb1a7dca3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala @@ -53,7 +53,8 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String * Values at -inf, inf must be explicitly provided to cover all Double values; * otherwise, values outside the splits specified will be treated as errors. * - * See also [[handleInvalid]], which can optionally create an additional bucket for NaN values. + * See also [[handleInvalid]], which can optionally create an additional bucket for NaN/NULL + * values. * * @group param */ @@ -163,7 +164,7 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String val (filteredDataset, keepInvalid) = { if (getHandleInvalid == Bucketizer.SKIP_INVALID) { - // "skip" NaN option is set, will filter out NaN values in the dataset + // "skip" NaN/NULL option is set, will filter out NaN/NULL values in the dataset (dataset.na.drop(inputColumns).toDF(), false) } else { (dataset.toDF(), getHandleInvalid == Bucketizer.KEEP_INVALID) @@ -177,8 +178,10 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String } val bucketizers: Seq[UserDefinedFunction] = seqOfSplits.zipWithIndex.map { case (splits, idx) => - udf { (feature: Double) => - Bucketizer.binarySearchForBuckets(splits, feature, keepInvalid) + udf { (feature: java.lang.Double) => + Bucketizer.binarySearchForBuckets(splits, + if (feature == null) None else Option(feature), + keepInvalid) }.withName(s"bucketizer_$idx") } @@ -259,34 +262,34 @@ object Bucketizer extends DefaultParamsReadable[Bucketizer] { * Binary searching in several buckets to place each data point. * @param splits array of split points * @param feature data point - * @param keepInvalid NaN flag. - * Set "true" to make an extra bucket for NaN values; - * Set "false" to report an error for NaN values + * @param keepInvalid NaN/NULL flag. + * Set "true" to make an extra bucket for NaN/NULL values; + * Set "false" to report an error for NaN/NULL values * @return bucket for each data point * @throws SparkException if a feature is < splits.head or > splits.last */ private[feature] def binarySearchForBuckets( splits: Array[Double], - feature: Double, + feature: Option[Double], keepInvalid: Boolean): Double = { - if (feature.isNaN) { + if (feature.isEmpty || feature.get.isNaN) { if (keepInvalid) { splits.length - 1 } else { - throw new SparkException("Bucketizer encountered NaN value. To handle or skip NaNs," + - " try setting Bucketizer.handleInvalid.") + throw new SparkException("Bucketizer encountered NaN/NULL values. " + + "To handle or skip NaNs/NULLs, try setting Bucketizer.handleInvalid.") } - } else if (feature == splits.last) { + } else if (feature.get == splits.last) { splits.length - 2 } else { - val idx = ju.Arrays.binarySearch(splits, feature) + val idx = ju.Arrays.binarySearch(splits, feature.get) if (idx >= 0) { idx } else { val insertPos = -idx - 1 if (insertPos == 0 || insertPos == splits.length) { - throw new SparkException(s"Feature value $feature out of Bucketizer bounds" + + throw new SparkException(s"Feature value ${feature.get} out of Bucketizer bounds" + s" [${splits.head}, ${splits.last}]. Check your features, or loosen " + s"the lower/upper bound constraints.") } else { diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala index d9c97ae8067d..8943fd3ec57f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala @@ -132,6 +132,77 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa assert(bucketizer.transform(df).count() == 2) } + test("Bucket continuous features, with NULL data and non-NaN splits") { + val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity) + val validData: Array[java.lang.Double] = + Array(Double.box(-0.9), Double.box(-0.5), Double.box(-0.3), + Double.box(0.0), Double.box(0.2), Double.box(0.5), + Double.box(0.9), null, null, null) + val expectedBuckets = Array(0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0, 4.0) + val dataFrame: DataFrame = validData.zip(expectedBuckets).toSeq.toDF("feature", "expected") + + val bucketizer: Bucketizer = new Bucketizer() + .setInputCol("feature") + .setOutputCol("result") + .setSplits(splits) + + bucketizer.setHandleInvalid("keep") + bucketizer.transform(dataFrame).select("result", "expected").collect().foreach { + case Row(x: Double, y: Double) => + assert(x === y, + s"The feature value is not correct after bucketing. Expected $y but found $x") + } + + bucketizer.setHandleInvalid("skip") + val skipResults: Array[Double] = bucketizer.transform(dataFrame) + .select("result").as[Double].collect() + assert(skipResults.length === 7) + assert(skipResults.forall(_ !== 4.0)) + + bucketizer.setHandleInvalid("error") + withClue("Bucketizer should throw error when setHandleInvalid=error and given NULL values") { + intercept[SparkException] { + bucketizer.transform(dataFrame).collect() + } + } + } + + test("Bucket continuous features, with NULL and NaN data but non-NaN splits") { + val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity) + val validData: Array[java.lang.Double] = + Array(Double.box(-0.9), Double.box(-0.5), Double.box(-0.3), + Double.box(0.0), Double.box(0.2), Double.box(0.5), + Double.box(0.9), null, Double.box(Double.NaN), null) + val expectedBuckets = Array(0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0, 4.0) + val dataFrame: DataFrame = validData.zip(expectedBuckets).toSeq.toDF("feature", "expected") + + val bucketizer: Bucketizer = new Bucketizer() + .setInputCol("feature") + .setOutputCol("result") + .setSplits(splits) + + bucketizer.setHandleInvalid("keep") + bucketizer.transform(dataFrame).select("result", "expected").collect().foreach { + case Row(x: Double, y: Double) => + assert(x === y, + s"The feature value is not correct after bucketing. Expected $y but found $x") + } + + bucketizer.setHandleInvalid("skip") + val skipResults: Array[Double] = bucketizer.transform(dataFrame) + .select("result").as[Double].collect() + assert(skipResults.length === 7) + assert(skipResults.forall(_ !== 4.0)) + + bucketizer.setHandleInvalid("error") + withClue("Bucketizer should throw error when setHandleInvalid=error and given NaN or NULL " + + "values") { + intercept[SparkException] { + bucketizer.transform(dataFrame).collect() + } + } + } + test("Bucket continuous features, with NaN splits") { val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity, Double.NaN) withClue("Invalid NaN split was not caught during Bucketizer initialization") { @@ -162,7 +233,7 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa val splits: Array[Double] = Double.NegativeInfinity +: Array.fill(10)(Random.nextDouble()).sorted :+ Double.PositiveInfinity val bsResult = Vectors.dense(data.map(x => - Bucketizer.binarySearchForBuckets(splits, x, false))) + Bucketizer.binarySearchForBuckets(splits, Option(x), false))) val lsResult = Vectors.dense(data.map(x => BucketizerSuite.linearSearchForBuckets(splits, x))) assert(bsResult ~== lsResult absTol 1e-5) } @@ -430,7 +501,7 @@ private object BucketizerSuite extends SparkFunSuite { /** Check all values in splits, plus values between all splits. */ def checkBinarySearch(splits: Array[Double]): Unit = { def testFeature(feature: Double, expectedBucket: Double): Unit = { - assert(Bucketizer.binarySearchForBuckets(splits, feature, false) === expectedBucket, + assert(Bucketizer.binarySearchForBuckets(splits, Option(feature), false) === expectedBucket, s"Expected feature value $feature to be in bucket $expectedBucket with splits:" + s" ${splits.mkString(", ")}") }