diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index f5b775da7930a..e31cd7fe20553 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -937,7 +937,7 @@ for details.
Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). |
- | mapPartitions(func) |
+ mapPartitions(func) |
Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type
Iterator<T> => Iterator<U> when running on an RDD of type T. |
@@ -964,7 +964,7 @@ for details.
Return a new dataset that contains the distinct elements of the source dataset. |
- | groupByKey([numTasks]) |
+ groupByKey([numTasks]) |
When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or
average) over each key, using reduceByKey or aggregateByKey will yield much better
@@ -975,25 +975,25 @@ for details.
|
- | reduceByKey(func, [numTasks]) |
+ reduceByKey(func, [numTasks]) |
When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument. |
- | aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) |
+ aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) |
When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument. |
- | sortByKey([ascending], [numTasks]) |
+ sortByKey([ascending], [numTasks]) |
When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument. |
- | join(otherDataset, [numTasks]) |
+ join(otherDataset, [numTasks]) |
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
|
- | cogroup(otherDataset, [numTasks]) |
+ cogroup(otherDataset, [numTasks]) |
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith. |
@@ -1006,17 +1006,17 @@ for details.
process's stdin and lines output to its stdout are returned as an RDD of strings.
- | coalesce(numPartitions) |
+ coalesce(numPartitions) |
Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently
after filtering down a large dataset. |
| repartition(numPartitions) |
Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them.
- This always shuffles all data over the network. |
+ This always shuffles all data over the network.
- | repartitionAndSortWithinPartitions(partitioner) |
+ repartitionAndSortWithinPartitions(partitioner) |
Repartition the RDD according to the given partitioner and, within each resulting partition,
sort records by their keys. This is more efficient than calling repartition and then sorting within
each partition because it can push the sorting down into the shuffle machinery. |
@@ -1080,7 +1080,7 @@ for details.
SparkContext.objectFile().
- | countByKey() |
+ countByKey() |
Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. |
@@ -1090,6 +1090,67 @@ for details.
+### Shuffle operations
+
+Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark's
+mechanism for re-distributing data so that is grouped differently across partitions. This typically
+involves copying data across executors and machines, making the shuffle a complex and
+costly operation.
+
+#### Background
+
+To understand what happens during the shuffle we can consider the example of the
+[`reduceByKey`](#ReduceByLink) operation. The `reduceByKey` operation generates a new RDD where all
+values for a single key are combined into a tuple - the key and the result of executing a reduce
+function against all values associated with that key. The challenge is that not all values for a
+single key necessarily reside on the same partition, or even the same machine, but they must be
+co-located to compute the result.
+
+In Spark, data is generally not distributed across partitions to be in the necessary place for a
+specific operation. During computations, a single task will operate on a single partition - thus, to
+organize all the data for a single `reduceByKey` reduce task to execute, Spark needs to perform an
+all-to-all operation. It must read from all partitions to find all the values for all keys, and then
+organize those such that all values for any key lie within the same partition - this is called the
+**shuffle**.
+
+Although the set of elements in each partition of newly shuffled data will be deterministic, and so
+is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably
+ordered data following shuffle then it's possible to use:
+
+* `mapPartitions` to sort each partition using, for example, `.sorted`
+* `repartitionAndSortWithinPartitions` to efficiently sort partitions while simultaneously repartitioning
+* `sortBy` to make a globally ordered RDD
+
+Operations which can cause a shuffle include **repartition** operations like
+[`repartition`](#RepartitionLink), and [`coalesce`](#CoalesceLink), **'ByKey** operations
+(except for counting) like [`groupByKey`](#GroupByLink) and [`reduceByKey`](#ReduceByLink), and
+**join** operations like [`cogroup`](#CogroupLink) and [`join`](#JoinLink).
+
+#### Performance Impact
+The **Shuffle** is an expensive operation since it involves disk I/O, data serialization, and
+network I/O. To organize data for the shuffle, Spark generates sets of tasks - *map* tasks to
+organize the data, and a set of *reduce* tasks to aggregate it. This nomenclature comes from
+MapReduce and does not directly relate to Spark's `map` and `reduce` operations.
+
+Internally, results from individual map tasks are kept in memory until they can't fit. Then, these
+are sorted based on the target partition and written to a single file. On the reduce side, tasks
+read the relevant sorted blocks.
+
+Certain shuffle operations can consume significant amounts of heap memory since they employ
+in-memory data structures to organize records before or after transferring them. Specifically,
+`reduceByKey` and `aggregateByKey` create these structures on the map side and `'ByKey` operations
+generate these on the reduce side. When data does not fit in memory Spark will spill these tables
+to disk, incurring the additional overhead of disk I/O and increased garbage collection.
+
+Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files
+are not cleaned up from Spark's temporary storage until Spark is stopped, which means that
+long-running Spark jobs may consume available disk space. This is done so the shuffle doesn't need
+to be re-computed if the lineage is re-computed. The temporary storage directory is specified by the
+`spark.local.dir` configuration parameter when configuring the Spark context.
+
+Shuffle behavior can be tuned by adjusting a variety of configuration parameters. See the
+'Shuffle Behavior' section within the [Spark Configuration Guide](configuration.html).
+
## RDD Persistence
One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory