Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebSocketChannel bug fixes, cleanup and javadoc updates for FrameHandler #3432

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,31 @@ public void failed(Throwable x)
};
}

/**
* Create a nested callback which always fails the nested callback on completion.
* @param callback The nested callback
* @param cause The cause to fail the nested callback, if the new callback is failed the reason
* will be added to this cause as a suppressed exception.
* @return a new callback.
*/
static Callback from(Callback callback, Throwable cause)
{
return new Callback()
{
@Override
public void succeeded()
{
callback.failed(cause);
}

@Override
public void failed(Throwable x)
{
cause.addSuppressed(x);
callback.failed(cause);
}
};
}

class Completing implements Callback
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public CompletableFuture<Session> connect(Object websocket, URI toUri, UpgradeRe
public CompletableFuture<Session> connect(Object websocket, URI toUri, UpgradeRequest request, UpgradeListener listener) throws IOException
{
JettyClientUpgradeRequest upgradeRequest = new JettyClientUpgradeRequest(this, coreClient, request, toUri, websocket);
upgradeRequest.addListener(listener);
if (listener != null)
upgradeRequest.addListener(listener);
coreClient.connect(upgradeRequest);
return upgradeRequest.getFutureSession();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,13 @@ public interface FrameHandler extends IncomingFrames
/**
* Async notification that Connection is being opened.
* <p>
* FrameHandler can write during this call, but will not receive frames until
* the onOpen() completes.
* FrameHandler can write during this call, but can not receive frames until the callback is succeeded.
* </p>
* <p>
* If the FrameHandler succeeds the callback we transition to OPEN state and can now receive frames if
* not demanding, or can now call {@link CoreSession#demand(long)} to receive frames if demanding.
* If the FrameHandler fails the callback a close frame will be sent with {@link CloseStatus#SERVER_ERROR} and
*the connection will be closed. <br>
* </p>
*
* @param coreSession the channel associated with this connection.
Expand All @@ -81,9 +86,8 @@ public interface FrameHandler extends IncomingFrames
* sequentially to satisfy all outstanding demand signaled by calls to
* {@link CoreSession#demand(long)}.
* Control and Data frames are passed to this method.
* Control frames that require a response (eg PING and CLOSE) may be responded to by the
* the handler, but if an appropriate response is not sent once the callback is succeeded,
* then a response will be generated and sent.
* Close frames may be responded to by the handler, but if an appropriate close response is not
* sent once the callback is succeeded, then a response close will be generated and sent.
*
* @param frame the raw frame
* @param callback the callback to indicate success in processing frame (or failure)
Expand All @@ -93,7 +97,8 @@ public interface FrameHandler extends IncomingFrames
/**
* An error has occurred or been detected in websocket-core and being reported to FrameHandler.
* A call to onError will be followed by a call to {@link #onClosed(CloseStatus, Callback)} giving the close status
* derived from the error.
* derived from the error. This will not be called more than once, {@link #onClosed(CloseStatus, Callback)}
* will be called on the callback completion.
*
* @param cause the reason for the error
* @param callback the callback to indicate success in processing (or failure)
Expand All @@ -105,6 +110,7 @@ public interface FrameHandler extends IncomingFrames
* <p>
* The connection is now closed, no reading or writing is possible anymore.
* Implementations of FrameHandler can cleanup their resources for this connection now.
* This method will be called only once.
* </p>
*
* @param closeStatus the close status received from remote, or in the case of abnormal closure from local.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,9 +395,11 @@ public void processConnectionError(Throwable cause, Callback callback)
CloseStatus closeStatus = abnormalCloseStatusFor(cause);

if (closeStatus.getCode() == CloseStatus.PROTOCOL)
close(closeStatus, NOOP);
close(closeStatus, callback);
else if (channelState.onClosed(closeStatus))
closeConnection(cause, closeStatus, callback);
else
callback.failed(cause);
}

/**
Expand Down Expand Up @@ -428,7 +430,7 @@ public void onOpen()
if (LOG.isDebugEnabled())
LOG.debug("ConnectionState: Transition to CONNECTED");

Callback openCallback = Callback.from(()->
Callback openCallback = Callback.from(()->
{
channelState.onOpen();
if (!demanding)
Expand All @@ -450,6 +452,11 @@ public void onOpen()
catch (Throwable t)
{
openCallback.failed(t);

/* This is double handling of the exception but we need to do this because we have two separate
mechanisms for returning the CoreSession, onOpen and the CompletableFuture and both the onOpen callback
and the CompletableFuture require the exception. */
throw new RuntimeException(t);
}

}
Expand Down Expand Up @@ -481,9 +488,9 @@ public void onFrame(Frame frame, Callback callback)
{
assertValidIncoming(frame);
}
catch (Throwable ex)
catch (Throwable t)
{
callback.failed(ex);
callback.failed(t);
return;
}

Expand All @@ -497,9 +504,9 @@ public void sendFrame(Frame frame, Callback callback, boolean batch)
{
assertValidOutgoing(frame);
}
catch (Throwable ex)
catch (Throwable t)
{
callback.failed(ex);
callback.failed(t);
return;
}

Expand All @@ -517,13 +524,7 @@ public void sendFrame(Frame frame, Callback callback, boolean batch)

Callback closeConnectionCallback = Callback.from(
()->closeConnection(cause, channelState.getCloseStatus(), callback),
x->closeConnection(cause, channelState.getCloseStatus(), Callback.from(
()-> callback.failed(x),
x2->
{
x.addSuppressed(x2);
callback.failed(x);
})));
t->closeConnection(cause, channelState.getCloseStatus(), Callback.from(callback, t)));

flusher.queue.offer(new FrameEntry(frame, closeConnectionCallback, false));
}
Expand All @@ -534,24 +535,18 @@ public void sendFrame(Frame frame, Callback callback, boolean batch)
}
flusher.iterate();
}
catch (Throwable ex)
catch (Throwable t)
{
if (frame.getOpCode() == OpCode.CLOSE)
{
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (closeStatus instanceof AbnormalCloseStatus && channelState.onClosed(closeStatus))
closeConnection(null, closeStatus, Callback.from(
()->callback.failed(ex),
x2->
{
ex.addSuppressed(x2);
callback.failed(ex);
}));
closeConnection(AbnormalCloseStatus.getCause(closeStatus), closeStatus, Callback.from(callback, t));
else
callback.failed(ex);
callback.failed(t);
}
else
callback.failed(ex);
callback.failed(t);
}
}

Expand Down