From 181381ef7ca2d662b156c79ea2f0188172751946 Mon Sep 17 00:00:00 2001 From: Santiago Pericasgeertsen Date: Mon, 1 Feb 2021 13:22:09 -0500 Subject: [PATCH 1/4] Properly release underlying buffer before passing it to handler. --- .../webserver/tyrus/TyrusReaderSubscriber.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/webserver/tyrus/src/main/java/io/helidon/webserver/tyrus/TyrusReaderSubscriber.java b/webserver/tyrus/src/main/java/io/helidon/webserver/tyrus/TyrusReaderSubscriber.java index b4878648334..0951d67d2d3 100644 --- a/webserver/tyrus/src/main/java/io/helidon/webserver/tyrus/TyrusReaderSubscriber.java +++ b/webserver/tyrus/src/main/java/io/helidon/webserver/tyrus/TyrusReaderSubscriber.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2021 Oracle and/or its affiliates. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -63,16 +63,15 @@ public void onSubscribe(Flow.Subscription subscription) { @Override public void onNext(DataChunk item) { + // Copy underlying buffer into a ByteBuffer and release DataChunk + ByteBuffer byteBuffer = ByteBuffer.wrap(item.bytes()); + item.release(); + + // Submit buffer to Tyrus if (executorService == null) { - for (ByteBuffer byteBuffer : item.data()) { - submitBuffer(byteBuffer); - } + submitBuffer(byteBuffer); } else { - executorService.submit(() -> { - for (ByteBuffer byteBuffer : item.data()) { - submitBuffer(byteBuffer); - } - }); + executorService.submit(() -> submitBuffer(byteBuffer)); } } From a90b5aba6fe4098ec43071850f09ef91c5b7af79 Mon Sep 17 00:00:00 2001 From: Santiago Pericasgeertsen Date: Tue, 2 Feb 2021 10:13:23 -0500 Subject: [PATCH 2/4] Releases data chunks after passing them to Tyrus without any copying. Reports an error and closes connection if Tyrus is unable to handle the data. Finally, fixed a problem related to subscription requests. Signed-off-by: Santiago Pericasgeertsen --- .../tyrus/TyrusReaderSubscriber.java | 46 ++++++++++++++----- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/webserver/tyrus/src/main/java/io/helidon/webserver/tyrus/TyrusReaderSubscriber.java b/webserver/tyrus/src/main/java/io/helidon/webserver/tyrus/TyrusReaderSubscriber.java index 0951d67d2d3..81cf85c54fe 100644 --- a/webserver/tyrus/src/main/java/io/helidon/webserver/tyrus/TyrusReaderSubscriber.java +++ b/webserver/tyrus/src/main/java/io/helidon/webserver/tyrus/TyrusReaderSubscriber.java @@ -63,33 +63,55 @@ public void onSubscribe(Flow.Subscription subscription) { @Override public void onNext(DataChunk item) { - // Copy underlying buffer into a ByteBuffer and release DataChunk - ByteBuffer byteBuffer = ByteBuffer.wrap(item.bytes()); - item.release(); - - // Submit buffer to Tyrus - if (executorService == null) { - submitBuffer(byteBuffer); + if (subscription != null) { + if (executorService == null) { + submitDataChunk(item); + } else { + executorService.submit(() -> submitDataChunk(item)); + } } else { - executorService.submit(() -> submitBuffer(byteBuffer)); + item.release(); + } + } + + /** + * Submits all data in a chunk and requests one more if successful. + * + * @param item a data chunk + */ + private void submitDataChunk(DataChunk item) { + try { + for (ByteBuffer byteBuffer : item.data()) { + submitBuffer(byteBuffer); + } + } finally { + item.release(); + } + if (subscription != null) { + subscription.request(1L); } } /** - * Submits data buffer to Tyrus. Retries a few times to make sure the entire buffer - * is consumed or logs an error. + * Submits single buffer to Tyrus. Retries a few times to make sure the entire buffer + * is consumed. * * @param data Data buffer. */ private void submitBuffer(ByteBuffer data) { + // Pass all data to Tyrus spi int retries = MAX_RETRIES; while (data.remaining() > 0 && retries-- > 0) { connection.getReadHandler().handle(data); } + + // If we can't push all data to Tyrus, cancel and report problem if (retries == 0) { - LOGGER.warning("Tyrus did not consume all data buffer after " + MAX_RETRIES + " retries"); + subscription.cancel(); + subscription = null; + connection.close(new CloseReason(UNEXPECTED_CONDITION, "Tyrus did not " + + "consume all data after " + MAX_RETRIES + " retries")); } - subscription.request(1L); } @Override From 25eb9d593028f0a2d059c56bb5b30a952a558f63 Mon Sep 17 00:00:00 2001 From: Santiago Pericasgeertsen Date: Tue, 2 Feb 2021 10:18:07 -0500 Subject: [PATCH 3/4] Removed unused logger. Signed-off-by: Santiago Pericasgeertsen --- .../java/io/helidon/webserver/tyrus/TyrusReaderSubscriber.java | 1 - 1 file changed, 1 deletion(-) diff --git a/webserver/tyrus/src/main/java/io/helidon/webserver/tyrus/TyrusReaderSubscriber.java b/webserver/tyrus/src/main/java/io/helidon/webserver/tyrus/TyrusReaderSubscriber.java index 81cf85c54fe..7564c3e0ce8 100644 --- a/webserver/tyrus/src/main/java/io/helidon/webserver/tyrus/TyrusReaderSubscriber.java +++ b/webserver/tyrus/src/main/java/io/helidon/webserver/tyrus/TyrusReaderSubscriber.java @@ -34,7 +34,6 @@ * Class TyrusReaderSubscriber. */ public class TyrusReaderSubscriber implements Flow.Subscriber { - private static final Logger LOGGER = Logger.getLogger(TyrusSupport.class.getName()); private static final int MAX_RETRIES = 5; private static final CloseReason CONNECTION_CLOSED = new CloseReason(NORMAL_CLOSURE, "Connection closed"); From a4c10f576b069ff1289dc20bfdc5d1a6bdf5ff9c Mon Sep 17 00:00:00 2001 From: Santiago Pericasgeertsen Date: Tue, 2 Feb 2021 10:37:02 -0500 Subject: [PATCH 4/4] Fixed checkstyle. Signed-off-by: Santiago Pericasgeertsen --- .../java/io/helidon/webserver/tyrus/TyrusReaderSubscriber.java | 1 - 1 file changed, 1 deletion(-) diff --git a/webserver/tyrus/src/main/java/io/helidon/webserver/tyrus/TyrusReaderSubscriber.java b/webserver/tyrus/src/main/java/io/helidon/webserver/tyrus/TyrusReaderSubscriber.java index 7564c3e0ce8..5a06f0f07dd 100644 --- a/webserver/tyrus/src/main/java/io/helidon/webserver/tyrus/TyrusReaderSubscriber.java +++ b/webserver/tyrus/src/main/java/io/helidon/webserver/tyrus/TyrusReaderSubscriber.java @@ -19,7 +19,6 @@ import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Flow; -import java.util.logging.Logger; import javax.websocket.CloseReason;