Skip to content

Getting Started (Websockets)

Nikita Moshkalov edited this page Apr 7, 2021 · 3 revisions

Here is production code that was sent to us by @CLTanuki for hosing a websocket server.

Something is cut out of here, but we are using this implementation for 18 months. Main logic is in this part: string returnString = await JsonRpcProcessor.Process(message, connectionId);

internal class RPCServer : IDisposable
{
    private static CancellationTokenSource m_cancellation;
    private static HttpListener m_listener;

    private static readonly string Url = $"http://{Settings.Default.Host}:{Settings.Default.Port}/";

    public static Dictionary<Guid, Tuple<HttpListenerWebSocketContext, Guid?>> Clients = new Dictionary<Guid, Tuple<HttpListenerWebSocketContext, Guid?>>();

    static readonly object[] Services =
    {
        new DirectoryService(),
        new OrderService(),
        new CommonService(),
        new AuthService(),
        new SettingsService(),
        new GuestService()
    };

    public RPCServer()
    {
        m_listener = new HttpListener();
        m_listener.Prefixes.Add(Url);
        m_listener.Start();

        m_cancellation = new CancellationTokenSource();
        Task.Run(() => AcceptWebSocketClientsAsync(m_listener, m_cancellation.Token));
    }

    #region Private Methods

    private static async Task AcceptWebSocketClientsAsync(HttpListener listener, CancellationToken token)
    {
        while (!token.IsCancellationRequested)
        {
            try
            {
                var context = await listener.GetContextAsync();
                if (!context.Request.IsWebSocketRequest)
                {
                    HttpListenerResponse response = context.Response;

                    // Construct a response.
                    StringBuilder message = new StringBuilder();
                    message.Append("<HTML><BODY>");
                    message.Append("<p>HTTP NOT ALLOWED</p>");
                    message.Append("</BODY></HTML>");
                    byte[] buffer = Encoding.UTF8.GetBytes(message.ToString());

                    // Get a response stream and write the response to it.
                    response.ContentLength64 = buffer.Length;
                    response.StatusCode = 403;
                    Stream output = response.OutputStream;
                    output.Write(buffer, 0, buffer.Length);

                    // You must close the output stream.
                    output.Close();
                    response.Close();
                }
                else
                {
                    var ws = await context.AcceptWebSocketAsync(null, new TimeSpan(0, 0, 3)).ConfigureAwait(false);
                    if (ws != null)
                    {
                        Guid connectionId = Guid.NewGuid();
                        Clients.Add(connectionId, new Tuple<HttpListenerWebSocketContext, Guid?>(ws, null));

                        Task.Run(() => HandleConnectionAsync(ws.WebSocket, token, connectionId));
                    }
                    else
                    {
                    }
                }
            }
            catch (Exception ex)
            {
            }
        }
    }

    private static async Task HandleConnectionAsync(WebSocket ws, CancellationToken cancellation, Guid connectionId)
    {
        try
        {
            while (ws.State == WebSocketState.Open && !cancellation.IsCancellationRequested)
            {
                String message = await ReadString(ws).ConfigureAwait(false);

                if (message.Contains("method"))
                {
                    string returnString = await JsonRpcProcessor.Process(message, connectionId);
                    if (returnString.Length != 0)
                    {
                        ArraySegment<byte> outputBuffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes(returnString));
                        if (ws.State == WebSocketState.Open)
                        {
                            await ws.SendAsync(outputBuffer, WebSocketMessageType.Text, true, cancellation).ConfigureAwait(false);
                        }
                    }
                }
            }

            await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "Done", CancellationToken.None);
        }
        catch (Exception ex)
        {
            try
            {
                await ws.CloseAsync(WebSocketCloseStatus.InternalServerError, "Done", CancellationToken.None).ConfigureAwait(false);
            }
            catch
            {
                // Do nothing
            }
        }
        finally
        {
            Tuple<HttpListenerWebSocketContext, Guid?> client;
            Clients.TryGetValue(connectionId, out client);
            if (client != null)
            {
                Clients.Remove(connectionId);
            }
            ws.Dispose();
        }
    }

    private static async Task<String> ReadString(WebSocket ws)
    {
        ArraySegment<Byte> buffer = new ArraySegment<byte>(new Byte[8192]);

        WebSocketReceiveResult result = null;

        using (var ms = new MemoryStream())
        {
            do
            {
                result = await ws.ReceiveAsync(buffer, CancellationToken.None);
                ms.Write(buffer.Array, buffer.Offset, result.Count);
            }
            while (!result.EndOfMessage);

            ms.Seek(0, SeekOrigin.Begin);

            using (var reader = new StreamReader(ms, Encoding.UTF8))
            {
                return reader.ReadToEnd();
            }
        }
    }

    #endregion

    public static SSTConfig TryAuthorizeConnection(Guid connectionId, Guid sstId, string mac, WorkingState[] states)
    {
        var config = ConfigStorage.SstConfigs.FirstOrDefault(c => c.Id == sstId);
        if (config == null) return null;
        if (!Clients.TryGetValue(connectionId, out var client))
            return null;

        client = new Tuple<HttpListenerWebSocketContext, Guid?>(client.Item1, config.Id);

        Clients[connectionId] = client;

        return config;
    }

    public static async Task Notify(string rpcMethod, object rpcParams)
    {
        JsonNotification request = new JsonNotification
        {
            Method = rpcMethod,
            Params = rpcParams
        };

        string notification = JsonConvert.SerializeObject(request);

        foreach (var client in Clients)
        {
            ArraySegment<byte> outputBuffer =
                new ArraySegment<byte>(Encoding.UTF8.GetBytes(notification));

            var context = client.Value.Item1;
            if (context.WebSocket.State == WebSocketState.Open)
            {
                try
                {
                    await client.Value.Item1.WebSocket.SendAsync(outputBuffer, WebSocketMessageType.Text, true,
                        CancellationToken.None);
                }
            }
        }
    }

    public static async Task NotifyClient(Guid clientId, string rpcMethod, object rpcParams)
    {
        JsonNotification request = new JsonNotification
        {
            Method = rpcMethod,
            Params = rpcParams
        };

        string notification = JsonConvert.SerializeObject(request);

        foreach (var client in Clients.Where(p=> p.Key == clientId))
        {
            ArraySegment<byte> outputBuffer =
                                new ArraySegment<byte>(Encoding.UTF8.GetBytes(notification));
            var context = client.Value.Item1;
            if (context.WebSocket.State == WebSocketState.Open)
            {
                try
                {
                    await client.Value.Item1.WebSocket.SendAsync(outputBuffer, WebSocketMessageType.Text, true,
                        CancellationToken.None);
                }
            }
        }
    }

    public void Dispose()
    {
        if (m_listener != null && m_cancellation != null)
        {
            try
            {
                m_cancellation.Cancel();

                m_listener.Stop();

                m_listener = null;
                m_cancellation = null;
            }
            catch
            {
                // Log error
            }
        }
    }
}

internal class JsonNotification
{
    public JsonNotification() { }

    [JsonProperty("jsonrpc")]
    public string JsonRpc => "2.0";

    [JsonProperty("method")]
    public string Method { get; set; }

    [JsonProperty("params", NullValueHandling = NullValueHandling.Ignore)]
    public object Params { get; set; }
}