Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions mllib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,19 @@

package org.apache.spark.mllib.regression

import java.io.File
import java.nio.charset.Charset

import scala.collection.mutable.ArrayBuffer

import com.google.common.io.Files
import org.scalatest.FunSuite

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.util.Utils
import org.apache.spark.mllib.util.LinearDataGenerator
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.TestSuiteBase

class StreamingLinearRegressionSuite extends FunSuite with TestSuiteBase {

class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext {
// use longer wait time to ensure job completion
override def maxWaitTimeMillis = 20000

// Assert that two values are equal within tolerance epsilon
def assertEqual(v1: Double, v2: Double, epsilon: Double) {
Expand All @@ -49,35 +48,26 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext {
}

// Test if we can accurately learn Y = 10*X1 + 10*X2 on streaming data
test("streaming linear regression parameter accuracy") {
test("parameter accuracy") {

val testDir = Files.createTempDir()
val numBatches = 10
val batchDuration = Milliseconds(1000)
val ssc = new StreamingContext(sc, batchDuration)
val data = ssc.textFileStream(testDir.toString).map(LabeledPoint.parse)
// create model
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.dense(0.0, 0.0))
.setStepSize(0.1)
.setNumIterations(50)
.setNumIterations(25)

model.trainOn(data)

ssc.start()

// write data to a file stream
for (i <- 0 until numBatches) {
val samples = LinearDataGenerator.generateLinearInput(
0.0, Array(10.0, 10.0), 100, 42 * (i + 1))
val file = new File(testDir, i.toString)
Files.write(samples.map(x => x.toString).mkString("\n"), file, Charset.forName("UTF-8"))
Thread.sleep(batchDuration.milliseconds)
// generate sequence of simulated data
val numBatches = 10
val input = (0 until numBatches).map { i =>
LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 42 * (i + 1))
}

ssc.stop(stopSparkContext=false)

System.clearProperty("spark.driver.port")
Utils.deleteRecursively(testDir)
// apply model training to input stream
val ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => {
model.trainOn(inputDStream)
inputDStream.count()
})
runStreams(ssc, numBatches, numBatches)

// check accuracy of final parameter estimates
assertEqual(model.latestModel().intercept, 0.0, 0.1)
Expand All @@ -91,39 +81,33 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext {
}

// Test that parameter estimates improve when learning Y = 10*X1 on streaming data
test("streaming linear regression parameter convergence") {
test("parameter convergence") {

val testDir = Files.createTempDir()
val batchDuration = Milliseconds(2000)
val ssc = new StreamingContext(sc, batchDuration)
val numBatches = 5
val data = ssc.textFileStream(testDir.toString()).map(LabeledPoint.parse)
// create model
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.dense(0.0))
.setStepSize(0.1)
.setNumIterations(50)

model.trainOn(data)

ssc.start()
.setNumIterations(25)

// write data to a file stream
val history = new ArrayBuffer[Double](numBatches)
for (i <- 0 until numBatches) {
val samples = LinearDataGenerator.generateLinearInput(0.0, Array(10.0), 100, 42 * (i + 1))
val file = new File(testDir, i.toString)
Files.write(samples.map(x => x.toString).mkString("\n"), file, Charset.forName("UTF-8"))
Thread.sleep(batchDuration.milliseconds)
// wait an extra few seconds to make sure the update finishes before new data arrive
Thread.sleep(4000)
history.append(math.abs(model.latestModel().weights(0) - 10.0))
// generate sequence of simulated data
val numBatches = 10
val input = (0 until numBatches).map { i =>
LinearDataGenerator.generateLinearInput(0.0, Array(10.0), 100, 42 * (i + 1))
}

ssc.stop(stopSparkContext=false)
// create buffer to store intermediate fits
val history = new ArrayBuffer[Double](numBatches)

System.clearProperty("spark.driver.port")
Utils.deleteRecursively(testDir)
// apply model training to input stream, storing the intermediate results
// (we add a count to ensure the result is a DStream)
val ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => {
model.trainOn(inputDStream)
inputDStream.foreachRDD(x => history.append(math.abs(model.latestModel().weights(0) - 10.0)))
inputDStream.count()
})
runStreams(ssc, numBatches, numBatches)

// compute change in error
val deltas = history.drop(1).zip(history.dropRight(1))
// check error stability (it always either shrinks, or increases with small tol)
assert(deltas.forall(x => (x._1 - x._2) <= 0.1))
Expand All @@ -132,4 +116,33 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra line.

}

// Test predictions on a stream
test("predictions") {

// create model initialized with true weights
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.dense(10.0, 10.0))
.setStepSize(0.1)
.setNumIterations(25)

// generate sequence of simulated data for testing
val numBatches = 10
val nPoints = 100
val testInput = (0 until numBatches).map { i =>
LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), nPoints, 42 * (i + 1))
}

// apply model predictions to test stream
val ssc = setupStreams(testInput, (inputDStream: DStream[LabeledPoint]) => {
model.predictOnValues(inputDStream.map(x => (x.label, x.features)))
})
// collect the output as (true, estimated) tuples
val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches)

// compute the mean absolute error and check that it's always less than 0.1
val errors = output.map(batch => batch.map(p => math.abs(p._1 - p._2)).sum / nPoints)
assert(errors.forall(x => x <= 0.1))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra line

}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra line

}
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,9 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput)

// Get the output buffer
val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
val outputStream = ssc.graph.getOutputStreams.
filter(_.isInstanceOf[TestOutputStreamWithPartitions[_]]).
head.asInstanceOf[TestOutputStreamWithPartitions[V]]
val output = outputStream.output

try {
Expand Down