Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ trait FutureAction[T] extends Future[T] {
*/
override def value: Option[Try[T]]

// These two methods must be implemented in Scala 2.12, but won't be used by Spark
// These two methods must be implemented in Scala 2.12. They're implemented as a no-op here
// and then filled in with a real implementation in the two subclasses below. The no-op exists
// here so that those implementations can declare "override", necessary in 2.12, while working
// in 2.11, where the method doesn't exist in the superclass.
// After 2.11 support goes away, remove these two:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case it wasn't clear, this change ought to have no effect at all in 2.11


def transform[S](f: (Try[T]) => Try[S])(implicit executor: ExecutionContext): Future[S] =
throw new UnsupportedOperationException()
Expand All @@ -113,6 +117,42 @@ trait FutureAction[T] extends Future[T] {

}

/**
* Scala 2.12 defines the two new transform/transformWith methods mentioned above. Impementing
* these for 2.12 in the Spark class here requires delegating to these same methods in an
* underlying Future object. But that only exists in 2.12. But these methods are only called
* in 2.12. So define helper shims to access these methods on a Future by reflection.
*/
private[spark] object FutureAction {

private val transformTryMethod =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having to access this by reflection is a little annoying but keeping 2.11 + 2.12 compatibility in one source file is nice, and doesn't entail much of any runtime overhead

try {
classOf[Future[_]].getMethod("transform", classOf[(_) => _], classOf[ExecutionContext])
} catch {
case _: NoSuchMethodException => null // Would fail later in 2.11, but not called in 2.11
}

private val transformWithTryMethod =
try {
classOf[Future[_]].getMethod("transformWith", classOf[(_) => _], classOf[ExecutionContext])
} catch {
case _: NoSuchMethodException => null // Would fail later in 2.11, but not called in 2.11
}

private[spark] def transform[T, S](
future: Future[T],
f: (Try[T]) => Try[S],
executor: ExecutionContext): Future[S] =
transformTryMethod.invoke(future, f, executor).asInstanceOf[Future[S]]

private[spark] def transformWith[T, S](
future: Future[T],
f: (Try[T]) => Future[S],
executor: ExecutionContext): Future[S] =
transformWithTryMethod.invoke(future, f, executor).asInstanceOf[Future[S]]

}


/**
* A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
Expand Down Expand Up @@ -153,6 +193,18 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
jobWaiter.completionFuture.value.map {res => res.map(_ => resultFunc)}

def jobIds: Seq[Int] = Seq(jobWaiter.jobId)

override def transform[S](f: (Try[T]) => Try[S])(implicit e: ExecutionContext): Future[S] =
FutureAction.transform(
jobWaiter.completionFuture,
(u: Try[Unit]) => f(u.map(_ => resultFunc)),
e)

override def transformWith[S](f: (Try[T]) => Future[S])(implicit e: ExecutionContext): Future[S] =
FutureAction.transformWith(
jobWaiter.completionFuture,
(u: Try[Unit]) => f(u.map(_ => resultFunc)),
e)
}


Expand Down Expand Up @@ -246,6 +298,11 @@ class ComplexFutureAction[T](run : JobSubmitter => Future[T])

def jobIds: Seq[Int] = subActions.flatMap(_.jobIds)

override def transform[S](f: (Try[T]) => Try[S])(implicit e: ExecutionContext): Future[S] =
FutureAction.transform(p.future, f, e)

override def transformWith[S](f: (Try[T]) => Future[S])(implicit e: ExecutionContext): Future[S] =
FutureAction.transformWith(p.future, f, e)
}


Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2692,7 +2692,7 @@
<profile>
<id>scala-2.12</id>
<properties>
<scala.version>2.12.3</scala.version>
<scala.version>2.12.4</scala.version>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This update picks up a fix for another scalac bug that also prevents the 2.12 build from working, but, has no effect on 2.11

<scala.binary.version>2.12</scala.binary.version>
</properties>
<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.execution.RDDScanExec
import org.apache.spark.sql.execution.streaming.{FlatMapGroupsWithStateExec, GroupStateImpl, MemoryStream}
import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreId, StateStoreMetrics, UnsafeRowPair}
import org.apache.spark.sql.streaming.FlatMapGroupsWithStateSuite.MemoryStateStore
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.types.{DataType, IntegerType}

Expand Down Expand Up @@ -1201,7 +1200,7 @@ object FlatMapGroupsWithStateSuite {
} catch {
case u: UnsupportedOperationException =>
return
case _ =>
case _: Throwable =>
throw new TestFailedException("Unexpected exception when trying to get watermark", 20)
}
throw new TestFailedException("Could get watermark when not expected", 20)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
assert(returnedValue === expectedReturnValue, "Returned value does not match expected")
}
}
AwaitTerminationTester.test(expectedBehavior, awaitTermFunc)
AwaitTerminationTester.test(expectedBehavior, () => awaitTermFunc())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rest here are just 2.12 warning cleanups by the by

true // If the control reached here, then everything worked as expected
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private[streaming] class FileBasedWriteAheadLog(
def readFile(file: String): Iterator[ByteBuffer] = {
logDebug(s"Creating log reader with $file")
val reader = new FileBasedWriteAheadLogReader(file, hadoopConf)
CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, reader.close _)
CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, () => reader.close())
}
if (!closeFileAfterWrite) {
logFilesToRead.iterator.map(readFile).flatten.asJava
Expand Down