From 71df3b57eef08040203337f6d83ff28adc86d7c7 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 30 Apr 2020 09:15:49 +1000 Subject: [PATCH] Issue #4824 - add configuration on RemoteEndpoint for maxOutgoingFrames Signed-off-by: Lachlan Roberts --- .../jetty/websocket/api/RemoteEndpoint.java | 18 ++++++ .../common/WebSocketRemoteEndpoint.java | 59 +++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java index 45a8fcf47800..37dcec929481 100644 --- a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java +++ b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java @@ -141,6 +141,24 @@ public interface RemoteEndpoint */ void setBatchMode(BatchMode mode); + /** + * Set the maximum number of frames which allowed to be waiting to be sent at any one time. + * The default value is -1, this indicates there is no limit on how many frames can be + * queued to be sent by the implementation. + * + * @param maxOutgoingFrames the max number of frames. + */ + void setMaxOutgoingFrames(int maxOutgoingFrames); + + /** + * Get the maximum number of frames which allowed to be waiting to be sent at any one time. + * The default value is -1, this indicates there is no limit on how many frames can be + * queued to be sent by the implementation. + * + * @return the max number of frames. + */ + int getMaxOutgoingFrames(); + /** * Get the InetSocketAddress for the established connection. * diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java index c53d71cffa3c..aa827f3754a8 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java @@ -81,7 +81,9 @@ public void writeFailed(Throwable x) private final OutgoingFrames outgoing; private final AtomicInteger msgState = new AtomicInteger(); private final BlockingWriteCallback blocker = new BlockingWriteCallback(); + private final AtomicInteger numOutgoingFrames = new AtomicInteger(); private volatile BatchMode batchMode; + private int maxNumOutgoingFrames = -1; public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing) { @@ -303,6 +305,19 @@ public void uncheckedSendFrame(WebSocketFrame frame, WriteCallback callback) BatchMode batchMode = BatchMode.OFF; if (frame.isDataFrame()) batchMode = getBatchMode(); + + if (maxNumOutgoingFrames > 0 && frame.isDataFrame()) + { + // Increase the number of outgoing frames, will be decremented when callback is completed. + int outgoingFrames = numOutgoingFrames.incrementAndGet(); + callback = from(callback, numOutgoingFrames::decrementAndGet); + if (outgoingFrames > maxNumOutgoingFrames) + { + callback.writeFailed(new IOException("Exceeded max outgoing frames: " + outgoingFrames + ">" + maxNumOutgoingFrames)); + return; + } + } + outgoing.outgoingFrame(frame, callback, batchMode); } @@ -439,6 +454,18 @@ public void setBatchMode(BatchMode batchMode) this.batchMode = batchMode; } + @Override + public void setMaxOutgoingFrames(int maxOutgoingFrames) + { + this.maxNumOutgoingFrames = maxOutgoingFrames; + } + + @Override + public int getMaxOutgoingFrames() + { + return maxNumOutgoingFrames; + } + @Override public void flush() throws IOException { @@ -459,4 +486,36 @@ public String toString() { return String.format("%s@%x[batching=%b]", getClass().getSimpleName(), hashCode(), getBatchMode()); } + + private static WriteCallback from(WriteCallback callback, Runnable completed) + { + return new WriteCallback() + { + @Override + public void writeFailed(Throwable x) + { + try + { + callback.writeFailed(x); + } + finally + { + completed.run(); + } + } + + @Override + public void writeSuccess() + { + try + { + callback.writeSuccess(); + } + finally + { + completed.run(); + } + } + }; + } }