Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,14 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
case e: EncryptedManagedBuffer =>
result.success(e)
case _ =>
val ret = ByteBuffer.allocate(data.size.toInt)
ret.put(data.nioByteBuffer())
ret.flip()
result.success(new NioManagedBuffer(ret))
try {
val ret = ByteBuffer.allocate(data.size.toInt)
ret.put(data.nioByteBuffer())
ret.flip()
result.success(new NioManagedBuffer(ret))
} catch {
case e: Throwable => result.failure(e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative to catching and ignoring Throwable (after result.failures) is to use a boolean flag to indicate if result.success was signalled - and if not, invoke result.failure in finally.

var success = false
try {
  ...
  result.success(...)
  success = true
} finally {
  if (! success) result.failure(e)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to get the e, I think the catch is necessary here.

Copy link
Contributor

@mridulm mridulm Jan 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point - it is required for the failure arg.
But I dont like the throwable being ignored in the catch once failure is signalled.
For example, interruption, OOM, thread death, etc - we should rethrow it once promise is updated.

Copy link
Contributor

@cloud-fan cloud-fan Jan 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not ignored, the exception will be rethrown in ThreadUtils.awaitResult at the end of fetchBlockSync. Maybe we should not use Promise, and do it manually to be more explicit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is a common anti pattern in spark; the issue is basically thread/pool specific exceptions start leaking to other threads.
Though my basic concern was regarding things like OOM, and they do get propagated - so I guess we can live with this for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Promise is the most convenient way to turn an async listener to a Scala Future in order to call blocking APIs provided by Future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's propagated and the code calling fetchBlockSync will handle it.

}
}
}
}, tempFileManager)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network

import java.io.InputStream
import java.nio.ByteBuffer

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.reflect.ClassTag

import org.scalatest.concurrent._

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager}
import org.apache.spark.storage.{BlockId, StorageLevel}

class BlockTransferServiceSuite extends SparkFunSuite with TimeLimits {

implicit val defaultSignaler: Signaler = ThreadSignaler

test("fetchBlockSync should not hang when BlockFetchingListener.onBlockFetchSuccess fails") {
// Create a mocked `BlockTransferService` to call `BlockFetchingListener.onBlockFetchSuccess`
// with a bad `ManagedBuffer` which will trigger an exception in `onBlockFetchSuccess`.
val blockTransferService = new BlockTransferService {
override def init(blockDataManager: BlockDataManager): Unit = {}

override def close(): Unit = {}

override def port: Int = 0

override def hostName: String = "localhost-unused"

override def fetchBlocks(
host: String,
port: Int,
execId: String,
blockIds: Array[String],
listener: BlockFetchingListener,
tempFileManager: DownloadFileManager): Unit = {
// Notify BlockFetchingListener with a bad ManagedBuffer asynchronously
new Thread() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we create a thread here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, BlockFetchingListener will be called in the same thread and fail at once before Thread.awaitResult is called. Then the test won't be able to reproduce the issue.

override def run(): Unit = {
// This is a bad buffer to trigger `IllegalArgumentException` in
// `BlockFetchingListener.onBlockFetchSuccess`. The real issue we hit is
// `ByteBuffer.allocate` throws `OutOfMemoryError`, but we cannot make it happen in
// a test. Instead, we use a negative size value to make `ByteBuffer.allocate` fail,
// and this should trigger the same code path as `OutOfMemoryError`.
val badBuffer = new ManagedBuffer {
override def size(): Long = -1

override def nioByteBuffer(): ByteBuffer = null

override def createInputStream(): InputStream = null

override def retain(): ManagedBuffer = this

override def release(): ManagedBuffer = this

override def convertToNetty(): AnyRef = null
}
listener.onBlockFetchSuccess("block-id-unused", badBuffer)
}
}.start()
}

override def uploadBlock(
hostname: String,
port: Int,
execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Future[Unit] = {
// This method is unused in this test
throw new UnsupportedOperationException("uploadBlock")
}
}

val e = intercept[SparkException] {
failAfter(10.seconds) {
blockTransferService.fetchBlockSync(
"localhost-unused", 0, "exec-id-unused", "block-id-unused", null)
}
}
assert(e.getCause.isInstanceOf[IllegalArgumentException])
}
}