Skip to content
Closed
2 changes: 2 additions & 0 deletions docs/ml-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ There are no deprecations.
* [SPARK-21027](https://issues.apache.org/jira/browse/SPARK-21027):
We are now setting the default parallelism used in `OneVsRest` to be 1 (i.e. serial), in 2.2 and earlier version,
the `OneVsRest` parallelism would be parallelism of the default threadpool in scala.
* [SPARK-19781](https://issues.apache.org/jira/browse/SPARK-19781):
`Bucketizer` now creates an additional bucket for NULL values as well as NaN.

## From 2.1 to 2.2

Expand Down
31 changes: 17 additions & 14 deletions mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String
* Values at -inf, inf must be explicitly provided to cover all Double values;
* otherwise, values outside the splits specified will be treated as errors.
*
* See also [[handleInvalid]], which can optionally create an additional bucket for NaN values.
* See also [[handleInvalid]], which can optionally create an additional bucket for NaN/NULL
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like a behavior change, we should add an item in migration guide of ML docs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya done.

* values.
*
* @group param
*/
Expand Down Expand Up @@ -163,7 +164,7 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String

val (filteredDataset, keepInvalid) = {
if (getHandleInvalid == Bucketizer.SKIP_INVALID) {
// "skip" NaN option is set, will filter out NaN values in the dataset
// "skip" NaN/NULL option is set, will filter out NaN/NULL values in the dataset
(dataset.na.drop(inputColumns).toDF(), false)
} else {
(dataset.toDF(), getHandleInvalid == Bucketizer.KEEP_INVALID)
Expand All @@ -177,8 +178,10 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String
}

val bucketizers: Seq[UserDefinedFunction] = seqOfSplits.zipWithIndex.map { case (splits, idx) =>
udf { (feature: Double) =>
Bucketizer.binarySearchForBuckets(splits, feature, keepInvalid)
udf { (feature: java.lang.Double) =>
Bucketizer.binarySearchForBuckets(splits,
if (feature == null) None else Option(feature),
keepInvalid)
}.withName(s"bucketizer_$idx")
}

Expand Down Expand Up @@ -259,34 +262,34 @@ object Bucketizer extends DefaultParamsReadable[Bucketizer] {
* Binary searching in several buckets to place each data point.
* @param splits array of split points
* @param feature data point
* @param keepInvalid NaN flag.
* Set "true" to make an extra bucket for NaN values;
* Set "false" to report an error for NaN values
* @param keepInvalid NaN/NULL flag.
* Set "true" to make an extra bucket for NaN/NULL values;
* Set "false" to report an error for NaN/NULL values
* @return bucket for each data point
* @throws SparkException if a feature is < splits.head or > splits.last
*/

private[feature] def binarySearchForBuckets(
splits: Array[Double],
feature: Double,
feature: Option[Double],
keepInvalid: Boolean): Double = {
if (feature.isNaN) {
if (feature.isEmpty || feature.get.isNaN) {
if (keepInvalid) {
splits.length - 1
} else {
throw new SparkException("Bucketizer encountered NaN value. To handle or skip NaNs," +
" try setting Bucketizer.handleInvalid.")
throw new SparkException("Bucketizer encountered NaN/NULL values. " +
"To handle or skip NaNs/NULLs, try setting Bucketizer.handleInvalid.")
}
} else if (feature == splits.last) {
} else if (feature.get == splits.last) {
splits.length - 2
} else {
val idx = ju.Arrays.binarySearch(splits, feature)
val idx = ju.Arrays.binarySearch(splits, feature.get)
if (idx >= 0) {
idx
} else {
val insertPos = -idx - 1
if (insertPos == 0 || insertPos == splits.length) {
throw new SparkException(s"Feature value $feature out of Bucketizer bounds" +
throw new SparkException(s"Feature value ${feature.get} out of Bucketizer bounds" +
s" [${splits.head}, ${splits.last}]. Check your features, or loosen " +
s"the lower/upper bound constraints.")
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,77 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
assert(bucketizer.transform(df).count() == 2)
}

test("Bucket continuous features, with NULL data and non-NaN splits") {
val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)
val validData: Array[java.lang.Double] =
Array(Double.box(-0.9), Double.box(-0.5), Double.box(-0.3),
Double.box(0.0), Double.box(0.2), Double.box(0.5),
Double.box(0.9), null, null, null)
val expectedBuckets = Array(0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0, 4.0)
val dataFrame: DataFrame = validData.zip(expectedBuckets).toSeq.toDF("feature", "expected")

val bucketizer: Bucketizer = new Bucketizer()
.setInputCol("feature")
.setOutputCol("result")
.setSplits(splits)

bucketizer.setHandleInvalid("keep")
bucketizer.transform(dataFrame).select("result", "expected").collect().foreach {
case Row(x: Double, y: Double) =>
assert(x === y,
s"The feature value is not correct after bucketing. Expected $y but found $x")
}

bucketizer.setHandleInvalid("skip")
val skipResults: Array[Double] = bucketizer.transform(dataFrame)
.select("result").as[Double].collect()
assert(skipResults.length === 7)
assert(skipResults.forall(_ !== 4.0))

bucketizer.setHandleInvalid("error")
withClue("Bucketizer should throw error when setHandleInvalid=error and given NULL values") {
intercept[SparkException] {
bucketizer.transform(dataFrame).collect()
}
}
}

test("Bucket continuous features, with NULL and NaN data but non-NaN splits") {
val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)
val validData: Array[java.lang.Double] =
Array(Double.box(-0.9), Double.box(-0.5), Double.box(-0.3),
Double.box(0.0), Double.box(0.2), Double.box(0.5),
Double.box(0.9), null, Double.box(Double.NaN), null)
val expectedBuckets = Array(0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0, 4.0)
val dataFrame: DataFrame = validData.zip(expectedBuckets).toSeq.toDF("feature", "expected")

val bucketizer: Bucketizer = new Bucketizer()
.setInputCol("feature")
.setOutputCol("result")
.setSplits(splits)

bucketizer.setHandleInvalid("keep")
bucketizer.transform(dataFrame).select("result", "expected").collect().foreach {
case Row(x: Double, y: Double) =>
assert(x === y,
s"The feature value is not correct after bucketing. Expected $y but found $x")
}

bucketizer.setHandleInvalid("skip")
val skipResults: Array[Double] = bucketizer.transform(dataFrame)
.select("result").as[Double].collect()
assert(skipResults.length === 7)
assert(skipResults.forall(_ !== 4.0))

bucketizer.setHandleInvalid("error")
withClue("Bucketizer should throw error when setHandleInvalid=error and given NaN or NULL " +
"values") {
intercept[SparkException] {
bucketizer.transform(dataFrame).collect()
}
}
}

test("Bucket continuous features, with NaN splits") {
val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity, Double.NaN)
withClue("Invalid NaN split was not caught during Bucketizer initialization") {
Expand Down Expand Up @@ -162,7 +233,7 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
val splits: Array[Double] = Double.NegativeInfinity +:
Array.fill(10)(Random.nextDouble()).sorted :+ Double.PositiveInfinity
val bsResult = Vectors.dense(data.map(x =>
Bucketizer.binarySearchForBuckets(splits, x, false)))
Bucketizer.binarySearchForBuckets(splits, Option(x), false)))
val lsResult = Vectors.dense(data.map(x => BucketizerSuite.linearSearchForBuckets(splits, x)))
assert(bsResult ~== lsResult absTol 1e-5)
}
Expand Down Expand Up @@ -430,7 +501,7 @@ private object BucketizerSuite extends SparkFunSuite {
/** Check all values in splits, plus values between all splits. */
def checkBinarySearch(splits: Array[Double]): Unit = {
def testFeature(feature: Double, expectedBucket: Double): Unit = {
assert(Bucketizer.binarySearchForBuckets(splits, feature, false) === expectedBucket,
assert(Bucketizer.binarySearchForBuckets(splits, Option(feature), false) === expectedBucket,
s"Expected feature value $feature to be in bucket $expectedBucket with splits:" +
s" ${splits.mkString(", ")}")
}
Expand Down