diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 159dd0ecb590..8b9fc9f09686 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -429,7 +429,25 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be val defaultCheckpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath try { - startedTest.foreach { action => + val actionIterator = startedTest.iterator.buffered + while (actionIterator.hasNext) { + // Synchronize sequential addDataMemory actions. + val addDataMemoryActions = ArrayBuffer[AddDataMemory[_]]() + while (actionIterator.hasNext && actionIterator.head.isInstanceOf[AddDataMemory[_]]) { + addDataMemoryActions.append(actionIterator.next().asInstanceOf[AddDataMemory[_]]) + } + if (addDataMemoryActions.nonEmpty) { + val synchronizeAll = addDataMemoryActions + .map(t => t.source.synchronized[Unit] _) + .reduce(_.compose(_)) + synchronizeAll { + addDataMemoryActions.foreach(handleAction) + } + } else { + handleAction(actionIterator.next()) + } + } + def handleAction: StreamAction => Unit = { action => logInfo(s"Processing test stream action: $action") action match { case StartStream(trigger, triggerClock, additionalConfs, checkpointLocation) =>