Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.spark.network.protocol.RpcRequest;
import org.apache.spark.network.protocol.StreamChunkId;
import org.apache.spark.network.protocol.StreamRequest;
import org.apache.spark.network.util.NettyUtils;
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;

/**
* Client for fetching consecutive chunks of a pre-negotiated stream. This API is intended to allow
Expand Down Expand Up @@ -135,9 +135,10 @@ public void fetchChunk(
long streamId,
final int chunkIndex,
final ChunkReceivedCallback callback) {
final String serverAddr = NettyUtils.getRemoteAddress(channel);
final long startTime = System.currentTimeMillis();
logger.debug("Sending fetch chunk request {} to {}", chunkIndex, serverAddr);
if (logger.isDebugEnabled()) {
logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel));
}

final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
handler.addFetchRequest(streamChunkId, callback);
Expand All @@ -148,11 +149,13 @@ public void fetchChunk(
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
logger.trace("Sending request {} to {} took {} ms", streamChunkId, serverAddr,
timeTaken);
if (logger.isTraceEnabled()) {
logger.trace("Sending request {} to {} took {} ms", streamChunkId, getRemoteAddress(channel),
timeTaken);
}
} else {
String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
serverAddr, future.cause());
getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
handler.removeFetchRequest(streamChunkId);
channel.close();
Expand All @@ -173,9 +176,10 @@ public void operationComplete(ChannelFuture future) throws Exception {
* @param callback Object to call with the stream data.
*/
public void stream(final String streamId, final StreamCallback callback) {
final String serverAddr = NettyUtils.getRemoteAddress(channel);
final long startTime = System.currentTimeMillis();
logger.debug("Sending stream request for {} to {}", streamId, serverAddr);
if (logger.isDebugEnabled()) {
logger.debug("Sending stream request for {} to {}", streamId, getRemoteAddress(channel));
}

// Need to synchronize here so that the callback is added to the queue and the RPC is
// written to the socket atomically, so that callbacks are called in the right order
Expand All @@ -188,11 +192,13 @@ public void stream(final String streamId, final StreamCallback callback) {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
logger.trace("Sending request for {} to {} took {} ms", streamId, serverAddr,
timeTaken);
if (logger.isTraceEnabled()) {
logger.trace("Sending request for {} to {} took {} ms", streamId, getRemoteAddress(channel),
timeTaken);
}
} else {
String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
serverAddr, future.cause());
getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
channel.close();
try {
Expand All @@ -215,9 +221,10 @@ public void operationComplete(ChannelFuture future) throws Exception {
* @return The RPC's id.
*/
public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
final String serverAddr = NettyUtils.getRemoteAddress(channel);
final long startTime = System.currentTimeMillis();
logger.trace("Sending RPC to {}", serverAddr);
if (logger.isTraceEnabled()) {
logger.trace("Sending RPC to {}", getRemoteAddress(channel));
}

final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
handler.addRpcRequest(requestId, callback);
Expand All @@ -228,10 +235,12 @@ public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
logger.trace("Sending request {} to {} took {} ms", requestId, serverAddr, timeTaken);
if (logger.isTraceEnabled()) {
logger.trace("Sending request {} to {} took {} ms", requestId, getRemoteAddress(channel), timeTaken);
}
} else {
String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
serverAddr, future.cause());
getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
handler.removeRpcRequest(requestId);
channel.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public TransportClient createUnmanagedClient(String remoteHost, int remotePort)

/** Create a completely new {@link TransportClient} to the remote address. */
private TransportClient createClient(InetSocketAddress address) throws IOException {
logger.debug("Creating new connection to " + address);
logger.debug("Creating new connection to {}", address);

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.spark.network.protocol.StreamFailure;
import org.apache.spark.network.protocol.StreamResponse;
import org.apache.spark.network.server.MessageHandler;
import org.apache.spark.network.util.NettyUtils;
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
import org.apache.spark.network.util.TransportFrameDecoder;

/**
Expand Down Expand Up @@ -122,7 +122,7 @@ public void channelActive() {
@Override
public void channelInactive() {
if (numOutstandingRequests() > 0) {
String remoteAddress = NettyUtils.getRemoteAddress(channel);
String remoteAddress = getRemoteAddress(channel);
logger.error("Still have {} requests outstanding when connection from {} is closed",
numOutstandingRequests(), remoteAddress);
failOutstandingRequests(new IOException("Connection from " + remoteAddress + " closed"));
Expand All @@ -132,7 +132,7 @@ public void channelInactive() {
@Override
public void exceptionCaught(Throwable cause) {
if (numOutstandingRequests() > 0) {
String remoteAddress = NettyUtils.getRemoteAddress(channel);
String remoteAddress = getRemoteAddress(channel);
logger.error("Still have {} requests outstanding when connection from {} is closed",
numOutstandingRequests(), remoteAddress);
failOutstandingRequests(cause);
Expand All @@ -141,13 +141,12 @@ public void exceptionCaught(Throwable cause) {

@Override
public void handle(ResponseMessage message) throws Exception {
String remoteAddress = NettyUtils.getRemoteAddress(channel);
if (message instanceof ChunkFetchSuccess) {
ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
if (listener == null) {
logger.warn("Ignoring response for block {} from {} since it is not outstanding",
resp.streamChunkId, remoteAddress);
resp.streamChunkId, getRemoteAddress(channel));
resp.body().release();
} else {
outstandingFetches.remove(resp.streamChunkId);
Expand All @@ -159,7 +158,7 @@ public void handle(ResponseMessage message) throws Exception {
ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
if (listener == null) {
logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding",
resp.streamChunkId, remoteAddress, resp.errorString);
resp.streamChunkId, getRemoteAddress(channel), resp.errorString);
} else {
outstandingFetches.remove(resp.streamChunkId);
listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException(
Expand All @@ -170,7 +169,7 @@ public void handle(ResponseMessage message) throws Exception {
RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
if (listener == null) {
logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
resp.requestId, remoteAddress, resp.body().size());
resp.requestId, getRemoteAddress(channel), resp.body().size());
} else {
outstandingRpcs.remove(resp.requestId);
try {
Expand All @@ -184,7 +183,7 @@ public void handle(ResponseMessage message) throws Exception {
RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
if (listener == null) {
logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
resp.requestId, remoteAddress, resp.errorString);
resp.requestId, getRemoteAddress(channel), resp.errorString);
} else {
outstandingRpcs.remove(resp.requestId);
listener.onFailure(new RuntimeException(resp.errorString));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
Message.Type msgType = Message.Type.decode(in);
Message decoded = decode(msgType, in);
assert decoded.type() == msgType;
logger.trace("Received message " + msgType + ": " + decoded);
logger.trace("Received message {}: {}", msgType, decoded);
out.add(decoded);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.spark.network.protocol.Message;
import org.apache.spark.network.protocol.RequestMessage;
import org.apache.spark.network.protocol.ResponseMessage;
import org.apache.spark.network.util.NettyUtils;
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;

/**
* The single Transport-level Channel handler which is used for delegating requests to the
Expand Down Expand Up @@ -76,7 +76,7 @@ public TransportClient getClient() {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.warn("Exception in connection from " + NettyUtils.getRemoteAddress(ctx.channel()),
logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()),
cause);
requestHandler.exceptionCaught(cause);
responseHandler.exceptionCaught(cause);
Expand Down Expand Up @@ -139,7 +139,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;
if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
if (responseHandler.numOutstandingRequests() > 0) {
String address = NettyUtils.getRemoteAddress(ctx.channel());
String address = getRemoteAddress(ctx.channel());
logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
"requests. Assuming connection is dead; please adjust spark.network.timeout if " +
"this is wrong.", address, requestTimeoutNs / 1000 / 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.network.server;

import java.net.SocketAddress;
import java.nio.ByteBuffer;

import com.google.common.base.Throwables;
Expand All @@ -42,7 +43,7 @@
import org.apache.spark.network.protocol.StreamFailure;
import org.apache.spark.network.protocol.StreamRequest;
import org.apache.spark.network.protocol.StreamResponse;
import org.apache.spark.network.util.NettyUtils;
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;

/**
* A handler that processes requests from clients and writes chunk data back. Each handler is
Expand Down Expand Up @@ -114,9 +115,9 @@ public void handle(RequestMessage request) {
}

private void processFetchRequest(final ChunkFetchRequest req) {
final String client = NettyUtils.getRemoteAddress(channel);

logger.trace("Received req from {} to fetch block {}", client, req.streamChunkId);
if (logger.isTraceEnabled()) {
logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), req.streamChunkId);
}

ManagedBuffer buf;
try {
Expand All @@ -125,7 +126,7 @@ private void processFetchRequest(final ChunkFetchRequest req) {
buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);
} catch (Exception e) {
logger.error(String.format(
"Error opening block %s for request from %s", req.streamChunkId, client), e);
"Error opening block %s for request from %s", req.streamChunkId, getRemoteAddress(channel)), e);
respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e)));
return;
}
Expand All @@ -134,13 +135,12 @@ private void processFetchRequest(final ChunkFetchRequest req) {
}

private void processStreamRequest(final StreamRequest req) {
final String client = NettyUtils.getRemoteAddress(channel);
ManagedBuffer buf;
try {
buf = streamManager.openStream(req.streamId);
} catch (Exception e) {
logger.error(String.format(
"Error opening stream %s for request from %s", req.streamId, client), e);
"Error opening stream %s for request from %s", req.streamId, getRemoteAddress(channel)), e);
respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e)));
return;
}
Expand Down Expand Up @@ -189,13 +189,13 @@ private void processOneWayMessage(OneWayMessage req) {
* it will be logged and the channel closed.
*/
private void respond(final Encodable result) {
final String remoteAddress = channel.remoteAddress().toString();
final SocketAddress remoteAddress = channel.remoteAddress();
channel.writeAndFlush(result).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
logger.trace(String.format("Sent result %s to client %s", result, remoteAddress));
logger.trace("Sent result {} to client {}", result, remoteAddress);
} else {
logger.error(String.format("Error sending result %s to %s; closing connection",
result, remoteAddress), future.cause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
channelFuture.syncUninterruptibly();

port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
logger.debug("Shuffle server started on port :" + port);
logger.debug("Shuffle server started on port: {}", port);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ private void deleteExecutorDirs(String[] dirs) {
for (String localDir : dirs) {
try {
JavaUtils.deleteRecursively(new File(localDir));
logger.debug("Successfully cleaned up directory: " + localDir);
logger.debug("Successfully cleaned up directory: {}", localDir);
} catch (Exception e) {
logger.error("Failed to delete directory: " + localDir, e);
}
Expand Down