Skip to content

Commit

Permalink
Issue eclipse-ee4j#2016 Better logging, reliable locking, variable na…
Browse files Browse the repository at this point in the history
…mes, but ...

- in this state grizzly:
  - fails TrailersTest (grizzly-http2; race condition, passes if I use stepping)
  - passes TCK tests with Payara (websocket)
  - still fails h2spec with Payara (race condition, different, but related issue)

Signed-off-by: David Matejcek <dmatej@seznam.cz>
  • Loading branch information
dmatej committed Nov 19, 2020
1 parent 8a616be commit 3aa8453
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
* @author Alexey Stashok
*/
class DefaultOutputSink implements StreamOutputSink {
private static final Logger LOGGER = Grizzly.logger(StreamOutputSink.class);
private static final Logger LOGGER = Grizzly.logger(DefaultOutputSink.class);

private static final int MAX_OUTPUT_QUEUE_SIZE = 65536;

Expand Down Expand Up @@ -165,15 +165,9 @@ public void onPeerWindowUpdate(final int delta) throws Http2StreamException {
final Source resource = outputQueueRecord.resource;

final HttpTrailer currentTrailer = outputQueueRecord.trailer;
final MessageCloner<Buffer> messageCloner = outputQueueRecord.cloner;

if (currentTrailer != null) {
try {
outputQueueRecord = null;
sendTrailers(completionHandler, messageCloner, currentTrailer);
} catch (IOException ex) {
LOGGER.log(WARNING, "Error sending trailers.", ex);
}
outputQueueRecord = null;
sendTrailers(completionHandler, currentTrailer);
return;
}

Expand Down Expand Up @@ -221,7 +215,6 @@ public void onPeerWindowUpdate(final int delta) throws Http2StreamException {
}
}
}

/**
* Send an {@link HttpPacket} to the {@link Http2Stream}.
*
Expand All @@ -238,27 +231,42 @@ public synchronized void writeDownStream(final HttpPacket httpPacket, final Filt
final CompletionHandler<WriteResult> completionHandler, final MessageCloner<Buffer> messageCloner)
throws IOException {
assert ctx != null;

assertReady();
final OutputQueueRecord next = writeDownStream0(httpPacket, ctx, completionHandler, messageCloner);
if (next != null) {
addOutputQueueRecord(next);
}
}


private OutputQueueRecord writeDownStream0(final HttpPacket httpPacket,
final FilterChainContext ctx,
final CompletionHandler<WriteResult> completionHandler,
final MessageCloner<Buffer> messageCloner)
throws IOException {

final HttpHeader httpHeader = stream.getOutputHttpHeader();
final HttpContent httpContent = HttpContent.isContent(httpPacket) ? (HttpContent) httpPacket : null;

List<Http2Frame> headerFrames = null;
OutputQueueRecord outputQueueRecord = null;

try { // try-finally block to release deflater lock if needed
boolean sendTrailers = false;
boolean lockedByMe = false;
try {
// try-finally block to release deflater lock if needed
boolean isLast = httpContent != null && httpContent.isLast();
final boolean isTrailer = HttpTrailer.isTrailer(httpContent);

// If HTTP header hasn't been committed - commit it
if (!httpHeader.isCommitted()) {
// do we expect any HTTP payload?
final boolean isNoPayload = !httpHeader.isExpectContent()
final boolean dontSendPayload = !httpHeader.isExpectContent()
|| (httpContent != null && httpContent.isLast() && !httpContent.getContent().hasRemaining());
LOGGER.finest(() -> "Header not committed yet; dontSendPayload=" + dontSendPayload);

http2Session.getDeflaterLock().lock();
lockedByMe = true;
final boolean logging = NetLogger.isActive();
final Map<String, String> capture = logging ? new HashMap<>() : null;
headerFrames = http2Session.encodeHttpHeaderAsHeaderFrames(ctx, httpHeader, stream.getId(), isNoPayload, null, capture);
headerFrames = http2Session.encodeHttpHeaderAsHeaderFrames(
ctx, httpHeader, stream.getId(), dontSendPayload, null, capture);
if (logging) {
for (Http2Frame http2Frame : headerFrames) {
if (http2Frame.getType() == PushPromiseFrame.TYPE) {
Expand All @@ -267,7 +275,7 @@ public synchronized void writeDownStream(final HttpPacket httpPacket, final Filt
}
}
}
stream.onSndHeaders(isNoPayload);
stream.onSndHeaders(dontSendPayload);

// 100-Continue block
if (!httpHeader.isRequest()) {
Expand All @@ -277,75 +285,61 @@ public synchronized void writeDownStream(final HttpPacket httpPacket, final Filt
response.getHeaders().clear();
unflushedWritesCounter.incrementAndGet();
flushToConnectionOutputSink(headerFrames, completionHandler, messageCloner, false);
return;
LOGGER.finest("Acknowledgement has been sent.");
return null;
}
}

httpHeader.setCommitted(true);

if (isNoPayload || httpContent == null) {
// if we don't expect any HTTP payload, mark this frame as
// last and return
if (dontSendPayload || httpContent == null) {
// if we don't expect any HTTP payload, mark this frame as last and return
unflushedWritesCounter.incrementAndGet();
flushToConnectionOutputSink(headerFrames, completionHandler, messageCloner, isNoPayload);
return;
flushToConnectionOutputSink(headerFrames, completionHandler, messageCloner, dontSendPayload);
sendTrailers = false;
LOGGER.finest(() -> "Nothing to send; dontSendPayload=" + dontSendPayload);
return null;
}
}

// if there is nothing to write - return
if (httpContent == null) {
return;
sendTrailers = false;
LOGGER.finest("Nothing to send, httpContent is null.");
return null;
}

http2Session.handlerFilter.onHttpContentEncoded(httpContent, ctx);

boolean isLast = httpContent.isLast();
final boolean isTrailer = HttpTrailer.isTrailer(httpContent);
Buffer data = httpContent.getContent();
final int dataSize = data.remaining();

unflushedWritesCounter.incrementAndGet();
final FlushCompletionHandler flushCompletionHandler = new FlushCompletionHandler(completionHandler);

boolean isDataCloned = false;

final boolean isZeroSizeData = dataSize == 0;
final int spaceToReserve = isZeroSizeData ? ZERO_QUEUE_RECORD_SIZE : dataSize;
boolean isDataCloned = false;

// Check if output queue is not empty - add new element
if (reserveWriteQueueSpace(spaceToReserve) > spaceToReserve) {
final int spaceReserved = reserveWriteQueueSpace(spaceToReserve);
LOGGER.finest(() -> "Bytes reserved: " + spaceReserved + ", was requested: " + spaceToReserve);
if (spaceReserved > spaceToReserve) {
// if the queue is not empty - the headers should have been sent
assert headerFrames == null;

if (messageCloner != null) {
data = messageCloner.clone(http2Session.getConnection(), data);
isDataCloned = true;
}

if (isTrailer) {
outputQueueRecord = new OutputQueueRecord(
Source.factory(stream).createBufferSource(data),
flushCompletionHandler,
(HttpTrailer) httpContent,
false);
} else {
outputQueueRecord = new OutputQueueRecord(
Source.factory(stream).createBufferSource(data),
flushCompletionHandler, isLast, isZeroSizeData);
}

outputQueue.offer(outputQueueRecord);
final OutputQueueRecord record = createOutputQueueRecord(data, httpContent, flushCompletionHandler,
isLast, isZeroSizeData);
outputQueue.offer(record);

// check if our element wasn't forgotten (async)
if (outputQueue.size() != spaceToReserve || !outputQueue.remove(outputQueueRecord)) {
// if not - send trailers and return
if (isLast && isTrailer) {
sendTrailers(completionHandler, messageCloner, (HttpTrailer) httpContent);
}
return;
if (outputQueue.size() != spaceToReserve || !outputQueue.remove(record)) {
sendTrailers = false;
LOGGER.finest("In some weird condition. FIXME... why are we removing what we added in previous if/else?");
return null;
}

outputQueueRecord = null;
}

// our element is first in the output queue
Expand All @@ -354,9 +348,13 @@ public synchronized void writeDownStream(final HttpPacket httpPacket, final Filt
// check if output record's buffer is fitting into window size
// if not - split it into 2 parts: part to send, part to keep in the queue
final int fitWindowLen = checkOutputWindow(remaining);
LOGGER.finest(() -> "Remaining: " + remaining + ", fitWindowLen: " + fitWindowLen);

// if there is a chunk to store
if (fitWindowLen < remaining) {
final OutputQueueRecord outputQueueRecord;
if (fitWindowLen >= remaining) {
outputQueueRecord = null;
} else {
// if there is a chunk to store
if (!isDataCloned && messageCloner != null) {
data = messageCloner.clone(http2Session.getConnection(), data);
isDataCloned = true;
Expand All @@ -365,9 +363,8 @@ public synchronized void writeDownStream(final HttpPacket httpPacket, final Filt
final Buffer dataChunkToStore = splitOutputBufferIfNeeded(data, fitWindowLen);

// Create output record for the chunk to be stored
outputQueueRecord = new OutputQueueRecord(
Source.factory(stream).createBufferSource(dataChunkToStore),
flushCompletionHandler, isLast, isZeroSizeData);
outputQueueRecord = createOutputQueueRecord(dataChunkToStore, null, flushCompletionHandler, isLast,
isZeroSizeData);

// reset completion handler and isLast for the current chunk
isLast = false;
Expand All @@ -377,34 +374,36 @@ public synchronized void writeDownStream(final HttpPacket httpPacket, final Filt
final Buffer dataToSend = prepareDataToSend(outputQueueRecord == null, isLast, data, isZeroSizeData);
if (headerFrames != null || dataToSend != null) {
// if another part of data is stored in the queue -
// we have to increase CompletionHandler counter to avoid
// premature notification
// we have to increase CompletionHandler counter to avoid premature notification
if (outputQueueRecord != null) {
outputQueueRecord.incChunksCounter();
}

flushToConnectionOutputSink(headerFrames, dataToSend, flushCompletionHandler,
isDataCloned ? null : messageCloner, isLast && !isTrailer);
}

if (isLast) {
if (isTrailer) {
sendTrailers(completionHandler, messageCloner, (HttpTrailer) httpContent);
}
return;
}

LOGGER.finest("isLast=" + isLast + "; isTrailer=" + isTrailer);
sendTrailers = isLast && isTrailer;
return isLast ? null : outputQueueRecord;
} finally {
// if it is locked by this thread, it was locked in the first lines of this try block
if (http2Session.getDeflaterLock().isHeldByCurrentThread()) {
LOGGER.finest("sendTrailers=" + sendTrailers);
if (sendTrailers) {
sendTrailers(completionHandler, (HttpTrailer) httpContent);
}
if (lockedByMe) {
http2Session.getDeflaterLock().unlock();
}
}
}


if (outputQueueRecord == null) {
return;
private OutputQueueRecord createOutputQueueRecord(final Buffer data, final HttpContent httpContent,
final FlushCompletionHandler flushCompletionHandler, boolean isLast, final boolean isZeroSizeData) {
final Source bufferSource = Source.factory(stream).createBufferSource(data);
if (httpContent instanceof HttpTrailer) {
return new OutputQueueRecord(bufferSource, flushCompletionHandler, (HttpTrailer) httpContent, false);
}
addOutputQueueRecord(outputQueueRecord);
return new OutputQueueRecord(bufferSource, flushCompletionHandler, isLast, isZeroSizeData);
}


Expand Down Expand Up @@ -485,29 +484,29 @@ private Buffer splitOutputBufferIfNeeded(final Buffer buffer, final int length)
private void flushToConnectionOutputSink(final List<Http2Frame> headerFrames,
final CompletionHandler<WriteResult> completionHandler,
final MessageCloner<Buffer> messageCloner,
final boolean isLast) {
final boolean sendFIN) {
final FlushCompletionHandler flushCompletionHandler = new FlushCompletionHandler(completionHandler);
flushToConnectionOutputSink(headerFrames, null, flushCompletionHandler, messageCloner, isLast);
flushToConnectionOutputSink(headerFrames, null, flushCompletionHandler, messageCloner, sendFIN);
}

private void flushToConnectionOutputSink(
final Buffer data,
final FlushCompletionHandler flushCompletionHandler,
final boolean isLast) {
flushToConnectionOutputSink(null, data, flushCompletionHandler, null, isLast);
final boolean sendFIN) {
flushToConnectionOutputSink(null, data, flushCompletionHandler, null, sendFIN);
}

private void flushToConnectionOutputSink(
final List<Http2Frame> headerFrames,
final Buffer data,
final CompletionHandler<WriteResult> completionHandler,
final MessageCloner<Buffer> messageCloner,
final boolean isLast) {
final boolean sendFIN) {

http2Session.getOutputSink().writeDataDownStream(
stream, headerFrames, data, completionHandler, messageCloner, isLast);
stream, headerFrames, data, completionHandler, messageCloner, sendFIN);

if (isLast) {
if (sendFIN) {
terminate(OUT_FIN_TERMINATION);
}
}
Expand Down Expand Up @@ -595,13 +594,8 @@ private void addOutputQueueRecord(OutputQueueRecord outputQueueRecord) throws Ht
// if we can send the output record now - do that
final FlushCompletionHandler chunkedCompletionHandler = outputQueueRecord.chunkedCompletionHandler;
final HttpTrailer currentTrailer = outputQueueRecord.trailer;
final MessageCloner<Buffer> messageCloner = outputQueueRecord.cloner;
if (currentTrailer != null) {
try {
sendTrailers(chunkedCompletionHandler, messageCloner, currentTrailer);
} catch (IOException ex) {
LOGGER.log(WARNING, "Error sending trailers.", ex);
}
sendTrailers(chunkedCompletionHandler, currentTrailer);
return;
}

Expand Down Expand Up @@ -660,9 +654,7 @@ private void releaseWriteQueueSpace(final int justSentBytes, final boolean isAto
}
}

private void sendTrailers(final CompletionHandler<WriteResult> completionHandler,
final MessageCloner<Buffer> messageCloner, final HttpTrailer httpContent)
throws IOException {
private void sendTrailers(final CompletionHandler<WriteResult> completionHandler, final HttpTrailer httpContent) {
http2Session.getDeflaterLock().lock();
try {
final boolean logging = NetLogger.isActive();
Expand All @@ -679,24 +671,25 @@ private void sendTrailers(final CompletionHandler<WriteResult> completionHandler
}
}
}
flushToConnectionOutputSink(trailerFrames, completionHandler, null, true);
unflushedWritesCounter.incrementAndGet();
flushToConnectionOutputSink(trailerFrames, completionHandler, messageCloner, true);
close();
} catch (IOException ex) {
LOGGER.log(WARNING, "Error sending trailers.", ex);
} finally {
close();
LOGGER.finest("Sending trailers finished, unlocking the deflater lock ...");
http2Session.getDeflaterLock().unlock();
}
}

private static class OutputQueueRecord extends AsyncQueueRecord<WriteResult> {
private final HttpTrailer trailer;
private final boolean isZeroSizeData;

private Source resource;
private FlushCompletionHandler chunkedCompletionHandler;

private HttpTrailer trailer;
private MessageCloner<Buffer> cloner;

private boolean isLast;

private final boolean isZeroSizeData;

public OutputQueueRecord(final Source resource, final FlushCompletionHandler completionHandler,
final boolean isLast, final boolean isZeroSizeData) {
Expand All @@ -705,6 +698,7 @@ public OutputQueueRecord(final Source resource, final FlushCompletionHandler com
this.resource = resource;
this.chunkedCompletionHandler = completionHandler;
this.isLast = isLast;
this.trailer = null;
this.isZeroSizeData = isZeroSizeData;
}

Expand Down
Loading

0 comments on commit 3aa8453

Please sign in to comment.