From 43e64a034134c6852a901f3a223bde2f9f54f040 Mon Sep 17 00:00:00 2001 From: James Frowen Date: Mon, 23 Sep 2024 20:13:34 +0100 Subject: [PATCH] starting on text frames support --- source/Runtime/Common/EventType.cs | 1 + source/Runtime/Common/Message.cs | 4 +-- source/Runtime/Common/MessageProcessor.cs | 8 ++--- source/Runtime/Common/ReceiveLoop.cs | 17 +++++---- source/Runtime/Server/SimpleWebServer.cs | 10 ++++++ tests/Runtime/Server/ReceiveMessageTest.cs | 23 ++++++++++++ tests/Runtime/Server/SendMessageTest.cs | 41 ++++++++++++++++++---- tests/Runtime/TestInstances.cs | 11 ++++-- tests/node~/ReceiveManyMessagesText.js | 34 ++++++++++++++++++ tests/node~/SendManyMessagesText.js | 20 +++++++++++ 10 files changed, 149 insertions(+), 20 deletions(-) create mode 100644 tests/node~/ReceiveManyMessagesText.js create mode 100644 tests/node~/SendManyMessagesText.js diff --git a/source/Runtime/Common/EventType.cs b/source/Runtime/Common/EventType.cs index 482185d..409a793 100644 --- a/source/Runtime/Common/EventType.cs +++ b/source/Runtime/Common/EventType.cs @@ -4,6 +4,7 @@ public enum EventType { Connected, Data, + Text, Disconnected, Error } diff --git a/source/Runtime/Common/Message.cs b/source/Runtime/Common/Message.cs index 9613423..8f469cd 100644 --- a/source/Runtime/Common/Message.cs +++ b/source/Runtime/Common/Message.cs @@ -2,7 +2,7 @@ namespace JamesFrowen.SimpleWeb { - public struct Message + public readonly struct Message { public readonly int connId; public readonly EventType type; @@ -32,7 +32,7 @@ public Message(int connId, EventType type) : this() this.type = type; } - public Message(int connId, ArrayBuffer data) : this() + public Message(int connId, ArrayBuffer data, EventType type) : this() { this.connId = connId; type = EventType.Data; diff --git a/source/Runtime/Common/MessageProcessor.cs b/source/Runtime/Common/MessageProcessor.cs index d5fb48f..6e3dca3 100644 --- a/source/Runtime/Common/MessageProcessor.cs +++ b/source/Runtime/Common/MessageProcessor.cs @@ -157,16 +157,16 @@ static void ThrowIfBadOpCode(OpCode opcode, bool finished, bool opCodeContinuati } else if (!finished) { - // Fragmented message, should be binary - if (opcode == OpCode.binary) + // Fragmented message, should be binary or text + if (opcode == OpCode.binary || opcode == OpCode.text) return; - throw new InvalidDataException("Expected opcode to be binary"); + throw new InvalidDataException("Expected opcode to be binary or text"); } else { // Normal message, should be binary, text, or close - if (opcode == OpCode.binary || opcode == OpCode.close) + if (opcode == OpCode.binary || opcode == OpCode.text || opcode == OpCode.close) return; throw new InvalidDataException($"Unexpected opcode {opcode}"); diff --git a/source/Runtime/Common/ReceiveLoop.cs b/source/Runtime/Common/ReceiveLoop.cs index 97dd8c2..b300ba0 100644 --- a/source/Runtime/Common/ReceiveLoop.cs +++ b/source/Runtime/Common/ReceiveLoop.cs @@ -124,12 +124,17 @@ static void ReadOneMessage(Config config, byte[] buffer) { switch (header.opcode) { + case OpCode.text: + HandleArrayMessage(config, buffer, msgOffset, header.payloadLength, EventType.Text); + break; case OpCode.binary: - HandleArrayMessage(config, buffer, msgOffset, header.payloadLength); + HandleArrayMessage(config, buffer, msgOffset, header.payloadLength, EventType.Data); break; case OpCode.close: HandleCloseMessage(config, buffer, msgOffset, header.payloadLength); break; + default: + throw new InvalidDataException($"Unexpected opcode {header.opcode}"); } } else @@ -138,6 +143,7 @@ static void ReadOneMessage(Config config, byte[] buffer) Queue fragments = new Queue(); fragments.Enqueue(CopyMessageToBuffer(bufferPool, expectMask, buffer, msgOffset, header.payloadLength)); int totalSize = header.payloadLength; + OpCode opcode = header.opcode; while (!header.finished) { @@ -151,7 +157,6 @@ static void ReadOneMessage(Config config, byte[] buffer) MessageProcessor.ThrowIfMsgLengthTooLong(totalSize, maxMessageSize); } - ArrayBuffer msg = bufferPool.Take(totalSize); msg.count = 0; while (fragments.Count > 0) @@ -166,8 +171,8 @@ static void ReadOneMessage(Config config, byte[] buffer) // dump after mask off Log.DumpBuffer($"Message", msg); - - queue.Enqueue(new Message(conn.connId, msg)); + EventType type = opcode == OpCode.binary ? EventType.Data : EventType.Text; + queue.Enqueue(new Message(conn.connId, msg, type)); } } @@ -209,7 +214,7 @@ static Header ReadHeader(Config config, byte[] buffer, bool opCodeContinuation = return header; } - static void HandleArrayMessage(Config config, byte[] buffer, int msgOffset, int payloadLength) + static void HandleArrayMessage(Config config, byte[] buffer, int msgOffset, int payloadLength, EventType type) { (Connection conn, int _, bool expectMask, ConcurrentQueue queue, BufferPool bufferPool) = config; @@ -218,7 +223,7 @@ static void HandleArrayMessage(Config config, byte[] buffer, int msgOffset, int // dump after mask off Log.DumpBuffer($"Message", arrayBuffer); - queue.Enqueue(new Message(conn.connId, arrayBuffer)); + queue.Enqueue(new Message(conn.connId, arrayBuffer, type)); } static ArrayBuffer CopyMessageToBuffer(BufferPool bufferPool, bool expectMask, byte[] buffer, int msgOffset, int payloadLength) diff --git a/source/Runtime/Server/SimpleWebServer.cs b/source/Runtime/Server/SimpleWebServer.cs index a626fd0..cf4fbe9 100644 --- a/source/Runtime/Server/SimpleWebServer.cs +++ b/source/Runtime/Server/SimpleWebServer.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Text; using UnityEngine; namespace JamesFrowen.SimpleWeb @@ -10,6 +11,7 @@ public class SimpleWebServer public event Action onConnect; public event Action onDisconnect; public event Action> onData; + public event Action onText; public event Action onError; readonly int maxMessagesPerTick; @@ -133,8 +135,16 @@ public void ProcessMessageQueue(MonoBehaviour behaviour) break; case EventType.Data: onData?.Invoke(next.connId, next.data.ToSegment()); + break; + case EventType.Text: + if (onText != null) + { + string messageText = Encoding.UTF8.GetString(next.data.array, 0, next.data.count); + onText.Invoke(next.connId, messageText); + } next.data.Release(); break; + case EventType.Disconnected: onDisconnect?.Invoke(next.connId); break; diff --git a/tests/Runtime/Server/ReceiveMessageTest.cs b/tests/Runtime/Server/ReceiveMessageTest.cs index 6aeea50..dba7fca 100644 --- a/tests/Runtime/Server/ReceiveMessageTest.cs +++ b/tests/Runtime/Server/ReceiveMessageTest.cs @@ -74,6 +74,29 @@ public IEnumerator ReceiveManyArrays() } } + [UnityTest] + public IEnumerator ReceiveManyTextMessages() + { + // Don't worry about result, run will timeout by itself + _ = RunNode.RunAsync("SendManyTextMessages.js"); + + yield return server.WaitForConnection; + + // Wait for messages + yield return new WaitForSeconds(0.5f); + const int expectedCount = 100; + + Assert.That(server.onData, Has.Count.EqualTo(expectedCount), $"Should have {expectedCount} messages"); + + for (int i = 0; i < expectedCount; i++) + { + (int connId, string data) = server.onData[i]; + + Assert.That(connId, Is.EqualTo(1), "Conn id should be 1"); + + Assert.That(data, Is.EqualTo($"Message {i}"), "Data should match the sent message"); + } + } [UnityTest] public IEnumerator ReceiveAlmostLargeArrays() diff --git a/tests/Runtime/Server/SendMessageTest.cs b/tests/Runtime/Server/SendMessageTest.cs index 98c7d09..e5ab001 100644 --- a/tests/Runtime/Server/SendMessageTest.cs +++ b/tests/Runtime/Server/SendMessageTest.cs @@ -23,7 +23,7 @@ public IEnumerator SendOne() yield return server.WaitForConnection; byte[] bytes = new byte[] { 1, 2, 3, 4, 5 }; - var segment = new ArraySegment(bytes); + ArraySegment segment = new ArraySegment(bytes); server.ServerSend(new List { 1 }, Channels.DefaultReliable, segment); @@ -61,7 +61,7 @@ public IEnumerator SendDifferentSizes(int msgSize) yield return server.WaitForConnection; byte[] bytes = Enumerable.Range(1, msgSize).Select(x => (byte)x).ToArray(); - var segment = new ArraySegment(bytes); + ArraySegment segment = new ArraySegment(bytes); server.ServerSend(new List { 1 }, Channels.DefaultReliable, segment); @@ -89,11 +89,11 @@ public IEnumerator SendMany() for (int i = 0; i < messageCount; i++) { - var writer = new NetworkWriter(); + NetworkWriter writer = new NetworkWriter(); writer.WriteByte((byte)i); writer.WriteInt32(100); - var segment = writer.ToArraySegment(); + ArraySegment segment = writer.ToArraySegment(); server.ServerSend(new List { 1 }, Channels.DefaultReliable, segment); } @@ -113,12 +113,41 @@ public IEnumerator SendMany() result.AssetErrors(); } + [UnityTest] + public IEnumerator SendManyTextMessages() + { + Task task = RunNode.RunAsync("ReceiveManyMessagesText.js", 5_000); + + yield return server.WaitForConnection; + const int messageCount = 100; + + for (int i = 0; i < messageCount; i++) + { + string message = $"Message {i}"; + server.ServerSendText(new List { 1 }, message); + } + + yield return new WaitForSeconds(1); + server.ServerDisconnect(1); + + yield return new WaitUntil(() => task.IsCompleted); + + RunNode.Result result = task.Result; + + IEnumerable expected = Enumerable.Range(0, messageCount).Select(i => $"Received text message: Message {i}"); + + result.AssetTimeout(false); + result.AssetOutput(expected.ToArray()); + result.AssetErrors(); + + } + [UnityTest] public IEnumerator ErrorWhenMessageTooBig() { yield return null; - var segment = new ArraySegment(new byte[70_000]); + ArraySegment segment = new ArraySegment(new byte[70_000]); LogAssert.Expect(LogType.Error, "ERROR: Message greater than max size"); server.ServerSend(new List { 1 }, Channels.DefaultReliable, segment); @@ -129,7 +158,7 @@ public IEnumerator ErrorWhenMessageTooSmall() { yield return null; - var segment = new ArraySegment(); + ArraySegment segment = new ArraySegment(); LogAssert.Expect(LogType.Error, "ERROR: Message count was zero"); server.ServerSend(new List { 1 }, Channels.DefaultReliable, segment); diff --git a/tests/Runtime/TestInstances.cs b/tests/Runtime/TestInstances.cs index 49413a8..8feb44f 100644 --- a/tests/Runtime/TestInstances.cs +++ b/tests/Runtime/TestInstances.cs @@ -51,6 +51,13 @@ public void ServerSend(System.Collections.Generic.List connectionIds, int c ServerSend(id, channelId, segment); } } + public void ServerSendText(System.Collections.Generic.List connectionIds, int channelId, string message) + { + foreach (int id in connectionIds) + { + ServerSend(id, channelId, message); + } + } } public class ClientTestInstance : SimpleWebTransport, NeedInitTestInstance { @@ -214,7 +221,7 @@ public override void ClientConnect(string hostname) return; } - var builder = new UriBuilder + UriBuilder builder = new UriBuilder { Scheme = GetClientScheme(), Host = hostname, @@ -353,7 +360,7 @@ public override string ServerGetClientAddress(int connectionId) public override Uri ServerUri() { - var builder = new UriBuilder + UriBuilder builder = new UriBuilder { Scheme = GetServerScheme(), Host = Dns.GetHostName(), diff --git a/tests/node~/ReceiveManyMessagesText.js b/tests/node~/ReceiveManyMessagesText.js new file mode 100644 index 0000000..ed8abdc --- /dev/null +++ b/tests/node~/ReceiveManyMessagesText.js @@ -0,0 +1,34 @@ +const WebSocket = require("websocket").w3cwebsocket; + +// Create WebSocket connection. +const webSocket = new WebSocket("ws://localhost:7776/"); + +webSocket.addEventListener('error', function (event) { + console.error('Socket Error', event); +}); + +// Use ping to keep connection alive +const pingInterval = 1000; +webSocket.addEventListener('open', function (event) { + setInterval(() => { + var buffer = new ArrayBuffer(4); + var view = new Uint8Array(buffer); + for (let i = 0; i < view.length; i++) { + view[i] = i + 10; + } + webSocket.send(buffer); + }, pingInterval); +}); + +webSocket.addEventListener('message', function (event) { + var message = event.data; + if (typeof message === 'string') { + console.log(`Received text message: ${message}`); + } else { + console.error("Message not a string"); + } +}); + +webSocket.addEventListener('close', function (event) { + process.exit(0); +}); diff --git a/tests/node~/SendManyMessagesText.js b/tests/node~/SendManyMessagesText.js new file mode 100644 index 0000000..c90eb6d --- /dev/null +++ b/tests/node~/SendManyMessagesText.js @@ -0,0 +1,20 @@ +const WebSocket = require("websocket").w3cwebsocket; + +// Create WebSocket connection. +const webSocket = new WebSocket("ws://localhost:7776/"); + +webSocket.addEventListener('error', function (event) { + console.error('Socket Error', event); +}); + +// Connection opened +webSocket.addEventListener('open', function (event) { + // send 100 text messages as fast as possible + for (let i = 0; i < 100; i++) { + webSocket.send(`Message ${i}`); + } +}); + +webSocket.addEventListener('close', function (event) { + process.exit(0); +});