From 87dd537e591af257007f86a600e39c97b87cae64 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 2 Feb 2015 22:16:42 -0800 Subject: [PATCH 1/3] [SPARK-5549] Define TaskContext interface in Scala. So the interface documentation shows up in ScalaDoc. --- .../org/apache/spark/TaskContext.scala} | 102 +++++++++--------- .../org/apache/spark/TaskContextImpl.scala | 8 +- .../JavaTaskCompletionListenerImpl.java | 3 +- .../spark/JavaTaskContextCompileCheck.java | 41 +++++++ 4 files changed, 100 insertions(+), 54 deletions(-) rename core/src/main/{java/org/apache/spark/TaskContext.java => scala/org/apache/spark/TaskContext.scala} (50%) rename core/src/test/java/{org/apache/spark/util => test/org/apache/spark}/JavaTaskCompletionListenerImpl.java (93%) create mode 100644 core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java diff --git a/core/src/main/java/org/apache/spark/TaskContext.java b/core/src/main/scala/org/apache/spark/TaskContext.scala similarity index 50% rename from core/src/main/java/org/apache/spark/TaskContext.java rename to core/src/main/scala/org/apache/spark/TaskContext.scala index 095f9fb94fdf..db924c65e40f 100644 --- a/core/src/main/java/org/apache/spark/TaskContext.java +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -15,112 +15,116 @@ * limitations under the License. */ -package org.apache.spark; +package org.apache.spark -import java.io.Serializable; +import java.io.Serializable -import scala.Function0; -import scala.Function1; -import scala.Unit; +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.util.TaskCompletionListener -import org.apache.spark.annotation.DeveloperApi; -import org.apache.spark.executor.TaskMetrics; -import org.apache.spark.util.TaskCompletionListener; -/** - * Contextual information about a task which can be read or mutated during - * execution. To access the TaskContext for a running task use - * TaskContext.get(). - */ -public abstract class TaskContext implements Serializable { +object TaskContext { /** * Return the currently active TaskContext. This can be called inside of * user functions to access contextual information about running tasks. */ - public static TaskContext get() { - return taskContext.get(); - } + def get(): TaskContext = taskContext.get + + private val taskContext: ThreadLocal[TaskContext] = new ThreadLocal[TaskContext] + + private[spark] def setTaskContext(tc: TaskContext): Unit = taskContext.set(tc) + + private[spark] def unset(): Unit = taskContext.remove() +} - private static ThreadLocal taskContext = - new ThreadLocal(); - static void setTaskContext(TaskContext tc) { - taskContext.set(tc); - } +/** + * Contextual information about a task which can be read or mutated during + * execution. To access the TaskContext for a running task, use: + * {{{ + * org.apache.spark.TaskContext.get() + * }}} + */ +abstract class TaskContext extends Serializable { + // Note: TaskContext must NOT define a get method. Otherwise it will prevent the Scala compiler + // from generating a static get method (based on the companion object's get method). - static void unset() { - taskContext.remove(); - } + // Note: getters in this class are defined with parentheses to maintain backward compatibility. /** - * Whether the task has completed. + * Returns true if the task has completed. */ - public abstract boolean isCompleted(); + def isCompleted(): Boolean /** - * Whether the task has been killed. + * Returns true if the task has been killed. */ - public abstract boolean isInterrupted(); + def isInterrupted(): Boolean - /** @deprecated use {@link #isRunningLocally()} */ - @Deprecated - public abstract boolean runningLocally(); + /** @deprecated use { @link #isRunningLocally()}*/ + @deprecated("1.2.0", "use isRunningLocally") + def runningLocally(): Boolean - public abstract boolean isRunningLocally(); + /** + * Returns true if the task is running locally in the driver program. + * @return + */ + def isRunningLocally(): Boolean /** - * Add a (Java friendly) listener to be executed on task completion. + * Adds a (Java friendly) listener to be executed on task completion. * This will be called in all situation - success, failure, or cancellation. * An example use is for HadoopRDD to register a callback to close the input stream. */ - public abstract TaskContext addTaskCompletionListener(TaskCompletionListener listener); + def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext /** - * Add a listener in the form of a Scala closure to be executed on task completion. + * Adds a listener in the form of a Scala closure to be executed on task completion. * This will be called in all situations - success, failure, or cancellation. * An example use is for HadoopRDD to register a callback to close the input stream. */ - public abstract TaskContext addTaskCompletionListener(final Function1 f); + def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext /** - * Add a callback function to be executed on task completion. An example use + * Adds a callback function to be executed on task completion. An example use * is for HadoopRDD to register a callback to close the input stream. * Will be called in any situation - success, failure, or cancellation. * - * @deprecated use {@link #addTaskCompletionListener(scala.Function1)} + * @deprecated use { @link #addTaskCompletionListener(scala.Function1)} * * @param f Callback function. */ - @Deprecated - public abstract void addOnCompleteCallback(final Function0 f); + @deprecated("1.2.0", "use addTaskCompletionListener") + def addOnCompleteCallback(f: () => Unit) /** * The ID of the stage that this task belong to. */ - public abstract int stageId(); + def stageId(): Int /** * The ID of the RDD partition that is computed by this task. */ - public abstract int partitionId(); + def partitionId(): Int /** * How many times this task has been attempted. The first task attempt will be assigned * attemptNumber = 0, and subsequent attempts will have increasing attempt numbers. */ - public abstract int attemptNumber(); + def attemptNumber(): Int - /** @deprecated use {@link #taskAttemptId()}; it was renamed to avoid ambiguity. */ - @Deprecated - public abstract long attemptId(); + /** @deprecated use { @link #taskAttemptId()}; it was renamed to avoid ambiguity. */ + @deprecated("1.3.0", "use attemptNumber") + def attemptId(): Long /** * An ID that is unique to this task attempt (within the same SparkContext, no two task attempts * will share the same attempt ID). This is roughly equivalent to Hadoop's TaskAttemptID. */ - public abstract long taskAttemptId(); + def taskAttemptId(): Long /** ::DeveloperApi:: */ @DeveloperApi - public abstract TaskMetrics taskMetrics(); + def taskMetrics(): TaskMetrics } diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index 9bb0c61e441f..337c8e4ebebc 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -33,7 +33,7 @@ private[spark] class TaskContextImpl( with Logging { // For backwards-compatibility; this method is now deprecated as of 1.3.0. - override def attemptId: Long = taskAttemptId + override def attemptId(): Long = taskAttemptId // List of callback functions to execute when the task completes. @transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener] @@ -87,10 +87,10 @@ private[spark] class TaskContextImpl( interrupted = true } - override def isCompleted: Boolean = completed + override def isCompleted(): Boolean = completed - override def isRunningLocally: Boolean = runningLocally + override def isRunningLocally(): Boolean = runningLocally - override def isInterrupted: Boolean = interrupted + override def isInterrupted(): Boolean = interrupted } diff --git a/core/src/test/java/org/apache/spark/util/JavaTaskCompletionListenerImpl.java b/core/src/test/java/test/org/apache/spark/JavaTaskCompletionListenerImpl.java similarity index 93% rename from core/src/test/java/org/apache/spark/util/JavaTaskCompletionListenerImpl.java rename to core/src/test/java/test/org/apache/spark/JavaTaskCompletionListenerImpl.java index e9ec700e32e1..e38bc38949d7 100644 --- a/core/src/test/java/org/apache/spark/util/JavaTaskCompletionListenerImpl.java +++ b/core/src/test/java/test/org/apache/spark/JavaTaskCompletionListenerImpl.java @@ -15,9 +15,10 @@ * limitations under the License. */ -package org.apache.spark.util; +package test.org.apache.spark; import org.apache.spark.TaskContext; +import org.apache.spark.util.TaskCompletionListener; /** diff --git a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java new file mode 100644 index 000000000000..4a918f725dc9 --- /dev/null +++ b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark; + +import org.apache.spark.TaskContext; + +/** + * Something to make sure that TaskContext can be used in Java. + */ +public class JavaTaskContextCompileCheck { + + public static void test() { + TaskContext tc = TaskContext.get(); + + tc.isCompleted(); + tc.isInterrupted(); + tc.isRunningLocally(); + + tc.addTaskCompletionListener(new JavaTaskCompletionListenerImpl()); + + tc.attemptNumber(); + tc.partitionId(); + tc.stageId(); + tc.taskAttemptId(); + } +} From 573756f6c1a7aa3a7bfd490068d8b2d2a459d357 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 2 Feb 2015 22:36:02 -0800 Subject: [PATCH 2/3] style fixes and javadoc fixes. --- .../scala/org/apache/spark/TaskContext.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index db924c65e40f..f7dd5dcef3d6 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -33,9 +33,17 @@ object TaskContext { private val taskContext: ThreadLocal[TaskContext] = new ThreadLocal[TaskContext] - private[spark] def setTaskContext(tc: TaskContext): Unit = taskContext.set(tc) + // Note: protected[spark] instead of private[spark] to prevent the following two from + // showing up in JavaDoc. + /** + * Set the thread local TaskContext. Internal to Spark. + */ + protected[spark] def setTaskContext(tc: TaskContext): Unit = taskContext.set(tc) - private[spark] def unset(): Unit = taskContext.remove() + /** + * Unset the thread local TaskContext. Internal to Spark. + */ + protected[spark] def unset(): Unit = taskContext.remove() } @@ -62,7 +70,6 @@ abstract class TaskContext extends Serializable { */ def isInterrupted(): Boolean - /** @deprecated use { @link #isRunningLocally()}*/ @deprecated("1.2.0", "use isRunningLocally") def runningLocally(): Boolean @@ -91,8 +98,6 @@ abstract class TaskContext extends Serializable { * is for HadoopRDD to register a callback to close the input stream. * Will be called in any situation - success, failure, or cancellation. * - * @deprecated use { @link #addTaskCompletionListener(scala.Function1)} - * * @param f Callback function. */ @deprecated("1.2.0", "use addTaskCompletionListener") @@ -114,7 +119,6 @@ abstract class TaskContext extends Serializable { */ def attemptNumber(): Int - /** @deprecated use { @link #taskAttemptId()}; it was renamed to avoid ambiguity. */ @deprecated("1.3.0", "use attemptNumber") def attemptId(): Long From 2480a177df2023238778fb217fb9b3e4794a9b82 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 2 Feb 2015 22:38:32 -0800 Subject: [PATCH 3/3] comment --- core/src/main/scala/org/apache/spark/TaskContext.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index f7dd5dcef3d6..af9c138f9787 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -58,6 +58,8 @@ abstract class TaskContext extends Serializable { // Note: TaskContext must NOT define a get method. Otherwise it will prevent the Scala compiler // from generating a static get method (based on the companion object's get method). + // Note: Update JavaTaskContextCompileCheck when new methods are added to this class. + // Note: getters in this class are defined with parentheses to maintain backward compatibility. /**