Skip to content

Commit

Permalink
Issue eclipse-ee4j#2016 Minor syntax changes
Browse files Browse the repository at this point in the history
Signed-off-by: David Matejcek <dmatej@seznam.cz>
  • Loading branch information
dmatej committed Nov 19, 2020
1 parent d129068 commit 40b5c80
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,18 @@ public final class TaskQueue<E extends AsyncQueueRecord> {
*/
private final Queue<E> queue;

private static final AtomicReferenceFieldUpdater<TaskQueue, AsyncQueueRecord> currentElementUpdater = AtomicReferenceFieldUpdater
.newUpdater(TaskQueue.class, AsyncQueueRecord.class, "currentElement");
private static final AtomicReferenceFieldUpdater<TaskQueue, AsyncQueueRecord> currentElementUpdater
= AtomicReferenceFieldUpdater.newUpdater(TaskQueue.class, AsyncQueueRecord.class, "currentElement");
private volatile E currentElement;

private static final AtomicIntegerFieldUpdater<TaskQueue> spaceInBytesUpdater = AtomicIntegerFieldUpdater.newUpdater(TaskQueue.class, "spaceInBytes");
private static final AtomicIntegerFieldUpdater<TaskQueue> spaceInBytesUpdater
= AtomicIntegerFieldUpdater.newUpdater(TaskQueue.class, "spaceInBytes");
private volatile int spaceInBytes;

private final MutableMaxQueueSize maxQueueSizeHolder;

private static final AtomicIntegerFieldUpdater<TaskQueue> writeHandlersCounterUpdater = AtomicIntegerFieldUpdater.newUpdater(TaskQueue.class,
"writeHandlersCounter");
private static final AtomicIntegerFieldUpdater<TaskQueue> writeHandlersCounterUpdater
= AtomicIntegerFieldUpdater.newUpdater(TaskQueue.class, "writeHandlersCounter");
private volatile int writeHandlersCounter;
protected final Queue<WriteHandler> writeHandlersQueue = new ConcurrentLinkedQueue<>();
// ------------------------------------------------------------ Constructors
Expand Down Expand Up @@ -84,14 +85,15 @@ public int size() {
@SuppressWarnings("unchecked")
public E poll() {
E current = (E) currentElementUpdater.getAndSet(this, null);
return current != null ? current : queue.poll();
return current == null ? queue.poll() : current;
}

/**
* Get the current processing task, if the current in not set, take the task from the queue. Note: after this operation
* call, the current element could be removed from the queue using
* {@link #setCurrentElement(org.glassfish.grizzly.asyncqueue.AsyncQueueRecord)} and passing <tt>null</tt> as a
* parameter, this is a little bit more optimal alternative to {@link #poll()}.
* Get the current processing task, if the current in not set, take the task from the queue.
* <p>
* Note: after this operation call, the current element could be removed from the queue using
* {@link #setCurrentElement(org.glassfish.grizzly.asyncqueue.AsyncQueueRecord)}
* and passing <tt>null</tt> as a parameter, this is a little bit more optimal alternative to {@link #poll()}.
*
* @return the current processing task
*/
Expand Down Expand Up @@ -155,7 +157,7 @@ public int spaceInBytes() {

/**
* Get the queue of tasks, which will be processed asynchronously
*
*
* @return the queue of tasks, which will be processed asynchronously
*/
public Queue<E> getQueue() {
Expand Down Expand Up @@ -209,21 +211,21 @@ private void checkWriteHandlerOnClose(final WriteHandler writeHandler) {
writeHandler.onError(new IOException("Connection is closed"));
}
}
// ------------------------------------------------------- Protected Methods

/**
* Notifies processing the queue by write handlers.
*/
public void doNotify() {
if (maxQueueSizeHolder == null || writeHandlersCounter == 0) {
return;
}

final int maxQueueSize = maxQueueSizeHolder.getMaxQueueSize();

while (spaceInBytes() < maxQueueSize) {
WriteHandler writeHandler = pollWriteHandler();
final WriteHandler writeHandler = pollWriteHandler();
if (writeHandler == null) {
return;
}

try {
writeHandler.onWritePossible();
} catch (Throwable e) {
Expand All @@ -234,7 +236,7 @@ public void doNotify() {

/**
* Set current task element.
*
*
* @param task current element.
*/
public void setCurrentElement(final E task) {
Expand All @@ -260,7 +262,7 @@ public boolean compareAndSetCurrentElement(final E expected, final E newValue) {

/**
* Remove the task from queue.
*
*
* @param task the task to remove.
* @return <tt>true</tt> if tasked was removed, or <tt>false</tt> otherwise.
*/
Expand Down Expand Up @@ -336,8 +338,6 @@ private WriteHandler pollWriteHandler() {
return null;
}

// ----------------------------------------------------------- Nested Classes

public interface MutableMaxQueueSize {
int getMaxQueueSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public final class SSLConnectionContext {
}

final ByteBufferArray outputByteBufferArray = ByteBufferArray.create();

final ByteBufferArray inputByteBufferArray = ByteBufferArray.create();

private Buffer lastOutputBuffer;
Expand Down Expand Up @@ -225,11 +224,9 @@ Buffer wrapAll(final Buffer input, final Allocator allocator) throws SSLExceptio
if (input.hasRemaining()) {
do {
result = wrap(input, inputArray, inputArraySize, null, allocator);

if (result.isError()) {
throw result.getError();
}

final Buffer newOutput = result.getOutput();
newOutput.trim();

Expand Down Expand Up @@ -272,13 +269,11 @@ private SslResult wrap(final Buffer input, final ByteBuffer[] inputArray, final
}

final Status status = sslEngineResult.getStatus();

if (status == Status.CLOSED) {
return new SslResult(output, new SSLException("SSLEngine is CLOSED"));
}

final boolean isOverflow = status == Status.BUFFER_OVERFLOW;

if (allocator != null && isOverflow) {
updateBufferSizes();
output = ensureBufferSize(output, netBufferSize, allocator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,12 +800,11 @@ public void notifyCanWrite(final WriteHandler handler) {

if (isNonBlockingWriteGuaranteed || canWrite()) {
final Reentrant reentrant = Reentrant.getWriteReentrant();
if (!reentrant.isMaxReentrantsReached()) {
notifyWritePossible();
} else {
if (reentrant.isMaxReentrantsReached()) {
notifyWritePossibleAsync();
} else {
notifyWritePossible();
}

return;
}

Expand Down Expand Up @@ -847,7 +846,6 @@ protected Executor getThreadPool() {
/**
* Notify WriteHandler asynchronously
*/
@SuppressWarnings("unchecked")
private void notifyWritePossibleAsync() {
if (writePossibleRunnable == null) {
writePossibleRunnable = new Runnable() {
Expand Down Expand Up @@ -1156,8 +1154,10 @@ private void flushBinaryBuffersIfNeeded() throws IOException {
}

private void notifyCommit() throws IOException {
for (int i = 0, len = lifeCycleListeners.size(); i < len; i++) {
lifeCycleListeners.get(i).onCommit();
// the collection is not synchronized and may be accessed in parallel
final LifeCycleListener[] array = lifeCycleListeners.toArray(new LifeCycleListener[lifeCycleListeners.size()]);
for (LifeCycleListener lifeCycleListener : array) {
lifeCycleListener.onCommit();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ class DefaultInputBuffer implements StreamInputBuffer {

private static final long NULL_CONTENT_LENGTH = Long.MIN_VALUE;

private static final AtomicIntegerFieldUpdater<DefaultInputBuffer> inputQueueSizeUpdater = AtomicIntegerFieldUpdater.newUpdater(DefaultInputBuffer.class,
"inputQueueSize");
@SuppressWarnings("unused")
private static final AtomicIntegerFieldUpdater<DefaultInputBuffer> inputQueueSizeUpdater
= AtomicIntegerFieldUpdater.newUpdater(DefaultInputBuffer.class, "inputQueueSize");

private volatile int inputQueueSize;

private final BlockingQueue<InputElement> inputQueue = new LinkedTransferQueue<>();
Expand All @@ -59,8 +59,8 @@ class DefaultInputBuffer implements StreamInputBuffer {

// the termination flag. When is not null contains the reason why input was terminated.
// when the flag is not null - poll0() will return -1.
private static final AtomicReferenceFieldUpdater<DefaultInputBuffer, Termination> closeFlagUpdater = AtomicReferenceFieldUpdater
.newUpdater(DefaultInputBuffer.class, Termination.class, "closeFlag");
private static final AtomicReferenceFieldUpdater<DefaultInputBuffer, Termination> closeFlagUpdater
= AtomicReferenceFieldUpdater.newUpdater(DefaultInputBuffer.class, Termination.class, "closeFlag");
@SuppressWarnings("unused")
private volatile Termination closeFlag;

Expand Down Expand Up @@ -93,7 +93,6 @@ public void onReadEventComplete() {
// If input stream has been terminated - send error message upstream
if (isClosed()) {
http2Session.sendMessageUpstream(stream, buildBrokenHttpContent(new EOFException(closeFlag.getDescription())));

return;
}

Expand Down Expand Up @@ -262,17 +261,16 @@ private Buffer poll0() throws IOException {
if (inputElement == null) {
// timeout expired
throw new IOException("Blocking read timeout");
} else {
// Due to asynchronous inputQueueSize update - the inputQueueSizeNow may be < 0.
// It means the inputQueueSize.getAndSet(0); above, may unintentionally increase the counter.
// So, once we read a Buffer - we have to properly restore the counter value.
// Normally it had to be inputQueueSize.decrementAndGet(); , but we have to
// take into account fact described above.
inputQueueSizeUpdater.addAndGet(this, inputQueueSizeNow - 1);

checkEOF(inputElement);
buffer = inputElement.toBuffer();
}
// Due to asynchronous inputQueueSize update - the inputQueueSizeNow may be < 0.
// It means the inputQueueSize.getAndSet(0); above, may unintentionally increase the counter.
// So, once we read a Buffer - we have to properly restore the counter value.
// Normally it had to be inputQueueSize.decrementAndGet(); , but we have to
// take into account fact described above.
inputQueueSizeUpdater.addAndGet(this, inputQueueSizeNow - 1);

checkEOF(inputElement);
buffer = inputElement.toBuffer();
} else if (inputQueueSizeNow == 1) {
// if there is one element available
inputElement = inputQueue.poll();
Expand Down Expand Up @@ -375,14 +373,15 @@ public boolean isClosed() {

/**
* Checks if the passed InputElement is input buffer EOF element.
*
*
* @param inputElement the {@link InputElement} to check EOF status against.
*/
private void checkEOF(final InputElement inputElement) {
// first of all it has to be the last element
if (inputElement.isLast) {

final Termination termination = !inputElement.isService ? IN_FIN_TERMINATION : (Termination) inputElement.content;
final Termination termination = inputElement.isService
? (Termination) inputElement.content : IN_FIN_TERMINATION;

if (closeFlagUpdater.compareAndSet(this, null, termination)) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public int getLocalMaxFramePayloadSize() {

/**
* Sets the maximum allowed HTTP2 frame size.
*
*
* @param localMaxFramePayloadSize the maximum allowed HTTP2 frame size
*/
public void setLocalMaxFramePayloadSize(final int localMaxFramePayloadSize) {
Expand Down Expand Up @@ -480,10 +480,10 @@ private void processSettingsFrame(final Http2Session http2Session, final FilterC
} finally {
frame.recycle();
}

}

void applySettings(final Http2Session http2Session, final SettingsFrame settingsFrame) throws Http2SessionException, Http2StreamException {
void applySettings(final Http2Session http2Session, final SettingsFrame settingsFrame)
throws Http2SessionException, Http2StreamException {

for (int i = 0, numberOfSettings = settingsFrame.getNumberOfSettings(); i < numberOfSettings; i++) {
final SettingsFrame.Setting setting = settingsFrame.getSettingByIndex(i);
Expand Down Expand Up @@ -730,7 +730,7 @@ boolean checkIfHttp2StreamChain(final FilterChainContext ctx) throws IOException

/**
* Creates {@link Http2Session} with pre-configured initial-windows-size and max-concurrent-streams
*
*
* @param connection the TCP {@link Connection}
* @param isServer flag indicating whether this connection is server side or not.
* @return {@link Http2Session}
Expand Down Expand Up @@ -841,7 +841,7 @@ protected final Http2Session obtainHttp2Session(final FilterChainContext context
final Http2Session obtainHttp2Session(final Http2State http2State, final FilterChainContext context, final boolean isUpStream) {
final Connection connection = context.getConnection();

Http2Session http2Session = http2State != null ? http2State.getHttp2Session() : null;
Http2Session http2Session = http2State == null ? null : http2State.getHttp2Session();

if (http2Session == null) {
http2Session = Http2Session.get(connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,6 @@ public List<Http2Frame> parse(final Http2Session http2Session, final FrameParsin
}

return parsingResult.frameList();

// // ------------ ERROR processing block -----------------------------
// final Buffer sndBuffer;
// final GoAwayFrame goAwayFrame =
// GoAwayFrame.builder()
// .errorCode(error.getErrorCode())
// .build();
// sndBuffer = goAwayFrame.toBuffer(http2State.getHttp2Session());
//
// // send last message and close the connection
// ctx.write(sndBuffer);
// connection.closeSilently();
//
// return ctx.getStopAction();
}

public Buffer serializeAndRecycle(final Http2Session http2Session, final Http2Frame frame) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,9 +587,7 @@ private void close() {

@Override
public void failed(final Throwable throwable) {
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.log(Level.FINE, "Unable to write GOAWAY. Terminating session.", throwable);
}
LOGGER.log(Level.WARNING, "Unable to write GOAWAY. Terminating session.", throwable);
close();
}

Expand All @@ -600,9 +598,7 @@ public void completed(final WriteResult result) {

@Override
public void cancelled() {
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.log(Level.FINE, "GOAWAY write cancelled. Terminating session.");
}
LOGGER.log(Level.FINE, "GOAWAY write cancelled. Terminating session.");
close();
}
}, null);
Expand Down
Loading

0 comments on commit 40b5c80

Please sign in to comment.