From c27d3fe20b643a03386ff826592a30ad818f129d Mon Sep 17 00:00:00 2001 From: jeanlyn Date: Tue, 4 Aug 2015 17:23:29 +0800 Subject: [PATCH 1/7] catch exception avoid task fail --- .../org/apache/spark/storage/BlockManager.scala | 10 ++++++++-- .../apache/spark/storage/BlockManagerSuite.scala | 15 +++++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 86493673d958..b6417e36e5f3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -592,8 +592,14 @@ private[spark] class BlockManager( val locations = Random.shuffle(master.getLocations(blockId)) for (loc <- locations) { logDebug(s"Getting remote block $blockId from $loc") - val data = blockTransferService.fetchBlockSync( - loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() + val data = try { + blockTransferService.fetchBlockSync( + loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() + } catch { + case e: Throwable => + logWarning(s"Exception during getting remote block $blockId from $loc", e) + null + } if (data != null) { if (asBlockResult) { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index f480fd107a0c..3942795f0a0c 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -443,6 +443,21 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(list2DiskGet.get.readMethod === DataReadMethod.Disk) } + test("block manager crash test") { + conf.set("spark.network.timeout", "5s") + store = makeBlockManager(8000) + store2 = makeBlockManager(8000) + val list1 = List(new Array[Byte](4000)) + store2.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + val list1get = store.getRemote("list1") + assert(list1get.isDefined, "list1get expected to be fetched") + //simulate block manager crash + store2.stop() + val list1getagain = store.getRemoteBytes("list1") + assert(!list1getagain.isDefined, "list1getagain expected to be not fetched") + conf.set("spark.network.timeout", "120s") + } + test("in-memory LRU storage") { store = makeBlockManager(12000) val a1 = new Array[Byte](4000) From 7deba6c34d3a236d8531317693dcfa4e94e1b158 Mon Sep 17 00:00:00 2001 From: jeanlyn Date: Tue, 4 Aug 2015 20:38:07 +0800 Subject: [PATCH 2/7] fix style --- .../org/apache/spark/storage/BlockManagerSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 3942795f0a0c..c6dbe8be1ca4 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -449,12 +449,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store2 = makeBlockManager(8000) val list1 = List(new Array[Byte](4000)) store2.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - val list1get = store.getRemote("list1") - assert(list1get.isDefined, "list1get expected to be fetched") - //simulate block manager crash + val list1Get = store.getRemote("list1") + assert(list1Get.isDefined, "list1get expected to be fetched") + // simulate block manager crashed store2.stop() - val list1getagain = store.getRemoteBytes("list1") - assert(!list1getagain.isDefined, "list1getagain expected to be not fetched") + val list1GetAgain = store.getRemoteBytes("list1") + assert(!list1GetAgain.isDefined, "list1getagain expected to be not fetched") conf.set("spark.network.timeout", "120s") } From b273e2c959602e10a280bc06b7f13e6474a27480 Mon Sep 17 00:00:00 2001 From: jeanlyn Date: Wed, 5 Aug 2015 13:06:20 +0800 Subject: [PATCH 3/7] address comments --- .../apache/spark/storage/BlockManager.scala | 7 ++- .../spark/storage/BlockManagerSuite.scala | 47 ++++++++++++++----- 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index b6417e36e5f3..6aacb6d82e74 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -590,15 +590,20 @@ private[spark] class BlockManager( private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { require(blockId != null, "BlockId is null") val locations = Random.shuffle(master.getLocations(blockId)) + var failTimes = 1 for (loc <- locations) { logDebug(s"Getting remote block $blockId from $loc") val data = try { blockTransferService.fetchBlockSync( loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() } catch { - case e: Throwable => + case e: IOException if failTimes < locations.size => + // return null when IOException throw ,so we can fetch block + // from another location if there still have locations logWarning(s"Exception during getting remote block $blockId from $loc", e) + failTimes += 1 null + case t: Throwable => throw t } if (data != null) { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index c6dbe8be1ca4..bafdd7bbedfd 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -47,6 +47,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE private val conf = new SparkConf(false) var store: BlockManager = null var store2: BlockManager = null + var store3: BlockManager = null var rpcEnv: RpcEnv = null var master: BlockManagerMaster = null conf.set("spark.authenticate", "false") @@ -99,6 +100,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store2.stop() store2 = null } + if (store3 != null) { + store3.stop() + store3 = null + } rpcEnv.shutdown() rpcEnv.awaitTermination() rpcEnv = null @@ -443,19 +448,35 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(list2DiskGet.get.readMethod === DataReadMethod.Disk) } - test("block manager crash test") { - conf.set("spark.network.timeout", "5s") - store = makeBlockManager(8000) - store2 = makeBlockManager(8000) - val list1 = List(new Array[Byte](4000)) - store2.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - val list1Get = store.getRemote("list1") - assert(list1Get.isDefined, "list1get expected to be fetched") - // simulate block manager crashed - store2.stop() - val list1GetAgain = store.getRemoteBytes("list1") - assert(!list1GetAgain.isDefined, "list1getagain expected to be not fetched") - conf.set("spark.network.timeout", "120s") + test("(SPARK-9591)getRemoteBytes from another location when IOException throw") { + try { + conf.set("spark.network.timeout", "2s") + store = makeBlockManager(8000, "excutor1") + store2 = makeBlockManager(8000, "excutor2") + store3 = makeBlockManager(8000, "executor3") + val list1 = List(new Array[Byte](4000)) + store2.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store3.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + + var list1Get = store.getRemoteBytes("list1") + assert(list1Get.isDefined, "list1Get expected to be fetched") + // block manager exit + store2.stop() + store2 = null + list1Get = store.getRemoteBytes("list1") + + // get `list1` block + assert(list1Get.isDefined, "list1Get expected to be fetched") + + store3.stop() + store3 = null + // exception throw because there is no locations + intercept[java.io.IOException] { + list1Get = store.getRemoteBytes("list1") + } + } finally { + conf.remove("spark.network.timeout") + } } test("in-memory LRU storage") { From bb0fe187e996b630a391a9a916b107caaf45f617 Mon Sep 17 00:00:00 2001 From: jeanlyn Date: Wed, 5 Aug 2015 20:14:56 +0800 Subject: [PATCH 4/7] fix typos and add fail times to log --- .../scala/org/apache/spark/storage/BlockManager.scala | 8 ++++---- .../org/apache/spark/storage/BlockManagerSuite.scala | 7 ++----- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6aacb6d82e74..e4b79a38df29 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -590,18 +590,18 @@ private[spark] class BlockManager( private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { require(blockId != null, "BlockId is null") val locations = Random.shuffle(master.getLocations(blockId)) - var failTimes = 1 + var failTimes = 0 for (loc <- locations) { logDebug(s"Getting remote block $blockId from $loc") val data = try { blockTransferService.fetchBlockSync( loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() } catch { - case e: IOException if failTimes < locations.size => - // return null when IOException throw ,so we can fetch block + case e: IOException if failTimes < locations.size - 1 => + // Return null when IOException throw, so we can fetch block // from another location if there still have locations - logWarning(s"Exception during getting remote block $blockId from $loc", e) failTimes += 1 + logWarning(s"Try ${failTimes} times getting remote block $blockId from $loc failed:", e) null case t: Throwable => throw t } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index bafdd7bbedfd..c53121c03609 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -451,23 +451,20 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("(SPARK-9591)getRemoteBytes from another location when IOException throw") { try { conf.set("spark.network.timeout", "2s") - store = makeBlockManager(8000, "excutor1") - store2 = makeBlockManager(8000, "excutor2") + store = makeBlockManager(8000, "executor1") + store2 = makeBlockManager(8000, "executor2") store3 = makeBlockManager(8000, "executor3") val list1 = List(new Array[Byte](4000)) store2.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) store3.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - var list1Get = store.getRemoteBytes("list1") assert(list1Get.isDefined, "list1Get expected to be fetched") // block manager exit store2.stop() store2 = null list1Get = store.getRemoteBytes("list1") - // get `list1` block assert(list1Get.isDefined, "list1Get expected to be fetched") - store3.stop() store3 = null // exception throw because there is no locations From b23f39a322299a4edfeac2fd46c89fb109ec1512 Mon Sep 17 00:00:00 2001 From: jeanlyn Date: Fri, 7 Aug 2015 11:41:30 +0800 Subject: [PATCH 5/7] throw exception when there no location to fetch --- .../spark/storage/BlockFetchException.scala | 21 +++++++++++++++++++ .../apache/spark/storage/BlockManager.scala | 15 +++++++------ .../spark/storage/BlockManagerSuite.scala | 10 ++++++--- 3 files changed, 37 insertions(+), 9 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala new file mode 100644 index 000000000000..535c9fa75b34 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +private[spark] +case class BlockFetchException(throwable: Throwable) extends Exception(throwable) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e4b79a38df29..b4012c412b88 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -590,20 +590,23 @@ private[spark] class BlockManager( private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { require(blockId != null, "BlockId is null") val locations = Random.shuffle(master.getLocations(blockId)) - var failTimes = 0 + var attemptTimes = 0 for (loc <- locations) { logDebug(s"Getting remote block $blockId from $loc") val data = try { blockTransferService.fetchBlockSync( loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() } catch { - case e: IOException if failTimes < locations.size - 1 => - // Return null when IOException throw, so we can fetch block + case t: Throwable if attemptTimes < locations.size - 1 => + // Return null when Exception throw, so we can fetch block // from another location if there still have locations - failTimes += 1 - logWarning(s"Try ${failTimes} times getting remote block $blockId from $loc failed:", e) + attemptTimes += 1 + logWarning(s"Try $attemptTimes times getting remote block $blockId from $loc failed.", t) null - case t: Throwable => throw t + case t: Throwable => + // Throw BlockFetchException wraps the last Exception when + // there is no block we can fetch + throw new BlockFetchException(t) } if (data != null) { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index c53121c03609..f25e23338590 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -448,7 +448,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(list2DiskGet.get.readMethod === DataReadMethod.Disk) } - test("(SPARK-9591)getRemoteBytes from another location when IOException throw") { + test("SPARK-9591:getRemoteBytes from another location when IOException throw") { + val origTimeoutOpt = conf.getOption("spark.network.timeout") try { conf.set("spark.network.timeout", "2s") store = makeBlockManager(8000, "executor1") @@ -468,11 +469,14 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store3.stop() store3 = null // exception throw because there is no locations - intercept[java.io.IOException] { + intercept[BlockFetchException] { list1Get = store.getRemoteBytes("list1") } } finally { - conf.remove("spark.network.timeout") + origTimeoutOpt match { + case Some(t) => conf.set("spark.network.timeout", t) + case None => conf.remove("spark.network.timeout") + } } } From 75db3340384def0b476482d9d9a8ff90f7b5e9f9 Mon Sep 17 00:00:00 2001 From: jeanlyn Date: Sun, 9 Aug 2015 20:43:42 +0800 Subject: [PATCH 6/7] add more informations when exception occur --- .../scala/org/apache/spark/storage/BlockFetchException.scala | 5 ++++- .../main/scala/org/apache/spark/storage/BlockManager.scala | 3 ++- .../scala/org/apache/spark/storage/BlockManagerSuite.scala | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala index 535c9fa75b34..f6e46ae9a481 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala @@ -17,5 +17,8 @@ package org.apache.spark.storage +import org.apache.spark.SparkException + private[spark] -case class BlockFetchException(throwable: Throwable) extends Exception(throwable) +case class BlockFetchException(messages: String, throwable: Throwable) + extends SparkException(messages, throwable) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index b4012c412b88..f13f4f0e3b83 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -606,7 +606,8 @@ private[spark] class BlockManager( case t: Throwable => // Throw BlockFetchException wraps the last Exception when // there is no block we can fetch - throw new BlockFetchException(t) + throw new BlockFetchException(s"Failed to fetch block from" + + s" ${locations.size} locations. Most recent failure cause:", t) } if (data != null) { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index f25e23338590..e5b54d66c815 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -448,7 +448,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(list2DiskGet.get.readMethod === DataReadMethod.Disk) } - test("SPARK-9591:getRemoteBytes from another location when IOException throw") { + test("SPARK-9591: getRemoteBytes from another location when Exception throw") { val origTimeoutOpt = conf.getOption("spark.network.timeout") try { conf.set("spark.network.timeout", "2s") From 6c6d53d61d293e2c14ed3f776b8fb20397a6b332 Mon Sep 17 00:00:00 2001 From: jeanlyn Date: Wed, 2 Sep 2015 10:55:42 +0800 Subject: [PATCH 7/7] address @andrewor14 comments, do not catch Fatal exception --- .../apache/spark/storage/BlockManager.scala | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f13f4f0e3b83..87aad0efadc1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -23,6 +23,7 @@ import java.nio.{ByteBuffer, MappedByteBuffer} import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.{ExecutionContext, Await, Future} import scala.concurrent.duration._ +import scala.util.control.NonFatal import scala.util.Random import sun.nio.ch.DirectBuffer @@ -590,24 +591,25 @@ private[spark] class BlockManager( private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { require(blockId != null, "BlockId is null") val locations = Random.shuffle(master.getLocations(blockId)) - var attemptTimes = 0 + var numFetchFailures = 0 for (loc <- locations) { logDebug(s"Getting remote block $blockId from $loc") val data = try { blockTransferService.fetchBlockSync( loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() } catch { - case t: Throwable if attemptTimes < locations.size - 1 => - // Return null when Exception throw, so we can fetch block - // from another location if there still have locations - attemptTimes += 1 - logWarning(s"Try $attemptTimes times getting remote block $blockId from $loc failed.", t) - null - case t: Throwable => - // Throw BlockFetchException wraps the last Exception when - // there is no block we can fetch - throw new BlockFetchException(s"Failed to fetch block from" + - s" ${locations.size} locations. Most recent failure cause:", t) + case NonFatal(e) => + numFetchFailures += 1 + if (numFetchFailures == locations.size) { + // An exception is thrown while fetching this block from all locations + throw new BlockFetchException(s"Failed to fetch block from" + + s" ${locations.size} locations. Most recent failure cause:", e) + } else { + // This location failed, so we retry fetch from a different one by returning null here + logWarning(s"Failed to fetch remote block $blockId " + + s"from $loc (failed attempt $numFetchFailures)", e) + null + } } if (data != null) {