Skip to content

Commit b5c8d1f

Browse files
rxinaarondav
authored andcommitted
Fixed ShuffleBlockFetcherIteratorSuite.
1 parent 064747b commit b5c8d1f

File tree

3 files changed

+47
-122
lines changed

3 files changed

+47
-122
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,8 @@ private[spark] class BlockManager(
212212
}
213213

214214
/**
215-
* Interface to get local block data.
216-
*
217-
* @return Some(buffer) if the block exists locally, and None if it doesn't.
215+
* Interface to get local block data. Throws an exception if the block cannot be found or
216+
* cannot be read successfully.
218217
*/
219218
override def getBlockData(blockId: String): ManagedBuffer = {
220219
val bid = BlockId(blockId)

core/src/test/scala/org/apache/spark/network/netty/ServerClientIntegrationSuite.scala

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
/*
2-
* Licensed to the Apache Software Foundation (ASF) under one or more
3-
* contributor license agreements. See the NOTICE file distributed with
4-
* this work for additional information regarding copyright ownership.
5-
* The ASF licenses this file to You under the Apache License, Version 2.0
6-
* (the "License"); you may not use this file except in compliance with
7-
* the License. You may obtain a copy of the License at
8-
*
9-
* http://www.apache.org/licenses/LICENSE-2.0
10-
*
11-
* Unless required by applicable law or agreed to in writing, software
12-
* distributed under the License is distributed on an "AS IS" BASIS,
13-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14-
* See the License for the specific language governing permissions and
15-
* limitations under the License.
16-
*/
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
1717

1818
package org.apache.spark.network.netty
1919

core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala

Lines changed: 30 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -17,136 +17,62 @@
1717

1818
package org.apache.spark.storage
1919

20-
import org.apache.spark.TaskContext
21-
import org.apache.spark.network.{BlockFetchingListener, BlockTransferService}
22-
2320
import org.mockito.Mockito._
2421
import org.mockito.Matchers.{any, eq => meq}
2522
import org.mockito.invocation.InvocationOnMock
2623
import org.mockito.stubbing.Answer
2724

2825
import org.scalatest.FunSuite
2926

27+
import org.apache.spark.{SparkConf, TaskContext}
28+
import org.apache.spark.network._
29+
import org.apache.spark.serializer.TestSerializer
3030

31-
class ShuffleBlockFetcherIteratorSuite extends FunSuite {
32-
33-
test("handle local read failures in BlockManager") {
34-
val transfer = mock(classOf[BlockTransferService])
35-
val blockManager = mock(classOf[BlockManager])
36-
doReturn(BlockManagerId("test-client", "test-client", 1)).when(blockManager).blockManagerId
3731

38-
val blIds = Array[BlockId](
39-
ShuffleBlockId(0,0,0),
40-
ShuffleBlockId(0,1,0),
41-
ShuffleBlockId(0,2,0),
42-
ShuffleBlockId(0,3,0),
43-
ShuffleBlockId(0,4,0))
44-
45-
val optItr = mock(classOf[Option[Iterator[Any]]])
46-
val answer = new Answer[Option[Iterator[Any]]] {
47-
override def answer(invocation: InvocationOnMock) = Option[Iterator[Any]] {
48-
throw new Exception
49-
}
50-
}
51-
52-
// 3rd block is going to fail
53-
doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(0)), any())
54-
doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(1)), any())
55-
doAnswer(answer).when(blockManager).getLocalShuffleFromDisk(meq(blIds(2)), any())
56-
doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(3)), any())
57-
doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(4)), any())
58-
59-
val bmId = BlockManagerId("test-client", "test-client", 1)
60-
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
61-
(bmId, blIds.map(blId => (blId, 1.asInstanceOf[Long])).toSeq)
62-
)
63-
64-
val iterator = new ShuffleBlockFetcherIterator(
65-
new TaskContext(0, 0, 0),
66-
transfer,
67-
blockManager,
68-
blocksByAddress,
69-
null,
70-
48 * 1024 * 1024)
32+
class ShuffleBlockFetcherIteratorSuite extends FunSuite {
7133

72-
// Without exhausting the iterator, the iterator should be lazy and not call
73-
// getLocalShuffleFromDisk.
74-
verify(blockManager, times(0)).getLocalShuffleFromDisk(any(), any())
75-
76-
assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements")
77-
// the 2nd element of the tuple returned by iterator.next should be defined when
78-
// fetching successfully
79-
assert(iterator.next()._2.isDefined,
80-
"1st element should be defined but is not actually defined")
81-
verify(blockManager, times(1)).getLocalShuffleFromDisk(any(), any())
82-
83-
assert(iterator.hasNext, "iterator should have 5 elements but actually has 1 element")
84-
assert(iterator.next()._2.isDefined,
85-
"2nd element should be defined but is not actually defined")
86-
verify(blockManager, times(2)).getLocalShuffleFromDisk(any(), any())
87-
88-
assert(iterator.hasNext, "iterator should have 5 elements but actually has 2 elements")
89-
// 3rd fetch should be failed
90-
intercept[Exception] {
91-
iterator.next()
92-
}
93-
verify(blockManager, times(3)).getLocalShuffleFromDisk(any(), any())
94-
}
34+
val conf = new SparkConf
9535

96-
test("handle local read successes") {
97-
val transfer = mock(classOf[BlockTransferService])
36+
test("handle successful local reads") {
37+
val buf = mock(classOf[ManagedBuffer])
9838
val blockManager = mock(classOf[BlockManager])
9939
doReturn(BlockManagerId("test-client", "test-client", 1)).when(blockManager).blockManagerId
10040

101-
val blIds = Array[BlockId](
102-
ShuffleBlockId(0,0,0),
103-
ShuffleBlockId(0,1,0),
104-
ShuffleBlockId(0,2,0),
105-
ShuffleBlockId(0,3,0),
106-
ShuffleBlockId(0,4,0))
107-
108-
val optItr = mock(classOf[Option[Iterator[Any]]])
41+
val blockIds = Array[BlockId](
42+
ShuffleBlockId(0, 0, 0),
43+
ShuffleBlockId(0, 1, 0),
44+
ShuffleBlockId(0, 2, 0),
45+
ShuffleBlockId(0, 3, 0),
46+
ShuffleBlockId(0, 4, 0))
10947

11048
// All blocks should be fetched successfully
111-
doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(0)), any())
112-
doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(1)), any())
113-
doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(2)), any())
114-
doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(3)), any())
115-
doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(4)), any())
49+
blockIds.foreach { blockId =>
50+
doReturn(buf).when(blockManager).getBlockData(meq(blockId.toString))
51+
}
11652

11753
val bmId = BlockManagerId("test-client", "test-client", 1)
11854
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
119-
(bmId, blIds.map(blId => (blId, 1.asInstanceOf[Long])).toSeq)
55+
(bmId, blockIds.map(blockId => (blockId, 1.asInstanceOf[Long])).toSeq)
12056
)
12157

12258
val iterator = new ShuffleBlockFetcherIterator(
12359
new TaskContext(0, 0, 0),
124-
transfer,
60+
mock(classOf[BlockTransferService]),
12561
blockManager,
12662
blocksByAddress,
127-
null,
63+
new TestSerializer,
12864
48 * 1024 * 1024)
12965

130-
// Without exhausting the iterator, the iterator should be lazy and not call getLocalShuffleFromDisk.
131-
verify(blockManager, times(0)).getLocalShuffleFromDisk(any(), any())
132-
133-
assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements")
134-
assert(iterator.next()._2.isDefined,
135-
"All elements should be defined but 1st element is not actually defined")
136-
assert(iterator.hasNext, "iterator should have 5 elements but actually has 1 element")
137-
assert(iterator.next()._2.isDefined,
138-
"All elements should be defined but 2nd element is not actually defined")
139-
assert(iterator.hasNext, "iterator should have 5 elements but actually has 2 elements")
140-
assert(iterator.next()._2.isDefined,
141-
"All elements should be defined but 3rd element is not actually defined")
142-
assert(iterator.hasNext, "iterator should have 5 elements but actually has 3 elements")
143-
assert(iterator.next()._2.isDefined,
144-
"All elements should be defined but 4th element is not actually defined")
145-
assert(iterator.hasNext, "iterator should have 5 elements but actually has 4 elements")
146-
assert(iterator.next()._2.isDefined,
147-
"All elements should be defined but 5th element is not actually defined")
148-
149-
verify(blockManager, times(5)).getLocalShuffleFromDisk(any(), any())
66+
// Local blocks are fetched immediately.
67+
verify(blockManager, times(5)).getBlockData(any())
68+
69+
for (i <- 0 until 5) {
70+
assert(iterator.hasNext, s"iterator should have 5 elements but actually has $i elements")
71+
assert(iterator.next()._2.isDefined,
72+
s"iterator should have 5 elements defined but actually has $i elements")
73+
}
74+
// No more fetching of local blocks.
75+
verify(blockManager, times(5)).getBlockData(any())
15076
}
15177

15278
test("handle remote fetch failures in BlockTransferService") {
@@ -173,7 +99,7 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
17399
transfer,
174100
blockManager,
175101
blocksByAddress,
176-
null,
102+
new TestSerializer,
177103
48 * 1024 * 1024)
178104

179105
iterator.foreach { case (_, iterOption) =>

0 commit comments

Comments
 (0)