Skip to content

Commit 8ff4417

Browse files
sarutakmateiz
authored andcommitted
[SPARK-2670] FetchFailedException should be thrown when local fetch has failed
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #1578 from sarutak/SPARK-2670 and squashes the following commits: 85c8938 [Kousuke Saruta] Removed useless results.put for fail fast e8713cc [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2670 d353984 [Kousuke Saruta] Refined assertion messages in BlockFetcherIteratorSuite.scala 03bcb02 [Kousuke Saruta] Merge branch 'SPARK-2670' of github.com:sarutak/spark into SPARK-2670 5d05855 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2670 4fca130 [Kousuke Saruta] Added test cases for BasicBlockFetcherIterator b7b8250 [Kousuke Saruta] Modified BasicBlockFetchIterator to fail fast when local fetch error has been occurred a3a9be1 [Kousuke Saruta] Modified BlockFetcherIterator for SPARK-2670 460dc01 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2670 e310c0b [Kousuke Saruta] Modified BlockFetcherIterator to handle local fetch failure as fatch fail
1 parent cb9e7d5 commit 8ff4417

File tree

2 files changed

+151
-8
lines changed

2 files changed

+151
-8
lines changed

core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -200,14 +200,17 @@ object BlockFetcherIterator {
200200
// these all at once because they will just memory-map some files, so they won't consume
201201
// any memory that might exceed our maxBytesInFlight
202202
for (id <- localBlocksToFetch) {
203-
getLocalFromDisk(id, serializer) match {
204-
case Some(iter) => {
205-
// Pass 0 as size since it's not in flight
206-
results.put(new FetchResult(id, 0, () => iter))
207-
logDebug("Got local block " + id)
208-
}
209-
case None => {
210-
throw new BlockException(id, "Could not get block " + id + " from local machine")
203+
try {
204+
// getLocalFromDisk never return None but throws BlockException
205+
val iter = getLocalFromDisk(id, serializer).get
206+
// Pass 0 as size since it's not in flight
207+
results.put(new FetchResult(id, 0, () => iter))
208+
logDebug("Got local block " + id)
209+
} catch {
210+
case e: Exception => {
211+
logError(s"Error occurred while fetching local blocks", e)
212+
results.put(new FetchResult(id, -1, null))
213+
return
211214
}
212215
}
213216
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.storage
19+
20+
import org.scalatest.{FunSuite, Matchers}
21+
import org.scalatest.PrivateMethodTester._
22+
23+
import org.mockito.Mockito._
24+
import org.mockito.Matchers.{any, eq => meq}
25+
import org.mockito.stubbing.Answer
26+
import org.mockito.invocation.InvocationOnMock
27+
28+
import org.apache.spark._
29+
import org.apache.spark.storage.BlockFetcherIterator._
30+
import org.apache.spark.network.{ConnectionManager, ConnectionManagerId,
31+
Message}
32+
33+
class BlockFetcherIteratorSuite extends FunSuite with Matchers {
34+
35+
test("block fetch from local fails using BasicBlockFetcherIterator") {
36+
val blockManager = mock(classOf[BlockManager])
37+
val connManager = mock(classOf[ConnectionManager])
38+
doReturn(connManager).when(blockManager).connectionManager
39+
doReturn(BlockManagerId("test-client", "test-client", 1, 0)).when(blockManager).blockManagerId
40+
41+
doReturn((48 * 1024 * 1024).asInstanceOf[Long]).when(blockManager).maxBytesInFlight
42+
43+
val blIds = Array[BlockId](
44+
ShuffleBlockId(0,0,0),
45+
ShuffleBlockId(0,1,0),
46+
ShuffleBlockId(0,2,0),
47+
ShuffleBlockId(0,3,0),
48+
ShuffleBlockId(0,4,0))
49+
50+
val optItr = mock(classOf[Option[Iterator[Any]]])
51+
val answer = new Answer[Option[Iterator[Any]]] {
52+
override def answer(invocation: InvocationOnMock) = Option[Iterator[Any]] {
53+
throw new Exception
54+
}
55+
}
56+
57+
// 3rd block is going to fail
58+
doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(0)), any())
59+
doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(1)), any())
60+
doAnswer(answer).when(blockManager).getLocalFromDisk(meq(blIds(2)), any())
61+
doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(3)), any())
62+
doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(4)), any())
63+
64+
val bmId = BlockManagerId("test-client", "test-client",1 , 0)
65+
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
66+
(bmId, blIds.map(blId => (blId, 1.asInstanceOf[Long])).toSeq)
67+
)
68+
69+
val iterator = new BasicBlockFetcherIterator(blockManager,
70+
blocksByAddress, null)
71+
72+
iterator.initialize()
73+
74+
// 3rd getLocalFromDisk invocation should be failed
75+
verify(blockManager, times(3)).getLocalFromDisk(any(), any())
76+
77+
assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements")
78+
// the 2nd element of the tuple returned by iterator.next should be defined when fetching successfully
79+
assert(iterator.next._2.isDefined, "1st element should be defined but is not actually defined")
80+
assert(iterator.hasNext, "iterator should have 5 elements but actually has 1 element")
81+
assert(iterator.next._2.isDefined, "2nd element should be defined but is not actually defined")
82+
assert(iterator.hasNext, "iterator should have 5 elements but actually has 2 elements")
83+
// 3rd fetch should be failed
84+
assert(!iterator.next._2.isDefined, "3rd element should not be defined but is actually defined")
85+
assert(iterator.hasNext, "iterator should have 5 elements but actually has 3 elements")
86+
// Don't call next() after fetching non-defined element even if thare are rest of elements in the iterator.
87+
// Otherwise, BasicBlockFetcherIterator hangs up.
88+
}
89+
90+
91+
test("block fetch from local succeed using BasicBlockFetcherIterator") {
92+
val blockManager = mock(classOf[BlockManager])
93+
val connManager = mock(classOf[ConnectionManager])
94+
doReturn(connManager).when(blockManager).connectionManager
95+
doReturn(BlockManagerId("test-client", "test-client", 1, 0)).when(blockManager).blockManagerId
96+
97+
doReturn((48 * 1024 * 1024).asInstanceOf[Long]).when(blockManager).maxBytesInFlight
98+
99+
val blIds = Array[BlockId](
100+
ShuffleBlockId(0,0,0),
101+
ShuffleBlockId(0,1,0),
102+
ShuffleBlockId(0,2,0),
103+
ShuffleBlockId(0,3,0),
104+
ShuffleBlockId(0,4,0))
105+
106+
val optItr = mock(classOf[Option[Iterator[Any]]])
107+
108+
// All blocks should be fetched successfully
109+
doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(0)), any())
110+
doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(1)), any())
111+
doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(2)), any())
112+
doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(3)), any())
113+
doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(4)), any())
114+
115+
val bmId = BlockManagerId("test-client", "test-client",1 , 0)
116+
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
117+
(bmId, blIds.map(blId => (blId, 1.asInstanceOf[Long])).toSeq)
118+
)
119+
120+
val iterator = new BasicBlockFetcherIterator(blockManager,
121+
blocksByAddress, null)
122+
123+
iterator.initialize()
124+
125+
// getLocalFromDis should be invoked for all of 5 blocks
126+
verify(blockManager, times(5)).getLocalFromDisk(any(), any())
127+
128+
assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements")
129+
assert(iterator.next._2.isDefined, "All elements should be defined but 1st element is not actually defined")
130+
assert(iterator.hasNext, "iterator should have 5 elements but actually has 1 element")
131+
assert(iterator.next._2.isDefined, "All elements should be defined but 2nd element is not actually defined")
132+
assert(iterator.hasNext, "iterator should have 5 elements but actually has 2 elements")
133+
assert(iterator.next._2.isDefined, "All elements should be defined but 3rd element is not actually defined")
134+
assert(iterator.hasNext, "iterator should have 5 elements but actually has 3 elements")
135+
assert(iterator.next._2.isDefined, "All elements should be defined but 4th element is not actually defined")
136+
assert(iterator.hasNext, "iterator should have 5 elements but actually has 4 elements")
137+
assert(iterator.next._2.isDefined, "All elements should be defined but 5th element is not actually defined")
138+
}
139+
140+
}

0 commit comments

Comments
 (0)