forked from jetty/jetty.project
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Issue jetty#3170 - WebSocket Proxy PoC
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
- Loading branch information
1 parent
83b53f1
commit c1e3e57
Showing
3 changed files
with
329 additions
and
0 deletions.
There are no files selected for viewing
100 changes: 100 additions & 0 deletions
100
...ebsocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/BasicFrameHandler.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
package org.eclipse.jetty.websocket.core.proxy; | ||
|
||
import java.util.concurrent.BlockingQueue; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import org.eclipse.jetty.util.BlockingArrayQueue; | ||
import org.eclipse.jetty.util.BufferUtil; | ||
import org.eclipse.jetty.util.Callback; | ||
import org.eclipse.jetty.websocket.core.CloseStatus; | ||
import org.eclipse.jetty.websocket.core.Frame; | ||
import org.eclipse.jetty.websocket.core.FrameHandler; | ||
import org.eclipse.jetty.websocket.core.OpCode; | ||
|
||
class BasicFrameHandler implements FrameHandler | ||
{ | ||
protected String name; | ||
protected CoreSession session; | ||
protected CountDownLatch closed = new CountDownLatch(1); | ||
|
||
protected BlockingQueue<Frame> receivedFrames = new BlockingArrayQueue<>(); | ||
|
||
|
||
public BasicFrameHandler(String name) | ||
{ | ||
this.name = "[" + name + "]"; | ||
} | ||
|
||
@Override | ||
public void onOpen(CoreSession coreSession, Callback callback) | ||
{ | ||
session = coreSession; | ||
|
||
System.err.println(name + " onOpen(): " + session); | ||
callback.succeeded(); | ||
} | ||
|
||
@Override | ||
public void onFrame(Frame frame, Callback callback) | ||
{ | ||
System.err.println(name + " onFrame(): " + frame); | ||
receivedFrames.offer(Frame.copy(frame)); | ||
callback.succeeded(); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable cause, Callback callback) | ||
{ | ||
System.err.println(name + " onError(): " + cause); | ||
cause.printStackTrace(); | ||
callback.succeeded(); | ||
} | ||
|
||
@Override | ||
public void onClosed(CloseStatus closeStatus, Callback callback) | ||
{ | ||
System.err.println(name + " onClosed(): " + closeStatus); | ||
closed.countDown(); | ||
callback.succeeded(); | ||
} | ||
|
||
public void sendText(String message) | ||
{ | ||
Frame textFrame = new Frame(OpCode.TEXT, BufferUtil.toBuffer(message)); | ||
session.sendFrame(textFrame, Callback.NOOP, false); | ||
} | ||
|
||
public void close() throws InterruptedException | ||
{ | ||
session.close(CloseStatus.NORMAL, "standard close", Callback.NOOP); | ||
awaitClose(); | ||
} | ||
|
||
public void awaitClose() throws InterruptedException | ||
{ | ||
closed.await(5, TimeUnit.SECONDS); | ||
} | ||
|
||
|
||
public static class EchoHandler extends BasicFrameHandler | ||
{ | ||
public EchoHandler(String name) | ||
{ | ||
super(name); | ||
} | ||
|
||
@Override | ||
public void onFrame(Frame frame, Callback callback) | ||
{ | ||
System.err.println(name + " onFrame(): " + frame); | ||
|
||
if (frame.isDataFrame()) | ||
session.sendFrame(new Frame(frame.getOpCode(), frame.getPayload()), callback, false); | ||
else | ||
callback.succeeded(); | ||
|
||
receivedFrames.offer(Frame.copy(frame)); | ||
} | ||
} | ||
} |
155 changes: 155 additions & 0 deletions
155
...ebsocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/ProxyFrameHandler.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
package org.eclipse.jetty.websocket.core.proxy; | ||
|
||
import java.io.IOException; | ||
import java.net.URI; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
|
||
import org.eclipse.jetty.util.Callback; | ||
import org.eclipse.jetty.websocket.core.CloseStatus; | ||
import org.eclipse.jetty.websocket.core.Frame; | ||
import org.eclipse.jetty.websocket.core.FrameHandler; | ||
import org.eclipse.jetty.websocket.core.OpCode; | ||
import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest; | ||
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; | ||
|
||
class ProxyFrameHandler implements FrameHandler | ||
{ | ||
String name = "[PROXY_SERVER]"; | ||
|
||
URI serverUri; | ||
WebSocketCoreClient client = new WebSocketCoreClient(); | ||
|
||
CoreSession clientSession; | ||
volatile CoreSession serverSession; | ||
|
||
|
||
AtomicReference<Callback> closeFrameCallback = new AtomicReference<>(); | ||
|
||
public ProxyFrameHandler() | ||
{ | ||
try | ||
{ | ||
serverUri = new URI("ws://localhost:8080/server"); | ||
client.start(); | ||
} | ||
catch (Exception e) | ||
{ | ||
e.printStackTrace(); | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
@Override | ||
public void onOpen(CoreSession coreSession, Callback callback) | ||
{ | ||
System.err.println(name + " onOpen: " + coreSession); | ||
clientSession = coreSession; | ||
|
||
try | ||
{ | ||
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(client, serverUri, new ProxyFrameHandlerClient()); | ||
client.connect(upgradeRequest).whenComplete((s,t)->{ | ||
if (t != null) | ||
{ | ||
callback.failed(t); | ||
} | ||
else | ||
{ | ||
serverSession = s; | ||
callback.succeeded(); | ||
} | ||
}); | ||
} | ||
catch (IOException e) | ||
{ | ||
e.printStackTrace(); | ||
clientSession.close(CloseStatus.SERVER_ERROR, "proxy failed to connect to server", Callback.NOOP); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFrame(Frame frame, Callback callback) | ||
{ | ||
System.err.println(name + " onFrame(): " + frame); | ||
onFrame(serverSession, frame, callback); | ||
} | ||
|
||
private void onFrame(CoreSession session, Frame frame, Callback callback) | ||
{ | ||
if (frame.getOpCode() == OpCode.CLOSE) | ||
{ | ||
|
||
Callback closeCallback = Callback.NOOP; | ||
|
||
// If we have already received a close frame then we can succeed both callbacks | ||
if (!closeFrameCallback.compareAndSet(null, callback)) | ||
{ | ||
closeCallback = Callback.from(()-> | ||
{ | ||
closeFrameCallback.get().succeeded(); | ||
callback.succeeded(); | ||
}, (t)-> | ||
{ | ||
closeFrameCallback.get().failed(t); | ||
callback.failed(t); | ||
}); | ||
} | ||
|
||
session.sendFrame(frame, closeCallback, false); | ||
return; | ||
} | ||
else | ||
{ | ||
session.sendFrame(Frame.copy(frame), callback, false); | ||
} | ||
} | ||
|
||
@Override | ||
public void onError(Throwable cause, Callback callback) | ||
{ | ||
System.err.println(name + " onError(): " + cause); | ||
cause.printStackTrace(); | ||
callback.succeeded(); | ||
} | ||
|
||
@Override | ||
public void onClosed(CloseStatus closeStatus, Callback callback) | ||
{ | ||
System.err.println(name + " onClosed(): " + closeStatus); | ||
callback.succeeded(); | ||
} | ||
|
||
class ProxyFrameHandlerClient implements FrameHandler | ||
{ | ||
String name = "[PROXY_CLIENT]"; | ||
|
||
@Override | ||
public void onOpen(CoreSession coreSession, Callback callback) | ||
{ | ||
serverSession = coreSession; | ||
callback.succeeded(); | ||
} | ||
|
||
@Override | ||
public void onFrame(Frame frame, Callback callback) | ||
{ | ||
System.err.println(name + " onFrame(): " + frame); | ||
ProxyFrameHandler.this.onFrame(clientSession, frame, callback); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable cause, Callback callback) | ||
{ | ||
System.err.println(name + " onError(): " + cause); | ||
cause.printStackTrace(); | ||
callback.succeeded(); | ||
} | ||
|
||
@Override | ||
public void onClosed(CloseStatus closeStatus, Callback callback) | ||
{ | ||
System.err.println(name + " onClosed(): " + closeStatus); | ||
callback.succeeded(); | ||
} | ||
} | ||
} |
74 changes: 74 additions & 0 deletions
74
...bsocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxyTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
package org.eclipse.jetty.websocket.core.proxy; | ||
|
||
import java.net.URI; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import org.eclipse.jetty.server.Server; | ||
import org.eclipse.jetty.server.ServerConnector; | ||
import org.eclipse.jetty.server.handler.ContextHandler; | ||
import org.eclipse.jetty.server.handler.HandlerList; | ||
import org.eclipse.jetty.websocket.core.FrameHandler.CoreSession; | ||
import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest; | ||
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; | ||
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator; | ||
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler; | ||
import org.junit.jupiter.api.AfterEach; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
|
||
public class WebSocketProxyTest | ||
{ | ||
Server _server; | ||
WebSocketCoreClient _client; | ||
|
||
|
||
@BeforeEach | ||
public void start() throws Exception | ||
{ | ||
_server = new Server(); | ||
ServerConnector connector = new ServerConnector(_server); | ||
connector.setPort(8080); | ||
_server.addConnector(connector); | ||
|
||
HandlerList handlers = new HandlerList(); | ||
|
||
ContextHandler serverContext = new ContextHandler("/server"); | ||
WebSocketNegotiator negotiator = WebSocketNegotiator.from((negotiation) -> new BasicFrameHandler.EchoHandler("SERVER")); | ||
WebSocketUpgradeHandler upgradeHandler = new WebSocketUpgradeHandler(negotiator); | ||
serverContext.setHandler(upgradeHandler); | ||
handlers.addHandler(serverContext); | ||
|
||
ContextHandler proxyContext = new ContextHandler("/proxy"); | ||
negotiator = WebSocketNegotiator.from((negotiation) -> new ProxyFrameHandler()); | ||
upgradeHandler = new WebSocketUpgradeHandler(negotiator); | ||
proxyContext.setHandler(upgradeHandler); | ||
handlers.addHandler(proxyContext); | ||
|
||
_server.setHandler(handlers); | ||
_server.start(); | ||
|
||
_client = new WebSocketCoreClient(); | ||
_client.start(); | ||
} | ||
|
||
@AfterEach | ||
public void stop() throws Exception | ||
{ | ||
_client.stop(); | ||
_server.stop(); | ||
} | ||
|
||
|
||
@Test | ||
public void testHello() throws Exception | ||
{ | ||
BasicFrameHandler clientHandler = new BasicFrameHandler("CLIENT"); | ||
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy"), clientHandler); | ||
|
||
CompletableFuture<CoreSession> response = _client.connect(upgradeRequest); | ||
response.get(5, TimeUnit.SECONDS); | ||
clientHandler.sendText("hello world"); | ||
clientHandler.close(); | ||
} | ||
} |