Skip to content

Commit

Permalink
Issue #4047 Graceful Write
Browse files Browse the repository at this point in the history
Added test to reproduce issue
Fixed bug from #2772 where output was shutdown on DONE without checking for END.
Fixed aggregation logic to aggregate last write if aggregation already started.

Signed-off-by: Greg Wilkins <gregw@webtide.com>
  • Loading branch information
gregw committed Sep 17, 2019
1 parent e013c24 commit df4a21c
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public enum Result
FLUSH, // The buffers previously generated should be flushed
CONTINUE, // Continue generating the message
SHUTDOWN_OUT, // Need EOF to be signaled
DONE // Message generation complete
DONE // Generation complete
}

// other statics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,9 +732,9 @@ public Action process() throws Exception
{
HttpGenerator.Result result = _generator.generateResponse(_info, _head, _header, chunk, _content, _lastContent);
if (LOG.isDebugEnabled())
LOG.debug("{} generate: {} ({},{},{})@{}",
this,
LOG.debug("generate: {} for {} ({},{},{})@{}",
result,
this,
BufferUtil.toSummaryString(_header),
BufferUtil.toSummaryString(_content),
_lastContent,
Expand Down Expand Up @@ -827,7 +827,7 @@ public Action process() throws Exception
case DONE:
{
// If shutdown after commit, we can still close here.
if (getConnector().isShutdown())
if (_generator.isEnd() && getConnector().isShutdown())
_shutdownOut = true;

return Action.SUCCEEDED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,17 +591,16 @@ public void write(byte[] b, int off, int len) throws IOException
// handle blocking write

// Should we aggregate?
int capacity = getBufferSize();
boolean last = isLastContentToWrite(len);
if (!last && len <= _commitSize)
if (len <= _commitSize && !(last && len > BufferUtil.space(_aggregate)))
{
acquireBuffer();

// YES - fill the aggregate with content from the buffer
int filled = BufferUtil.fill(_aggregate, b, off, len);

// return if we are not complete, not full and filled all the content
if (filled == len && !BufferUtil.isFull(_aggregate))
if (!last && filled == len && !BufferUtil.isFull(_aggregate))
return;

// adjust offset/length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,11 @@ protected void doShutdown(List<Future<Void>> futures) throws MultiException
// tell the graceful handlers that we are shutting down
Handler[] gracefuls = getChildHandlersByClass(Graceful.class);
if (futures == null)
{
if (gracefuls.length == 0)
return;
futures = new ArrayList<>(gracefuls.length);
}
for (Handler graceful : gracefuls)
{
futures.add(((Graceful)graceful).shutdown());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -166,6 +168,67 @@ public void testGracefulTimeout() throws Exception
client.close();
}


/**
* Test completed writes during shutdown do not close output
* @throws Exception on test failure
*/
@Test
public void testWriteDuringShutdown() throws Exception
{
Server server = new Server();
server.setStopTimeout(1000);

ServerConnector connector = new ServerConnector(server);
connector.setPort(0);
server.addConnector(connector);

ABHandler handler = new ABHandler();
StatisticsHandler stats = new StatisticsHandler();
server.setHandler(stats);
stats.setHandler(handler);

server.start();

Thread stopper = new Thread(() ->
{
try
{
handler.latchA.await();
server.stop();
}
catch (Exception e)
{
e.printStackTrace();
}
});

final int port = connector.getLocalPort();
try(Socket client = new Socket("127.0.0.1", port))
{
client.getOutputStream().write((
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + port + "\r\n" +
"\r\n"
).getBytes());
client.getOutputStream().flush();

stopper.start();

while (!connector.isShutdown())
Thread.sleep(10);

handler.latchB.countDown();

String response = IO.toString(client.getInputStream());
assertThat(response, startsWith("HTTP/1.1 200 "));
assertThat(response, containsString("Content-Length: 2"));
assertThat(response, containsString("Connection: close"));
assertThat(response, endsWith("ab"));
}
stopper.join();
}

/**
* Test of standard graceful timeout mechanism when a block request does
* complete. Note that even though the request completes after 100ms, the
Expand Down Expand Up @@ -736,6 +799,30 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
}
}

static class ABHandler extends AbstractHandler
{
final CountDownLatch latchA = new CountDownLatch(1);
final CountDownLatch latchB = new CountDownLatch(1);

@Override
public void handle(String s, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
response.setContentLength(2);
response.getOutputStream().write("a".getBytes());
try
{
latchA.countDown();
latchB.await();
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
response.flushBuffer();
response.getOutputStream().write("b".getBytes());
}
}

static class TestHandler extends AbstractHandler
{
final CountDownLatch latch = new CountDownLatch(1);
Expand Down

0 comments on commit df4a21c

Please sign in to comment.