From 40b5c803a9286c7a5da52b178787d6d24fb7e4a1 Mon Sep 17 00:00:00 2001 From: David Matejcek Date: Thu, 19 Nov 2020 23:35:50 +0100 Subject: [PATCH] Issue #2016 Minor syntax changes Signed-off-by: David Matejcek --- .../grizzly/asyncqueue/TaskQueue.java | 38 ++++++++-------- .../grizzly/ssl/SSLConnectionContext.java | 5 --- .../grizzly/http/io/OutputBuffer.java | 14 +++--- .../grizzly/http2/DefaultInputBuffer.java | 35 ++++++++------- .../grizzly/http2/Http2BaseFilter.java | 10 ++--- .../grizzly/http2/Http2FrameCodec.java | 14 ------ .../glassfish/grizzly/http2/Http2Session.java | 8 +--- .../grizzly/http2/Http2SessionOutputSink.java | 43 +++++++------------ .../glassfish/grizzly/http2/Http2Stream.java | 18 ++++---- 9 files changed, 75 insertions(+), 110 deletions(-) diff --git a/modules/grizzly/src/main/java/org/glassfish/grizzly/asyncqueue/TaskQueue.java b/modules/grizzly/src/main/java/org/glassfish/grizzly/asyncqueue/TaskQueue.java index 41c6d60545..f0092bc1be 100644 --- a/modules/grizzly/src/main/java/org/glassfish/grizzly/asyncqueue/TaskQueue.java +++ b/modules/grizzly/src/main/java/org/glassfish/grizzly/asyncqueue/TaskQueue.java @@ -39,17 +39,18 @@ public final class TaskQueue { */ private final Queue queue; - private static final AtomicReferenceFieldUpdater currentElementUpdater = AtomicReferenceFieldUpdater - .newUpdater(TaskQueue.class, AsyncQueueRecord.class, "currentElement"); + private static final AtomicReferenceFieldUpdater currentElementUpdater + = AtomicReferenceFieldUpdater.newUpdater(TaskQueue.class, AsyncQueueRecord.class, "currentElement"); private volatile E currentElement; - private static final AtomicIntegerFieldUpdater spaceInBytesUpdater = AtomicIntegerFieldUpdater.newUpdater(TaskQueue.class, "spaceInBytes"); + private static final AtomicIntegerFieldUpdater spaceInBytesUpdater + = AtomicIntegerFieldUpdater.newUpdater(TaskQueue.class, "spaceInBytes"); private volatile int spaceInBytes; private final MutableMaxQueueSize maxQueueSizeHolder; - private static final AtomicIntegerFieldUpdater writeHandlersCounterUpdater = AtomicIntegerFieldUpdater.newUpdater(TaskQueue.class, - "writeHandlersCounter"); + private static final AtomicIntegerFieldUpdater writeHandlersCounterUpdater + = AtomicIntegerFieldUpdater.newUpdater(TaskQueue.class, "writeHandlersCounter"); private volatile int writeHandlersCounter; protected final Queue writeHandlersQueue = new ConcurrentLinkedQueue<>(); // ------------------------------------------------------------ Constructors @@ -84,14 +85,15 @@ public int size() { @SuppressWarnings("unchecked") public E poll() { E current = (E) currentElementUpdater.getAndSet(this, null); - return current != null ? current : queue.poll(); + return current == null ? queue.poll() : current; } /** - * Get the current processing task, if the current in not set, take the task from the queue. Note: after this operation - * call, the current element could be removed from the queue using - * {@link #setCurrentElement(org.glassfish.grizzly.asyncqueue.AsyncQueueRecord)} and passing null as a - * parameter, this is a little bit more optimal alternative to {@link #poll()}. + * Get the current processing task, if the current in not set, take the task from the queue. + *

+ * Note: after this operation call, the current element could be removed from the queue using + * {@link #setCurrentElement(org.glassfish.grizzly.asyncqueue.AsyncQueueRecord)} + * and passing null as a parameter, this is a little bit more optimal alternative to {@link #poll()}. * * @return the current processing task */ @@ -155,7 +157,7 @@ public int spaceInBytes() { /** * Get the queue of tasks, which will be processed asynchronously - * + * * @return the queue of tasks, which will be processed asynchronously */ public Queue getQueue() { @@ -209,21 +211,21 @@ private void checkWriteHandlerOnClose(final WriteHandler writeHandler) { writeHandler.onError(new IOException("Connection is closed")); } } - // ------------------------------------------------------- Protected Methods + /** + * Notifies processing the queue by write handlers. + */ public void doNotify() { if (maxQueueSizeHolder == null || writeHandlersCounter == 0) { return; } final int maxQueueSize = maxQueueSizeHolder.getMaxQueueSize(); - while (spaceInBytes() < maxQueueSize) { - WriteHandler writeHandler = pollWriteHandler(); + final WriteHandler writeHandler = pollWriteHandler(); if (writeHandler == null) { return; } - try { writeHandler.onWritePossible(); } catch (Throwable e) { @@ -234,7 +236,7 @@ public void doNotify() { /** * Set current task element. - * + * * @param task current element. */ public void setCurrentElement(final E task) { @@ -260,7 +262,7 @@ public boolean compareAndSetCurrentElement(final E expected, final E newValue) { /** * Remove the task from queue. - * + * * @param task the task to remove. * @return true if tasked was removed, or false otherwise. */ @@ -336,8 +338,6 @@ private WriteHandler pollWriteHandler() { return null; } - // ----------------------------------------------------------- Nested Classes - public interface MutableMaxQueueSize { int getMaxQueueSize(); } diff --git a/modules/grizzly/src/main/java/org/glassfish/grizzly/ssl/SSLConnectionContext.java b/modules/grizzly/src/main/java/org/glassfish/grizzly/ssl/SSLConnectionContext.java index 10f9a06889..a92c40cde6 100644 --- a/modules/grizzly/src/main/java/org/glassfish/grizzly/ssl/SSLConnectionContext.java +++ b/modules/grizzly/src/main/java/org/glassfish/grizzly/ssl/SSLConnectionContext.java @@ -61,7 +61,6 @@ public final class SSLConnectionContext { } final ByteBufferArray outputByteBufferArray = ByteBufferArray.create(); - final ByteBufferArray inputByteBufferArray = ByteBufferArray.create(); private Buffer lastOutputBuffer; @@ -225,11 +224,9 @@ Buffer wrapAll(final Buffer input, final Allocator allocator) throws SSLExceptio if (input.hasRemaining()) { do { result = wrap(input, inputArray, inputArraySize, null, allocator); - if (result.isError()) { throw result.getError(); } - final Buffer newOutput = result.getOutput(); newOutput.trim(); @@ -272,13 +269,11 @@ private SslResult wrap(final Buffer input, final ByteBuffer[] inputArray, final } final Status status = sslEngineResult.getStatus(); - if (status == Status.CLOSED) { return new SslResult(output, new SSLException("SSLEngine is CLOSED")); } final boolean isOverflow = status == Status.BUFFER_OVERFLOW; - if (allocator != null && isOverflow) { updateBufferSizes(); output = ensureBufferSize(output, netBufferSize, allocator); diff --git a/modules/http/src/main/java/org/glassfish/grizzly/http/io/OutputBuffer.java b/modules/http/src/main/java/org/glassfish/grizzly/http/io/OutputBuffer.java index 1e30f5583c..c1acfee4e0 100644 --- a/modules/http/src/main/java/org/glassfish/grizzly/http/io/OutputBuffer.java +++ b/modules/http/src/main/java/org/glassfish/grizzly/http/io/OutputBuffer.java @@ -800,12 +800,11 @@ public void notifyCanWrite(final WriteHandler handler) { if (isNonBlockingWriteGuaranteed || canWrite()) { final Reentrant reentrant = Reentrant.getWriteReentrant(); - if (!reentrant.isMaxReentrantsReached()) { - notifyWritePossible(); - } else { + if (reentrant.isMaxReentrantsReached()) { notifyWritePossibleAsync(); + } else { + notifyWritePossible(); } - return; } @@ -847,7 +846,6 @@ protected Executor getThreadPool() { /** * Notify WriteHandler asynchronously */ - @SuppressWarnings("unchecked") private void notifyWritePossibleAsync() { if (writePossibleRunnable == null) { writePossibleRunnable = new Runnable() { @@ -1156,8 +1154,10 @@ private void flushBinaryBuffersIfNeeded() throws IOException { } private void notifyCommit() throws IOException { - for (int i = 0, len = lifeCycleListeners.size(); i < len; i++) { - lifeCycleListeners.get(i).onCommit(); + // the collection is not synchronized and may be accessed in parallel + final LifeCycleListener[] array = lifeCycleListeners.toArray(new LifeCycleListener[lifeCycleListeners.size()]); + for (LifeCycleListener lifeCycleListener : array) { + lifeCycleListener.onCommit(); } } diff --git a/modules/http2/src/main/java/org/glassfish/grizzly/http2/DefaultInputBuffer.java b/modules/http2/src/main/java/org/glassfish/grizzly/http2/DefaultInputBuffer.java index c87c448cd2..9e13cb78ef 100644 --- a/modules/http2/src/main/java/org/glassfish/grizzly/http2/DefaultInputBuffer.java +++ b/modules/http2/src/main/java/org/glassfish/grizzly/http2/DefaultInputBuffer.java @@ -47,9 +47,9 @@ class DefaultInputBuffer implements StreamInputBuffer { private static final long NULL_CONTENT_LENGTH = Long.MIN_VALUE; - private static final AtomicIntegerFieldUpdater inputQueueSizeUpdater = AtomicIntegerFieldUpdater.newUpdater(DefaultInputBuffer.class, - "inputQueueSize"); - @SuppressWarnings("unused") + private static final AtomicIntegerFieldUpdater inputQueueSizeUpdater + = AtomicIntegerFieldUpdater.newUpdater(DefaultInputBuffer.class, "inputQueueSize"); + private volatile int inputQueueSize; private final BlockingQueue inputQueue = new LinkedTransferQueue<>(); @@ -59,8 +59,8 @@ class DefaultInputBuffer implements StreamInputBuffer { // the termination flag. When is not null contains the reason why input was terminated. // when the flag is not null - poll0() will return -1. - private static final AtomicReferenceFieldUpdater closeFlagUpdater = AtomicReferenceFieldUpdater - .newUpdater(DefaultInputBuffer.class, Termination.class, "closeFlag"); + private static final AtomicReferenceFieldUpdater closeFlagUpdater + = AtomicReferenceFieldUpdater.newUpdater(DefaultInputBuffer.class, Termination.class, "closeFlag"); @SuppressWarnings("unused") private volatile Termination closeFlag; @@ -93,7 +93,6 @@ public void onReadEventComplete() { // If input stream has been terminated - send error message upstream if (isClosed()) { http2Session.sendMessageUpstream(stream, buildBrokenHttpContent(new EOFException(closeFlag.getDescription()))); - return; } @@ -262,17 +261,16 @@ private Buffer poll0() throws IOException { if (inputElement == null) { // timeout expired throw new IOException("Blocking read timeout"); - } else { - // Due to asynchronous inputQueueSize update - the inputQueueSizeNow may be < 0. - // It means the inputQueueSize.getAndSet(0); above, may unintentionally increase the counter. - // So, once we read a Buffer - we have to properly restore the counter value. - // Normally it had to be inputQueueSize.decrementAndGet(); , but we have to - // take into account fact described above. - inputQueueSizeUpdater.addAndGet(this, inputQueueSizeNow - 1); - - checkEOF(inputElement); - buffer = inputElement.toBuffer(); } + // Due to asynchronous inputQueueSize update - the inputQueueSizeNow may be < 0. + // It means the inputQueueSize.getAndSet(0); above, may unintentionally increase the counter. + // So, once we read a Buffer - we have to properly restore the counter value. + // Normally it had to be inputQueueSize.decrementAndGet(); , but we have to + // take into account fact described above. + inputQueueSizeUpdater.addAndGet(this, inputQueueSizeNow - 1); + + checkEOF(inputElement); + buffer = inputElement.toBuffer(); } else if (inputQueueSizeNow == 1) { // if there is one element available inputElement = inputQueue.poll(); @@ -375,14 +373,15 @@ public boolean isClosed() { /** * Checks if the passed InputElement is input buffer EOF element. - * + * * @param inputElement the {@link InputElement} to check EOF status against. */ private void checkEOF(final InputElement inputElement) { // first of all it has to be the last element if (inputElement.isLast) { - final Termination termination = !inputElement.isService ? IN_FIN_TERMINATION : (Termination) inputElement.content; + final Termination termination = inputElement.isService + ? (Termination) inputElement.content : IN_FIN_TERMINATION; if (closeFlagUpdater.compareAndSet(this, null, termination)) { diff --git a/modules/http2/src/main/java/org/glassfish/grizzly/http2/Http2BaseFilter.java b/modules/http2/src/main/java/org/glassfish/grizzly/http2/Http2BaseFilter.java index 66a0df917e..adcb93fbf5 100644 --- a/modules/http2/src/main/java/org/glassfish/grizzly/http2/Http2BaseFilter.java +++ b/modules/http2/src/main/java/org/glassfish/grizzly/http2/Http2BaseFilter.java @@ -119,7 +119,7 @@ public int getLocalMaxFramePayloadSize() { /** * Sets the maximum allowed HTTP2 frame size. - * + * * @param localMaxFramePayloadSize the maximum allowed HTTP2 frame size */ public void setLocalMaxFramePayloadSize(final int localMaxFramePayloadSize) { @@ -480,10 +480,10 @@ private void processSettingsFrame(final Http2Session http2Session, final FilterC } finally { frame.recycle(); } - } - void applySettings(final Http2Session http2Session, final SettingsFrame settingsFrame) throws Http2SessionException, Http2StreamException { + void applySettings(final Http2Session http2Session, final SettingsFrame settingsFrame) + throws Http2SessionException, Http2StreamException { for (int i = 0, numberOfSettings = settingsFrame.getNumberOfSettings(); i < numberOfSettings; i++) { final SettingsFrame.Setting setting = settingsFrame.getSettingByIndex(i); @@ -730,7 +730,7 @@ boolean checkIfHttp2StreamChain(final FilterChainContext ctx) throws IOException /** * Creates {@link Http2Session} with pre-configured initial-windows-size and max-concurrent-streams - * + * * @param connection the TCP {@link Connection} * @param isServer flag indicating whether this connection is server side or not. * @return {@link Http2Session} @@ -841,7 +841,7 @@ protected final Http2Session obtainHttp2Session(final FilterChainContext context final Http2Session obtainHttp2Session(final Http2State http2State, final FilterChainContext context, final boolean isUpStream) { final Connection connection = context.getConnection(); - Http2Session http2Session = http2State != null ? http2State.getHttp2Session() : null; + Http2Session http2Session = http2State == null ? null : http2State.getHttp2Session(); if (http2Session == null) { http2Session = Http2Session.get(connection); diff --git a/modules/http2/src/main/java/org/glassfish/grizzly/http2/Http2FrameCodec.java b/modules/http2/src/main/java/org/glassfish/grizzly/http2/Http2FrameCodec.java index a26903492d..23edab2003 100644 --- a/modules/http2/src/main/java/org/glassfish/grizzly/http2/Http2FrameCodec.java +++ b/modules/http2/src/main/java/org/glassfish/grizzly/http2/Http2FrameCodec.java @@ -71,20 +71,6 @@ public List parse(final Http2Session http2Session, final FrameParsin } return parsingResult.frameList(); - -// // ------------ ERROR processing block ----------------------------- -// final Buffer sndBuffer; -// final GoAwayFrame goAwayFrame = -// GoAwayFrame.builder() -// .errorCode(error.getErrorCode()) -// .build(); -// sndBuffer = goAwayFrame.toBuffer(http2State.getHttp2Session()); -// -// // send last message and close the connection -// ctx.write(sndBuffer); -// connection.closeSilently(); -// -// return ctx.getStopAction(); } public Buffer serializeAndRecycle(final Http2Session http2Session, final Http2Frame frame) { diff --git a/modules/http2/src/main/java/org/glassfish/grizzly/http2/Http2Session.java b/modules/http2/src/main/java/org/glassfish/grizzly/http2/Http2Session.java index f17a423ebb..d4031a2db3 100644 --- a/modules/http2/src/main/java/org/glassfish/grizzly/http2/Http2Session.java +++ b/modules/http2/src/main/java/org/glassfish/grizzly/http2/Http2Session.java @@ -587,9 +587,7 @@ private void close() { @Override public void failed(final Throwable throwable) { - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.log(Level.FINE, "Unable to write GOAWAY. Terminating session.", throwable); - } + LOGGER.log(Level.WARNING, "Unable to write GOAWAY. Terminating session.", throwable); close(); } @@ -600,9 +598,7 @@ public void completed(final WriteResult result) { @Override public void cancelled() { - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.log(Level.FINE, "GOAWAY write cancelled. Terminating session."); - } + LOGGER.log(Level.FINE, "GOAWAY write cancelled. Terminating session."); close(); } }, null); diff --git a/modules/http2/src/main/java/org/glassfish/grizzly/http2/Http2SessionOutputSink.java b/modules/http2/src/main/java/org/glassfish/grizzly/http2/Http2SessionOutputSink.java index 086f2ad89d..62d866a7c9 100644 --- a/modules/http2/src/main/java/org/glassfish/grizzly/http2/Http2SessionOutputSink.java +++ b/modules/http2/src/main/java/org/glassfish/grizzly/http2/Http2SessionOutputSink.java @@ -48,7 +48,6 @@ public class Http2SessionOutputSink { protected final Http2Session http2Session; private static final Logger LOGGER = Grizzly.logger(Http2SessionOutputSink.class); - private static final Level LOGGER_LEVEL = Level.FINE; private static final int MAX_FRAME_PAYLOAD_SIZE = 16383; private static final int MAX_OUTPUT_QUEUE_SIZE = 65536; @@ -127,10 +126,9 @@ protected void onPeerWindowUpdate(final int delta) throws Http2SessionException throw new Http2SessionException(ErrorCode.FLOW_CONTROL_ERROR, "Session flow-control window overflow."); } final int newWindowSize = availConnectionWindowSize.addAndGet(delta); - if (LOGGER.isLoggable(LOGGER_LEVEL)) { - LOGGER.log(LOGGER_LEVEL, "Http2Session. Expand connection window size by {0} bytes. Current connection window size is: {1}", - new Object[] { delta, newWindowSize }); - } + LOGGER.log(Level.FINE, + "Http2Session. Expand connection window size by {0} bytes. Current connection window size is: {1}", + new Object[] {delta, newWindowSize}); flushOutputQueue(); } @@ -171,7 +169,6 @@ protected void writeDataDownStream(final Http2Stream stream, final List writeCompletionHandler = null; int writeCompletionHandlerBytes = 0; - int bytesToTransfer = 0; int queueSizeToFree = 0; - AggrCompletionHandler completionHandlers = null; - // gather all available output data frames while (availWindowSize > bytesToTransfer && queueSize > queueSizeToFree) { final Http2OutputQueueRecord record = outputQueue.poll(); - if (record == null) { - // keep this warning for now - // should be reported when null record is spotted - LOGGER.log(Level.WARNING, "UNEXPECTED NULL RECORD. Queue-size: {0} " + "tmpcnt={1} byteToTransfer={2} queueSizeToFree={3} queueSize={4}", - new Object[] { outputQueue.size(), tmpcnt, bytesToTransfer, queueSizeToFree, queueSize }); + // keep this warning for now - should be reported when null record is spotted + LOGGER.log(Level.WARNING, "UNEXPECTED NULL RECORD. Queue-size: {0} " + + "byteToTransfer={1} queueSizeToFree={2} queueSize={3}", + new Object[]{outputQueue.size(), bytesToTransfer, queueSizeToFree, queueSize}); } assert record != null; - final int serializedBytes = record.serializeTo(tmpFramesList, Math.min(MAX_FRAME_PAYLOAD_SIZE, availWindowSize - bytesToTransfer)); + final int serializedBytes = record.serializeTo(tmpFramesList, + Math.min(MAX_FRAME_PAYLOAD_SIZE, availWindowSize - bytesToTransfer)); bytesToTransfer += serializedBytes; queueSizeToFree += serializedBytes; @@ -264,12 +256,10 @@ assert record != null; outputQueue.releaseSpace(queueSizeToFree); - needToNotify = true; - if (LOGGER.isLoggable(LOGGER_LEVEL)) { - LOGGER.log(LOGGER_LEVEL, "Http2Session. Shrink connection window size by {0} bytes. Current connection window size is: {1}", - new Object[] { bytesToTransfer, newWindowSize }); - } - + needToNotifyQueueManagement = true; + LOGGER.log(Level.FINE, + "Http2Session. Shrink connection window size by {0} bytes. Current connection window size is: {1}", + new Object[] {bytesToTransfer, newWindowSize}); } // release the writer lock, so other thread can start to write @@ -278,10 +268,9 @@ assert record != null; // we don't want this thread to write all the time - so give more // time for another thread to start writing LockSupport.parkNanos(backoffDelay++); - tmpcnt++; } - if (needToNotify) { + if (needToNotifyQueueManagement) { outputQueue.doNotify(); } } diff --git a/modules/http2/src/main/java/org/glassfish/grizzly/http2/Http2Stream.java b/modules/http2/src/main/java/org/glassfish/grizzly/http2/Http2Stream.java index b3a75b8f9b..46d5de1365 100644 --- a/modules/http2/src/main/java/org/glassfish/grizzly/http2/Http2Stream.java +++ b/modules/http2/src/main/java/org/glassfish/grizzly/http2/Http2Stream.java @@ -75,8 +75,8 @@ public enum State { static final int UPGRADE_STREAM_ID = 1; - private static final Attribute HTTP_RQST_HTTP2_STREAM_ATTR = AttributeBuilder.DEFAULT_ATTRIBUTE_BUILDER - .createAttribute("http2.request.stream"); + private static final Attribute HTTP_RQST_HTTP2_STREAM_ATTR + = AttributeBuilder.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("http2.request.stream"); State state = State.IDLE; @@ -94,13 +94,14 @@ public enum State { final StreamOutputSink outputSink; // number of bytes reported to be read, but still unacked to the peer - static final AtomicIntegerFieldUpdater unackedReadBytesUpdater = AtomicIntegerFieldUpdater.newUpdater(Http2Stream.class, "unackedReadBytes"); + static final AtomicIntegerFieldUpdater unackedReadBytesUpdater + = AtomicIntegerFieldUpdater.newUpdater(Http2Stream.class, "unackedReadBytes"); @SuppressWarnings("unused") private volatile int unackedReadBytes; // closeReasonRef, "null" value means the connection is open. - private static final AtomicReferenceFieldUpdater closeReasonUpdater = AtomicReferenceFieldUpdater.newUpdater(Http2Stream.class, - CloseReason.class, "closeReason"); + private static final AtomicReferenceFieldUpdater closeReasonUpdater + = AtomicReferenceFieldUpdater.newUpdater(Http2Stream.class, CloseReason.class, "closeReason"); @SuppressWarnings("unused") private volatile CloseReason closeReason; @@ -108,8 +109,8 @@ public enum State { private final Queue closeListeners = new ConcurrentLinkedQueue<>(); - private static final AtomicIntegerFieldUpdater completeFinalizationCounterUpdater = AtomicIntegerFieldUpdater.newUpdater(Http2Stream.class, - "completeFinalizationCounter"); + private static final AtomicIntegerFieldUpdater completeFinalizationCounterUpdater + = AtomicIntegerFieldUpdater.newUpdater(Http2Stream.class, "completeFinalizationCounter"); @SuppressWarnings("unused") private volatile int completeFinalizationCounter; @@ -179,7 +180,6 @@ protected Http2Stream(final Http2Session http2Session, final HttpRequestPacket r this.exclusive = false; inputBuffer = http2Session.isServer() ? new UpgradeInputBuffer(this) : new DefaultInputBuffer(this); - outputSink = http2Session.isServer() ? new DefaultOutputSink(this) : new UpgradeOutputSink(http2Session); HTTP_RQST_HTTP2_STREAM_ATTR.set(request, this); @@ -318,7 +318,7 @@ public void closeSilently() { /** * {@inheritDoc} - * + * * @deprecated please use {@link #close()} with the following * {@link GrizzlyFuture#addCompletionHandler(org.glassfish.grizzly.CompletionHandler)} call */