@@ -73,27 +73,38 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
7373 val inputInfo = InputInfo (id, blockInfos.map(_.numRecords).sum)
7474 ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
7575
76- // Are WAL record handles present with all the blocks
77- val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
76+ if (blockInfos.nonEmpty) {
77+ // Are WAL record handles present with all the blocks
78+ val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
7879
79- if (areWALRecordHandlesPresent) {
80- // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
81- val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
82- val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
83- new WriteAheadLogBackedBlockRDD [T ](
84- ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
85- } else {
86- // Else, create a BlockRDD. However, if there are some blocks with WAL info but not others
87- // then that is unexpected and log a warning accordingly.
88- if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
89- if (WriteAheadLogUtils .enableReceiverLog(ssc.conf)) {
90- logError(" Some blocks do not have Write Ahead Log information; " +
91- " this is unexpected and data may not be recoverable after driver failures" )
92- } else {
93- logWarning(" Some blocks have Write Ahead Log information; this is unexpected" )
80+ if (areWALRecordHandlesPresent) {
81+ // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
82+ val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
83+ val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
84+ new WriteAheadLogBackedBlockRDD [T ](
85+ ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
86+ } else {
87+ // Else, create a BlockRDD. However, if there are some blocks with WAL info but not
88+ // others then that is unexpected and log a warning accordingly.
89+ if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
90+ if (WriteAheadLogUtils .enableReceiverLog(ssc.conf)) {
91+ logError(" Some blocks do not have Write Ahead Log information; " +
92+ " this is unexpected and data may not be recoverable after driver failures" )
93+ } else {
94+ logWarning(" Some blocks have Write Ahead Log information; this is unexpected" )
95+ }
9496 }
97+ new BlockRDD [T ](ssc.sc, blockIds)
98+ }
99+ } else {
100+ // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
101+ // according to the configuration
102+ if (WriteAheadLogUtils .enableReceiverLog(ssc.conf)) {
103+ new WriteAheadLogBackedBlockRDD [T ](
104+ ssc.sparkContext, Array .empty, Array .empty, Array .empty)
105+ } else {
106+ new BlockRDD [T ](ssc.sc, Array .empty)
95107 }
96- new BlockRDD [T ](ssc.sc, blockIds)
97108 }
98109 }
99110 }
0 commit comments