Skip to content

Commit 26b03b2

Browse files
committed
checkSum leads to NPE
1 parent cdab885 commit 26b03b2

File tree

5 files changed

+34
-23
lines changed

5 files changed

+34
-23
lines changed

core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,11 @@ private[spark] trait BroadcastFactory {
4848
def newExecutorBroadcast[T: ClassTag](
4949
value: T,
5050
id: Long,
51-
nBlocks: Int): Broadcast[T]
51+
nBlocks: Int,
52+
cSums: Array[Int]): Broadcast[T]
5253

5354
// Called from executor to put broadcast data to blockmanager.
54-
def uploadBroadcast[T: ClassTag](value_ : T, id: Long): Int
55+
def uploadBroadcast[T: ClassTag](value_ : T, id: Long): Seq[Int]
5556

5657
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit
5758

core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,16 +93,17 @@ private[spark] class BroadcastManager(
9393
def uploadBroadcast[T: ClassTag](
9494
value_ : T,
9595
id: Long
96-
): Int = {
96+
): Seq[Int] = {
9797
broadcastFactory.uploadBroadcast[T](value_, id)
9898
}
9999

100100
// Called from driver to create broadcast with specified id
101101
def newExecutorBroadcast[T: ClassTag](
102102
value_ : T,
103103
id: Long,
104-
nBlocks: Int): Broadcast[T] = {
105-
broadcastFactory.newExecutorBroadcast[T](value_, id, nBlocks)
104+
nBlocks: Int,
105+
cSums: Array[Int]): Broadcast[T] = {
106+
broadcastFactory.newExecutorBroadcast[T](value_, id, nBlocks, cSums)
106107
}
107108

108109
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](
7474
obj: T,
7575
id: Long,
7676
isExecutorSide: Boolean = false,
77-
nBlocks: Option[Int] = None)
77+
nBlocks: Option[Int] = None,
78+
cSums: Option[Array[Int]] = None)
7879
extends Broadcast[T](id) with Logging with Serializable {
7980

8081
/**
@@ -90,6 +91,11 @@ private[spark] class TorrentBroadcast[T: ClassTag](
9091
/** Size of each block. Default value is 4MB. This value is only read by the broadcaster. */
9192
@transient private var blockSize: Int = _
9293

94+
/** Whether to generate checksum for blocks or not. */
95+
private var checksumEnabled: Boolean = false
96+
/** The checksum for all the blocks. */
97+
private var checksums: Array[Int] = cSums.getOrElse(null)
98+
9399
private def setConf(conf: SparkConf) {
94100
compressionCodec = if (conf.getBoolean("spark.broadcast.compress", true)) {
95101
Some(CompressionCodec.createCodec(conf))
@@ -104,15 +110,14 @@ private[spark] class TorrentBroadcast[T: ClassTag](
104110

105111
private val broadcastId = BroadcastBlockId(id)
106112

107-
def getNumBlocks: Int = numBlocks
113+
def getNumBlocksAndChecksums: Seq[Int] = if (checksumEnabled) {
114+
Seq(numBlocks) ++ checksums
115+
} else {
116+
Seq(numBlocks)
117+
}
108118

109119
/** Total number of blocks this broadcast variable contains. */
110-
private val numBlocks: Int = nBlocks.getOrElse(writeBlocks(obj))
111-
112-
/** Whether to generate checksum for blocks or not. */
113-
private var checksumEnabled: Boolean = false
114-
/** The checksum for all the blocks. */
115-
private var checksums: Array[Int] = _
120+
private val numBlocks: Int = nBlocks.getOrElse(writeBlocks(obj)) // this must be after checkSums
116121

117122
override protected def getValue() = {
118123
_value

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,14 @@ private[spark] class TorrentBroadcastFactory extends BroadcastFactory {
3737
override def newExecutorBroadcast[T: ClassTag](
3838
value: T,
3939
id: Long,
40-
nBlocks: Int): Broadcast[T] = {
41-
new TorrentBroadcast[T](value, id, true, Option(nBlocks))
40+
nBlocks: Int,
41+
cSums: Array[Int]): Broadcast[T] = {
42+
new TorrentBroadcast[T](value, id, true, Option(nBlocks), Option(cSums))
4243
}
4344

44-
override def uploadBroadcast[T: ClassTag](value_ : T, id: Long): Int = {
45-
new TorrentBroadcast[T](value_, id, true).getNumBlocks
45+
override def uploadBroadcast[T: ClassTag](value_ : T, id: Long): Seq[Int] = {
46+
val executorBroadcast = new TorrentBroadcast[T](value_, id, true)
47+
executorBroadcast.getNumBlocksAndChecksums
4648
}
4749

4850
override def stop() { }

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -949,15 +949,17 @@ abstract class RDD[T: ClassTag](
949949
val id = sc.env.broadcastManager.newBroadcastId
950950

951951
// first: write blocks to block manager from executor.
952-
val nBlocks = coalesce(1).mapPartitions { iter =>
953-
val numBlocks =
954-
SparkEnv.get.broadcastManager.uploadBroadcast(transFunc.transform(iter.toArray), id)
955-
Seq(numBlocks).iterator
956-
}.collect().head
952+
val numBlocksAndChecksums = coalesce(1).mapPartitions { iter =>
953+
SparkEnv.get.broadcastManager
954+
.uploadBroadcast(transFunc.transform(iter.toArray), id).iterator
955+
}.collect()
957956

958957
// then: create broadcast from driver, this will not write blocks
959958
val res = SparkEnv.get.broadcastManager.newExecutorBroadcast(
960-
transFunc.transform(Array.empty[T]), id, nBlocks)
959+
transFunc.transform(Array.empty[T]),
960+
id,
961+
numBlocksAndChecksums.head,
962+
numBlocksAndChecksums.tail)
961963

962964
val callSite = sc.getCallSite
963965
logInfo("Created executor side broadcast " + res.id + " from " + callSite.shortForm)

0 commit comments

Comments
 (0)