Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix typo #3

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions markdown/1-Overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ object GroupByTest {
- 执行 RDD 上的 transformation 操作(这里是 flatMap)以后,生成 FlatMappedRDD,其中每个 partition 包含一个 Array[(Int, Array[Byte])]。
- 第一个 count() 执行时,先在每个 partition 上执行 count,然后执行结果被发送到 driver,最后在 driver 端进行 sum。
- 由于 FlatMappedRDD 被 cache 到内存,因此这里将里面的 partition 都换了一种颜色表示。
- groupByKey 产生了后面三个 RDD,为什么产生这三个在后面章节讨论
- groupByKey 产生了后面两个 RDD,为什么产生这两个在后面章节讨论
- 如果 job 需要 shuffle,一般会产生 ShuffledRDD。该 RDD 与前面的 RDD 的关系类似于 Hadoop 中 mapper 输出数据与 reducer 输入数据之间的关系。
- MapPartitionsRDD 里包含 groupByKey() 的结果。
- 最后将 MapPartitionsRDD 中的 每个value(也就是Array[Byte])都转换成 Iterable 类型。
Expand Down Expand Up @@ -173,4 +173,4 @@ Hi,文章写得很赞~关于OverView中如何配置多个Backend进程的问

Backend个数发生变化情况:1、启动一个新的Application(每个APP都会launceExecutor,此时会生成此进程)2、还可以通过设置SPARK\_WORKER\_INSTANCES参数来增加Backend个数。图可以依此稍做改动。

Backend进程是SparkContext初始化taskcScheduler,taskcScheduler初始化SparkDeploySchedulerBackend里appDesc里的command...顺藤摸瓜即可。。CoarseGrainedExecutorBackend
Backend进程是SparkContext初始化taskcScheduler,taskcScheduler初始化SparkDeploySchedulerBackend里appDesc里的command...顺藤摸瓜即可。。CoarseGrainedExecutorBackend
4 changes: 2 additions & 2 deletions markdown/2-JobLogicalPlan.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ val pairs = sc.parallelize(List(1, 2, 3, 4, 5), 3)

![reduceyByKey](PNGfigures/reduceByKey.png)

reduceyByKey() 相当于传统的 MapReduce,整个也数据流与 Hadoop 中的数据流基本一样。reduceyByKey() 默认在 map 端开启 combine(),因此在 shuffle 之前先通过 mapPartitions 操作进行 combine,得到 MapPartitionsRDD,然后 shuffle 得到 ShuffledRDD,然后再进行 reduce(通过 aggregate + mapPartitions() 操作来实现)得到 MapPartitionsRDD。
reduceyByKey() 相当于传统的 MapReduce,整个数据流也与 Hadoop 中的数据流基本一样。reduceyByKey() 默认在 map 端开启 combine(),因此在 shuffle 之前先通过 mapPartitions 操作进行 combine,得到 MapPartitionsRDD,然后 shuffle 得到 ShuffledRDD,然后再进行 reduce(通过 aggregate + mapPartitions() 操作来实现)得到 MapPartitionsRDD。

**3) distinct(numPartitions)**

Expand Down Expand Up @@ -242,4 +242,4 @@ RDD 本身的依赖关系由 transformation() 生成的每一个 RDD 本身语

RDD 中 partition 依赖关系分为 NarrowDependency 和 ShuffleDependency。前者是完全伊依赖,后者是部分依赖。NarrowDependency 里面又包含多种情况,只有前后两个 RDD 的 partition 个数以及 partitioner 都一样,才会出现 NarrowDependency。

从数据处理逻辑的角度来看,MapReduce 相当于 Spark 中的 map() + reduceByKey(),但严格来讲 MapReduce 中的 reduce() 要比 reduceByKey() 的功能强大些,详细差别会在 Shuffle details 一章中继续讨论。
从数据处理逻辑的角度来看,MapReduce 相当于 Spark 中的 map() + reduceByKey(),但严格来讲 MapReduce 中的 reduce() 要比 reduceByKey() 的功能强大些,详细差别会在 Shuffle details 一章中继续讨论。
14 changes: 7 additions & 7 deletions markdown/3-JobPhysicalPlan.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

所有的粗箭头组合成第一个 task,该 task 计算结束后顺便将 CoGroupedRDD 中已经计算得到的第二个和第三个 partition 存起来。之后第二个 task(细实线)只需计算两步,第三个 task(细虚线)也只需要计算两步,最后得到结果。

这个想法由两个不靠谱的地方
这个想法有两个不靠谱的地方

- 第一个 task 太大,碰到 ShuffleDependency 后,不得不计算 shuffle 依赖的 RDDs 的所有 partitions,而且都在这一个 task 里面计算。
- 需要设计巧妙的算法来判断哪个 RDD 中的哪些 partition 需要 cache。而且 cache 会占用存储空间。
Expand Down Expand Up @@ -113,7 +113,7 @@
## 生成 job
前面介绍了逻辑和物理执行图的生成原理,那么,**怎么触发 job 的生成?已经介绍了 task,那么 job 是什么?**

下表列出了可以触发生成执行图生成典型 [action()](http://spark.apache.org/docs/latest/programming-guide.html#actions),其中第二列是 `processPartition()`,定义如何计算 partition 中的 records 得到 result。第三列是 `resultHandler()`,定义如何对从各个 partition 收集来的 results 进行计算来得到最终结果。
下表列出了可以触发执行图生成的典型 [action()](http://spark.apache.org/docs/latest/programming-guide.html#actions),其中第二列是 `processPartition()`,定义如何计算 partition 中的 records 得到 result。第三列是 `resultHandler()`,定义如何对从各个 partition 收集来的 results 进行计算来得到最终结果。


| Action | finalRDD(records) => result | compute(results) |
Expand All @@ -122,9 +122,9 @@
| collect() |Array[records] => result | Array[result] |
| count() | count(records) => result | sum(result) |
| foreach(f) | f(records) => result | Array[result] |
| take(n) | record (i<=n) => result | Array{result] |
| first() | record 1 => result | Array{result] |
| takeSample() | selected records => result | Array{result] |
| take(n) | record (i<=n) => result | Array[result] |
| first() | record 1 => result | Array[result] |
| takeSample() | selected records => result | Array[result] |
| takeOrdered(n, [ordering]) | TopN(records) => result | TopN(results) |
| saveAsHadoopFile(path) | records => write(records) | null |
| countByKey() | (K, V) => Map(K, count(K)) | (Map, Map) => Map(K, count(K)) |
Expand Down Expand Up @@ -155,7 +155,7 @@
1. 先确定该 stage 的 missingParentStages,使用`getMissingParentStages(stage)`。如果 parentStages 都可能已经执行过了,那么就为空了。
2. 如果 missingParentStages 不为空,那么先递归提交 missing 的 parent stages,并将自己加入到 waitingStages 里面,等到 parent stages 执行结束后,会触发提交 waitingStages 里面的 stage。
3. 如果 missingParentStages 为空,说明该 stage 可以立即执行,那么就调用`submitMissingTasks(stage, jobId)`来生成和提交具体的 task。如果 stage 是 ShuffleMapStage,那么 new 出来与该 stage 最后一个 RDD 的 partition 数相同的 ShuffleMapTasks。如果 stage 是 ResultStage,那么 new 出来与 stage 最后一个 RDD 的 partition 个数相同的 ResultTasks。一个 stage 里面的 task 组成一个 TaskSet,最后调用`taskScheduler.submitTasks(taskSet)`来提交一整个 taskSet。
4. 这个 taskScheduler 类型是 TaskSchedulerImpl,在 submitTasks() 里面,每一个 taskSet 被包装成 manager: TaskSetMananger,然后交给`schedulableBuilder.addTaskSetManager(manager)`。schedulableBuilder 可以是 FIFOSchedulableBuilder 或者 FairSchedulableBuilder 调度器。submitTasks() 最后一步时通知`backend.reviveOffers()`去执行 task,backend 的类型是 SchedulerBackend。如果在集群上运行,那么这个 backend 类型是 SparkDeploySchedulerBackend。
4. 这个 taskScheduler 类型是 TaskSchedulerImpl,在 submitTasks() 里面,每一个 taskSet 被包装成 manager: TaskSetMananger,然后交给`schedulableBuilder.addTaskSetManager(manager)`。schedulableBuilder 可以是 FIFOSchedulableBuilder 或者 FairSchedulableBuilder 调度器。submitTasks() 最后一步是通知`backend.reviveOffers()`去执行 task,backend 的类型是 SchedulerBackend。如果在集群上运行,那么这个 backend 类型是 SparkDeploySchedulerBackend。
5. SparkDeploySchedulerBackend 是 CoarseGrainedSchedulerBackend 的子类,`backend.reviveOffers()`其实是向 DriverActor 发送 ReviveOffers 信息。SparkDeploySchedulerBackend 在 start() 的时候,会启动 DriverActor。DriverActor 收到 ReviveOffers 消息后,会调用`launchTasks(scheduler.resourceOffers(Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))` 来 launch tasks。scheduler 就是 TaskSchedulerImpl。`scheduler.resourceOffers()`从 FIFO 或者 Fair 调度器那里获得排序后的 TaskSetManager,并经过`TaskSchedulerImpl.resourceOffer()`,考虑 locality 等因素来确定 task 的全部信息 TaskDescription。调度细节这里暂不讨论。
6. DriverActor 中的 launchTasks() 将每个 task 序列化,如果序列化大小不超过 Akka 的 akkaFrameSize,那么直接将 task 送到 executor 那里执行`executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))`。

Expand Down Expand Up @@ -221,4 +221,4 @@ object complexJob {
println(result.toDebugString)
}
}
```
```
2 changes: 1 addition & 1 deletion markdown/4-shuffleDetails.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,6 @@ ExternalAppendOnlyMap 持有一个 AppendOnlyMap,shuffle 来的一个个 (K, V
## Discussion
通过本章的介绍可以发现,相比 MapReduce 固定的 shuffle-combine-merge-reduce 策略,Spark 更加灵活,会根据不同的 transformation() 的语义去设计不同的 shuffle-aggregate 策略,再加上不同的内存数据结构来混搭出合理的执行流程。

这章主要讨论了 Spark 是怎么在不排序 records 的情况下完成 shuffle write 和 shuffle write,以及怎么将 shuffle 过程融入 RDD computing chain 中的。附带讨论了内存与磁盘的平衡以及与 Hadoop MapReduce shuffle 的异同。下一章将从部署图以及进程通信角度来描述 job 执行的整个流程,也会涉及 shuffle write 和 shuffle read 中的数据位置获取问题。
这章主要讨论了 Spark 是怎么在不排序 records 的情况下完成 shuffle write 和 shuffle read,以及怎么将 shuffle 过程融入 RDD computing chain 中的。附带讨论了内存与磁盘的平衡以及与 Hadoop MapReduce shuffle 的异同。下一章将从部署图以及进程通信角度来描述 job 执行的整个流程,也会涉及 shuffle write 和 shuffle read 中的数据位置获取问题。

另外,Jerry Shao 写的 [详细探究Spark的shuffle实现](http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/) 很赞,里面还介绍了 shuffle 过程在 Spark 中的进化史。目前 sort-based 的 shuffle 也在实现当中,stay tuned。
Expand Down