Skip to content

Commit

Permalink
Merge remote-tracking branch 'asf/master'
Browse files Browse the repository at this point in the history
Conflicts:
	project/SparkBuild.scala
  • Loading branch information
harishreedharan committed Jul 28, 2014
2 parents 7a1bc6e + d7eac4c commit 981bf62
Show file tree
Hide file tree
Showing 129 changed files with 2,986 additions and 2,621 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ unit-tests.log
rat-results.txt
scalastyle.txt
conf/*.conf
scalastyle-output.xml

# For Hive
metastore_db/
metastore/
warehouse/
TempStatsStore/
sql/hive-thriftserver/test_warehouses
10 changes: 0 additions & 10 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,6 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>hive-thriftserver</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-ganglia-lgpl</id>
<dependencies>
Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-bagel_2.10</artifactId>
<properties>
<sbt.project.name>bagel</sbt.project.name>
<sbt.project.name>bagel</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project Bagel</name>
Expand Down
45 changes: 0 additions & 45 deletions bin/beeline

This file was deleted.

1 change: 0 additions & 1 deletion bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ if [ -n "$SPARK_PREPEND_CLASSES" ]; then
CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive-thriftserver/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SCALA_VERSION/classes"
fi

Expand Down
4 changes: 2 additions & 2 deletions bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ function main(){
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
$FWDIR/bin/spark-submit --class org.apache.spark.repl.Main spark-shell "$@"
$FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
stty icanon echo > /dev/null 2>&1
else
export SPARK_SUBMIT_OPTS
$FWDIR/bin/spark-submit --class org.apache.spark.repl.Main spark-shell "$@"
$FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
fi
}

Expand Down
2 changes: 1 addition & 1 deletion bin/spark-shell.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ rem

set SPARK_HOME=%~dp0..

cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell --class org.apache.spark.repl.Main %*
cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell %* --class org.apache.spark.repl.Main
36 changes: 0 additions & 36 deletions bin/spark-sql

This file was deleted.

2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<properties>
<sbt.project.name>core</sbt.project.name>
<sbt.project.name>core</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project Core</name>
Expand Down
5 changes: 1 addition & 4 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@ case class Aggregator[K, V, C] (
combiners.iterator
} else {
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
while (iter.hasNext) {
val pair = iter.next()
combiners.insert(pair._1, pair._2)
}
combiners.insertAll(iter)
// TODO: Make this non optional in a future release
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
Expand Down
72 changes: 51 additions & 21 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.spark

import scala.collection.mutable.{ArrayBuffer, HashSet}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.executor.InputMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._

Expand All @@ -30,7 +30,7 @@ import org.apache.spark.storage._
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {

/** Keys of RDD partitions that are being computed/loaded. */
private val loading = new HashSet[RDDBlockId]()
private val loading = new mutable.HashSet[RDDBlockId]

/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](
Expand Down Expand Up @@ -118,36 +118,66 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
}

/**
* Cache the values of a partition, keeping track of any updates in the storage statuses
* of other blocks along the way.
* Cache the values of a partition, keeping track of any updates in the storage statuses of
* other blocks along the way.
*
* The effective storage level refers to the level that actually specifies BlockManager put
* behavior, not the level originally specified by the user. This is mainly for forcing a
* MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition,
* while preserving the the original semantics of the RDD as specified by the application.
*/
private def putInBlockManager[T](
key: BlockId,
values: Iterator[T],
storageLevel: StorageLevel,
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)]): Iterator[T] = {

if (!storageLevel.useMemory) {
/* This RDD is not to be cached in memory, so we can just pass the computed values
* as an iterator directly to the BlockManager, rather than first fully unrolling
* it in memory. The latter option potentially uses much more memory and risks OOM
* exceptions that can be avoided. */
updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true)
level: StorageLevel,
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {

val putLevel = effectiveStorageLevel.getOrElse(level)
if (!putLevel.useMemory) {
/*
* This RDD is not to be cached in memory, so we can just pass the computed values as an
* iterator directly to the BlockManager rather than first fully unrolling it in memory.
*/
updatedBlocks ++=
blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
blockManager.get(key) match {
case Some(v) => v.data.asInstanceOf[Iterator[T]]
case None =>
logInfo(s"Failure to store $key")
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
}
} else {
/* This RDD is to be cached in memory. In this case we cannot pass the computed values
/*
* This RDD is to be cached in memory. In this case we cannot pass the computed values
* to the BlockManager as an iterator and expect to read it back later. This is because
* we may end up dropping a partition from memory store before getting it back, e.g.
* when the entirety of the RDD does not fit in memory. */
val elements = new ArrayBuffer[Any]
elements ++= values
updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true)
elements.iterator.asInstanceOf[Iterator[T]]
* we may end up dropping a partition from memory store before getting it back.
*
* In addition, we must be careful to not unroll the entire partition in memory at once.
* Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
* single partition. Instead, we unroll the values cautiously, potentially aborting and
* dropping the partition to disk if applicable.
*/
blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
case Left(arr) =>
// We have successfully unrolled the entire partition, so cache it in memory
updatedBlocks ++=
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
arr.iterator.asInstanceOf[Iterator[T]]
case Right(it) =>
// There is not enough space to cache this partition in memory
logWarning(s"Not enough space to cache partition $key in memory! " +
s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.")
val returnValues = it.asInstanceOf[Iterator[T]]
if (putLevel.useDisk) {
logWarning(s"Persisting partition $key to disk instead.")
val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
useOffHeap = false, deserialized = false, putLevel.replication)
putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
} else {
returnValues
}
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.SortOrder.SortOrder
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleHandle

Expand Down Expand Up @@ -62,7 +63,8 @@ class ShuffleDependency[K, V, C](
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
val mapSideCombine: Boolean = false,
val sortOrder: Option[SortOrder] = None)
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {

val shuffleId: Int = rdd.context.newShuffleId()
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class SparkEnv (
val metricsSystem: MetricsSystem,
val conf: SparkConf) extends Logging {

// A mapping of thread ID to amount of memory used for shuffle in bytes
// A mapping of thread ID to amount of memory, in bytes, used for shuffle aggregations
// All accesses should be manually synchronized
val shuffleMemoryMap = mutable.HashMap[Long, Long]()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark._
import org.apache.spark.SparkContext.{DoubleAccumulatorParam, IntAccumulatorParam}
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{EmptyRDD, RDD}

/**
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
Expand Down Expand Up @@ -112,6 +112,9 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork

def startTime: java.lang.Long = sc.startTime

/** The version of Spark on which this application is running. */
def version: String = sc.version

/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
def defaultParallelism: java.lang.Integer = sc.defaultParallelism

Expand All @@ -132,6 +135,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices)
}

/** Get an RDD that has no partitions or elements. */
def emptyRDD[T]: JavaRDD[T] = {
implicit val ctag: ClassTag[T] = fakeClassTag
JavaRDD.fromRDD(new EmptyRDD[T](sc))
}


/** Distribute a local Scala collection to form an RDD. */
def parallelize[T](list: java.util.List[T]): JavaRDD[T] =
parallelize(list, sc.defaultParallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils

private[spark] class PythonRDD[T: ClassTag](
parent: RDD[T],
private[spark] class PythonRDD(
parent: RDD[_],
command: Array[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
Expand Down
14 changes: 2 additions & 12 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ object SparkSubmit {
private val CLUSTER = 2
private val ALL_DEPLOY_MODES = CLIENT | CLUSTER

// A special jar name that indicates the class being run is inside of Spark itself, and therefore
// no user jar is needed.
private val SPARK_INTERNAL = "spark-internal"

// Special primary resource names that represent shells rather than application jars.
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"
Expand Down Expand Up @@ -261,9 +257,7 @@ object SparkSubmit {
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (clusterManager == YARN && deployMode == CLUSTER) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
if (args.primaryResource != SPARK_INTERNAL) {
childArgs += ("--jar", args.primaryResource)
}
childArgs += ("--jar", args.primaryResource)
childArgs += ("--class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
Expand Down Expand Up @@ -338,7 +332,7 @@ object SparkSubmit {
* Return whether the given primary resource represents a user jar.
*/
private def isUserJar(primaryResource: String): Boolean = {
!isShell(primaryResource) && !isPython(primaryResource) && !isInternal(primaryResource)
!isShell(primaryResource) && !isPython(primaryResource)
}

/**
Expand All @@ -355,10 +349,6 @@ object SparkSubmit {
primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL
}

private[spark] def isInternal(primaryResource: String): Boolean = {
primaryResource == SPARK_INTERNAL
}

/**
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string.
Expand Down
Loading

0 comments on commit 981bf62

Please sign in to comment.