From 1b3173c590718d167331fc0a994359ddace5b964 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Tue, 25 Jun 2019 17:55:27 +0800 Subject: [PATCH 1/3] [SPARK-28160][Core] Fix a bug that TransportClient.sendRpcSync may hang forever --- .../spark/network/client/TransportClient.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java index 20d840baeaf6..6478447cc0bc 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -237,11 +237,15 @@ public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs) { sendRpc(message, new RpcResponseCallback() { @Override public void onSuccess(ByteBuffer response) { - ByteBuffer copy = ByteBuffer.allocate(response.remaining()); - copy.put(response); - // flip "copy" to make it readable - copy.flip(); - result.set(copy); + try { + ByteBuffer copy = ByteBuffer.allocate(response.remaining()); + copy.put(response); + // flip "copy" to make it readable + copy.flip(); + result.set(copy); + } catch (Throwable t) { + result.setException(t); + } } @Override From 794602ac4aafd7ca8123ead016d226fbadf2e804 Mon Sep 17 00:00:00 2001 From: lajin Date: Wed, 26 Jun 2019 21:25:34 +0800 Subject: [PATCH 2/3] bug in ExternalShuffleClient --- .../network/shuffle/ExternalShuffleClient.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java index 0e11d2124ada..f1ce8a101b7a 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -163,9 +163,16 @@ public Future removeBlocks( client.sendRpc(removeBlocksMessage, new RpcResponseCallback() { @Override public void onSuccess(ByteBuffer response) { - BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response); - numRemovedBlocksFuture.complete(((BlocksRemoved)msgObj).numRemovedBlocks); - client.close(); + try { + BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response); + numRemovedBlocksFuture.complete(((BlocksRemoved) msgObj).numRemovedBlocks); + } catch (Exception e) { + logger.warn("Error trying to remove RDD blocks " + Arrays.toString(blockIds) + + " via external shuffle service from executor: " + execId, e); + numRemovedBlocksFuture.complete(0); + } finally { + client.close(); + } } @Override From d2330cc8b4a74378556f7ae5924b3bb9388219d2 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Fri, 28 Jun 2019 12:40:03 +0800 Subject: [PATCH 3/3] address comment --- .../java/org/apache/spark/network/client/TransportClient.java | 1 + .../apache/spark/network/shuffle/ExternalShuffleClient.java | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java index 6478447cc0bc..b018197deaf2 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -244,6 +244,7 @@ public void onSuccess(ByteBuffer response) { copy.flip(); result.set(copy); } catch (Throwable t) { + logger.warn("Error in responding PRC callback", t); result.setException(t); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java index f1ce8a101b7a..2100b8946802 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -166,9 +166,9 @@ public void onSuccess(ByteBuffer response) { try { BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response); numRemovedBlocksFuture.complete(((BlocksRemoved) msgObj).numRemovedBlocks); - } catch (Exception e) { + } catch (Throwable t) { logger.warn("Error trying to remove RDD blocks " + Arrays.toString(blockIds) + - " via external shuffle service from executor: " + execId, e); + " via external shuffle service from executor: " + execId, t); numRemovedBlocksFuture.complete(0); } finally { client.close();