diff --git a/SharedBuildProperties.props b/SharedBuildProperties.props index 3faf967..647661d 100644 --- a/SharedBuildProperties.props +++ b/SharedBuildProperties.props @@ -2,7 +2,7 @@ xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> Solnet.Mango - 5.0.4.2 + 5.0.4.3 Copyright 2022 © blockmountain blockmountain blockmountain diff --git a/Solnet.Mango.Examples/HistoricalDataExample.cs b/Solnet.Mango.Examples/HistoricalDataExample.cs index af5f42b..284df6d 100644 --- a/Solnet.Mango.Examples/HistoricalDataExample.cs +++ b/Solnet.Mango.Examples/HistoricalDataExample.cs @@ -1,6 +1,7 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Console; using Solnet.Mango.Historical; +using Solnet.Mango.Models.Events; using System; using System.Collections.Generic; using System.Linq; @@ -28,7 +29,7 @@ public HistoricalDataExample() .SetMinimumLevel(LogLevel.Debug); }).CreateLogger(); - _mangoHistoricalDataService = new MangoHistoricalDataService( new MangoHistoricalDataServiceConfig + _mangoHistoricalDataService = new MangoHistoricalDataService(new MangoHistoricalDataServiceConfig { MangoGroup = "mainnet.1", }, _logger); @@ -36,20 +37,45 @@ public HistoricalDataExample() public void Run() { - var spotStats = _mangoHistoricalDataService.GetMarginLendingStats(); - var perpStats = _mangoHistoricalDataService.GetPerpStats(); + //var spotStats = _mangoHistoricalDataService.GetMarginLendingStats(); - var fundingRate = _mangoHistoricalDataService.GetHistoricalFundingRates("SOL-PERP"); + //var perpStats = _mangoHistoricalDataService.GetPerpStats(); - var volumeInfo = _mangoHistoricalDataService.GetVolume("2TgaaVoHgnSeEtXvWTx13zQeTf4hYWAMEiMQdcG6EwHi"); + //var fundingRate = _mangoHistoricalDataService.GetHistoricalFundingRates("SOL-PERP"); - var recentTrades = _mangoHistoricalDataService.GetRecentTrades("2TgaaVoHgnSeEtXvWTx13zQeTf4hYWAMEiMQdcG6EwHi"); + //var volumeInfo = _mangoHistoricalDataService.GetVolume("2TgaaVoHgnSeEtXvWTx13zQeTf4hYWAMEiMQdcG6EwHi"); - var recentPerpTrades = _mangoHistoricalDataService.GetPerpTrades("CGcrpkxyx92vjyQApsr1jTN6M5PeERKSEaH1zskzccRG"); + //var recentTrades = _mangoHistoricalDataService.GetRecentTrades("2TgaaVoHgnSeEtXvWTx13zQeTf4hYWAMEiMQdcG6EwHi"); - var openOrders = _mangoHistoricalDataService.GetOpenOrders("DBZUDrcXEPNdLaNJZ973w1joCnsa1k4a8hUFVvgCuzGf"); + //var recentPerpTrades = _mangoHistoricalDataService.GetPerpTrades("CGcrpkxyx92vjyQApsr1jTN6M5PeERKSEaH1zskzccRG"); + + //var openOrders = _mangoHistoricalDataService.GetOpenOrders("DBZUDrcXEPNdLaNJZ973w1joCnsa1k4a8hUFVvgCuzGf"); + + _mangoHistoricalDataService.ConnectionStateChanged += _mangoHistoricalDataService_ConnectionStateChanged; + + _mangoHistoricalDataService.SubscribeFillsAsync( + (snapshot) => + { + foreach (var evt in snapshot.DecodedEvents) + { + Console.WriteLine($"{snapshot.Market} - {evt.Price} {evt.Quantity}"); + } + }, (evt) => + { + Console.WriteLine($"{evt.Market} - {evt.DecodedEvent.Price} {evt.DecodedEvent.Quantity}"); + }); + + + Task.Delay(60_000).Wait(); + + _mangoHistoricalDataService.UnsubscribeFills(); Console.ReadLine(); } + + private void _mangoHistoricalDataService_ConnectionStateChanged(object sender, System.Net.WebSockets.WebSocketState e) + { + _logger?.LogDebug(new EventId(), $"Web Socket State changed to {e}"); + } } } diff --git a/Solnet.Mango.Historical/IMangoHistoricalDataService.cs b/Solnet.Mango.Historical/IMangoHistoricalDataService.cs index 1cb8463..2de48a0 100644 --- a/Solnet.Mango.Historical/IMangoHistoricalDataService.cs +++ b/Solnet.Mango.Historical/IMangoHistoricalDataService.cs @@ -3,6 +3,7 @@ using Solnet.Mango.Historical.Models; using System; using System.Collections.Generic; +using System.Net.WebSockets; using System.Threading.Tasks; namespace Solnet.Mango.Historical @@ -12,6 +13,42 @@ namespace Solnet.Mango.Historical /// public interface IMangoHistoricalDataService { + /// + /// The web socket connection state. + /// + public WebSocketState State { get; } + + /// + /// Subscribe to the feed of fill events. + /// + /// An action that is called to receive the fills snapshot upon connection. + /// An action that is called whenever there is a new fill event. + void SubscribeFills(Action snapshotAction, Action eventAction); + + /// + /// Subscribe to the feed of fill events. This is an asynchronous operation. + /// + /// An action that is called to receive the fills snapshot upon connection. + /// An action that is called whenever there is a new fill event. + /// A task which performs the action. + Task SubscribeFillsAsync(Action snapshotAction, Action eventAction); + + /// + /// Unsubscribes to the feed of fill events and disconnects from the web socket server. + /// + void UnsubscribeFills(); + + /// + /// Unsubscribes to the feed of fill events and disconnects from the web socket server. This is an asynchronous operation. + /// + /// A task which performs the action. + Task UnsubscribeFillsAsync(); + + /// + /// An event raised whenever the web socket connection state changes. + /// + public event EventHandler ConnectionStateChanged; + /// /// Gets the margin lending stats. This is an asynchronous operation. /// @@ -123,6 +160,5 @@ public interface IMangoHistoricalDataService /// TvSymbolInfo GetSymbol(string symbol); - } } diff --git a/Solnet.Mango.Historical/MangoHistoricalDataService.cs b/Solnet.Mango.Historical/MangoHistoricalDataService.cs index 257fc2c..69c8fb2 100644 --- a/Solnet.Mango.Historical/MangoHistoricalDataService.cs +++ b/Solnet.Mango.Historical/MangoHistoricalDataService.cs @@ -2,12 +2,17 @@ using BlockMountain.TradingView.Models; using Microsoft.Extensions.Logging; using Solnet.Mango.Historical.Models; +using Solnet.Mango.Models.Events; using System; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Net.Http; +using System.Net.WebSockets; +using System.Text; using System.Text.Json; using System.Text.Json.Serialization; +using System.Threading; using System.Threading.Tasks; namespace Solnet.Mango.Historical @@ -32,6 +37,11 @@ public class MangoHistoricalDataService : IMangoHistoricalDataService /// private static readonly string EventHistoryApiCandlesBaseUrl = "https://event-history-api-candles.herokuapp.com/"; + /// + /// The base url for the mango fills service. + /// + private static readonly string MangoFillsBaseUrl = "ws://api.mngo.cloud:8080"; + /// /// The logger. /// @@ -47,6 +57,16 @@ public class MangoHistoricalDataService : IMangoHistoricalDataService /// private HttpClient _httpClient; + /// + /// The websocket client. + /// + private ClientWebSocket _webSocket; + + /// + /// + /// + private CancellationTokenSource _cancellationTokenSource; + /// /// The json serializer options. /// @@ -63,13 +83,16 @@ public class MangoHistoricalDataService : IMangoHistoricalDataService /// The config. /// A logger instance. /// An http client. + /// A web socket client. /// A trading view provider. public MangoHistoricalDataService(MangoHistoricalDataServiceConfig config, ILogger logger = null, - HttpClient httpClient = default, ITradingViewProvider tradingViewProvider = null) + HttpClient httpClient = default, ClientWebSocket webSocket = default, ITradingViewProvider tradingViewProvider = null) { _logger = logger; _config = config; _httpClient = httpClient ?? new HttpClient(); + _webSocket = webSocket ?? new ClientWebSocket(); + _cancellationTokenSource = new CancellationTokenSource(); _jsonSerializerOptions = new JsonSerializerOptions() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase, @@ -266,5 +289,142 @@ private async Task HandleResponse(HttpResponseMessage message) _logger?.LogInformation(new EventId(0, "REC"), $"Result: {data}"); return JsonSerializer.Deserialize(data, _jsonSerializerOptions); } + + /// + public void SubscribeFills(Action snapshotAction, Action eventAction) + => SubscribeFillsAsync(snapshotAction, eventAction).Wait(); + + /// + public async Task SubscribeFillsAsync(Action snapshotAction, Action eventAction) + { + await _webSocket.ConnectAsync(new Uri(MangoFillsBaseUrl), _cancellationTokenSource.Token); + ConnectionStateChanged?.Invoke(this, State); + + Memory data; + + while(_webSocket.State == WebSocketState.Open) + { + try + { + data = await ReadMessage(_cancellationTokenSource.Token); + + Console.WriteLine($"{Encoding.UTF8.GetString(data.ToArray())}"); + var t = HandleMessage(data); + + if (t == -1) continue; + + switch (t) + { + case 0: + var snapshot = JsonSerializer.Deserialize(data.Span, _jsonSerializerOptions); + snapshot.DecodedEvents = new(snapshot.Events.Count); + foreach(var e in snapshot.Events) + { + snapshot.DecodedEvents.Add(FillEvent.Deserialize(Convert.FromBase64String(e))); + } + snapshotAction(snapshot); + break; + case 1: + var evt = JsonSerializer.Deserialize(data.Span, _jsonSerializerOptions); + evt.DecodedEvent = FillEvent.Deserialize(Convert.FromBase64String(evt.Event)); + eventAction(evt); + break; + } + } + catch (Exception e) + { + _logger?.LogDebug(new EventId(), e, "Exception trying to read next message."); + } + } + _logger?.LogDebug(new EventId(), $"Stopped reading messages. Web Socket State changed to {_webSocket.State}"); + ConnectionStateChanged?.Invoke(this, State); + } + + /// + public void UnsubscribeFills() => UnsubscribeFillsAsync().Wait(); + + /// + public async Task UnsubscribeFillsAsync() + { + await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "User requested closure.", _cancellationTokenSource.Token); + ConnectionStateChanged?.Invoke(this, State); + } + + /// + public event EventHandler ConnectionStateChanged; + + /// + public WebSocketState State => _webSocket.State; + + private async Task> ReadMessage(CancellationToken cancellationToken = default) + { + var buffer = new byte[32768]; + Memory mem = new(buffer); + ValueWebSocketReceiveResult result = await _webSocket.ReceiveAsync(mem, cancellationToken).ConfigureAwait(false); + int count = result.Count; + + if (result.MessageType == WebSocketMessageType.Close) + { + await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken); + } + else + { + if (!result.EndOfMessage) + { + MemoryStream ms = new (); + ms.Write(mem.Span); + + + while (!result.EndOfMessage) + { + result = await _webSocket.ReceiveAsync(mem, cancellationToken).ConfigureAwait(false); + ms.Write(mem[..result.Count].Span); + count += result.Count; + } + + mem = new Memory(ms.ToArray()); + } + else + { + mem = mem[..count]; + } + + return mem; + } + + return null; + } + + private int HandleMessage(Memory data) + { + Utf8JsonReader jsonReader = new Utf8JsonReader(data.Span); + jsonReader.Read(); + + if (_logger?.IsEnabled(LogLevel.Information) ?? false) + { + var str = Encoding.UTF8.GetString(data.Span); + _logger?.LogInformation($"[Received] {str}"); + } + + while (jsonReader.Read()) + { + switch (jsonReader.TokenType) + { + case JsonTokenType.PropertyName: + var prop = jsonReader.GetString(); + if (prop == "events") + { + return 0; + } + else if(prop == "event") + { + return 1; + } + break; + } + } + + return -1; + } } } diff --git a/Solnet.Mango.Historical/Models/FillsEvent.cs b/Solnet.Mango.Historical/Models/FillsEvent.cs new file mode 100644 index 0000000..7975a48 --- /dev/null +++ b/Solnet.Mango.Historical/Models/FillsEvent.cs @@ -0,0 +1,51 @@ +using Solnet.Mango.Models.Events; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Solnet.Mango.Historical.Models +{ + /// + /// Represents a fills event from the websocket connection to the Mango Fills Service. + /// For more information: https://docs.mango.markets/development-resources/client-libraries/fills-websocket-feed + /// + public class FillsEvent + { + /// + /// The event. + /// + public string Event { get; set; } + + /// + /// The decoded event. + /// + public FillEvent DecodedEvent { get; set; } + + /// + /// The market name. + /// + public string Market { get; set; } + + /// + /// The market's event queue. + /// + public string Queue { get; set; } + + /// + /// The status of the event. + /// + public string Status { get; set; } + + /// + /// The slot of the event. + /// + public ulong Slot { get; set; } + + /// + /// The write version. + /// + public int WriteVersion { get; set; } + } +} diff --git a/Solnet.Mango.Historical/Models/FillsSnapshot.cs b/Solnet.Mango.Historical/Models/FillsSnapshot.cs new file mode 100644 index 0000000..58482f7 --- /dev/null +++ b/Solnet.Mango.Historical/Models/FillsSnapshot.cs @@ -0,0 +1,37 @@ +using Solnet.Mango.Models.Events; +using System.Collections.Generic; + +namespace Solnet.Mango.Historical.Models +{ + /// + /// Represents a fills snapshot from the websocket connection to the Mango Fills Service. + /// For more information: https://docs.mango.markets/development-resources/client-libraries/fills-websocket-feed + /// + public class FillsSnapshot + { + /// + /// The list of events in the snapshot. + /// + public List Events { get; set; } + + /// + /// The decoded events. + /// + public List DecodedEvents { get; set; } + + /// + /// The market of the snapshot. + /// + public string Market { get; set; } + + /// + /// The slot of the snapshot. + /// + public ulong Slot { get; set; } + + /// + /// The write version. + /// + public int WriteVersion { get; set; } + } +}