Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PAYARA-3031 Fix HTTP/2 Trailer Issue #13

Merged
merged 4 commits into from
Nov 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@

package org.glassfish.grizzly.http2;

import static java.util.logging.Level.FINE;
import static java.util.logging.Level.WARNING;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.CompletionHandler;
Expand Down Expand Up @@ -117,10 +120,8 @@ public void notifyWritePossible(final WriteHandler writeHandler) {
private void assertReady() throws IOException {
// if the last frame (fin flag == 1) has been queued already - throw an IOException
if (isTerminated()) {
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.log(Level.FINE, "Terminated!!! id={0} description={1}",
new Object[]{stream.getId(), terminationFlag.getDescription()});
}
LOGGER.log(FINE, "Terminated!!! id={0} description={1}",
new Object[]{stream.getId(), terminationFlag.getDescription()});
throw new IOException(terminationFlag.getDescription());
} else if (isLastFrameQueued) {
throw new IOException("Write beyond end of stream");
Expand All @@ -145,10 +146,10 @@ public void onPeerWindowUpdate(final int delta) throws Http2StreamException {
}
// update the available window size
availStreamWindowSize.addAndGet(delta);

// try to write until window limit allows
while (isWantToWrite() &&
!outputQueue.isEmpty()) {
while ((isWantToWrite() && !outputQueue.isEmpty())
|| (outputQueue.peek() != null && outputQueue.peek().trailer != null)) {

// pick up the first output record in the queue
OutputQueueRecord outputQueueRecord = outputQueue.poll();
Expand All @@ -171,14 +172,26 @@ public void onPeerWindowUpdate(final int delta) throws Http2StreamException {
boolean isLast = outputQueueRecord.isLast;
final boolean isZeroSizeData = outputQueueRecord.isZeroSizeData;
final Source resource = outputQueueRecord.resource;

final HttpTrailer currentTrailer = outputQueueRecord.trailer;
final MessageCloner messageCloner = outputQueueRecord.cloner;

if (currentTrailer != null) {
try {
outputQueueRecord = null;
sendTrailers(completionHandler, messageCloner, currentTrailer);
} catch (IOException ex) {
LOGGER.log(WARNING, "Error sending trailers.", ex);
}
return;
}

// check if output record's buffer is fitting into window size
// if not - split it into 2 parts: part to send, part to keep in the queue
final int bytesToSend = checkOutputWindow(resource.remaining());
final Buffer dataChunkToSend = resource.read(bytesToSend);
final boolean hasRemaining = resource.hasRemaining();


// if there is a chunk to store
if (hasRemaining) {
// Create output record for the chunk to be stored
Expand Down Expand Up @@ -251,8 +264,8 @@ public synchronized void writeDownStream(final HttpPacket httpPacket,
List<Http2Frame> headerFrames = null;
OutputQueueRecord outputQueueRecord = null;

boolean isDeflaterLocked = false;
final ReentrantLock deflatorLock = http2Session.getDeflaterLock();

try { // try-finally block to release deflater lock if needed

// If HTTP header hasn't been committed - commit it
Expand All @@ -263,8 +276,7 @@ public synchronized void writeDownStream(final HttpPacket httpPacket,
!httpContent.getContent().hasRemaining());

// !!!!! LOCK the deflater
isDeflaterLocked = true;
http2Session.getDeflaterLock().lock();
deflatorLock.lock();
final boolean logging = NetLogger.isActive();
final Map<String,String> capture = ((logging) ? new HashMap<>() : null);
headerFrames = http2Session.encodeHttpHeaderAsHeaderFrames(
Expand Down Expand Up @@ -319,15 +331,6 @@ public synchronized void writeDownStream(final HttpPacket httpPacket,
Buffer data = httpContent.getContent();
final int dataSize = data.remaining();

if (isLast && dataSize == 0) {
if (isTrailer) {
// !!!!! LOCK the deflater
isDeflaterLocked = true;
sendTrailers(completionHandler, messageCloner, (HttpTrailer) httpContent);
}
close();
return;
}

unflushedWritesCounter.incrementAndGet();
final FlushCompletionHandler flushCompletionHandler =
Expand All @@ -349,10 +352,19 @@ public synchronized void writeDownStream(final HttpPacket httpPacket,
isDataCloned = true;
}

outputQueueRecord = new OutputQueueRecord(
Source.factory(stream)
.createBufferSource(data),
flushCompletionHandler, isLast, isZeroSizeData);
if (isTrailer) {
outputQueueRecord = new OutputQueueRecord(
Source.factory(stream)
.createBufferSource(data),
flushCompletionHandler,
(HttpTrailer) httpContent,
false);
} else {
outputQueueRecord = new OutputQueueRecord(
Source.factory(stream)
.createBufferSource(data),
flushCompletionHandler, isLast, isZeroSizeData);
}

outputQueue.offer(outputQueueRecord);

Expand All @@ -367,7 +379,6 @@ public synchronized void writeDownStream(final HttpPacket httpPacket,
}

// our element is first in the output queue

final int remaining = data.remaining();

// check if output record's buffer is fitting into window size
Expand Down Expand Up @@ -428,16 +439,14 @@ public synchronized void writeDownStream(final HttpPacket httpPacket,
if (isLast) {
if (isTrailer) {
// !!!!! LOCK the deflater
isDeflaterLocked = true;
sendTrailers(completionHandler, messageCloner, (HttpTrailer) httpContent);
}
close();
return;
}

} finally {
if (isDeflaterLocked) {
http2Session.getDeflaterLock().unlock();
if (deflatorLock.isHeldByCurrentThread()) {
deflatorLock.unlock();
}
}

Expand Down Expand Up @@ -603,7 +612,7 @@ private void addOutputQueueRecord(OutputQueueRecord outputQueueRecord)
throws Http2StreamException {

do { // Make sure current outputQueueRecord is not forgotten

// set the outputQueueRecord as the current
outputQueue.setCurrentElement(outputQueueRecord);

Expand All @@ -612,10 +621,20 @@ private void addOutputQueueRecord(OutputQueueRecord outputQueueRecord)
outputQueue.compareAndSetCurrentElement(outputQueueRecord, null)) {

// if we can send the output record now - do that

final FlushCompletionHandler chunkedCompletionHandler =
outputQueueRecord.chunkedCompletionHandler;

final HttpTrailer currentTrailer = outputQueueRecord.trailer;
final MessageCloner messageCloner = outputQueueRecord.cloner;
if (currentTrailer != null) {
try {
sendTrailers(chunkedCompletionHandler, messageCloner, currentTrailer);
} catch (IOException ex) {
LOGGER.log(WARNING, "Error sending trailers.", ex);
}
return;
}

boolean isLast = outputQueueRecord.isLast;
final boolean isZeroSizeData = outputQueueRecord.isZeroSizeData;

Expand All @@ -624,7 +643,6 @@ private void addOutputQueueRecord(OutputQueueRecord outputQueueRecord)
final int fitWindowLen = checkOutputWindow(currentResource.remaining());
final Buffer dataChunkToSend = currentResource.read(fitWindowLen);


// if there is a chunk to store
if (currentResource.hasRemaining()) {
// Create output record for the chunk to be stored
Expand Down Expand Up @@ -688,7 +706,6 @@ private void sendTrailers(final CompletionHandler<WriteResult> completionHandler
final MessageCloner<Buffer> messageCloner,
final HttpTrailer httpContent)
throws IOException {
http2Session.getDeflaterLock().lock();
final boolean logging = NetLogger.isActive();
final Map<String,String> capture = ((logging) ? new HashMap<>() : null);
List<Http2Frame> trailerFrames =
Expand All @@ -707,11 +724,15 @@ private void sendTrailers(final CompletionHandler<WriteResult> completionHandler
flushToConnectionOutputSink(trailerFrames, null,
new FlushCompletionHandler(completionHandler),
messageCloner, true);
close();
}

private static class OutputQueueRecord extends AsyncQueueRecord<WriteResult> {
private Source resource;
private FlushCompletionHandler chunkedCompletionHandler;

private HttpTrailer trailer;
private MessageCloner cloner;

private boolean isLast;

Expand All @@ -727,6 +748,18 @@ public OutputQueueRecord(final Source resource,
this.isLast = isLast;
this.isZeroSizeData = isZeroSizeData;
}

public OutputQueueRecord(final Source resource,
final FlushCompletionHandler completionHandler,
final HttpTrailer trailer, final boolean isZeroDataSize) {
super(null, null, null);

this.resource = resource;
this.chunkedCompletionHandler = completionHandler;
this.isLast = true;
this.trailer = trailer;
this.isZeroSizeData = isZeroDataSize;
}

private void incChunksCounter() {
if (chunkedCompletionHandler != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private ParsingResult parseFrame(final Http2Session http2Session,

final int len = http2Session.getFrameSize(buffer);

if (len > http2Session.getLocalMaxFramePayloadSize() + Http2Frame.FRAME_HEADER_SIZE) {
if (len > http2Session.getPeerMaxFramePayloadSize() + Http2Frame.FRAME_HEADER_SIZE) {

http2Session.onOversizedFrame(buffer);

Expand Down