Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #4462 - Prevent jetty 10 WebSocket close deadlocks #4472

Merged
merged 25 commits into from
Jan 28, 2020
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
18850d2
Issue #4462 - Replicate problems from WS close deadlock with testcases
lachlan-roberts Jan 10, 2020
4349ec3
Issue #4462 - do not use SharedBlockingCallback for jetty WS close
lachlan-roberts Jan 10, 2020
de903ec
add missing licence header
lachlan-roberts Jan 10, 2020
151fa25
Ignore any closes after the initial websocket close.
lachlan-roberts Jan 12, 2020
c9fd77c
Merge remote-tracking branch 'origin/jetty-10.0.x' into jetty-10.0.x-…
lachlan-roberts Jan 12, 2020
755cdce
subsequent closes are prevented now with WSCoreSession close
lachlan-roberts Jan 13, 2020
7aba546
Use new class BlockingCallback instead of SharedBlockingCallback
lachlan-roberts Jan 13, 2020
f07bba9
add licence header
lachlan-roberts Jan 13, 2020
979b288
Merge remote-tracking branch 'origin/jetty-10.0.x' into jetty-10.0.x-…
joakime Jan 13, 2020
61a316f
Move prevention of duplicate closes back to the jetty and javax APIs
lachlan-roberts Jan 13, 2020
4514158
attempt to fix long build time and fix test cases
lachlan-roberts Jan 14, 2020
0b0a03a
add timeout to BlockingCallback for WS use this is idleTimeout + 1000ms
lachlan-roberts Jan 15, 2020
103d6ea
Core throws ClosedChannelException instead of ISE if send after closed
lachlan-roberts Jan 16, 2020
645f0eb
update testing to test close from separate thread after hard close
lachlan-roberts Jan 16, 2020
58f61de
fix tests
lachlan-roberts Jan 16, 2020
ffee8bf
do not add 1000ms to idleTimeout if it is not greater than 0
lachlan-roberts Jan 16, 2020
b3c214d
javax SessionTracker guards for thrown IOExceptions from close
lachlan-roberts Jan 16, 2020
4b72768
improve testing
lachlan-roberts Jan 20, 2020
702edbe
changes from review
lachlan-roberts Jan 20, 2020
d70e51b
Merge remote-tracking branch 'origin/jetty-10.0.x' into jetty-10.0.x-…
lachlan-roberts Jan 21, 2020
bfe1fc0
fix checkstyle violation in WebSocketCloseTest
lachlan-roberts Jan 21, 2020
c595144
remove test in WebSocketCloseTest now in FlushTest
lachlan-roberts Jan 21, 2020
e4477a1
combine BlockingCallback with FutureCallback
lachlan-roberts Jan 28, 2020
968dc4b
changes from review
lachlan-roberts Jan 28, 2020
c2728ed
add test to cement behavior for double normal close
lachlan-roberts Jan 28, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//

package org.eclipse.jetty.util;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class BlockingCallback implements Callback
lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
{
private FutureCallback callback = new FutureCallback();
lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
private final long timeout;

public BlockingCallback()
lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
{
this(-1);
}

public BlockingCallback(long timeout)
{
this.timeout = timeout;
}

@Override
public void succeeded()
{
callback.succeeded();
}

@Override
public void failed(Throwable x)
{
callback.failed(x);
}

public void block() throws IOException
{
try
{
if (timeout > 0)
callback.get(timeout, TimeUnit.MILLISECONDS);
else
callback.get();
lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
}
catch (InterruptedException e)
{
InterruptedIOException exception = new InterruptedIOException();
exception.initCause(e);
throw exception;
}
catch (ExecutionException e)
{
Throwable cause = e.getCause();
if (cause instanceof RuntimeException)
throw (RuntimeException)cause;
else if (cause instanceof IOException)
throw (IOException)cause;
else
throw new IOException(cause);
}
catch (TimeoutException e)
{
throw new IOException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -400,16 +400,19 @@ public boolean isOutputOpen()
@Override
public void flush(Callback callback)
{
callback.succeeded();
}

@Override
public void close(Callback callback)
{
callback.succeeded();
}

@Override
public void close(int statusCode, String reason, Callback callback)
{
callback.succeeded();
}

@Override
Expand All @@ -420,6 +423,7 @@ public void demand(long n)
@Override
public void sendFrame(Frame frame, Callback callback, boolean batch)
{
callback.succeeded();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,15 @@ public boolean onEof()
}
}

public boolean onOutgoingFrame(Frame frame) throws ProtocolException
public boolean onOutgoingFrame(Frame frame) throws Exception
{
byte opcode = frame.getOpCode();
boolean fin = frame.isFin();

synchronized (this)
{
if (!isOutputOpen())
throw new IllegalStateException(_sessionState.toString());
throw new ClosedChannelException();

if (opcode == OpCode.CLOSE)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import javax.websocket.EncodeException;
import javax.websocket.RemoteEndpoint;

import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.Frame;
Expand Down Expand Up @@ -65,10 +65,10 @@ public void sendBinary(ByteBuffer data) throws IOException
{
LOG.debug("sendBinary({})", BufferUtil.toDetailString(data));
}
try (SharedBlockingCallback.Blocker b = session.getBlocking().acquire())
{
sendFrame(new Frame(OpCode.BINARY).setPayload(data), b, false);
}

lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
BlockingCallback b = newBlockingCallback();
sendFrame(new Frame(OpCode.BINARY).setPayload(data), b, false);
b.block();
}

@Override
Expand All @@ -79,37 +79,36 @@ public void sendBinary(ByteBuffer partialByte, boolean isLast) throws IOExceptio
{
LOG.debug("sendBinary({},{})", BufferUtil.toDetailString(partialByte), isLast);
}
try (SharedBlockingCallback.Blocker b = session.getBlocking().acquire())

Frame frame;
switch (messageType)
{
Frame frame;
switch (messageType)
{
case -1:
// New message!
frame = new Frame(OpCode.BINARY);
break;
case OpCode.TEXT:
throw new IllegalStateException("Cannot send a partial BINARY message: TEXT message in progress");
case OpCode.BINARY:
frame = new Frame(OpCode.CONTINUATION);
break;
default:
throw new IllegalStateException("Cannot send a partial BINARY message: unrecognized active message type " + OpCode.name(messageType));
}

frame.setPayload(partialByte);
frame.setFin(isLast);
sendFrame(frame, b, false);
case -1:
// New message!
frame = new Frame(OpCode.BINARY);
break;
case OpCode.TEXT:
throw new IllegalStateException("Cannot send a partial BINARY message: TEXT message in progress");
case OpCode.BINARY:
frame = new Frame(OpCode.CONTINUATION);
break;
default:
throw new IllegalStateException("Cannot send a partial BINARY message: unrecognized active message type " + OpCode.name(messageType));
}

frame.setPayload(partialByte);
frame.setFin(isLast);
BlockingCallback b = newBlockingCallback();
sendFrame(frame, b, false);
b.block();
}

@Override
public void sendObject(Object data) throws IOException, EncodeException
{
try (SharedBlockingCallback.Blocker b = session.getBlocking().acquire())
{
super.sendObject(data, b);
}
BlockingCallback b = newBlockingCallback();
super.sendObject(data, b);
b.block();
}

@Override
Expand All @@ -120,10 +119,11 @@ public void sendText(String text) throws IOException
{
LOG.debug("sendText({})", TextUtil.hint(text));
}
try (SharedBlockingCallback.Blocker b = session.getBlocking().acquire())
{
sendFrame(new Frame(OpCode.TEXT).setPayload(text), b, false);
}


BlockingCallback b = newBlockingCallback();
sendFrame(new Frame(OpCode.TEXT).setPayload(text), b, false);
b.block();
}

@Override
Expand All @@ -134,27 +134,32 @@ public void sendText(String partialMessage, boolean isLast) throws IOException
{
LOG.debug("sendText({},{})", TextUtil.hint(partialMessage), isLast);
}
try (SharedBlockingCallback.Blocker b = session.getBlocking().acquire())

Frame frame;
switch (messageType)
{
Frame frame;
switch (messageType)
{
case -1:
// New message!
frame = new Frame(OpCode.TEXT);
break;
case OpCode.TEXT:
frame = new Frame(OpCode.CONTINUATION);
break;
case OpCode.BINARY:
throw new IllegalStateException("Cannot send a partial TEXT message: BINARY message in progress");
default:
throw new IllegalStateException("Cannot send a partial TEXT message: unrecognized active message type " + OpCode.name(messageType));
}

frame.setPayload(BufferUtil.toBuffer(partialMessage, UTF_8));
frame.setFin(isLast);
sendFrame(frame, b, false);
case -1:
// New message!
frame = new Frame(OpCode.TEXT);
break;
case OpCode.TEXT:
frame = new Frame(OpCode.CONTINUATION);
break;
case OpCode.BINARY:
throw new IllegalStateException("Cannot send a partial TEXT message: BINARY message in progress");
default:
throw new IllegalStateException("Cannot send a partial TEXT message: unrecognized active message type " + OpCode.name(messageType));
}

frame.setPayload(BufferUtil.toBuffer(partialMessage, UTF_8));
frame.setFin(isLast);
BlockingCallback b = newBlockingCallback();
sendFrame(frame, b, false);
b.block();
}

private BlockingCallback newBlockingCallback()
{
return new BlockingCallback(getIdleTimeout() + 1000);
lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import javax.websocket.Encoder;
import javax.websocket.SendHandler;

import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.Frame;
Expand Down Expand Up @@ -66,10 +66,9 @@ protected MessageOutputStream newMessageOutputStream()
@Override
public void flushBatch() throws IOException
{
try (SharedBlockingCallback.Blocker blocker = session.getBlocking().acquire())
{
coreSession.flush(blocker);
}
BlockingCallback b = newBlockingCallback();
coreSession.flush(b);
b.block();
}

@Override
Expand Down Expand Up @@ -227,24 +226,22 @@ public void sendObject(Object data, Callback callback) throws IOException, Encod
public void sendPing(ByteBuffer data) throws IOException, IllegalArgumentException
{
if (LOG.isDebugEnabled())
{
LOG.debug("sendPing({})", BufferUtil.toDetailString(data));
}
// TODO: is this supposed to be a blocking call?
// TODO: what to do on excessively large payloads (error and close connection per RFC6455, or truncate?)
sendFrame(new Frame(OpCode.PING).setPayload(data), Callback.NOOP, batch);

BlockingCallback b = newBlockingCallback();
sendFrame(new Frame(OpCode.PING).setPayload(data), b, batch);
b.block();
}

@Override
public void sendPong(ByteBuffer data) throws IOException, IllegalArgumentException
{
if (LOG.isDebugEnabled())
{
LOG.debug("sendPong({})", BufferUtil.toDetailString(data));
}
// TODO: is this supposed to be a blocking call?
// TODO: what to do on excessively large payloads (error and close connection per RFC6455, or truncate?)
sendFrame(new Frame(OpCode.PONG).setPayload(data), Callback.NOOP, batch);

BlockingCallback b = newBlockingCallback();
sendFrame(new Frame(OpCode.PONG).setPayload(data), b, batch);
b.block();
}

protected void assertMessageNotNull(Object data)
Expand All @@ -262,4 +259,9 @@ protected void assertSendHandlerNotNull(SendHandler handler)
throw new IllegalArgumentException("SendHandler cannot be null");
}
}

private BlockingCallback newBlockingCallback()
{
return new BlockingCallback(getIdleTimeout() + 1000);
}
}
Loading