diff --git a/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/AbstractHttpClientServerTest.java b/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/AbstractHttpClientServerTest.java index 9e791a2513fa..77f74f6ba1e4 100644 --- a/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/AbstractHttpClientServerTest.java +++ b/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/AbstractHttpClientServerTest.java @@ -93,6 +93,7 @@ public void dispose() throws Exception { assertThat("Server BufferPool - leaked acquires", serverBufferPool.getLeakedAcquires(), Matchers.is(0L)); assertThat("Server BufferPool - leaked releases", serverBufferPool.getLeakedReleases(), Matchers.is(0L)); + assertThat("Server BufferPool - leaked removes", serverBufferPool.getLeakedRemoves(), Matchers.is(0L)); assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedResources(), Matchers.is(0L)); } @@ -101,6 +102,7 @@ public void dispose() throws Exception LeakTrackingByteBufferPool pool = (LeakTrackingByteBufferPool)clientBufferPool; assertThat("Client BufferPool - leaked acquires", pool.getLeakedAcquires(), Matchers.is(0L)); assertThat("Client BufferPool - leaked releases", pool.getLeakedReleases(), Matchers.is(0L)); + assertThat("Client BufferPool - leaked removes", pool.getLeakedRemoves(), Matchers.is(0L)); assertThat("Client BufferPool - unreleased", pool.getLeakedResources(), Matchers.is(0L)); } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java index 9c79b6062243..93ffc90c3ac9 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java @@ -436,7 +436,7 @@ private boolean isStale() // It's an application frame; is the stream gone already? if (stream == null) return true; - return stream.isReset(); + return stream.isResetOrFailed(); } private boolean isProtocolFrame(Frame frame) diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index b8c5f31eb656..d1eeb9a8f33c 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -239,7 +239,7 @@ public void onData(DataFrame frame) public void onData(final DataFrame frame, Callback callback) { if (LOG.isDebugEnabled()) - LOG.debug("Received {}", frame); + LOG.debug("Received {} on {}", frame, this); int streamId = frame.getStreamId(); IStream stream = getStream(streamId); @@ -259,7 +259,7 @@ public void onData(final DataFrame frame, Callback callback) else { if (LOG.isDebugEnabled()) - LOG.debug("Stream #{} not found", streamId); + LOG.debug("Stream #{} not found on {}", streamId, this); // We must enlarge the session flow control window, // otherwise other requests will be stalled. flowControl.onDataConsumed(this, null, flowControlLength); @@ -297,14 +297,14 @@ protected boolean isRemoteStreamClosed(int streamId) public void onPriority(PriorityFrame frame) { if (LOG.isDebugEnabled()) - LOG.debug("Received {}", frame); + LOG.debug("Received {} on {}", frame, this); } @Override public void onReset(ResetFrame frame) { if (LOG.isDebugEnabled()) - LOG.debug("Received {}", frame); + LOG.debug("Received {} on {}", frame, this); int streamId = frame.getStreamId(); IStream stream = getStream(streamId); @@ -336,7 +336,7 @@ public void onSettings(SettingsFrame frame) public void onSettings(SettingsFrame frame, boolean reply) { if (LOG.isDebugEnabled()) - LOG.debug("Received {}", frame); + LOG.debug("Received {} on {}", frame, this); if (frame.isReply()) return; @@ -420,7 +420,7 @@ public void onSettings(SettingsFrame frame, boolean reply) public void onPing(PingFrame frame) { if (LOG.isDebugEnabled()) - LOG.debug("Received {}", frame); + LOG.debug("Received {} on {}", frame, this); if (frame.isReply()) { @@ -454,7 +454,7 @@ public void onPing(PingFrame frame) public void onGoAway(final GoAwayFrame frame) { if (LOG.isDebugEnabled()) - LOG.debug("Received {}", frame); + LOG.debug("Received {} on {}", frame, this); if (closed.compareAndSet(CloseState.NOT_CLOSED, CloseState.REMOTELY_CLOSED)) { @@ -473,7 +473,7 @@ public void onGoAway(final GoAwayFrame frame) public void onWindowUpdate(WindowUpdateFrame frame) { if (LOG.isDebugEnabled()) - LOG.debug("Received {}", frame); + LOG.debug("Received {} on {}", frame, this); int streamId = frame.getStreamId(); int windowDelta = frame.getWindowDelta(); @@ -528,7 +528,9 @@ public void onWindowUpdate(IStream stream, WindowUpdateFrame frame) public void onStreamFailure(int streamId, int error, String reason) { Callback callback = new ResetCallback(streamId, error, Callback.NOOP); - Throwable failure = toFailure("Stream failure", error, reason); + Throwable failure = toFailure(error, reason); + if (LOG.isDebugEnabled()) + LOG.debug("Stream #{} failure {}", streamId, this, failure); onStreamFailure(streamId, error, reason, failure, callback); } @@ -549,12 +551,16 @@ public void onConnectionFailure(int error, String reason) protected void onConnectionFailure(int error, String reason, Callback callback) { - Throwable failure = toFailure("Session failure", error, reason); + Throwable failure = toFailure(error, reason); + if (LOG.isDebugEnabled()) + LOG.debug("Session failure {}", this, failure); onFailure(error, reason, failure, new FailureCallback(error, reason, callback)); } protected void abort(Throwable failure) { + if (LOG.isDebugEnabled()) + LOG.debug("Session abort {}", this, failure); onFailure(ErrorCode.NO_ERROR.code, null, failure, new TerminateCallback(failure)); } @@ -574,7 +580,9 @@ private void onClose(GoAwayFrame frame, Callback callback) { int error = frame.getError(); String reason = frame.tryConvertPayload(); - Throwable failure = toFailure("Session close", error, reason); + Throwable failure = toFailure(error, reason); + if (LOG.isDebugEnabled()) + LOG.debug("Session close {}", this, failure); Collection streams = getStreams(); int count = streams.size(); Callback countCallback = new CountingCallback(callback, count + 1); @@ -585,9 +593,9 @@ private void onClose(GoAwayFrame frame, Callback callback) notifyClose(this, frame, countCallback); } - private Throwable toFailure(String message, int error, String reason) + private Throwable toFailure(int error, String reason) { - return new IOException(String.format("%s %s/%s", message, ErrorCode.toString(error, null), reason)); + return new IOException(String.format("%s/%s", ErrorCode.toString(error, null), reason)); } @Override @@ -777,7 +785,7 @@ public void data(IStream stream, Callback callback, DataFrame frame) private void frame(HTTP2Flusher.Entry entry, boolean flush) { if (LOG.isDebugEnabled()) - LOG.debug("{} {}", flush ? "Sending" : "Queueing", entry.frame); + LOG.debug("{} {} on {}", flush ? "Sending" : "Queueing", entry.frame, this); // Ping frames are prepended to process them as soon as possible. boolean queued = entry.frame.getType() == FrameType.PING ? flusher.prepend(entry) : flusher.append(entry); if (queued && flush) @@ -873,7 +881,7 @@ public void removeStream(IStream stream) onStreamClosed(stream); flowControl.onStreamDestroyed(stream); if (LOG.isDebugEnabled()) - LOG.debug("Removed {} {}", stream.isLocal() ? "local" : "remote", stream); + LOG.debug("Removed {} {} from {}", stream.isLocal() ? "local" : "remote", stream, this); } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index 4e2903212ac6..e13c7352e49c 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -42,6 +42,7 @@ import org.eclipse.jetty.http2.frames.PushPromiseFrame; import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.WindowUpdateFrame; +import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.IdleTimeout; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.MathUtils; @@ -61,7 +62,6 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa private final AtomicReference attachment = new AtomicReference<>(); private final AtomicReference> attributes = new AtomicReference<>(); private final AtomicReference closeState = new AtomicReference<>(CloseState.NOT_CLOSED); - private final AtomicReference writing = new AtomicReference<>(); private final AtomicInteger sendWindow = new AtomicInteger(); private final AtomicInteger recvWindow = new AtomicInteger(); private final long timeStamp = System.nanoTime(); @@ -69,9 +69,11 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa private final int streamId; private final MetaData.Request request; private final boolean local; + private Callback sendCallback; + private Throwable failure; private boolean localReset; - private Listener listener; private boolean remoteReset; + private Listener listener; private long dataLength; private long dataDemand; private boolean dataInitial; @@ -141,17 +143,31 @@ public void data(DataFrame frame, Callback callback) @Override public void reset(ResetFrame frame, Callback callback) { - if (isReset()) - return; - localReset = true; + synchronized (this) + { + if (isReset()) + return; + localReset = true; + failure = new EOFException("reset"); + } session.frames(this, callback, frame, Frame.EMPTY_ARRAY); } private boolean startWrite(Callback callback) { - if (writing.compareAndSet(null, callback)) - return true; - callback.failed(new WritePendingException()); + Throwable failure; + synchronized (this) + { + failure = this.failure; + if (failure == null && sendCallback == null) + { + sendCallback = callback; + return true; + } + } + if (failure == null) + failure = new WritePendingException(); + callback.failed(failure); return false; } @@ -176,7 +192,27 @@ public Object removeAttribute(String key) @Override public boolean isReset() { - return localReset || remoteReset; + synchronized (this) + { + return localReset || remoteReset; + } + } + + private boolean isFailed() + { + synchronized (this) + { + return failure != null; + } + } + + @Override + public boolean isResetOrFailed() + { + synchronized (this) + { + return isReset() || isFailed(); + } } @Override @@ -440,7 +476,11 @@ private long demand() private void onReset(ResetFrame frame, Callback callback) { - remoteReset = true; + synchronized (this) + { + remoteReset = true; + failure = new EofException("reset"); + } close(); session.removeStream(this); notifyReset(this, frame, callback); @@ -461,8 +501,12 @@ private void onWindowUpdate(WindowUpdateFrame frame, Callback callback) private void onFailure(FailureFrame frame, Callback callback) { - // Don't close or remove the stream, as the listener may - // want to use it, for example to send a RST_STREAM frame. + synchronized (this) + { + failure = frame.getFailure(); + } + close(); + session.removeStream(this); notifyFailure(this, frame, callback); } @@ -645,7 +689,12 @@ public void failed(Throwable x) private Callback endWrite() { - return writing.getAndSet(null); + synchronized (this) + { + Callback callback = sendCallback; + sendCallback = null; + return callback; + } } private void notifyNewStream(Stream stream) diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java index 893cfdbdd044..a2f58724ca0d 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java @@ -114,4 +114,11 @@ public interface IStream extends Stream, Closeable * @see #isClosed() */ boolean isRemotelyClosed(); + + /** + * @return whether this stream has been reset (locally or remotely) or has been failed + * @see #isReset() + * @see Listener#onFailure(Stream, int, String, Throwable, Callback) + */ + boolean isResetOrFailed(); } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/parser/Parser.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/parser/Parser.java index 5997c00d199a..a04c81c42b71 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/parser/Parser.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/parser/Parser.java @@ -149,7 +149,7 @@ protected boolean parseHeader(ByteBuffer buffer) return false; if (LOG.isDebugEnabled()) - LOG.debug("Parsed {} frame header from {}", headerParser, buffer); + LOG.debug("Parsed {} frame header from {}@{}", headerParser, buffer, Integer.toHexString(buffer.hashCode())); if (headerParser.getLength() > getMaxFrameLength()) return connectionFailure(buffer, ErrorCode.FRAME_SIZE_ERROR, "invalid_frame_length"); @@ -199,7 +199,7 @@ protected boolean parseBody(ByteBuffer buffer) return false; } if (LOG.isDebugEnabled()) - LOG.debug("Parsed {} frame body from {}", FrameType.from(type), buffer); + LOG.debug("Parsed {} frame body from {}@{}", FrameType.from(type), buffer, Integer.toHexString(buffer.hashCode())); reset(); return true; } diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java index 2df824cf904f..e08993de7467 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java @@ -330,7 +330,7 @@ private void sendTrailersFrame(MetaData metaData, Callback callback) public void onStreamFailure(Throwable failure) { - transportCallback.failed(failure); + transportCallback.abort(failure); } public boolean onStreamTimeout(Throwable failure) @@ -408,17 +408,6 @@ public void abort(Throwable failure) * being reset, or the connection being closed *
  • an asynchronous idle timeout
  • * - *

    The last 2 cases may happen during a send, when the frames - * are being generated in the flusher. - * In such cases, this class must avoid that the nested callback is notified - * while the frame generation is in progress, because the nested callback - * may modify other states (such as clearing the {@code HttpOutput._buffer}) - * that are accessed during frame generation.

    - *

    The solution implemented in this class works by splitting the send - * operation in 3 parts: {@code pre-send}, {@code send} and {@code post-send}. - * Asynchronous state changes happening during {@code send} are stored - * and only executed in {@code post-send}, therefore never interfering - * with frame generation.

    * * @see State */ @@ -442,14 +431,14 @@ private void send(Callback callback, boolean commit, Consumer sendFram { Throwable failure = sending(callback, commit); if (failure == null) - { sendFrame.accept(this); - pending(); - } else - { callback.failed(failure); - } + } + + private void abort(Throwable failure) + { + failed(failure); } private Throwable sending(Callback callback, boolean commit) @@ -477,58 +466,6 @@ private Throwable sending(Callback callback, boolean commit) } } - private void pending() - { - Callback callback; - boolean commit; - Throwable failure; - synchronized (this) - { - switch (_state) - { - case SENDING: - { - // The send has not completed the callback yet, - // wait for succeeded() or failed() to be called. - _state = State.PENDING; - return; - } - case SUCCEEDING: - { - // The send already completed successfully, but the - // call to succeeded() was delayed, so call it now. - callback = _callback; - commit = _commit; - failure = null; - reset(null); - break; - } - case FAILING: - { - // The send already completed with a failure, but - // the call to failed() was delayed, so call it now. - callback = _callback; - commit = _commit; - failure = _failure; - reset(failure); - break; - } - default: - { - callback = _callback; - commit = _commit; - failure = new IllegalStateException("Invalid transport state: " + _state); - reset(failure); - break; - } - } - } - if (failure == null) - succeed(callback, commit); - else - fail(callback, commit, failure); - } - @Override public void succeeded() { @@ -536,30 +473,21 @@ public void succeeded() boolean commit; synchronized (this) { - switch (_state) + if (_state != State.SENDING) { - case SENDING: - { - _state = State.SUCCEEDING; - // Succeeding the callback will be done in postSend(). - return; - } - case PENDING: - { - callback = _callback; - commit = _commit; - reset(null); - break; - } - default: - { - // This thread lost the race to succeed the current - // send, as other threads likely already failed it. - return; - } + // This thread lost the race to succeed the current + // send, as other threads likely already failed it. + return; } + callback = _callback; + commit = _commit; + reset(null); } - succeed(callback, commit); + if (LOG.isDebugEnabled()) + LOG.debug("HTTP2 Response #{}/{} {} success", + stream.getId(), Integer.toHexString(stream.getSession().hashCode()), + commit ? "commit" : "flush"); + callback.succeeded(); } @Override @@ -569,104 +497,37 @@ public void failed(Throwable failure) boolean commit; synchronized (this) { - switch (_state) + if (_state != State.SENDING) { - case SENDING: - { - _state = State.FAILING; - _failure = failure; - // Failing the callback will be done in postSend(). - return; - } - case IDLE: - case PENDING: - { - callback = _callback; - commit = _commit; - reset(failure); - break; - } - default: - { - // This thread lost the race to fail the current send, - // as other threads already succeeded or failed it. - return; - } + reset(failure); + return; } + callback = _callback; + commit = _commit; + reset(failure); } - fail(callback, commit, failure); + if (LOG.isDebugEnabled()) + LOG.debug("HTTP2 Response #{}/{} {} failure", + stream.getId(), Integer.toHexString(stream.getSession().hashCode()), + commit ? "commit" : "flush", + failure); + callback.failed(failure); } private boolean idleTimeout(Throwable failure) { - Callback callback; - boolean timeout; + Callback callback = null; synchronized (this) { - switch (_state) + // Ignore idle timeouts if not writing, + // as the application may be suspended. + if (_state == State.SENDING) { - case PENDING: - { - // The send was started but idle timed out, fail it. - callback = _callback; - timeout = true; - reset(failure); - break; - } - case IDLE: - // The application may be suspended, ignore the idle timeout. - case SENDING: - // A send has been started at the same time of an idle timeout; - // Ignore the idle timeout and let the write continue normally. - case SUCCEEDING: - case FAILING: - // An idle timeout during these transient states is ignored. - case FAILED: - // Already failed, ignore the idle timeout. - { - callback = null; - timeout = false; - break; - } - default: - { - // Should not happen, but just in case. - callback = _callback; - if (callback == null) - callback = Callback.NOOP; - timeout = true; - failure = new IllegalStateException("Invalid transport state: " + _state, failure); - reset(failure); - break; - } + callback = _callback; + reset(failure); } } - idleTimeout(callback, timeout, failure); - return timeout; - } - - private void succeed(Callback callback, boolean commit) - { - if (LOG.isDebugEnabled()) - LOG.debug("HTTP2 Response #{}/{} {} success", - stream.getId(), Integer.toHexString(stream.getSession().hashCode()), - commit ? "commit" : "flush"); - callback.succeeded(); - } - - private void fail(Callback callback, boolean commit, Throwable failure) - { - if (LOG.isDebugEnabled()) - LOG.debug("HTTP2 Response #{}/{} {} failure", - stream.getId(), Integer.toHexString(stream.getSession().hashCode()), - commit ? "commit" : "flush", - failure); - if (callback != null) - callback.failed(failure); - } - - private void idleTimeout(Callback callback, boolean timeout, Throwable failure) - { + boolean timeout = callback != null; if (LOG.isDebugEnabled()) LOG.debug("HTTP2 Response #{}/{} idle timeout {}", stream.getId(), Integer.toHexString(stream.getSession().hashCode()), @@ -674,6 +535,18 @@ private void idleTimeout(Callback callback, boolean timeout, Throwable failure) failure); if (timeout) callback.failed(failure); + return timeout; + } + + @Override + public InvocationType getInvocationType() + { + Callback callback; + synchronized (this) + { + callback = _callback; + } + return callback != null ? callback.getInvocationType() : Callback.super.getInvocationType(); } } @@ -686,67 +559,12 @@ private enum State { /** *

    No send initiated or in progress.

    - *

    Next states could be:

    - *
      - *
    • {@link #SENDING}, when {@link TransportCallback#send(Callback, boolean, Consumer)} - * is called by the transport to initiate a send
    • - *
    • {@link #FAILED}, when {@link TransportCallback#failed(Throwable)} - * is called by an asynchronous failure
    • - *
    */ IDLE, /** - *

    A send is initiated; the nested callback in {@link TransportCallback} - * cannot be notified while in this state.

    - *

    Next states could be:

    - *
      - *
    • {@link #SUCCEEDING}, when {@link TransportCallback#succeeded()} - * is called synchronously because the send succeeded
    • - *
    • {@link #FAILING}, when {@link TransportCallback#failed(Throwable)} - * is called synchronously because the send failed
    • - *
    • {@link #PENDING}, when {@link TransportCallback#pending()} - * is called before the send completes
    • - *
    + *

    A send is initiated and possibly in progress.

    */ SENDING, - /** - *

    A send was initiated and is now pending, waiting for the {@link TransportCallback} - * to be notified of success or failure.

    - *

    Next states could be:

    - *
      - *
    • {@link #IDLE}, when {@link TransportCallback#succeeded()} - * is called because the send succeeded
    • - *
    • {@link #FAILED}, when {@link TransportCallback#failed(Throwable)} - * is called because either the send failed, or an asynchronous failure happened
    • - *
    - */ - PENDING, - /** - *

    A send was initiated and succeeded, but {@link TransportCallback#pending()} - * has not been called yet.

    - *

    This state indicates that the success actions (such as notifying the - * {@link TransportCallback} nested callback) must be performed when - * {@link TransportCallback#pending()} is called.

    - *

    Next states could be:

    - *
      - *
    • {@link #IDLE}, when {@link TransportCallback#pending()} - * is called
    • - *
    - */ - SUCCEEDING, - /** - *

    A send was initiated and failed, but {@link TransportCallback#pending()} - * has not been called yet.

    - *

    This state indicates that the failure actions (such as notifying the - * {@link TransportCallback} nested callback) must be performed when - * {@link TransportCallback#pending()} is called.

    - *

    Next states could be:

    - *
      - *
    • {@link #FAILED}, when {@link TransportCallback#pending()} - * is called
    • - *
    - */ - FAILING, /** *

    The terminal state indicating failure of the send.

    */ diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java index 06e64fd94067..307c627d90a8 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java @@ -57,6 +57,18 @@ public interface ByteBufferPool */ public void release(ByteBuffer buffer); + /** + *

    Removes a {@link ByteBuffer} that was previously obtained with {@link #acquire(int, boolean)}.

    + *

    The buffer will not be available for further reuse.

    + * + * @param buffer the buffer to remove + * @see #acquire(int, boolean) + * @see #release(ByteBuffer) + */ + default void remove(ByteBuffer buffer) + { + } + /** *

    Creates a new ByteBuffer of the given capacity and the given directness.

    * diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/LeakTrackingByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/LeakTrackingByteBufferPool.java index d61dce79115b..4d85393d6ec9 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/LeakTrackingByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/LeakTrackingByteBufferPool.java @@ -23,10 +23,13 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.LeakDetector; +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@ManagedObject public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements ByteBufferPool { private static final Logger LOG = LoggerFactory.getLogger(LeakTrackingByteBufferPool.class); @@ -47,11 +50,11 @@ protected void leaked(LeakInfo leakInfo) } }; - private static final boolean NOISY = Boolean.getBoolean(LeakTrackingByteBufferPool.class.getName() + ".NOISY"); - private final ByteBufferPool delegate; - private final AtomicLong leakedReleases = new AtomicLong(0); private final AtomicLong leakedAcquires = new AtomicLong(0); + private final AtomicLong leakedReleases = new AtomicLong(0); + private final AtomicLong leakedRemoves = new AtomicLong(0); private final AtomicLong leaked = new AtomicLong(0); + private final ByteBufferPool delegate; public LeakTrackingByteBufferPool(ByteBufferPool delegate) { @@ -64,12 +67,12 @@ public LeakTrackingByteBufferPool(ByteBufferPool delegate) public ByteBuffer acquire(int size, boolean direct) { ByteBuffer buffer = delegate.acquire(size, direct); - boolean leaked = leakDetector.acquired(buffer); - if (NOISY || !leaked) + boolean acquired = leakDetector.acquired(buffer); + if (!acquired) { leakedAcquires.incrementAndGet(); - LOG.info(String.format("ByteBuffer acquire %s leaked.acquired=%s", leakDetector.id(buffer), leaked ? "normal" : "LEAK"), - new Throwable("LeakStack.Acquire")); + if (LOG.isDebugEnabled()) + LOG.debug("ByteBuffer leaked acquire for id {}", leakDetector.id(buffer), new Throwable("acquire")); } return buffer; } @@ -79,16 +82,36 @@ public void release(ByteBuffer buffer) { if (buffer == null) return; - boolean leaked = leakDetector.released(buffer); - if (NOISY || !leaked) + boolean released = leakDetector.released(buffer); + if (!released) { leakedReleases.incrementAndGet(); - LOG.info(String.format("ByteBuffer release %s leaked.released=%s", leakDetector.id(buffer), leaked ? "normal" : "LEAK"), new Throwable( - "LeakStack.Release")); + if (LOG.isDebugEnabled()) + LOG.debug("ByteBuffer leaked release for id {}", leakDetector.id(buffer), new Throwable("release")); } delegate.release(buffer); } + @Override + public void remove(ByteBuffer buffer) + { + if (buffer == null) + return; + boolean released = leakDetector.released(buffer); + if (!released) + { + leakedRemoves.incrementAndGet(); + if (LOG.isDebugEnabled()) + LOG.debug("ByteBuffer leaked remove for id {}", leakDetector.id(buffer), new Throwable("remove")); + } + delegate.remove(buffer); + } + + /** + * Clears the tracking data returned by {@link #getLeakedAcquires()}, + * {@link #getLeakedReleases()}, {@link #getLeakedResources()}. + */ + @ManagedAttribute("Clears the tracking data") public void clearTracking() { leakedAcquires.set(0); @@ -96,24 +119,36 @@ public void clearTracking() } /** - * @return count of BufferPool.acquire() calls that detected a leak + * @return count of ByteBufferPool.acquire() calls that detected a leak */ + @ManagedAttribute("The number of acquires that produced a leak") public long getLeakedAcquires() { return leakedAcquires.get(); } /** - * @return count of BufferPool.release() calls that detected a leak + * @return count of ByteBufferPool.release() calls that detected a leak */ + @ManagedAttribute("The number of releases that produced a leak") public long getLeakedReleases() { return leakedReleases.get(); } + /** + * @return count of ByteBufferPool.remove() calls that detected a leak + */ + @ManagedAttribute("The number of removes that produced a leak") + public long getLeakedRemoves() + { + return leakedRemoves.get(); + } + /** * @return count of resources that were acquired but not released */ + @ManagedAttribute("The number of resources that were leaked") public long getLeakedResources() { return leaked.get(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 42e5455963c0..abf611551d54 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -504,7 +504,7 @@ public boolean handle() // TODO that is done. // Set a close callback on the HttpOutput to make it an async callback - _response.completeOutput(Callback.from(_state::completed)); + _response.completeOutput(Callback.from(() -> _state.completed(null), _state::completed)); break; } @@ -644,7 +644,7 @@ public void sendResponseAndComplete() { _request.setHandled(true); _state.completing(); - sendResponse(null, _response.getHttpOutput().getBuffer(), true, Callback.from(_state::completed)); + sendResponse(null, _response.getHttpOutput().getBuffer(), true, Callback.from(() -> _state.completed(null), _state::completed)); } catch (Throwable x) { @@ -1250,7 +1250,7 @@ public void failed(final Throwable x) @Override public void succeeded() { - _response.getHttpOutput().completed(); + _response.getHttpOutput().completed(null); super.failed(x); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java index fe7b18eae59b..52bf1ab29040 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java @@ -948,7 +948,7 @@ protected void completing() } } - protected void completed() + protected void completed(Throwable failure) { final List aListeners; final AsyncContextEvent event; @@ -981,7 +981,7 @@ protected void completed() } // release any aggregate buffer from a closing flush - _channel.getResponse().getHttpOutput().completed(); + _channel.getResponse().getHttpOutput().completed(failure); if (event != null) { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index f5c9c07ad8b3..3101ed88bbee 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -36,6 +36,7 @@ import javax.servlet.WriteListener; import org.eclipse.jetty.http.HttpContent; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; @@ -288,7 +289,7 @@ private void onWriteComplete(boolean last, Throwable failure) _state = State.CLOSED; closedCallback = _closedCallback; _closedCallback = null; - releaseBuffer(); + releaseBuffer(failure); wake = updateApiState(failure); } else if (_state == State.CLOSE) @@ -470,12 +471,12 @@ public void complete(Callback callback) /** * Called to indicate that the request cycle has been completed. */ - public void completed() + public void completed(Throwable failure) { synchronized (_channelState) { _state = State.CLOSED; - releaseBuffer(); + releaseBuffer(failure); } } @@ -614,11 +615,15 @@ private ByteBuffer acquireBuffer() return _aggregate; } - private void releaseBuffer() + private void releaseBuffer(Throwable failure) { if (_aggregate != null) { - _channel.getConnector().getByteBufferPool().release(_aggregate); + ByteBufferPool bufferPool = _channel.getConnector().getByteBufferPool(); + if (failure == null) + bufferPool.release(_aggregate); + else + bufferPool.remove(_aggregate); _aggregate = null; } } @@ -1353,7 +1358,7 @@ public void recycle() _commitSize = config.getOutputAggregationSize(); if (_commitSize > _bufferSize) _commitSize = _bufferSize; - releaseBuffer(); + releaseBuffer(null); _written = 0; _writeListener = null; _onError = null; diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientLoadTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientLoadTest.java index f03b5570f11e..193516795bd6 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientLoadTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientLoadTest.java @@ -117,6 +117,7 @@ public void testIterative(Transport transport) throws Exception LeakTrackingByteBufferPool serverBufferPool = (LeakTrackingByteBufferPool)byteBufferPool; assertThat("Server BufferPool - leaked acquires", serverBufferPool.getLeakedAcquires(), Matchers.is(0L)); assertThat("Server BufferPool - leaked releases", serverBufferPool.getLeakedReleases(), Matchers.is(0L)); + assertThat("Server BufferPool - leaked removes", serverBufferPool.getLeakedRemoves(), Matchers.is(0L)); assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedResources(), Matchers.is(0L)); } @@ -126,6 +127,7 @@ public void testIterative(Transport transport) throws Exception LeakTrackingByteBufferPool clientBufferPool = (LeakTrackingByteBufferPool)byteBufferPool; assertThat("Client BufferPool - leaked acquires", clientBufferPool.getLeakedAcquires(), Matchers.is(0L)); assertThat("Client BufferPool - leaked releases", clientBufferPool.getLeakedReleases(), Matchers.is(0L)); + assertThat("Client BufferPool - leaked removes", clientBufferPool.getLeakedRemoves(), Matchers.is(0L)); assertThat("Client BufferPool - unreleased", clientBufferPool.getLeakedResources(), Matchers.is(0L)); }