Skip to content

Commit

Permalink
Merged branch 'jetty-9.4.x' into 'jetty-10.0.x'.
Browse files Browse the repository at this point in the history
  • Loading branch information
sbordet committed Jul 3, 2020
2 parents 42d5db3 + ae43b70 commit 7d8e56b
Show file tree
Hide file tree
Showing 13 changed files with 225 additions and 287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public void dispose() throws Exception
{
assertThat("Server BufferPool - leaked acquires", serverBufferPool.getLeakedAcquires(), Matchers.is(0L));
assertThat("Server BufferPool - leaked releases", serverBufferPool.getLeakedReleases(), Matchers.is(0L));
assertThat("Server BufferPool - leaked removes", serverBufferPool.getLeakedRemoves(), Matchers.is(0L));
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedResources(), Matchers.is(0L));
}

Expand All @@ -101,6 +102,7 @@ public void dispose() throws Exception
LeakTrackingByteBufferPool pool = (LeakTrackingByteBufferPool)clientBufferPool;
assertThat("Client BufferPool - leaked acquires", pool.getLeakedAcquires(), Matchers.is(0L));
assertThat("Client BufferPool - leaked releases", pool.getLeakedReleases(), Matchers.is(0L));
assertThat("Client BufferPool - leaked removes", pool.getLeakedRemoves(), Matchers.is(0L));
assertThat("Client BufferPool - unreleased", pool.getLeakedResources(), Matchers.is(0L));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ private boolean isStale()
// It's an application frame; is the stream gone already?
if (stream == null)
return true;
return stream.isReset();
return stream.isResetOrFailed();
}

private boolean isProtocolFrame(Frame frame)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public void onData(DataFrame frame)
public void onData(final DataFrame frame, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
LOG.debug("Received {} on {}", frame, this);

int streamId = frame.getStreamId();
IStream stream = getStream(streamId);
Expand All @@ -259,7 +259,7 @@ public void onData(final DataFrame frame, Callback callback)
else
{
if (LOG.isDebugEnabled())
LOG.debug("Stream #{} not found", streamId);
LOG.debug("Stream #{} not found on {}", streamId, this);
// We must enlarge the session flow control window,
// otherwise other requests will be stalled.
flowControl.onDataConsumed(this, null, flowControlLength);
Expand Down Expand Up @@ -297,14 +297,14 @@ protected boolean isRemoteStreamClosed(int streamId)
public void onPriority(PriorityFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
LOG.debug("Received {} on {}", frame, this);
}

@Override
public void onReset(ResetFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
LOG.debug("Received {} on {}", frame, this);

int streamId = frame.getStreamId();
IStream stream = getStream(streamId);
Expand Down Expand Up @@ -336,7 +336,7 @@ public void onSettings(SettingsFrame frame)
public void onSettings(SettingsFrame frame, boolean reply)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
LOG.debug("Received {} on {}", frame, this);

if (frame.isReply())
return;
Expand Down Expand Up @@ -420,7 +420,7 @@ public void onSettings(SettingsFrame frame, boolean reply)
public void onPing(PingFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
LOG.debug("Received {} on {}", frame, this);

if (frame.isReply())
{
Expand Down Expand Up @@ -454,7 +454,7 @@ public void onPing(PingFrame frame)
public void onGoAway(final GoAwayFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
LOG.debug("Received {} on {}", frame, this);

if (closed.compareAndSet(CloseState.NOT_CLOSED, CloseState.REMOTELY_CLOSED))
{
Expand All @@ -473,7 +473,7 @@ public void onGoAway(final GoAwayFrame frame)
public void onWindowUpdate(WindowUpdateFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
LOG.debug("Received {} on {}", frame, this);

int streamId = frame.getStreamId();
int windowDelta = frame.getWindowDelta();
Expand Down Expand Up @@ -528,7 +528,9 @@ public void onWindowUpdate(IStream stream, WindowUpdateFrame frame)
public void onStreamFailure(int streamId, int error, String reason)
{
Callback callback = new ResetCallback(streamId, error, Callback.NOOP);
Throwable failure = toFailure("Stream failure", error, reason);
Throwable failure = toFailure(error, reason);
if (LOG.isDebugEnabled())
LOG.debug("Stream #{} failure {}", streamId, this, failure);
onStreamFailure(streamId, error, reason, failure, callback);
}

Expand All @@ -549,12 +551,16 @@ public void onConnectionFailure(int error, String reason)

protected void onConnectionFailure(int error, String reason, Callback callback)
{
Throwable failure = toFailure("Session failure", error, reason);
Throwable failure = toFailure(error, reason);
if (LOG.isDebugEnabled())
LOG.debug("Session failure {}", this, failure);
onFailure(error, reason, failure, new FailureCallback(error, reason, callback));
}

protected void abort(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("Session abort {}", this, failure);
onFailure(ErrorCode.NO_ERROR.code, null, failure, new TerminateCallback(failure));
}

Expand All @@ -574,7 +580,9 @@ private void onClose(GoAwayFrame frame, Callback callback)
{
int error = frame.getError();
String reason = frame.tryConvertPayload();
Throwable failure = toFailure("Session close", error, reason);
Throwable failure = toFailure(error, reason);
if (LOG.isDebugEnabled())
LOG.debug("Session close {}", this, failure);
Collection<Stream> streams = getStreams();
int count = streams.size();
Callback countCallback = new CountingCallback(callback, count + 1);
Expand All @@ -585,9 +593,9 @@ private void onClose(GoAwayFrame frame, Callback callback)
notifyClose(this, frame, countCallback);
}

private Throwable toFailure(String message, int error, String reason)
private Throwable toFailure(int error, String reason)
{
return new IOException(String.format("%s %s/%s", message, ErrorCode.toString(error, null), reason));
return new IOException(String.format("%s/%s", ErrorCode.toString(error, null), reason));
}

@Override
Expand Down Expand Up @@ -777,7 +785,7 @@ public void data(IStream stream, Callback callback, DataFrame frame)
private void frame(HTTP2Flusher.Entry entry, boolean flush)
{
if (LOG.isDebugEnabled())
LOG.debug("{} {}", flush ? "Sending" : "Queueing", entry.frame);
LOG.debug("{} {} on {}", flush ? "Sending" : "Queueing", entry.frame, this);
// Ping frames are prepended to process them as soon as possible.
boolean queued = entry.frame.getType() == FrameType.PING ? flusher.prepend(entry) : flusher.append(entry);
if (queued && flush)
Expand Down Expand Up @@ -873,7 +881,7 @@ public void removeStream(IStream stream)
onStreamClosed(stream);
flowControl.onStreamDestroyed(stream);
if (LOG.isDebugEnabled())
LOG.debug("Removed {} {}", stream.isLocal() ? "local" : "remote", stream);
LOG.debug("Removed {} {} from {}", stream.isLocal() ? "local" : "remote", stream, this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.IdleTimeout;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.MathUtils;
Expand All @@ -61,17 +62,18 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private final AtomicReference<Object> attachment = new AtomicReference<>();
private final AtomicReference<ConcurrentMap<String, Object>> attributes = new AtomicReference<>();
private final AtomicReference<CloseState> closeState = new AtomicReference<>(CloseState.NOT_CLOSED);
private final AtomicReference<Callback> writing = new AtomicReference<>();
private final AtomicInteger sendWindow = new AtomicInteger();
private final AtomicInteger recvWindow = new AtomicInteger();
private final long timeStamp = System.nanoTime();
private final ISession session;
private final int streamId;
private final MetaData.Request request;
private final boolean local;
private Callback sendCallback;
private Throwable failure;
private boolean localReset;
private Listener listener;
private boolean remoteReset;
private Listener listener;
private long dataLength;
private long dataDemand;
private boolean dataInitial;
Expand Down Expand Up @@ -141,17 +143,31 @@ public void data(DataFrame frame, Callback callback)
@Override
public void reset(ResetFrame frame, Callback callback)
{
if (isReset())
return;
localReset = true;
synchronized (this)
{
if (isReset())
return;
localReset = true;
failure = new EOFException("reset");
}
session.frames(this, callback, frame, Frame.EMPTY_ARRAY);
}

private boolean startWrite(Callback callback)
{
if (writing.compareAndSet(null, callback))
return true;
callback.failed(new WritePendingException());
Throwable failure;
synchronized (this)
{
failure = this.failure;
if (failure == null && sendCallback == null)
{
sendCallback = callback;
return true;
}
}
if (failure == null)
failure = new WritePendingException();
callback.failed(failure);
return false;
}

Expand All @@ -176,7 +192,27 @@ public Object removeAttribute(String key)
@Override
public boolean isReset()
{
return localReset || remoteReset;
synchronized (this)
{
return localReset || remoteReset;
}
}

private boolean isFailed()
{
synchronized (this)
{
return failure != null;
}
}

@Override
public boolean isResetOrFailed()
{
synchronized (this)
{
return isReset() || isFailed();
}
}

@Override
Expand Down Expand Up @@ -440,7 +476,11 @@ private long demand()

private void onReset(ResetFrame frame, Callback callback)
{
remoteReset = true;
synchronized (this)
{
remoteReset = true;
failure = new EofException("reset");
}
close();
session.removeStream(this);
notifyReset(this, frame, callback);
Expand All @@ -461,8 +501,12 @@ private void onWindowUpdate(WindowUpdateFrame frame, Callback callback)

private void onFailure(FailureFrame frame, Callback callback)
{
// Don't close or remove the stream, as the listener may
// want to use it, for example to send a RST_STREAM frame.
synchronized (this)
{
failure = frame.getFailure();
}
close();
session.removeStream(this);
notifyFailure(this, frame, callback);
}

Expand Down Expand Up @@ -645,7 +689,12 @@ public void failed(Throwable x)

private Callback endWrite()
{
return writing.getAndSet(null);
synchronized (this)
{
Callback callback = sendCallback;
sendCallback = null;
return callback;
}
}

private void notifyNewStream(Stream stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,11 @@ public interface IStream extends Stream, Closeable
* @see #isClosed()
*/
boolean isRemotelyClosed();

/**
* @return whether this stream has been reset (locally or remotely) or has been failed
* @see #isReset()
* @see Listener#onFailure(Stream, int, String, Throwable, Callback)
*/
boolean isResetOrFailed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ protected boolean parseHeader(ByteBuffer buffer)
return false;

if (LOG.isDebugEnabled())
LOG.debug("Parsed {} frame header from {}", headerParser, buffer);
LOG.debug("Parsed {} frame header from {}@{}", headerParser, buffer, Integer.toHexString(buffer.hashCode()));

if (headerParser.getLength() > getMaxFrameLength())
return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR, "invalid_frame_length");
Expand Down Expand Up @@ -199,7 +199,7 @@ protected boolean parseBody(ByteBuffer buffer)
return false;
}
if (LOG.isDebugEnabled())
LOG.debug("Parsed {} frame body from {}", FrameType.from(type), buffer);
LOG.debug("Parsed {} frame body from {}@{}", FrameType.from(type), buffer, Integer.toHexString(buffer.hashCode()));
reset();
return true;
}
Expand Down
Loading

0 comments on commit 7d8e56b

Please sign in to comment.