Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,31 @@ public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
* Initializes a ClientFactory which runs the given TransportClientBootstraps prior to returning
* a new Client. Bootstraps will be executed synchronously, and must run successfully in order
* to create a Client.
* @param bootstraps TODO
* @return TODO
*/
public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {
return new TransportClientFactory(this, bootstraps);
}

/**
* @return TODO
*/
public TransportClientFactory createClientFactory() {
return createClientFactory(Lists.<TransportClientBootstrap>newArrayList());
}

/** Create a server which will attempt to bind to a specific port. */
/** Create a server which will attempt to bind to a specific port.
* @param port TODO
* @return TODO
* */
public TransportServer createServer(int port) {
return new TransportServer(this, port);
}

/** Creates a new server, binding to any available ephemeral port. */
/** Creates a new server, binding to any available ephemeral port.
* @return TODO
* */
public TransportServer createServer() {
return new TransportServer(this, 0);
}
Expand All @@ -96,6 +106,8 @@ public TransportServer createServer() {
* has a {@link org.apache.spark.network.server.TransportChannelHandler} to handle request or
* response messages.
*
* @param channel TODO
*
* @return Returns the created TransportChannelHandler, which includes a TransportClient that can
* be used to communicate on this channel. The TransportClient is directly associated with a
* ChannelHandler to ensure all users of the same channel get the same TransportClient object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,16 @@
*/
public abstract class ManagedBuffer {

/** Number of bytes of the data. */
/** Number of bytes of the data.
* @return TODO
* */
public abstract long size();

/**
* Exposes this buffer's data as an NIO ByteBuffer. Changing the position and limit of the
* returned ByteBuffer should not affect the content of this buffer.
* @return TODO
* @throws java.io.IOException TODO
*/
// TODO: Deprecate this, usage may require expensive memory mapping or allocation.
public abstract ByteBuffer nioByteBuffer() throws IOException;
Expand All @@ -50,22 +54,28 @@ public abstract class ManagedBuffer {
* Exposes this buffer's data as an InputStream. The underlying implementation does not
* necessarily check for the length of bytes read, so the caller is responsible for making sure
* it does not go over the limit.
* @return TODO
* @throws java.io.IOException TODO
*/
public abstract InputStream createInputStream() throws IOException;

/**
* Increment the reference count by one if applicable.
* @return TODO
*/
public abstract ManagedBuffer retain();

/**
* If applicable, decrement the reference count by one and deallocates the buffer if the
* reference count reaches zero.
* @return TODO
*/
public abstract ManagedBuffer release();

/**
* Convert the buffer into an Netty object, used to write the data out.
* @return TODO
* @throws java.io.IOException TODO
*/
public abstract Object convertToNetty() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public interface ChunkReceivedCallback {
* The given buffer will initially have a refcount of 1, but will be release()'d as soon as this
* call returns. You must therefore either retain() the buffer or copy its contents before
* returning.
* @param chunkIndex TODO
* @param buffer TODO
*/
void onSuccess(int chunkIndex, ManagedBuffer buffer);

Expand All @@ -42,6 +44,8 @@ public interface ChunkReceivedCallback {
*
* After receiving a failure, the stream may or may not be valid. The client should not assume
* that the server's side of the stream has been closed.
* @param chunkIndex TODO
* @param e TODO
*/
void onFailure(int chunkIndex, Throwable e);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@
* failure.
*/
public interface RpcResponseCallback {
/** Successful serialized result from server. */
/** Successful serialized result from server.
* @param response TODO
* */
void onSuccess(byte[] response);

/** Exception either propagated from server or raised on client side. */
/** Exception either propagated from server or raised on client side.
* @param e TODO
* */
void onFailure(Throwable e);
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
* to perform this setup.
*
* For example, a typical workflow might be:
* client.sendRPC(new OpenFile("/foo")) --> returns StreamId = 100
* client.sendRPC(new OpenFile("/foo")) -- returns StreamId = 100
* client.fetchChunk(streamId = 100, chunkIndex = 0, callback)
* client.fetchChunk(streamId = 100, chunkIndex = 1, callback)
* ...
Expand All @@ -70,11 +70,18 @@ public class TransportClient implements Closeable {
private final Channel channel;
private final TransportResponseHandler handler;

/**
* @param channel TODO
* @param handler TODO
*/
public TransportClient(Channel channel, TransportResponseHandler handler) {
this.channel = Preconditions.checkNotNull(channel);
this.handler = Preconditions.checkNotNull(handler);
}

/**
* @return TODO
*/
public boolean isActive() {
return channel.isOpen() || channel.isActive();
}
Expand Down Expand Up @@ -132,6 +139,8 @@ public void operationComplete(ChannelFuture future) throws Exception {
/**
* Sends an opaque message to the RpcHandler on the server-side. The callback will be invoked
* with the server's response or upon any failure.
* @param message TODO
* @param callback TODO
*/
public void sendRpc(byte[] message, final RpcResponseCallback callback) {
final String serverAddr = NettyUtils.getRemoteAddress(channel);
Expand Down Expand Up @@ -167,6 +176,9 @@ public void operationComplete(ChannelFuture future) throws Exception {
/**
* Synchronously sends an opaque message to the RpcHandler on the server-side, waiting for up to
* a specified timeout for a response.
* @param message TODO
* @param timeoutMs TODO
* @return TODO
*/
public byte[] sendRpcSync(byte[] message, long timeoutMs) {
final SettableFuture<byte[]> result = SettableFuture.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
* the JVM itself.
*/
public interface TransportClientBootstrap {
/** Performs the bootstrapping operation, throwing an exception on failure. */
/** Performs the bootstrapping operation, throwing an exception on failure.
* @param client TODO
* */
public void doBootstrap(TransportClient client) throws RuntimeException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ public TransportClientFactory(
* This blocks until a connection is successfully established and fully bootstrapped.
*
* Concurrency: This method is safe to call from multiple threads.
* @param remoteHost TODO
* @param remotePort TODO
* @return TODO
* @throws java.io.IOException TODO
*/
public TransportClient createClient(String remoteHost, int remotePort) throws IOException {
// Get connection from the connection pool first.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ public void handle(ResponseMessage message) {
}
}

/** Returns total number of outstanding requests (fetch requests + rpcs) */
/** Returns total number of outstanding requests (fetch requests + rpcs)
* @return TODO
* */
@VisibleForTesting
public int numOutstandingRequests() {
return outstandingFetches.size() + outstandingRpcs.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ public void encode(ByteBuf buf) {
streamChunkId.encode(buf);
}

/** Decoding uses the given ByteBuf as our data, and will retain() it. */
/** Decoding uses the given ByteBuf as our data, and will retain() it.
* @param buf TODO
* @return TODO
* */
public static ChunkFetchSuccess decode(ByteBuf buf) {
StreamChunkId streamChunkId = StreamChunkId.decode(buf);
buf.retain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@
* Additionally, when adding a new Encodable Message, add it to {@link Message.Type}.
*/
public interface Encodable {
/** Number of bytes of the encoded form of this object. */
/** Number of bytes of the encoded form of this object.
* @return TODO
* */
int encodedLength();

/**
* Serializes this object by writing into the given ByteBuf.
* This method must write exactly encodedLength() bytes.
* @param buf TODO
*/
void encode(ByteBuf buf);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

/** An on-the-wire transmittable message. */
public interface Message extends Encodable {
/** Used to identify this request type. */
/** Used to identify this request type.
* @return TODO
* */
Type type();

/** Preceding every serialized Message is its type, which allows us to deserialize it. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,20 @@
* Channel.)
*/
public abstract class MessageHandler<T extends Message> {
/** Handles the receipt of a single message. */
/**
* Handles the receipt of a single message.
* @param message TODO
*/
public abstract void handle(T message);

/** Invoked when an exception was caught on the Channel. */
/**
* Invoked when an exception was caught on the Channel.
* @param cause TODO
*/
public abstract void exceptionCaught(Throwable cause);

/** Invoked when the channel this MessageHandler is on has been unregistered. */
/**
* Invoked when the channel this MessageHandler is on has been unregistered.
*/
public abstract void channelUnregistered();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,20 @@ public NoOpRpcHandler() {
streamManager = new OneForOneStreamManager();
}

/**
* @param client A channel client which enables the handler to make requests back to the sender
* of this RPC. This will always be the exact same object for a particular channel.
* @param message The serialized bytes of the RPC.
* @param callback Callback which should be invoked exactly once upon success or failure of the
*/
@Override
public void receive(TransportClient client, byte[] message, RpcResponseCallback callback) {
throw new UnsupportedOperationException("Cannot handle messages");
}

/**
* @return TODO
*/
@Override
public StreamManager getStreamManager() { return streamManager; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.spark.network.buffer.ManagedBuffer;

/**
* StreamManager which allows registration of an Iterator<ManagedBuffer>, which are individually
* StreamManager which allows registration of an Iterator of ManagedBuffer, which are individually
* fetched as chunks by the client. Each registered buffer is one chunk.
*/
public class OneForOneStreamManager extends StreamManager {
Expand Down Expand Up @@ -95,6 +95,8 @@ public void connectionTerminated(long streamId) {
* callers. Each ManagedBuffer will be release()'d after it is transferred on the wire. If a
* client connection is closed before the iterator is fully drained, then the remaining buffers
* will all be release()'d.
* @param buffers TODO
* @return TODO
*/
public long registerStream(Iterator<ManagedBuffer> buffers) {
long myStreamId = nextStreamId.getAndIncrement();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ public abstract void receive(
/**
* Returns the StreamManager which contains the state about which streams are currently being
* fetched by a TransportClient.
* @return TODO
*/
public abstract StreamManager getStreamManager();

/**
* Invoked when the connection associated with the given client has been invalidated.
* No further requests will come from this client.
* @param client TODO
*/
public void connectionTerminated(TransportClient client) { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ public abstract class StreamManager {
*
* @param streamId id of a stream that has been previously registered with the StreamManager.
* @param chunkIndex 0-indexed chunk of the stream that's requested
* @return TODO
*/
public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);

/**
* @param streamId TODO
* Indicates that the TCP connection that was tied to the given stream has been terminated. After
* this occurs, we are guaranteed not to read from the stream again, so any state can be cleaned
* up.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ public class TransportServer implements Closeable {
private ChannelFuture channelFuture;
private int port = -1;

/** Creates a TransportServer that binds to the given port, or to any available if 0. */
/** Creates a TransportServer that binds to the given port, or to any available if 0.
* @param context TODO
* @param portToBind TODO
* */
public TransportServer(TransportContext context, int portToBind) {
this.context = context;
this.conf = context.getConf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
* Provides a mechanism for constructing a {@link TransportConf} using some sort of configuration.
*/
public abstract class ConfigProvider {
/** Obtains the value of the given config, throws NoSuchElementException if it doesn't exist. */
/** Obtains the value of the given config, throws NoSuchElementException if it doesn't exist.
* @param name TODO
* @return TODO
* */
public abstract String get(String name);

public String get(String name, String defaultValue) {
Expand Down
Loading