From 32c43c2928868505a3af3a79db8e25a494a7c09a Mon Sep 17 00:00:00 2001 From: freeman Date: Mon, 18 Aug 2014 22:25:38 -0400 Subject: [PATCH 1/3] Added test for prediction - Test predictOnValues for accuracy on a test stream --- .../StreamingLinearRegressionSuite.scala | 69 +++++++++++++++++-- 1 file changed, 65 insertions(+), 4 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala index 45e25eecf508e..5817bcd78ed78 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -49,7 +49,7 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { } // Test if we can accurately learn Y = 10*X1 + 10*X2 on streaming data - test("streaming linear regression parameter accuracy") { + test("parameter accuracy") { val testDir = Files.createTempDir() val numBatches = 10 @@ -76,7 +76,6 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { ssc.stop(stopSparkContext=false) - System.clearProperty("spark.driver.port") Utils.deleteRecursively(testDir) // check accuracy of final parameter estimates @@ -91,7 +90,7 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { } // Test that parameter estimates improve when learning Y = 10*X1 on streaming data - test("streaming linear regression parameter convergence") { + test("parameter convergence") { val testDir = Files.createTempDir() val batchDuration = Milliseconds(2000) @@ -121,7 +120,6 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { ssc.stop(stopSparkContext=false) - System.clearProperty("spark.driver.port") Utils.deleteRecursively(testDir) val deltas = history.drop(1).zip(history.dropRight(1)) @@ -132,4 +130,67 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { } + // Test predictions on a stream + test("predictions") { + + val trainDir = Files.createTempDir() + val testDir = Files.createTempDir() + val batchDuration = Milliseconds(1000) + val numBatches = 10 + val nPoints = 100 + + val ssc = new StreamingContext(sc, batchDuration) + val data = ssc.textFileStream(trainDir.toString).map(LabeledPoint.parse) + val model = new StreamingLinearRegressionWithSGD() + .setInitialWeights(Vectors.dense(0.0, 0.0)) + .setStepSize(0.1) + .setNumIterations(50) + + model.trainOn(data) + + ssc.start() + + // write training data to a file stream + for (i <- 0 until numBatches) { + val samples = LinearDataGenerator.generateLinearInput( + 0.0, Array(10.0, 10.0), nPoints, 42 * (i + 1)) + val file = new File(trainDir, i.toString) + Files.write(samples.map(x => x.toString).mkString("\n"), file, Charset.forName("UTF-8")) + Thread.sleep(batchDuration.milliseconds) + } + + ssc.stop(stopSparkContext=false) + + Utils.deleteRecursively(trainDir) + + print(model.latestModel().weights.toArray.mkString(" ")) + print(model.latestModel().intercept) + + val ssc2 = new StreamingContext(sc, batchDuration) + val data2 = ssc2.textFileStream(testDir.toString).map(LabeledPoint.parse) + + val history = new ArrayBuffer[Double](numBatches) + val predictions = model.predictOnValues(data2.map(x => (x.label, x.features))) + val errors = predictions.map(x => math.abs(x._1 - x._2)) + errors.foreachRDD(rdd => history.append(rdd.reduce(_+_) / nPoints.toDouble)) + + ssc2.start() + + // write test data to a file stream + + // make a function + for (i <- 0 until numBatches) { + val samples = LinearDataGenerator.generateLinearInput( + 0.0, Array(10.0, 10.0), nPoints, 42 * (i + 1)) + val file = new File(testDir, i.toString) + Files.write(samples.map(x => x.toString).mkString("\n"), file, Charset.forName("UTF-8")) + Thread.sleep(batchDuration.milliseconds) + } + + println(history) + + ssc2.stop(stopSparkContext=false) + + } + } From 50eb0bfcf59b8fc3f8ab7b441eb3cbbf46299e1f Mon Sep 17 00:00:00 2001 From: freeman Date: Tue, 19 Aug 2014 12:53:46 -0400 Subject: [PATCH 2/3] Refactored tests to use streaming test tools - Made mllib depend on tests from streaming - Rewrote all streamingLR tests to use the setupStreams & runStreams functions --- mllib/pom.xml | 7 + .../StreamingLinearRegressionSuite.scala | 157 ++++++------------ .../spark/streaming/TestSuiteBase.scala | 2 +- 3 files changed, 63 insertions(+), 103 deletions(-) diff --git a/mllib/pom.xml b/mllib/pom.xml index fc1ecfbea708f..c7a1e2ae75c84 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -91,6 +91,13 @@ junit-interface test + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + test-jar + test + diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala index 5817bcd78ed78..7437f6ee9899e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -17,20 +17,19 @@ package org.apache.spark.mllib.regression -import java.io.File -import java.nio.charset.Charset - import scala.collection.mutable.ArrayBuffer -import com.google.common.io.Files import org.scalatest.FunSuite import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext} -import org.apache.spark.streaming.{Milliseconds, StreamingContext} -import org.apache.spark.util.Utils +import org.apache.spark.mllib.util.LinearDataGenerator +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.TestSuiteBase + +class StreamingLinearRegressionSuite extends FunSuite with TestSuiteBase { -class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { + // use longer wait time to ensure job completion + override def maxWaitTimeMillis = 20000 // Assert that two values are equal within tolerance epsilon def assertEqual(v1: Double, v2: Double, epsilon: Double) { @@ -51,32 +50,24 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { // Test if we can accurately learn Y = 10*X1 + 10*X2 on streaming data test("parameter accuracy") { - val testDir = Files.createTempDir() - val numBatches = 10 - val batchDuration = Milliseconds(1000) - val ssc = new StreamingContext(sc, batchDuration) - val data = ssc.textFileStream(testDir.toString).map(LabeledPoint.parse) + // create model val model = new StreamingLinearRegressionWithSGD() .setInitialWeights(Vectors.dense(0.0, 0.0)) .setStepSize(0.1) - .setNumIterations(50) - - model.trainOn(data) + .setNumIterations(25) - ssc.start() - - // write data to a file stream - for (i <- 0 until numBatches) { - val samples = LinearDataGenerator.generateLinearInput( - 0.0, Array(10.0, 10.0), 100, 42 * (i + 1)) - val file = new File(testDir, i.toString) - Files.write(samples.map(x => x.toString).mkString("\n"), file, Charset.forName("UTF-8")) - Thread.sleep(batchDuration.milliseconds) + // generate sequence of simulated data + val numBatches = 10 + val input = (0 until numBatches).map { i => + LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 42 * (i + 1)) } - ssc.stop(stopSparkContext=false) - - Utils.deleteRecursively(testDir) + // apply model training to input stream + val ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => { + model.trainOn(inputDStream) + inputDStream.count() + }) + runStreams(ssc, numBatches, numBatches) // check accuracy of final parameter estimates assertEqual(model.latestModel().intercept, 0.0, 0.1) @@ -92,36 +83,31 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { // Test that parameter estimates improve when learning Y = 10*X1 on streaming data test("parameter convergence") { - val testDir = Files.createTempDir() - val batchDuration = Milliseconds(2000) - val ssc = new StreamingContext(sc, batchDuration) - val numBatches = 5 - val data = ssc.textFileStream(testDir.toString()).map(LabeledPoint.parse) + // create model val model = new StreamingLinearRegressionWithSGD() .setInitialWeights(Vectors.dense(0.0)) .setStepSize(0.1) - .setNumIterations(50) - - model.trainOn(data) + .setNumIterations(25) - ssc.start() - - // write data to a file stream - val history = new ArrayBuffer[Double](numBatches) - for (i <- 0 until numBatches) { - val samples = LinearDataGenerator.generateLinearInput(0.0, Array(10.0), 100, 42 * (i + 1)) - val file = new File(testDir, i.toString) - Files.write(samples.map(x => x.toString).mkString("\n"), file, Charset.forName("UTF-8")) - Thread.sleep(batchDuration.milliseconds) - // wait an extra few seconds to make sure the update finishes before new data arrive - Thread.sleep(4000) - history.append(math.abs(model.latestModel().weights(0) - 10.0)) + // generate sequence of simulated data + val numBatches = 10 + val input = (0 until numBatches).map { i => + LinearDataGenerator.generateLinearInput(0.0, Array(10.0), 100, 42 * (i + 1)) } - ssc.stop(stopSparkContext=false) + // create buffer to store intermediate fits + val history = new ArrayBuffer[Double](numBatches) - Utils.deleteRecursively(testDir) + // apply model training to input stream, storing the intermediate results + // (we add a count to ensure the result is a DStream) + val ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => { + model.trainOn(inputDStream) + inputDStream.foreachRDD(x => history.append(math.abs(model.latestModel().weights(0) - 10.0))) + inputDStream.count() + }) + runStreams(ssc, numBatches, numBatches) + // compute change in error val deltas = history.drop(1).zip(history.dropRight(1)) // check error stability (it always either shrinks, or increases with small tol) assert(deltas.forall(x => (x._1 - x._2) <= 0.1)) @@ -133,63 +119,30 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { // Test predictions on a stream test("predictions") { - val trainDir = Files.createTempDir() - val testDir = Files.createTempDir() - val batchDuration = Milliseconds(1000) - val numBatches = 10 - val nPoints = 100 - - val ssc = new StreamingContext(sc, batchDuration) - val data = ssc.textFileStream(trainDir.toString).map(LabeledPoint.parse) + // create model initialized with true weights val model = new StreamingLinearRegressionWithSGD() - .setInitialWeights(Vectors.dense(0.0, 0.0)) + .setInitialWeights(Vectors.dense(10.0, 10.0)) .setStepSize(0.1) - .setNumIterations(50) + .setNumIterations(25) - model.trainOn(data) - - ssc.start() - - // write training data to a file stream - for (i <- 0 until numBatches) { - val samples = LinearDataGenerator.generateLinearInput( - 0.0, Array(10.0, 10.0), nPoints, 42 * (i + 1)) - val file = new File(trainDir, i.toString) - Files.write(samples.map(x => x.toString).mkString("\n"), file, Charset.forName("UTF-8")) - Thread.sleep(batchDuration.milliseconds) - } - - ssc.stop(stopSparkContext=false) - - Utils.deleteRecursively(trainDir) - - print(model.latestModel().weights.toArray.mkString(" ")) - print(model.latestModel().intercept) - - val ssc2 = new StreamingContext(sc, batchDuration) - val data2 = ssc2.textFileStream(testDir.toString).map(LabeledPoint.parse) - - val history = new ArrayBuffer[Double](numBatches) - val predictions = model.predictOnValues(data2.map(x => (x.label, x.features))) - val errors = predictions.map(x => math.abs(x._1 - x._2)) - errors.foreachRDD(rdd => history.append(rdd.reduce(_+_) / nPoints.toDouble)) - - ssc2.start() - - // write test data to a file stream - - // make a function - for (i <- 0 until numBatches) { - val samples = LinearDataGenerator.generateLinearInput( - 0.0, Array(10.0, 10.0), nPoints, 42 * (i + 1)) - val file = new File(testDir, i.toString) - Files.write(samples.map(x => x.toString).mkString("\n"), file, Charset.forName("UTF-8")) - Thread.sleep(batchDuration.milliseconds) + // generate sequence of simulated data for testing + val numBatches = 10 + val nPoints = 100 + val testInput = (0 until numBatches).map { i => + LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), nPoints, 42 * (i + 1)) } - println(history) - - ssc2.stop(stopSparkContext=false) + // apply model predictions to test stream + val ssc = setupStreams(testInput, (inputDStream: DStream[LabeledPoint]) => { + model.predictOnValues(inputDStream.map(x => (x.label, x.features))) + }) + // collect the output as (true, estimated) tuples + val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches) + + // compute the mean absolute error and check that it's always less than 0.1 + val errors = output.map(batch => batch.map( + p => math.abs(p._1 - p._2)).reduce(_+_) / nPoints.toDouble) + assert(errors.forall(x => x <= 0.1)) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index cc178fba12c9d..fab80dfb6a242 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -242,7 +242,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput) // Get the output buffer - val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]] + val outputStream = ssc.graph.getOutputStreams.filter(_.isInstanceOf[TestOutputStreamWithPartitions[_]]).head.asInstanceOf[TestOutputStreamWithPartitions[V]] val output = outputStream.output try { From e851ca745883d19bfd7b8d5479ab2124a9bd4278 Mon Sep 17 00:00:00 2001 From: freeman Date: Tue, 19 Aug 2014 14:07:17 -0400 Subject: [PATCH 3/3] Fixed long lines --- .../mllib/regression/StreamingLinearRegressionSuite.scala | 3 +-- .../test/scala/org/apache/spark/streaming/TestSuiteBase.scala | 4 +++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala index 7437f6ee9899e..28489410f8225 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -140,8 +140,7 @@ class StreamingLinearRegressionSuite extends FunSuite with TestSuiteBase { val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches) // compute the mean absolute error and check that it's always less than 0.1 - val errors = output.map(batch => batch.map( - p => math.abs(p._1 - p._2)).reduce(_+_) / nPoints.toDouble) + val errors = output.map(batch => batch.map(p => math.abs(p._1 - p._2)).sum / nPoints) assert(errors.forall(x => x <= 0.1)) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index fab80dfb6a242..f095da9cb55d3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -242,7 +242,9 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput) // Get the output buffer - val outputStream = ssc.graph.getOutputStreams.filter(_.isInstanceOf[TestOutputStreamWithPartitions[_]]).head.asInstanceOf[TestOutputStreamWithPartitions[V]] + val outputStream = ssc.graph.getOutputStreams. + filter(_.isInstanceOf[TestOutputStreamWithPartitions[_]]). + head.asInstanceOf[TestOutputStreamWithPartitions[V]] val output = outputStream.output try {