diff --git a/network/common/src/main/java/org/apache/spark/network/TransportContext.java b/network/common/src/main/java/org/apache/spark/network/TransportContext.java index 5bc6e5a2418a9..b299b9f71f8cf 100644 --- a/network/common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/network/common/src/main/java/org/apache/spark/network/TransportContext.java @@ -72,21 +72,31 @@ public TransportContext(TransportConf conf, RpcHandler rpcHandler) { * Initializes a ClientFactory which runs the given TransportClientBootstraps prior to returning * a new Client. Bootstraps will be executed synchronously, and must run successfully in order * to create a Client. + * @param bootstraps TODO + * @return TODO */ public TransportClientFactory createClientFactory(List bootstraps) { return new TransportClientFactory(this, bootstraps); } + /** + * @return TODO + */ public TransportClientFactory createClientFactory() { return createClientFactory(Lists.newArrayList()); } - /** Create a server which will attempt to bind to a specific port. */ + /** Create a server which will attempt to bind to a specific port. + * @param port TODO + * @return TODO + * */ public TransportServer createServer(int port) { return new TransportServer(this, port); } - /** Creates a new server, binding to any available ephemeral port. */ + /** Creates a new server, binding to any available ephemeral port. + * @return TODO + * */ public TransportServer createServer() { return new TransportServer(this, 0); } @@ -96,6 +106,8 @@ public TransportServer createServer() { * has a {@link org.apache.spark.network.server.TransportChannelHandler} to handle request or * response messages. * + * @param channel TODO + * * @return Returns the created TransportChannelHandler, which includes a TransportClient that can * be used to communicate on this channel. The TransportClient is directly associated with a * ChannelHandler to ensure all users of the same channel get the same TransportClient object. diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java index a415db593a788..0e54d69672aaa 100644 --- a/network/common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java +++ b/network/common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java @@ -36,12 +36,16 @@ */ public abstract class ManagedBuffer { - /** Number of bytes of the data. */ + /** Number of bytes of the data. + * @return TODO + * */ public abstract long size(); /** * Exposes this buffer's data as an NIO ByteBuffer. Changing the position and limit of the * returned ByteBuffer should not affect the content of this buffer. + * @return TODO + * @throws java.io.IOException TODO */ // TODO: Deprecate this, usage may require expensive memory mapping or allocation. public abstract ByteBuffer nioByteBuffer() throws IOException; @@ -50,22 +54,28 @@ public abstract class ManagedBuffer { * Exposes this buffer's data as an InputStream. The underlying implementation does not * necessarily check for the length of bytes read, so the caller is responsible for making sure * it does not go over the limit. + * @return TODO + * @throws java.io.IOException TODO */ public abstract InputStream createInputStream() throws IOException; /** * Increment the reference count by one if applicable. + * @return TODO */ public abstract ManagedBuffer retain(); /** * If applicable, decrement the reference count by one and deallocates the buffer if the * reference count reaches zero. + * @return TODO */ public abstract ManagedBuffer release(); /** * Convert the buffer into an Netty object, used to write the data out. + * @return TODO + * @throws java.io.IOException TODO */ public abstract Object convertToNetty() throws IOException; } diff --git a/network/common/src/main/java/org/apache/spark/network/client/ChunkReceivedCallback.java b/network/common/src/main/java/org/apache/spark/network/client/ChunkReceivedCallback.java index 519e6cb470d0d..3d51fcd3b2839 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/ChunkReceivedCallback.java +++ b/network/common/src/main/java/org/apache/spark/network/client/ChunkReceivedCallback.java @@ -33,6 +33,8 @@ public interface ChunkReceivedCallback { * The given buffer will initially have a refcount of 1, but will be release()'d as soon as this * call returns. You must therefore either retain() the buffer or copy its contents before * returning. + * @param chunkIndex TODO + * @param buffer TODO */ void onSuccess(int chunkIndex, ManagedBuffer buffer); @@ -42,6 +44,8 @@ public interface ChunkReceivedCallback { * * After receiving a failure, the stream may or may not be valid. The client should not assume * that the server's side of the stream has been closed. + * @param chunkIndex TODO + * @param e TODO */ void onFailure(int chunkIndex, Throwable e); } diff --git a/network/common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java b/network/common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java index 6ec960d795420..d720f7ed9460e 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java +++ b/network/common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java @@ -22,9 +22,13 @@ * failure. */ public interface RpcResponseCallback { - /** Successful serialized result from server. */ + /** Successful serialized result from server. + * @param response TODO + * */ void onSuccess(byte[] response); - /** Exception either propagated from server or raised on client side. */ + /** Exception either propagated from server or raised on client side. + * @param e TODO + * */ void onFailure(Throwable e); } diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java index 4e944114e8176..f24672e613fb6 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -49,7 +49,7 @@ * to perform this setup. * * For example, a typical workflow might be: - * client.sendRPC(new OpenFile("/foo")) --> returns StreamId = 100 + * client.sendRPC(new OpenFile("/foo")) -- returns StreamId = 100 * client.fetchChunk(streamId = 100, chunkIndex = 0, callback) * client.fetchChunk(streamId = 100, chunkIndex = 1, callback) * ... @@ -70,11 +70,18 @@ public class TransportClient implements Closeable { private final Channel channel; private final TransportResponseHandler handler; + /** + * @param channel TODO + * @param handler TODO + */ public TransportClient(Channel channel, TransportResponseHandler handler) { this.channel = Preconditions.checkNotNull(channel); this.handler = Preconditions.checkNotNull(handler); } + /** + * @return TODO + */ public boolean isActive() { return channel.isOpen() || channel.isActive(); } @@ -132,6 +139,8 @@ public void operationComplete(ChannelFuture future) throws Exception { /** * Sends an opaque message to the RpcHandler on the server-side. The callback will be invoked * with the server's response or upon any failure. + * @param message TODO + * @param callback TODO */ public void sendRpc(byte[] message, final RpcResponseCallback callback) { final String serverAddr = NettyUtils.getRemoteAddress(channel); @@ -167,6 +176,9 @@ public void operationComplete(ChannelFuture future) throws Exception { /** * Synchronously sends an opaque message to the RpcHandler on the server-side, waiting for up to * a specified timeout for a response. + * @param message TODO + * @param timeoutMs TODO + * @return TODO */ public byte[] sendRpcSync(byte[] message, long timeoutMs) { final SettableFuture result = SettableFuture.create(); diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientBootstrap.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientBootstrap.java index 65e8020e34121..7cd9a99dbdf8c 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientBootstrap.java +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientBootstrap.java @@ -27,6 +27,8 @@ * the JVM itself. */ public interface TransportClientBootstrap { - /** Performs the bootstrapping operation, throwing an exception on failure. */ + /** Performs the bootstrapping operation, throwing an exception on failure. + * @param client TODO + * */ public void doBootstrap(TransportClient client) throws RuntimeException; } diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index 76bce8592816a..5ce060db707a6 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -91,6 +91,10 @@ public TransportClientFactory( * This blocks until a connection is successfully established and fully bootstrapped. * * Concurrency: This method is safe to call from multiple threads. + * @param remoteHost TODO + * @param remotePort TODO + * @return TODO + * @throws java.io.IOException TODO */ public TransportClient createClient(String remoteHost, int remotePort) throws IOException { // Get connection from the connection pool first. diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java index 2044afb0d85db..1b41deec15f15 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java @@ -160,7 +160,9 @@ public void handle(ResponseMessage message) { } } - /** Returns total number of outstanding requests (fetch requests + rpcs) */ + /** Returns total number of outstanding requests (fetch requests + rpcs) + * @return TODO + * */ @VisibleForTesting public int numOutstandingRequests() { return outstandingFetches.size() + outstandingRpcs.size(); diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java index ff4936470c697..6d7c4062f7a89 100644 --- a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java +++ b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java @@ -53,7 +53,10 @@ public void encode(ByteBuf buf) { streamChunkId.encode(buf); } - /** Decoding uses the given ByteBuf as our data, and will retain() it. */ + /** Decoding uses the given ByteBuf as our data, and will retain() it. + * @param buf TODO + * @return TODO + * */ public static ChunkFetchSuccess decode(ByteBuf buf) { StreamChunkId streamChunkId = StreamChunkId.decode(buf); buf.retain(); diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/Encodable.java b/network/common/src/main/java/org/apache/spark/network/protocol/Encodable.java index b4e299471b41a..4d58e7b6776dd 100644 --- a/network/common/src/main/java/org/apache/spark/network/protocol/Encodable.java +++ b/network/common/src/main/java/org/apache/spark/network/protocol/Encodable.java @@ -30,12 +30,15 @@ * Additionally, when adding a new Encodable Message, add it to {@link Message.Type}. */ public interface Encodable { - /** Number of bytes of the encoded form of this object. */ + /** Number of bytes of the encoded form of this object. + * @return TODO + * */ int encodedLength(); /** * Serializes this object by writing into the given ByteBuf. * This method must write exactly encodedLength() bytes. + * @param buf TODO */ void encode(ByteBuf buf); } diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/Message.java b/network/common/src/main/java/org/apache/spark/network/protocol/Message.java index d568370125fd4..f0eea68ea76e6 100644 --- a/network/common/src/main/java/org/apache/spark/network/protocol/Message.java +++ b/network/common/src/main/java/org/apache/spark/network/protocol/Message.java @@ -21,7 +21,9 @@ /** An on-the-wire transmittable message. */ public interface Message extends Encodable { - /** Used to identify this request type. */ + /** Used to identify this request type. + * @return TODO + * */ Type type(); /** Preceding every serialized Message is its type, which allows us to deserialize it. */ diff --git a/network/common/src/main/java/org/apache/spark/network/server/MessageHandler.java b/network/common/src/main/java/org/apache/spark/network/server/MessageHandler.java index b80c15106ecbd..d325139b6fe8e 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/MessageHandler.java +++ b/network/common/src/main/java/org/apache/spark/network/server/MessageHandler.java @@ -25,12 +25,20 @@ * Channel.) */ public abstract class MessageHandler { - /** Handles the receipt of a single message. */ + /** + * Handles the receipt of a single message. + * @param message TODO + */ public abstract void handle(T message); - /** Invoked when an exception was caught on the Channel. */ + /** + * Invoked when an exception was caught on the Channel. + * @param cause TODO + */ public abstract void exceptionCaught(Throwable cause); - /** Invoked when the channel this MessageHandler is on has been unregistered. */ + /** + * Invoked when the channel this MessageHandler is on has been unregistered. + */ public abstract void channelUnregistered(); } diff --git a/network/common/src/main/java/org/apache/spark/network/server/NoOpRpcHandler.java b/network/common/src/main/java/org/apache/spark/network/server/NoOpRpcHandler.java index 1502b7489e864..a86b69fe4105f 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/NoOpRpcHandler.java +++ b/network/common/src/main/java/org/apache/spark/network/server/NoOpRpcHandler.java @@ -28,11 +28,20 @@ public NoOpRpcHandler() { streamManager = new OneForOneStreamManager(); } + /** + * @param client A channel client which enables the handler to make requests back to the sender + * of this RPC. This will always be the exact same object for a particular channel. + * @param message The serialized bytes of the RPC. + * @param callback Callback which should be invoked exactly once upon success or failure of the + */ @Override public void receive(TransportClient client, byte[] message, RpcResponseCallback callback) { throw new UnsupportedOperationException("Cannot handle messages"); } + /** + * @return TODO + */ @Override public StreamManager getStreamManager() { return streamManager; } } diff --git a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index 731d48d4d9c6c..f0f9970cc53a0 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -29,7 +29,7 @@ import org.apache.spark.network.buffer.ManagedBuffer; /** - * StreamManager which allows registration of an Iterator, which are individually + * StreamManager which allows registration of an Iterator of ManagedBuffer, which are individually * fetched as chunks by the client. Each registered buffer is one chunk. */ public class OneForOneStreamManager extends StreamManager { @@ -95,6 +95,8 @@ public void connectionTerminated(long streamId) { * callers. Each ManagedBuffer will be release()'d after it is transferred on the wire. If a * client connection is closed before the iterator is fully drained, then the remaining buffers * will all be release()'d. + * @param buffers TODO + * @return TODO */ public long registerStream(Iterator buffers) { long myStreamId = nextStreamId.getAndIncrement(); diff --git a/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java b/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java index 2ba92a40f8b0a..b5f1e5bb01248 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java +++ b/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java @@ -44,12 +44,14 @@ public abstract void receive( /** * Returns the StreamManager which contains the state about which streams are currently being * fetched by a TransportClient. + * @return TODO */ public abstract StreamManager getStreamManager(); /** * Invoked when the connection associated with the given client has been invalidated. * No further requests will come from this client. + * @param client TODO */ public void connectionTerminated(TransportClient client) { } } diff --git a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java index 5a9a14a180c10..2a9768053efa5 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java +++ b/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java @@ -40,10 +40,12 @@ public abstract class StreamManager { * * @param streamId id of a stream that has been previously registered with the StreamManager. * @param chunkIndex 0-indexed chunk of the stream that's requested + * @return TODO */ public abstract ManagedBuffer getChunk(long streamId, int chunkIndex); /** + * @param streamId TODO * Indicates that the TCP connection that was tied to the given stream has been terminated. After * this occurs, we are guaranteed not to read from the stream again, so any state can be cleaned * up. diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java index 625c3257d764e..f26b5da329bba 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -50,7 +50,10 @@ public class TransportServer implements Closeable { private ChannelFuture channelFuture; private int port = -1; - /** Creates a TransportServer that binds to the given port, or to any available if 0. */ + /** Creates a TransportServer that binds to the given port, or to any available if 0. + * @param context TODO + * @param portToBind TODO + * */ public TransportServer(TransportContext context, int portToBind) { this.context = context; this.conf = context.getConf(); diff --git a/network/common/src/main/java/org/apache/spark/network/util/ConfigProvider.java b/network/common/src/main/java/org/apache/spark/network/util/ConfigProvider.java index d944d9da1c7f8..b59e1605a6f28 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/ConfigProvider.java +++ b/network/common/src/main/java/org/apache/spark/network/util/ConfigProvider.java @@ -23,7 +23,10 @@ * Provides a mechanism for constructing a {@link TransportConf} using some sort of configuration. */ public abstract class ConfigProvider { - /** Obtains the value of the given config, throws NoSuchElementException if it doesn't exist. */ + /** Obtains the value of the given config, throws NoSuchElementException if it doesn't exist. + * @param name TODO + * @return TODO + * */ public abstract String get(String name); public String get(String name, String defaultValue) { diff --git a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java index bf8a1fc42fc6d..033a41a42a9a0 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java +++ b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java @@ -41,7 +41,9 @@ public class JavaUtils { private static final Logger logger = LoggerFactory.getLogger(JavaUtils.class); - /** Closes the given object, ignoring IOExceptions. */ + /** Closes the given object, ignoring IOExceptions. + * @param closeable TODO + * */ public static void closeQuietly(Closeable closeable) { try { if (closeable != null) { @@ -52,7 +54,10 @@ public static void closeQuietly(Closeable closeable) { } } - /** Returns a hash consistent with Spark's Utils.nonNegativeHash(). */ + /** Returns a hash consistent with Spark's Utils.nonNegativeHash(). + * @param obj TODO + * @return TODO + */ public static int nonNegativeHash(Object obj) { if (obj == null) { return 0; } int hash = obj.hashCode(); @@ -62,6 +67,8 @@ public static int nonNegativeHash(Object obj) { /** * Convert the given string to a byte buffer. The resulting buffer can be * converted back to the same string through {@link #bytesToString(ByteBuffer)}. + * @param s TODO + * @return TODO */ public static ByteBuffer stringToBytes(String s) { return Unpooled.wrappedBuffer(s.getBytes(Charsets.UTF_8)).nioBuffer(); @@ -70,6 +77,8 @@ public static ByteBuffer stringToBytes(String s) { /** * Convert the given byte buffer to a string. The resulting string can be * converted back to the same byte buffer through {@link #stringToBytes(String)}. + * @param b TODO + * @return TODO */ public static String bytesToString(ByteBuffer b) { return Unpooled.wrappedBuffer(b).toString(Charsets.UTF_8); @@ -79,6 +88,7 @@ public static String bytesToString(ByteBuffer b) { * Delete a file or directory and its contents recursively. * Don't follow directories if they are symlinks. * Throws an exception if deletion is unsuccessful. + * @param file TODO */ public static void deleteRecursively(File file) throws IOException { if (file == null) { return; } diff --git a/network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java b/network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java index 63ca43c046525..1e8b0b3a48687 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java +++ b/network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java @@ -27,7 +27,7 @@ * Wraps a {@link InputStream}, limiting the number of bytes which can be read. * * This code is from Guava's 14.0 source code, because there is no compatible way to - * use this functionality in both a Guava 11 environment and a Guava >14 environment. + * use this functionality in both a Guava 11 environment and a Guava 14 environment. */ public final class LimitedInputStream extends FilterInputStream { private long left; diff --git a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java index 5c654a6fd6ebe..3067a4668a71f 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -40,7 +40,10 @@ * Utilities for creating various Netty constructs based on whether we're using EPOLL or NIO. */ public class NettyUtils { - /** Creates a new ThreadFactory which prefixes each thread with the given name. */ + /** Creates a new ThreadFactory which prefixes each thread with the given name. + * @param threadPoolPrefix TODO + * @return TODO + * */ public static ThreadFactory createThreadFactory(String threadPoolPrefix) { return new ThreadFactoryBuilder() .setDaemon(true) @@ -48,7 +51,12 @@ public static ThreadFactory createThreadFactory(String threadPoolPrefix) { .build(); } - /** Creates a Netty EventLoopGroup based on the IOMode. */ + /** Creates a Netty EventLoopGroup based on the IOMode. + * @param mode TODO + * @param numThreads TODO + * @param threadPrefix TODO + * @return TODO + */ public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String threadPrefix) { ThreadFactory threadFactory = createThreadFactory(threadPrefix); @@ -62,7 +70,10 @@ public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String } } - /** Returns the correct (client) SocketChannel class based on IOMode. */ + /** Returns the correct (client) SocketChannel class based on IOMode. + * @param mode TODO + * @return TODO + * */ public static Class getClientChannelClass(IOMode mode) { switch (mode) { case NIO: @@ -74,7 +85,10 @@ public static Class getClientChannelClass(IOMode mode) { } } - /** Returns the correct ServerSocketChannel class based on IOMode. */ + /** Returns the correct ServerSocketChannel class based on IOMode. + * @param mode TODO + * @return TODO + * */ public static Class getServerChannelClass(IOMode mode) { switch (mode) { case NIO: @@ -89,6 +103,7 @@ public static Class getServerChannelClass(IOMode mode) /** * Creates a LengthFieldBasedFrameDecoder where the first 8 bytes are the length of the frame. * This is used before all decoders. + * @return TODO */ public static ByteToMessageDecoder createFrameDecoder() { // maxFrameLength = 2G @@ -99,7 +114,10 @@ public static ByteToMessageDecoder createFrameDecoder() { return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 8, -8, 8); } - /** Returns the remote address on the channel or "" if none exists. */ + /** Returns the remote address on the channel or "remote address" if none exists. + * @param channel TODO + * @return TODO + * */ public static String getRemoteAddress(Channel channel) { if (channel != null && channel.remoteAddress() != null) { return channel.remoteAddress().toString(); @@ -112,6 +130,10 @@ public static String getRemoteAddress(Channel channel) { * are disabled because the ByteBufs are allocated by the event loop thread, but released by the * executor thread rather than the event loop thread. Those thread-local caches actually delay * the recycling of buffers, leading to larger memory usage. + * @param allowDirectBufs TODO + * @param allowCache TODO + * @param numCores TODO + * @return TODO */ public static PooledByteBufAllocator createPooledByteBufAllocator( boolean allowDirectBufs, diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java index 621427d8cba5e..17cab73d3205f 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -27,26 +27,38 @@ public TransportConf(ConfigProvider conf) { this.conf = conf; } - /** IO mode: nio or epoll */ + /** IO mode: nio or epoll + * @return TODO + * */ public String ioMode() { return conf.get("spark.shuffle.io.mode", "NIO").toUpperCase(); } - /** If true, we will prefer allocating off-heap byte buffers within Netty. */ + /** If true, we will prefer allocating off-heap byte buffers within Netty. + * @return TODO + * */ public boolean preferDirectBufs() { return conf.getBoolean("spark.shuffle.io.preferDirectBufs", true); } - /** Connect timeout in secs. Default 120 secs. */ + /** Connect timeout in secs. Default 120 secs. + * @return TODO + * */ public int connectionTimeoutMs() { return conf.getInt("spark.shuffle.io.connectionTimeout", 120) * 1000; } - /** Requested maximum length of the queue of incoming connections. Default -1 for no backlog. */ + /** Requested maximum length of the queue of incoming connections. Default -1 for no backlog. + * @return TODO + * */ public int backLog() { return conf.getInt("spark.shuffle.io.backLog", -1); } - /** Number of threads used in the server thread pool. Default to 0, which is 2x#cores. */ + /** Number of threads used in the server thread pool. Default to 0, which is 2x#cores. + * @return TODO + * */ public int serverThreads() { return conf.getInt("spark.shuffle.io.serverThreads", 0); } - /** Number of threads used in the client thread pool. Default to 0, which is 2x#cores. */ + /** Number of threads used in the client thread pool. Default to 0, which is 2x#cores. + * @return TODO + */ public int clientThreads() { return conf.getInt("spark.shuffle.io.clientThreads", 0); } /** @@ -55,24 +67,31 @@ public int connectionTimeoutMs() { * latency * network_bandwidth. * Assuming latency = 1ms, network_bandwidth = 10Gbps * buffer size should be ~ 1.25MB + * @return TODO */ public int receiveBuf() { return conf.getInt("spark.shuffle.io.receiveBuffer", -1); } - /** Send buffer size (SO_SNDBUF). */ + /** Send buffer size (SO_SNDBUF). + * @return TODO + */ public int sendBuf() { return conf.getInt("spark.shuffle.io.sendBuffer", -1); } - /** Timeout for a single round trip of SASL token exchange, in milliseconds. */ + /** Timeout for a single round trip of SASL token exchange, in milliseconds. + * @return TODO + */ public int saslRTTimeout() { return conf.getInt("spark.shuffle.sasl.timeout", 30000); } /** * Max number of times we will try IO exceptions (such as connection timeouts) per request. * If set to 0, we will not do any retries. + * @return TODO */ public int maxIORetries() { return conf.getInt("spark.shuffle.io.maxRetries", 3); } /** * Time (in milliseconds) that we will wait in order to perform a retry after an IOException. - * Only relevant if maxIORetries > 0. + * Only relevant if maxIORetries greater than 0. + * @return TODO */ public int ioRetryWaitTime() { return conf.getInt("spark.shuffle.io.retryWaitMs", 5000); } @@ -80,6 +99,7 @@ public int connectionTimeoutMs() { * Minimum size of a block that we should start using memory map rather than reading in through * normal IO operations. This prevents Spark from memory mapping very small blocks. In general, * memory mapping has high overhead for blocks close to or below the page size of the OS. + * @return TODO */ public int memoryMapBytes() { return conf.getInt("spark.storage.memoryMapThreshold", 2 * 1024 * 1024); @@ -88,6 +108,7 @@ public int memoryMapBytes() { /** * Whether to initialize shuffle FileDescriptor lazily or not. If true, file descriptors are * created only when data is going to be transferred. This can reduce the number of open files. + * @return TODO */ public boolean lazyFileDescriptor() { return conf.getBoolean("spark.shuffle.io.lazyFD", true);