Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel execution of scalacheck test cases for streaming properties #19

Open
juanrh opened this issue Aug 25, 2015 · 2 comments
Open

Comments

@juanrh
Copy link
Owner

juanrh commented Aug 25, 2015

DStreamProp.forAllAlways executes the test cases sequentially, which is ok for local tests and fine tuned batch interval. Adapting the ideas of StreamingContextDirectReceiverTest the tests maybe could be executed in parallel. But instead of using a single inputDStream that multiplexes the test cases by using pairs, which leads to not being able to use DStream transformers as test subjects, the idea is using a Vector (or other fast immutable indexed Seq) of DStreams. We need access to the number of workers as an implicit value wrapping Int (like Parallelism), and we give each worker an index in that DStream with something like:

 val counter2 = new AtomicInteger(0)
  val id = new ThreadLocal[Int] {
    override def initialValue() : Int = counter2.getAndIncrement()
  }                                               
  for (_ <- 1 to 4) {
     new Thread(new Runnable () { def run() {
         println(s"I got id ${id.get}, said ${Thread.currentThread()}")
         Thread.sleep(500)
         println(s"I told you I got id ${id.get}, said ${Thread.currentThread()}")
       }
     }).start()                                  
  }
  Thread.sleep(2000) 

This way each worker generates data for a different DStream, hopefully eliminating thread safety problems, and the action checking the result just iterates on the Vector of DStreams, in a way similar to what it is done in StreamingContextDirectReceiverTest

@juanrh
Copy link
Owner Author

juanrh commented Aug 26, 2015

The Vector of DStreams cannot be replaced by a TheadLocal variable because the driver thread needs to access each of the DStreams in the vector

@juanrh
Copy link
Owner Author

juanrh commented Aug 26, 2015

If we are already specifying the number of workers outside the prop, we could think of that not as a number of workers but as number of multiplexed test cases, and work in a single thread as in actorSendingPropMultiplex from https://github.com/juanrh/sscheck/blob/streamingDataSendExperiments/src/test/scala/es/ucm/fdi/sscheck/spark/streaming/StreamingContextActorReceiverTest.scala. This way the program is simpler and only one thread has access to the vector of DStreams, and anyway if we want to parallelize then we create the threads or its pool and have more control over them. Some care must be taken to avoid blocking the driver thread
The downside of this is that this way we cannot use the shrinking mechanism of ScalaCheck, because we can fail for a test case during the execution of another, and we just change the message of the Specs2 Result so this is not noticed. So maybe it is better letting ScalaCheck hadnling the workers is a good idea anyway. Currently we don't use shrinking but it could be optionally set in the future, and it is an important feature

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant