From d58b8459495ae656751144bb19531d24ced8b7a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Garillot?= Date: Tue, 14 Jul 2015 13:55:29 +0200 Subject: [PATCH 1/6] [SPARK-8979][Streaming] Implements a PIDRateEstimator --- .../scheduler/rate/PIDRateEstimator.scala | 92 ++++++++++++++ .../rate/PIDRateEstimatorSuite.scala | 112 ++++++++++++++++++ 2 files changed, 204 insertions(+) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala new file mode 100644 index 0000000000000..ab1970e9aadcd --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.scheduler.rate + +import org.apache.spark.Logging + +/** + * Implements a proportional-integral-derivative (PID) controller which acts on + * the speed of ingestion of elements into Spark Streaming. + * + * @param batchDurationMillis the batch duration, in milliseconds + * @param proportional how much the correction should depend on the current + * error, + * @param integral how much the correction should depend on the accumulation + * of past errors, + * @param derivative how much the correction should depend on a prediction + * of future errors, based on current rate of change + */ +private[streaming] class PIDRateEstimator(batchIntervalMillis: Long, + proportional: Double = -1D, + integral: Double = -.2D, + derivative: Double = 0D) + extends RateEstimator with Logging { + + private var init: Boolean = true + private var latestTime : Long = -1L + private var latestSpeed : Double = -1D + private var latestError : Double = -1L + + if (batchIntervalMillis <= 0) logError("Specified batch interval ${batchIntervalMillis} " + + "in PIDRateEstimator is invalid.") + + def compute(time: Long, // in milliseconds + elements: Long, + processingDelay: Long, // in milliseconds + schedulingDelay: Long // in milliseconds + ): Option[Double] = { + + this.synchronized { + if (time > latestTime && processingDelay > 0 && batchIntervalMillis > 0) { + + // in seconds, should be close to batchDuration + val delaySinceUpdate = (time - latestTime).toDouble / 1000 + + // in elements/second + val processingSpeed = elements.toDouble / processingDelay * 1000 + + // in elements/second + val error = latestSpeed - processingSpeed + + // in elements/second + val sumError = schedulingDelay.toDouble * processingSpeed / batchIntervalMillis + + // in elements/(second ^ 2) + val dError = (error - latestError) / delaySinceUpdate + + val newSpeed = (latestSpeed + proportional * error + + integral * sumError + + derivative * dError) max 0D + latestTime = time + if (init) { + latestSpeed = processingSpeed + latestError = 0D + init = false + + None + } else { + latestSpeed = newSpeed + latestError = error + + Some(newSpeed) + } + } else None + } + } + +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala new file mode 100644 index 0000000000000..a1722b2ed6068 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.scheduler.rate + +import org.apache.spark.SparkFunSuite +import org.scalatest._ +import org.scalatest.Matchers +import org.scalatest.Inspectors._ + +import scala.util.Random + +class PIDRateEstimatorSuite extends SparkFunSuite with Matchers { + + test("first bound is None"){ + val p = new PIDRateEstimator(20, -1D, 0D, 0D) + p.compute(0, 10, 10, 0) should equal (None) + } + + test("second bound is rate"){ + val p = new PIDRateEstimator(20, -1D, 0D, 0D) + p.compute(0, 10, 10, 0) + // 1000 elements / s + p.compute(10, 10, 10, 0) should equal (Some(1000)) + } + + test("works even with no time between updates"){ + val p = new PIDRateEstimator(20, -1D, 0D, 0D) + p.compute(0, 10, 10, 0) + p.compute(10, 10, 10, 0) + p.compute(10, 10, 10, 0) should equal (None) + } + + test("works even with a zero batch interval"){ + val p = new PIDRateEstimator(0, -1D, 0D, 0D) + p.compute(0, 10, 10, 0) should equal (None) + p.compute(10, 10, 10, 0) should equal (None) + } + + test("bound is never negative"){ + val p = new PIDRateEstimator(20, -1D, -1D, 0D) + val times = List.tabulate(50)(x => x * 20) // every 20ms + val elements = List.fill(50)(0) // no processing + val proc = List.fill(50)(20) // 20ms of processing + val sched = List.fill(50)(100) // strictly positive accumulation + val res = for (i <- List.range(0, 50)) yield + p.compute(times(i), elements(i), proc(i), sched(i)) + res.head should equal (None) + res.tail should equal (List.fill(49)(Some(0D))) + } + + + test("with no accumulated or positive error, |I| > 0, follow the processing speed"){ + val p = new PIDRateEstimator(20, -1D, -1D, 0D) + val times = List.tabulate(50)(x => x * 20) // every 20ms + val elements = List.tabulate(50)(x => x * 20) // increasing + val proc = List.fill(50)(20) // 20ms of processing + val sched = List.fill(50)(0) + val res = for (i <- List.range(0, 50)) yield + p.compute(times(i), elements(i), proc(i), sched(i)) + res.head should equal (None) + res.tail should equal (List.tabulate(50)(x => Some(x * 1000D)).tail) + } + + test("with no accumulated but some positive error, |I| > 0, follow the processing speed"){ + val p = new PIDRateEstimator(20, -1D, -1D, 0D) + val times = List.tabulate(50)(x => x * 20) // every 20ms + val elements = List.tabulate(50)(x => (50-x) * 20) // decreasing + val proc = List.fill(50)(20) // 20ms of processing + val sched = List.fill(50)(0) + val res = for (i <- List.range(0, 50)) yield + p.compute(times(i), elements(i), proc(i), sched(i)) + res.head should equal (None) + res.tail should equal (List.tabulate(50)(x => Some((50-x) * 1000D)).tail) + } + + test("with some accumulated and some positive error, |I| > 0, stay below the processing speed"){ + val p = new PIDRateEstimator(20, -1D, -.01D, 0D) + val times = List.tabulate(50)(x => x * 20) // every 20ms + val rng = new Random() + val elements = List.tabulate(50)(x => rng.nextInt(1000)) + val procDelayMs = 20 + val proc = List.fill(50)(procDelayMs) // 20ms of processing + val sched = List.tabulate(50)(x => rng.nextInt(19)) // random wait + val speeds = elements map ((x) => x.toDouble / procDelayMs * 1000) + + val res = for (i <- List.range(0, 50)) yield + p.compute(times(i), elements(i), proc(i), sched(i)) + res.head should equal (None) + forAll (List.range(1, 50)){ (n) => + res(n) should not be None + if (res(n).get > 0 && sched(n) > 0) { + res(n).get should be < speeds(n) + } + } + } + +} From d0bdf7c02c0d9ac057076c476068a901c0eeabf5 Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Fri, 24 Jul 2015 19:02:26 +0200 Subject: [PATCH 2/6] Update to latest version of the code, various style and name improvements. --- .../dstream/ReceiverInputDStream.scala | 3 +- .../scheduler/rate/PIDRateEstimator.scala | 56 ++++--- .../scheduler/rate/RateEstimator.scala | 10 +- .../rate/PIDRateEstimatorSuite.scala | 141 ++++++++---------- 4 files changed, 106 insertions(+), 104 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 646a8c3530a62..5d4bb2080c016 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -46,7 +46,8 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont */ override protected[streaming] val rateController: Option[RateController] = { if (RateController.isBackPressureEnabled(ssc.conf)) { - RateEstimator.create(ssc.conf).map { new ReceiverRateController(id, _) } + RateEstimator.create(ssc.conf, ssc.graph.batchDuration) + .map { new ReceiverRateController(id, _) } } else { None } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala index ab1970e9aadcd..df8189b2fdb42 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala @@ -17,33 +17,42 @@ package org.apache.spark.streaming.scheduler.rate -import org.apache.spark.Logging - /** * Implements a proportional-integral-derivative (PID) controller which acts on - * the speed of ingestion of elements into Spark Streaming. + * the speed of ingestion of elements into Spark Streaming. A PID controller works + * by calculating an '''error''' between a measured output and a desired setpoint. In the + * case of Spark Streaming the error is the difference between the measured processing + * rate (number of elements/processing delay) and the previous rate. + * + * @see https://en.wikipedia.org/wiki/PID_controller * * @param batchDurationMillis the batch duration, in milliseconds * @param proportional how much the correction should depend on the current - * error, + * error. This term usually provides the bulk of correction. A value too large would + * make the controller overshoot the setpoint, while a small value would make the + * controller too insensitive. The default value is -1. * @param integral how much the correction should depend on the accumulation - * of past errors, + * of past errors. This term accelerates the movement towards the setpoint, but a large + * value may lead to overshooting. The default value is -0.2. * @param derivative how much the correction should depend on a prediction - * of future errors, based on current rate of change + * of future errors, based on current rate of change. This term is not used very often, + * as it impacts stability of the system. The default value is 0. */ -private[streaming] class PIDRateEstimator(batchIntervalMillis: Long, +private[streaming] class PIDRateEstimator( + batchIntervalMillis: Long, proportional: Double = -1D, integral: Double = -.2D, derivative: Double = 0D) - extends RateEstimator with Logging { + extends RateEstimator { - private var init: Boolean = true - private var latestTime : Long = -1L - private var latestSpeed : Double = -1D - private var latestError : Double = -1L + private var firstRun: Boolean = true + private var latestTime: Long = -1L + private var latestRate: Double = -1D + private var latestError: Double = -1L - if (batchIntervalMillis <= 0) logError("Specified batch interval ${batchIntervalMillis} " + - "in PIDRateEstimator is invalid.") + require( + batchIntervalMillis > 0, + s"Specified batch interval $batchIntervalMillis in PIDRateEstimator is invalid.") def compute(time: Long, // in milliseconds elements: Long, @@ -61,7 +70,7 @@ private[streaming] class PIDRateEstimator(batchIntervalMillis: Long, val processingSpeed = elements.toDouble / processingDelay * 1000 // in elements/second - val error = latestSpeed - processingSpeed + val error = latestRate - processingSpeed // in elements/second val sumError = schedulingDelay.toDouble * processingSpeed / batchIntervalMillis @@ -69,24 +78,23 @@ private[streaming] class PIDRateEstimator(batchIntervalMillis: Long, // in elements/(second ^ 2) val dError = (error - latestError) / delaySinceUpdate - val newSpeed = (latestSpeed + proportional * error + - integral * sumError + - derivative * dError) max 0D + val newRate = (latestRate + proportional * error + + integral * sumError + + derivative * dError).max(0.0) latestTime = time - if (init) { - latestSpeed = processingSpeed + if (firstRun) { + latestRate = processingSpeed latestError = 0D - init = false + firstRun = false None } else { - latestSpeed = newSpeed + latestRate = newRate latestError = error - Some(newSpeed) + Some(newRate) } } else None } } - } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala index a08685119e5d5..bb92d0a55c643 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming.scheduler.rate import org.apache.spark.SparkConf import org.apache.spark.SparkException +import org.apache.spark.streaming.Duration /** * A component that estimates the rate at wich an InputDStream should ingest @@ -52,8 +53,11 @@ object RateEstimator { * @throws IllegalArgumentException if there is a configured RateEstimator that doesn't match any * known estimators. */ - def create(conf: SparkConf): Option[RateEstimator] = - conf.getOption("spark.streaming.backpressure.rateEstimator").map { estimator => - throw new IllegalArgumentException(s"Unkown rate estimator: $estimator") + def create(conf: SparkConf, batchInterval: Duration): Option[RateEstimator] = + conf.getOption("spark.streaming.backpressure.rateEstimator").map { + case "pid" => + new PIDRateEstimator(batchInterval.milliseconds) + case estimator => + throw new IllegalArgumentException(s"Unkown rate estimator: $estimator") } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala index a1722b2ed6068..21678b3449cfb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala @@ -17,96 +17,85 @@ package org.apache.spark.streaming.scheduler.rate -import org.apache.spark.SparkFunSuite +import scala.util.Random + import org.scalatest._ import org.scalatest.Matchers import org.scalatest.Inspectors._ -import scala.util.Random +import org.apache.spark.SparkFunSuite class PIDRateEstimatorSuite extends SparkFunSuite with Matchers { - test("first bound is None"){ - val p = new PIDRateEstimator(20, -1D, 0D, 0D) - p.compute(0, 10, 10, 0) should equal (None) - } + test("first bound is None") { + val p = new PIDRateEstimator(20, -1D, 0D, 0D) + p.compute(0, 10, 10, 0) should equal(None) + } - test("second bound is rate"){ - val p = new PIDRateEstimator(20, -1D, 0D, 0D) - p.compute(0, 10, 10, 0) - // 1000 elements / s - p.compute(10, 10, 10, 0) should equal (Some(1000)) - } + test("second bound is rate") { + val p = new PIDRateEstimator(20, -1D, 0D, 0D) + p.compute(0, 10, 10, 0) + // 1000 elements / s + p.compute(10, 10, 10, 0) should equal(Some(1000)) + } - test("works even with no time between updates"){ - val p = new PIDRateEstimator(20, -1D, 0D, 0D) - p.compute(0, 10, 10, 0) - p.compute(10, 10, 10, 0) - p.compute(10, 10, 10, 0) should equal (None) - } + test("works even with no time between updates") { + val p = new PIDRateEstimator(20, -1D, 0D, 0D) + p.compute(0, 10, 10, 0) + p.compute(10, 10, 10, 0) + p.compute(10, 10, 10, 0) should equal(None) + } - test("works even with a zero batch interval"){ - val p = new PIDRateEstimator(0, -1D, 0D, 0D) - p.compute(0, 10, 10, 0) should equal (None) - p.compute(10, 10, 10, 0) should equal (None) - } + test("bound is never negative") { + val p = new PIDRateEstimator(20, -1D, -1D, 0D) + val times = List.tabulate(50)(x => x * 20) // every 20ms + val elements = List.fill(50)(0) // no processing + val proc = List.fill(50)(20) // 20ms of processing + val sched = List.fill(50)(100) // strictly positive accumulation + val res = for (i <- List.range(0, 50)) yield p.compute(times(i), elements(i), proc(i), sched(i)) + res.head should equal(None) + res.tail should equal(List.fill(49)(Some(0D))) + } - test("bound is never negative"){ - val p = new PIDRateEstimator(20, -1D, -1D, 0D) - val times = List.tabulate(50)(x => x * 20) // every 20ms - val elements = List.fill(50)(0) // no processing - val proc = List.fill(50)(20) // 20ms of processing - val sched = List.fill(50)(100) // strictly positive accumulation - val res = for (i <- List.range(0, 50)) yield - p.compute(times(i), elements(i), proc(i), sched(i)) - res.head should equal (None) - res.tail should equal (List.fill(49)(Some(0D))) - } + test("with no accumulated or positive error, |I| > 0, follow the processing speed") { + val p = new PIDRateEstimator(20, -1D, -1D, 0D) + val times = List.tabulate(50)(x => x * 20) // every 20ms + val elements = List.tabulate(50)(x => x * 20) // increasing + val proc = List.fill(50)(20) // 20ms of processing + val sched = List.fill(50)(0) + val res = for (i <- List.range(0, 50)) yield p.compute(times(i), elements(i), proc(i), sched(i)) + res.head should equal(None) + res.tail should equal(List.tabulate(50)(x => Some(x * 1000D)).tail) + } + test("with no accumulated but some positive error, |I| > 0, follow the processing speed") { + val p = new PIDRateEstimator(20, -1D, -1D, 0D) + val times = List.tabulate(50)(x => x * 20) // every 20ms + val elements = List.tabulate(50)(x => (50 - x) * 20) // decreasing + val proc = List.fill(50)(20) // 20ms of processing + val sched = List.fill(50)(0) + val res = for (i <- List.range(0, 50)) yield p.compute(times(i), elements(i), proc(i), sched(i)) + res.head should equal(None) + res.tail should equal(List.tabulate(50)(x => Some((50 - x) * 1000D)).tail) + } - test("with no accumulated or positive error, |I| > 0, follow the processing speed"){ - val p = new PIDRateEstimator(20, -1D, -1D, 0D) - val times = List.tabulate(50)(x => x * 20) // every 20ms - val elements = List.tabulate(50)(x => x * 20) // increasing - val proc = List.fill(50)(20) // 20ms of processing - val sched = List.fill(50)(0) - val res = for (i <- List.range(0, 50)) yield - p.compute(times(i), elements(i), proc(i), sched(i)) - res.head should equal (None) - res.tail should equal (List.tabulate(50)(x => Some(x * 1000D)).tail) - } + test("with some accumulated and some positive error, |I| > 0, stay below the processing speed") { + val p = new PIDRateEstimator(20, -1D, -.01D, 0D) + val times = List.tabulate(50)(x => x * 20) // every 20ms + val rng = new Random() + val elements = List.tabulate(50)(x => rng.nextInt(1000)) + val procDelayMs = 20 + val proc = List.fill(50)(procDelayMs) // 20ms of processing + val sched = List.tabulate(50)(x => rng.nextInt(19)) // random wait + val speeds = elements map ((x) => x.toDouble / procDelayMs * 1000) - test("with no accumulated but some positive error, |I| > 0, follow the processing speed"){ - val p = new PIDRateEstimator(20, -1D, -1D, 0D) - val times = List.tabulate(50)(x => x * 20) // every 20ms - val elements = List.tabulate(50)(x => (50-x) * 20) // decreasing - val proc = List.fill(50)(20) // 20ms of processing - val sched = List.fill(50)(0) - val res = for (i <- List.range(0, 50)) yield - p.compute(times(i), elements(i), proc(i), sched(i)) - res.head should equal (None) - res.tail should equal (List.tabulate(50)(x => Some((50-x) * 1000D)).tail) - } - - test("with some accumulated and some positive error, |I| > 0, stay below the processing speed"){ - val p = new PIDRateEstimator(20, -1D, -.01D, 0D) - val times = List.tabulate(50)(x => x * 20) // every 20ms - val rng = new Random() - val elements = List.tabulate(50)(x => rng.nextInt(1000)) - val procDelayMs = 20 - val proc = List.fill(50)(procDelayMs) // 20ms of processing - val sched = List.tabulate(50)(x => rng.nextInt(19)) // random wait - val speeds = elements map ((x) => x.toDouble / procDelayMs * 1000) - - val res = for (i <- List.range(0, 50)) yield - p.compute(times(i), elements(i), proc(i), sched(i)) - res.head should equal (None) - forAll (List.range(1, 50)){ (n) => - res(n) should not be None - if (res(n).get > 0 && sched(n) > 0) { - res(n).get should be < speeds(n) - } + val res = for (i <- List.range(0, 50)) yield p.compute(times(i), elements(i), proc(i), sched(i)) + res.head should equal(None) + forAll(List.range(1, 50)) { (n) => + res(n) should not be None + if (res(n).get > 0 && sched(n) > 0) { + res(n).get should be < speeds(n) } } - + } } From 26cfd78c339e58e71c138e424952002f13595389 Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Thu, 30 Jul 2015 13:37:51 +0200 Subject: [PATCH 3/6] A couple of variable renames. --- .../streaming/scheduler/rate/PIDRateEstimator.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala index df8189b2fdb42..3f7a85c9f94b5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala @@ -20,7 +20,7 @@ package org.apache.spark.streaming.scheduler.rate /** * Implements a proportional-integral-derivative (PID) controller which acts on * the speed of ingestion of elements into Spark Streaming. A PID controller works - * by calculating an '''error''' between a measured output and a desired setpoint. In the + * by calculating an '''error''' between a measured output and a desired value. In the * case of Spark Streaming the error is the difference between the measured processing * rate (number of elements/processing delay) and the previous rate. * @@ -58,7 +58,7 @@ private[streaming] class PIDRateEstimator( elements: Long, processingDelay: Long, // in milliseconds schedulingDelay: Long // in milliseconds - ): Option[Double] = { + ): Option[Double] = { this.synchronized { if (time > latestTime && processingDelay > 0 && batchIntervalMillis > 0) { @@ -67,13 +67,13 @@ private[streaming] class PIDRateEstimator( val delaySinceUpdate = (time - latestTime).toDouble / 1000 // in elements/second - val processingSpeed = elements.toDouble / processingDelay * 1000 + val processingRate = elements.toDouble / processingDelay * 1000 // in elements/second - val error = latestRate - processingSpeed + val error = latestRate - processingRate // in elements/second - val sumError = schedulingDelay.toDouble * processingSpeed / batchIntervalMillis + val sumError = schedulingDelay.toDouble * processingRate / batchIntervalMillis // in elements/(second ^ 2) val dError = (error - latestError) / delaySinceUpdate @@ -83,7 +83,7 @@ private[streaming] class PIDRateEstimator( derivative * dError).max(0.0) latestTime = time if (firstRun) { - latestRate = processingSpeed + latestRate = processingRate latestError = 0D firstRun = false From 7975b0c9703696653563d2b457b4ef071f30bfe9 Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Thu, 30 Jul 2015 15:14:52 +0200 Subject: [PATCH 4/6] Add configuration for PID. --- .../streaming/dstream/ReceiverInputDStream.scala | 3 +-- .../streaming/scheduler/rate/RateEstimator.scala | 14 ++++++++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 5d4bb2080c016..670ef8d296a0b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -46,8 +46,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont */ override protected[streaming] val rateController: Option[RateController] = { if (RateController.isBackPressureEnabled(ssc.conf)) { - RateEstimator.create(ssc.conf, ssc.graph.batchDuration) - .map { new ReceiverRateController(id, _) } + Some(new ReceiverRateController(id, RateEstimator.create(ssc.conf, ssc.graph.batchDuration))) } else { None } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala index bb92d0a55c643..d5c5931029f1b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala @@ -49,14 +49,20 @@ object RateEstimator { /** * Return a new RateEstimator based on the value of `spark.streaming.RateEstimator`. * - * @return None if there is no configured estimator, otherwise an instance of RateEstimator + * The only known estimator right now is `pid`. + * + * @return An instance of RateEstimator * @throws IllegalArgumentException if there is a configured RateEstimator that doesn't match any * known estimators. */ - def create(conf: SparkConf, batchInterval: Duration): Option[RateEstimator] = - conf.getOption("spark.streaming.backpressure.rateEstimator").map { + def create(conf: SparkConf, batchInterval: Duration): RateEstimator = + conf.get("spark.streaming.backpressure.rateEstimator", "pid") match { case "pid" => - new PIDRateEstimator(batchInterval.milliseconds) + val proportional = conf.getDouble("spark.streraming.backpressure.pid.proportional", -1.0) + val integral = conf.getDouble("spark.streraming.backpressure.pid.integral", -0.2) + val derived = conf.getDouble("spark.streraming.backpressure.pid.derived", 0.0) + new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived) + case estimator => throw new IllegalArgumentException(s"Unkown rate estimator: $estimator") } From 93b74f884ea17da65297e47ff9a20b53d93225d1 Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Thu, 30 Jul 2015 15:56:50 +0200 Subject: [PATCH 5/6] Better explanation of historicalError. --- .../scheduler/rate/PIDRateEstimator.scala | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala index 3f7a85c9f94b5..875cea821a845 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala @@ -72,14 +72,23 @@ private[streaming] class PIDRateEstimator( // in elements/second val error = latestRate - processingRate - // in elements/second - val sumError = schedulingDelay.toDouble * processingRate / batchIntervalMillis + // The error integral, based on schedulingDelay as an indicator for accumulated errors + // a scheduling delay s corresponds to s * processingRate overflowing elements. Those + // are elements that couldn't be processed in previous batches, leading to this delay. + // We assume the processingRate didn't change too much. + // from the number of overflowing elements we can calculate the rate at which they would be + // processed by dividing it by the batch interval. This rate is our "historical" error, + // or integral part, since if we subtracted this rate from the previous "calculated rate", + // there wouldn't have been any overflowing elements, and the scheduling delay would have + // been zero. + // (in elements/second) + val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis // in elements/(second ^ 2) val dError = (error - latestError) / delaySinceUpdate val newRate = (latestRate + proportional * error + - integral * sumError + + integral * historicalError + derivative * dError).max(0.0) latestTime = time if (firstRun) { From aa5b097e3d58f07feb830f0ff5456088064b7fff Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Fri, 31 Jul 2015 11:25:40 +0200 Subject: [PATCH 6/6] Add more comments, made all PID constant parameters positive, a couple more tests. --- .../scheduler/rate/PIDRateEstimator.scala | 49 ++++++++++------ .../scheduler/rate/RateEstimator.scala | 6 +- .../rate/PIDRateEstimatorSuite.scala | 56 +++++++++++++++---- 3 files changed, 81 insertions(+), 30 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala index 875cea821a845..6ae56a68ad88c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala @@ -28,20 +28,22 @@ package org.apache.spark.streaming.scheduler.rate * * @param batchDurationMillis the batch duration, in milliseconds * @param proportional how much the correction should depend on the current - * error. This term usually provides the bulk of correction. A value too large would - * make the controller overshoot the setpoint, while a small value would make the - * controller too insensitive. The default value is -1. + * error. This term usually provides the bulk of correction and should be positive or zero. + * A value too large would make the controller overshoot the setpoint, while a small value + * would make the controller too insensitive. The default value is 1. * @param integral how much the correction should depend on the accumulation - * of past errors. This term accelerates the movement towards the setpoint, but a large - * value may lead to overshooting. The default value is -0.2. + * of past errors. This value should be positive or 0. This term accelerates the movement + * towards the desired value, but a large value may lead to overshooting. The default value + * is 0.2. * @param derivative how much the correction should depend on a prediction - * of future errors, based on current rate of change. This term is not used very often, - * as it impacts stability of the system. The default value is 0. + * of future errors, based on current rate of change. This value should be positive or 0. + * This term is not used very often, as it impacts stability of the system. The default + * value is 0. */ private[streaming] class PIDRateEstimator( batchIntervalMillis: Long, - proportional: Double = -1D, - integral: Double = -.2D, + proportional: Double = 1D, + integral: Double = .2D, derivative: Double = 0D) extends RateEstimator { @@ -53,9 +55,19 @@ private[streaming] class PIDRateEstimator( require( batchIntervalMillis > 0, s"Specified batch interval $batchIntervalMillis in PIDRateEstimator is invalid.") + require( + proportional >= 0, + s"Proportional term $proportional in PIDRateEstimator should be >= 0.") + require( + integral >= 0, + s"Integral term $integral in PIDRateEstimator should be >= 0.") + require( + derivative >= 0, + s"Derivative term $derivative in PIDRateEstimator should be >= 0.") + def compute(time: Long, // in milliseconds - elements: Long, + numElements: Long, processingDelay: Long, // in milliseconds schedulingDelay: Long // in milliseconds ): Option[Double] = { @@ -67,16 +79,19 @@ private[streaming] class PIDRateEstimator( val delaySinceUpdate = (time - latestTime).toDouble / 1000 // in elements/second - val processingRate = elements.toDouble / processingDelay * 1000 + val processingRate = numElements.toDouble / processingDelay * 1000 + // In our system `error` is the difference between the desired rate and the measured rate + // based on the latest batch information. We consider the desired rate to be latest rate, + // which is what this estimator calculated for the previous batch. // in elements/second val error = latestRate - processingRate - // The error integral, based on schedulingDelay as an indicator for accumulated errors - // a scheduling delay s corresponds to s * processingRate overflowing elements. Those + // The error integral, based on schedulingDelay as an indicator for accumulated errors. + // A scheduling delay s corresponds to s * processingRate overflowing elements. Those // are elements that couldn't be processed in previous batches, leading to this delay. - // We assume the processingRate didn't change too much. - // from the number of overflowing elements we can calculate the rate at which they would be + // In the following, we assume the processingRate didn't change too much. + // From the number of overflowing elements we can calculate the rate at which they would be // processed by dividing it by the batch interval. This rate is our "historical" error, // or integral part, since if we subtracted this rate from the previous "calculated rate", // there wouldn't have been any overflowing elements, and the scheduling delay would have @@ -87,8 +102,8 @@ private[streaming] class PIDRateEstimator( // in elements/(second ^ 2) val dError = (error - latestError) / delaySinceUpdate - val newRate = (latestRate + proportional * error + - integral * historicalError + + val newRate = (latestRate - proportional * error - + integral * historicalError - derivative * dError).max(0.0) latestTime = time if (firstRun) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala index d5c5931029f1b..17ccebc1ed41b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala @@ -58,9 +58,9 @@ object RateEstimator { def create(conf: SparkConf, batchInterval: Duration): RateEstimator = conf.get("spark.streaming.backpressure.rateEstimator", "pid") match { case "pid" => - val proportional = conf.getDouble("spark.streraming.backpressure.pid.proportional", -1.0) - val integral = conf.getDouble("spark.streraming.backpressure.pid.integral", -0.2) - val derived = conf.getDouble("spark.streraming.backpressure.pid.derived", 0.0) + val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0) + val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2) + val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0) new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived) case estimator => diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala index 21678b3449cfb..97c32d8f2d59e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala @@ -19,35 +19,64 @@ package org.apache.spark.streaming.scheduler.rate import scala.util.Random -import org.scalatest._ +import org.scalatest.Inspectors.forAll import org.scalatest.Matchers -import org.scalatest.Inspectors._ -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.streaming.Seconds class PIDRateEstimatorSuite extends SparkFunSuite with Matchers { + test("the right estimator is created") { + val conf = new SparkConf + conf.set("spark.streaming.backpressure.rateEstimator", "pid") + val pid = RateEstimator.create(conf, Seconds(1)) + pid.getClass should equal(classOf[PIDRateEstimator]) + } + + test("estimator checks ranges") { + intercept[IllegalArgumentException] { + new PIDRateEstimator(0, 1, 2, 3) + } + intercept[IllegalArgumentException] { + new PIDRateEstimator(100, -1, 2, 3) + } + intercept[IllegalArgumentException] { + new PIDRateEstimator(100, 0, -1, 3) + } + intercept[IllegalArgumentException] { + new PIDRateEstimator(100, 0, 0, -1) + } + } + + private def createDefaultEstimator: PIDRateEstimator = { + new PIDRateEstimator(20, 1D, 0D, 0D) + } + test("first bound is None") { - val p = new PIDRateEstimator(20, -1D, 0D, 0D) + val p = createDefaultEstimator p.compute(0, 10, 10, 0) should equal(None) } test("second bound is rate") { - val p = new PIDRateEstimator(20, -1D, 0D, 0D) + val p = createDefaultEstimator p.compute(0, 10, 10, 0) // 1000 elements / s p.compute(10, 10, 10, 0) should equal(Some(1000)) } test("works even with no time between updates") { - val p = new PIDRateEstimator(20, -1D, 0D, 0D) + val p = createDefaultEstimator p.compute(0, 10, 10, 0) p.compute(10, 10, 10, 0) p.compute(10, 10, 10, 0) should equal(None) } test("bound is never negative") { - val p = new PIDRateEstimator(20, -1D, -1D, 0D) + val p = new PIDRateEstimator(20, 1D, 1D, 0D) + // prepare a series of batch updates, one every 20ms, 0 processed elements, 2ms of processing + // this might point the estimator to try and decrease the bound, but we test it never + // goes below zero, which would be nonsensical. val times = List.tabulate(50)(x => x * 20) // every 20ms val elements = List.fill(50)(0) // no processing val proc = List.fill(50)(20) // 20ms of processing @@ -58,7 +87,10 @@ class PIDRateEstimatorSuite extends SparkFunSuite with Matchers { } test("with no accumulated or positive error, |I| > 0, follow the processing speed") { - val p = new PIDRateEstimator(20, -1D, -1D, 0D) + val p = new PIDRateEstimator(20, 1D, 1D, 0D) + // prepare a series of batch updates, one every 20ms with an increasing number of processed + // elements in each batch, but constant processing time, and no accumulated error. Even though + // the integral part is non-zero, the estimated rate should follow only the proportional term val times = List.tabulate(50)(x => x * 20) // every 20ms val elements = List.tabulate(50)(x => x * 20) // increasing val proc = List.fill(50)(20) // 20ms of processing @@ -69,7 +101,11 @@ class PIDRateEstimatorSuite extends SparkFunSuite with Matchers { } test("with no accumulated but some positive error, |I| > 0, follow the processing speed") { - val p = new PIDRateEstimator(20, -1D, -1D, 0D) + val p = new PIDRateEstimator(20, 1D, 1D, 0D) + // prepare a series of batch updates, one every 20ms with an decreasing number of processed + // elements in each batch, but constant processing time, and no accumulated error. Even though + // the integral part is non-zero, the estimated rate should follow only the proportional term, + // asking for less and less elements val times = List.tabulate(50)(x => x * 20) // every 20ms val elements = List.tabulate(50)(x => (50 - x) * 20) // decreasing val proc = List.fill(50)(20) // 20ms of processing @@ -80,7 +116,7 @@ class PIDRateEstimatorSuite extends SparkFunSuite with Matchers { } test("with some accumulated and some positive error, |I| > 0, stay below the processing speed") { - val p = new PIDRateEstimator(20, -1D, -.01D, 0D) + val p = new PIDRateEstimator(20, 1D, .01D, 0D) val times = List.tabulate(50)(x => x * 20) // every 20ms val rng = new Random() val elements = List.tabulate(50)(x => rng.nextInt(1000))