From d540be6bb051a33d2f6bd69a49fbe11afe9f0a65 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 20 Feb 2018 15:34:16 -0800 Subject: [PATCH 1/3] just use synchronization --- .../spark/sql/streaming/StreamTest.scala | 60 +++++++++++++------ 1 file changed, 42 insertions(+), 18 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 37fe595529ba..38df23b6f054 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -110,6 +110,10 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be * offset of added data. */ def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) + + def addAllData(query: Option[StreamExecution]): Seq[(BaseStreamingSource, Offset)] = { + Seq(addData(query)) + } } /** A trait that can be extended when testing a source. */ @@ -429,7 +433,25 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be val defaultCheckpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath try { - startedTest.foreach { action => + val actionIterator = startedTest.iterator.buffered + while (actionIterator.hasNext) { + // Synchronize sequential addDataMemory actions. + val addDataMemoryActions = ArrayBuffer[AddDataMemory[_]]() + while (actionIterator.hasNext && actionIterator.head.isInstanceOf[AddDataMemory[_]]) { + addDataMemoryActions.append(actionIterator.next().asInstanceOf[AddDataMemory[_]]) + } + if (addDataMemoryActions.nonEmpty) { + val synchronizeAll = addDataMemoryActions + .map(t => t.source.synchronized[Unit] _) + .reduce(_.compose(_)) + synchronizeAll { + addDataMemoryActions.foreach(handleAction) + } + } else { + handleAction(actionIterator.next()) + } + } + def handleAction: StreamAction => Unit = { action => logInfo(s"Processing test stream action: $action") action match { case StartStream(trigger, triggerClock, additionalConfs, checkpointLocation) => @@ -599,22 +621,23 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be } // Add data val queryToUse = Option(currentStream).orElse(Option(lastStream)) - val (source, offset) = a.addData(queryToUse) - - def findSourceIndex(plan: LogicalPlan): Option[Int] = { - plan - .collect { - case StreamingExecutionRelation(s, _) => s - case DataSourceV2Relation(_, r) => r - } - .zipWithIndex - .find(_._1 == source) - .map(_._2) - } + a.addAllData(queryToUse).foreach { tuple => + val (source, offset) = tuple + + def findSourceIndex(plan: LogicalPlan): Option[Int] = { + plan + .collect { + case StreamingExecutionRelation(s, _) => s + case DataSourceV2Relation(_, r) => r + } + .zipWithIndex + .find(_._1 == source) + .map(_._2) + } - // Try to find the index of the source to which data was added. Either get the index - // from the current active query or the original input logical plan. - val sourceIndex = + // Try to find the index of the source to which data was added. Either get the index + // from the current active query or the original input logical plan. + val sourceIndex = queryToUse.flatMap { query => findSourceIndex(query.logicalPlan) }.orElse { @@ -628,8 +651,9 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be "Could not find index of the source to which data was added") } - // Store the expected offset of added data to wait for it later - awaiting.put(sourceIndex, offset) + // Store the expected offset of added data to wait for it later + awaiting.put(sourceIndex, offset) + } } catch { case NonFatal(e) => failTest("Error adding data", e) From dce075f53c8a1418dac99c9b7b7f9b7e79ed17ff Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 20 Feb 2018 15:45:40 -0800 Subject: [PATCH 2/3] fix merge --- .../spark/sql/streaming/StreamTest.scala | 37 ++++++++----------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index d5b0a73b2a8c..72279733024d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -110,10 +110,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be * offset of added data. */ def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) - - def addAllData(query: Option[StreamExecution]): Seq[(BaseStreamingSource, Offset)] = { - Seq(addData(query)) - } } /** A trait that can be extended when testing a source. */ @@ -634,25 +630,24 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be .map(_._2) } - // Try to find the index of the source to which data was added. Either get the index - // from the current active query or the original input logical plan. - val sourceIndex = - queryToUse.flatMap { query => - findSourceIndex(query.logicalPlan) - }.orElse { - findSourceIndex(stream.logicalPlan) - }.orElse { - queryToUse.flatMap { q => - findSourceIndex(q.lastExecution.logical) - } - }.getOrElse { - throw new IllegalArgumentException( - "Could not find index of the source to which data was added") + // Try to find the index of the source to which data was added. Either get the index + // from the current active query or the original input logical plan. + val sourceIndex = + queryToUse.flatMap { query => + findSourceIndex(query.logicalPlan) + }.orElse { + findSourceIndex(stream.logicalPlan) + }.orElse { + queryToUse.flatMap { q => + findSourceIndex(q.lastExecution.logical) } - - // Store the expected offset of added data to wait for it later - awaiting.put(sourceIndex, offset) + }.getOrElse { + throw new IllegalArgumentException( + "Could not find index of the source to which data was added") } + + // Store the expected offset of added data to wait for it later + awaiting.put(sourceIndex, offset) } catch { case NonFatal(e) => failTest("Error adding data", e) From 1df90e796e9388d7992bc55f9f87bfd71af2f7f9 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 20 Feb 2018 15:48:30 -0800 Subject: [PATCH 3/3] fix indent --- .../spark/sql/streaming/StreamTest.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 72279733024d..8b9fc9f09686 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -633,18 +633,18 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be // Try to find the index of the source to which data was added. Either get the index // from the current active query or the original input logical plan. val sourceIndex = - queryToUse.flatMap { query => - findSourceIndex(query.logicalPlan) - }.orElse { - findSourceIndex(stream.logicalPlan) - }.orElse { - queryToUse.flatMap { q => - findSourceIndex(q.lastExecution.logical) + queryToUse.flatMap { query => + findSourceIndex(query.logicalPlan) + }.orElse { + findSourceIndex(stream.logicalPlan) + }.orElse { + queryToUse.flatMap { q => + findSourceIndex(q.lastExecution.logical) + } + }.getOrElse { + throw new IllegalArgumentException( + "Could not find index of the source to which data was added") } - }.getOrElse { - throw new IllegalArgumentException( - "Could not find index of the source to which data was added") - } // Store the expected offset of added data to wait for it later awaiting.put(sourceIndex, offset)