Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

starting on text frames support #15

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source/Runtime/Common/EventType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ public enum EventType
{
Connected,
Data,
Text,
Disconnected,
Error
}
Expand Down
4 changes: 2 additions & 2 deletions source/Runtime/Common/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace JamesFrowen.SimpleWeb
{
public struct Message
public readonly struct Message
{
public readonly int connId;
public readonly EventType type;
Expand Down Expand Up @@ -32,7 +32,7 @@ public Message(int connId, EventType type) : this()
this.type = type;
}

public Message(int connId, ArrayBuffer data) : this()
public Message(int connId, ArrayBuffer data, EventType type) : this()
{
this.connId = connId;
type = EventType.Data;
Expand Down
8 changes: 4 additions & 4 deletions source/Runtime/Common/MessageProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,16 @@ static void ThrowIfBadOpCode(OpCode opcode, bool finished, bool opCodeContinuati
}
else if (!finished)
{
// Fragmented message, should be binary
if (opcode == OpCode.binary)
// Fragmented message, should be binary or text
if (opcode == OpCode.binary || opcode == OpCode.text)
return;

throw new InvalidDataException("Expected opcode to be binary");
throw new InvalidDataException("Expected opcode to be binary or text");
}
else
{
// Normal message, should be binary, text, or close
if (opcode == OpCode.binary || opcode == OpCode.close)
if (opcode == OpCode.binary || opcode == OpCode.text || opcode == OpCode.close)
return;

throw new InvalidDataException($"Unexpected opcode {opcode}");
Expand Down
17 changes: 11 additions & 6 deletions source/Runtime/Common/ReceiveLoop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,17 @@ static void ReadOneMessage(Config config, byte[] buffer)
{
switch (header.opcode)
{
case OpCode.text:
HandleArrayMessage(config, buffer, msgOffset, header.payloadLength, EventType.Text);
break;
case OpCode.binary:
HandleArrayMessage(config, buffer, msgOffset, header.payloadLength);
HandleArrayMessage(config, buffer, msgOffset, header.payloadLength, EventType.Data);
break;
case OpCode.close:
HandleCloseMessage(config, buffer, msgOffset, header.payloadLength);
break;
default:
throw new InvalidDataException($"Unexpected opcode {header.opcode}");
}
}
else
Expand All @@ -138,6 +143,7 @@ static void ReadOneMessage(Config config, byte[] buffer)
Queue<ArrayBuffer> fragments = new Queue<ArrayBuffer>();
fragments.Enqueue(CopyMessageToBuffer(bufferPool, expectMask, buffer, msgOffset, header.payloadLength));
int totalSize = header.payloadLength;
OpCode opcode = header.opcode;

while (!header.finished)
{
Expand All @@ -151,7 +157,6 @@ static void ReadOneMessage(Config config, byte[] buffer)
MessageProcessor.ThrowIfMsgLengthTooLong(totalSize, maxMessageSize);
}


ArrayBuffer msg = bufferPool.Take(totalSize);
msg.count = 0;
while (fragments.Count > 0)
Expand All @@ -166,8 +171,8 @@ static void ReadOneMessage(Config config, byte[] buffer)

// dump after mask off
Log.DumpBuffer($"Message", msg);

queue.Enqueue(new Message(conn.connId, msg));
EventType type = opcode == OpCode.binary ? EventType.Data : EventType.Text;
queue.Enqueue(new Message(conn.connId, msg, type));
}
}

Expand Down Expand Up @@ -209,7 +214,7 @@ static Header ReadHeader(Config config, byte[] buffer, bool opCodeContinuation =
return header;
}

static void HandleArrayMessage(Config config, byte[] buffer, int msgOffset, int payloadLength)
static void HandleArrayMessage(Config config, byte[] buffer, int msgOffset, int payloadLength, EventType type)
{
(Connection conn, int _, bool expectMask, ConcurrentQueue<Message> queue, BufferPool bufferPool) = config;

Expand All @@ -218,7 +223,7 @@ static void HandleArrayMessage(Config config, byte[] buffer, int msgOffset, int
// dump after mask off
Log.DumpBuffer($"Message", arrayBuffer);

queue.Enqueue(new Message(conn.connId, arrayBuffer));
queue.Enqueue(new Message(conn.connId, arrayBuffer, type));
}

static ArrayBuffer CopyMessageToBuffer(BufferPool bufferPool, bool expectMask, byte[] buffer, int msgOffset, int payloadLength)
Expand Down
10 changes: 10 additions & 0 deletions source/Runtime/Server/SimpleWebServer.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using UnityEngine;

namespace JamesFrowen.SimpleWeb
Expand All @@ -10,6 +11,7 @@ public class SimpleWebServer
public event Action<int> onConnect;
public event Action<int> onDisconnect;
public event Action<int, ArraySegment<byte>> onData;
public event Action<int, string> onText;
public event Action<int, Exception> onError;

readonly int maxMessagesPerTick;
Expand Down Expand Up @@ -133,8 +135,16 @@ public void ProcessMessageQueue(MonoBehaviour behaviour)
break;
case EventType.Data:
onData?.Invoke(next.connId, next.data.ToSegment());
break;
case EventType.Text:
if (onText != null)
{
string messageText = Encoding.UTF8.GetString(next.data.array, 0, next.data.count);
onText.Invoke(next.connId, messageText);
}
next.data.Release();
break;

case EventType.Disconnected:
onDisconnect?.Invoke(next.connId);
break;
Expand Down
23 changes: 23 additions & 0 deletions tests/Runtime/Server/ReceiveMessageTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,29 @@ public IEnumerator ReceiveManyArrays()
}
}

[UnityTest]
public IEnumerator ReceiveManyTextMessages()
{
// Don't worry about result, run will timeout by itself
_ = RunNode.RunAsync("SendManyTextMessages.js");

yield return server.WaitForConnection;

// Wait for messages
yield return new WaitForSeconds(0.5f);
const int expectedCount = 100;

Assert.That(server.onData, Has.Count.EqualTo(expectedCount), $"Should have {expectedCount} messages");

for (int i = 0; i < expectedCount; i++)
{
(int connId, string data) = server.onData[i];

Assert.That(connId, Is.EqualTo(1), "Conn id should be 1");

Assert.That(data, Is.EqualTo($"Message {i}"), "Data should match the sent message");
}
}

[UnityTest]
public IEnumerator ReceiveAlmostLargeArrays()
Expand Down
41 changes: 35 additions & 6 deletions tests/Runtime/Server/SendMessageTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public IEnumerator SendOne()
yield return server.WaitForConnection;

byte[] bytes = new byte[] { 1, 2, 3, 4, 5 };
var segment = new ArraySegment<byte>(bytes);
ArraySegment<byte> segment = new ArraySegment<byte>(bytes);

server.ServerSend(new List<int> { 1 }, Channels.DefaultReliable, segment);

Expand Down Expand Up @@ -61,7 +61,7 @@ public IEnumerator SendDifferentSizes(int msgSize)
yield return server.WaitForConnection;

byte[] bytes = Enumerable.Range(1, msgSize).Select(x => (byte)x).ToArray();
var segment = new ArraySegment<byte>(bytes);
ArraySegment<byte> segment = new ArraySegment<byte>(bytes);

server.ServerSend(new List<int> { 1 }, Channels.DefaultReliable, segment);

Expand Down Expand Up @@ -89,11 +89,11 @@ public IEnumerator SendMany()

for (int i = 0; i < messageCount; i++)
{
var writer = new NetworkWriter();
NetworkWriter writer = new NetworkWriter();
writer.WriteByte((byte)i);
writer.WriteInt32(100);

var segment = writer.ToArraySegment();
ArraySegment<byte> segment = writer.ToArraySegment();

server.ServerSend(new List<int> { 1 }, Channels.DefaultReliable, segment);
}
Expand All @@ -113,12 +113,41 @@ public IEnumerator SendMany()
result.AssetErrors();
}

[UnityTest]
public IEnumerator SendManyTextMessages()
{
Task<RunNode.Result> task = RunNode.RunAsync("ReceiveManyMessagesText.js", 5_000);

yield return server.WaitForConnection;
const int messageCount = 100;

for (int i = 0; i < messageCount; i++)
{
string message = $"Message {i}";
server.ServerSendText(new List<int> { 1 }, message);
}

yield return new WaitForSeconds(1);
server.ServerDisconnect(1);

yield return new WaitUntil(() => task.IsCompleted);

RunNode.Result result = task.Result;

IEnumerable<string> expected = Enumerable.Range(0, messageCount).Select(i => $"Received text message: Message {i}");

result.AssetTimeout(false);
result.AssetOutput(expected.ToArray());
result.AssetErrors();

}

[UnityTest]
public IEnumerator ErrorWhenMessageTooBig()
{
yield return null;

var segment = new ArraySegment<byte>(new byte[70_000]);
ArraySegment<byte> segment = new ArraySegment<byte>(new byte[70_000]);

LogAssert.Expect(LogType.Error, "ERROR: <color=red>Message greater than max size</color>");
server.ServerSend(new List<int> { 1 }, Channels.DefaultReliable, segment);
Expand All @@ -129,7 +158,7 @@ public IEnumerator ErrorWhenMessageTooSmall()
{
yield return null;

var segment = new ArraySegment<byte>();
ArraySegment<byte> segment = new ArraySegment<byte>();

LogAssert.Expect(LogType.Error, "ERROR: <color=red>Message count was zero</color>");
server.ServerSend(new List<int> { 1 }, Channels.DefaultReliable, segment);
Expand Down
11 changes: 9 additions & 2 deletions tests/Runtime/TestInstances.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ public void ServerSend(System.Collections.Generic.List<int> connectionIds, int c
ServerSend(id, channelId, segment);
}
}
public void ServerSendText(System.Collections.Generic.List<int> connectionIds, int channelId, string message)
{
foreach (int id in connectionIds)
{
ServerSend(id, channelId, message);
}
}
}
public class ClientTestInstance : SimpleWebTransport, NeedInitTestInstance
{
Expand Down Expand Up @@ -214,7 +221,7 @@ public override void ClientConnect(string hostname)
return;
}

var builder = new UriBuilder
UriBuilder builder = new UriBuilder
{
Scheme = GetClientScheme(),
Host = hostname,
Expand Down Expand Up @@ -353,7 +360,7 @@ public override string ServerGetClientAddress(int connectionId)

public override Uri ServerUri()
{
var builder = new UriBuilder
UriBuilder builder = new UriBuilder
{
Scheme = GetServerScheme(),
Host = Dns.GetHostName(),
Expand Down
34 changes: 34 additions & 0 deletions tests/node~/ReceiveManyMessagesText.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
const WebSocket = require("websocket").w3cwebsocket;

// Create WebSocket connection.
const webSocket = new WebSocket("ws://localhost:7776/");

webSocket.addEventListener('error', function (event) {
console.error('Socket Error', event);
});

// Use ping to keep connection alive
const pingInterval = 1000;
webSocket.addEventListener('open', function (event) {
setInterval(() => {
var buffer = new ArrayBuffer(4);
var view = new Uint8Array(buffer);
for (let i = 0; i < view.length; i++) {
view[i] = i + 10;
}
webSocket.send(buffer);
}, pingInterval);
});

webSocket.addEventListener('message', function (event) {
var message = event.data;
if (typeof message === 'string') {
console.log(`Received text message: ${message}`);
} else {
console.error("Message not a string");
}
});

webSocket.addEventListener('close', function (event) {
process.exit(0);
});
20 changes: 20 additions & 0 deletions tests/node~/SendManyMessagesText.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
const WebSocket = require("websocket").w3cwebsocket;

// Create WebSocket connection.
const webSocket = new WebSocket("ws://localhost:7776/");

webSocket.addEventListener('error', function (event) {
console.error('Socket Error', event);
});

// Connection opened
webSocket.addEventListener('open', function (event) {
// send 100 text messages as fast as possible
for (let i = 0; i < 100; i++) {
webSocket.send(`Message ${i}`);
}
});

webSocket.addEventListener('close', function (event) {
process.exit(0);
});