Skip to content

Commit

Permalink
Adds fills websocket feed (#100)
Browse files Browse the repository at this point in the history
* Adds the decoded events to the fills feed (+1 squashed commits)

Squashed commits:

[5189ee7] WIP: Adds fills websocket feed

(cherry picked from commit 0c7bb97916347684a16d25dd49452ef4d128dcc6)

* Version bump
  • Loading branch information
hoakbuilds authored Apr 25, 2022
1 parent 1d0ee68 commit d6af986
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 11 deletions.
2 changes: 1 addition & 1 deletion SharedBuildProperties.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<Product>Solnet.Mango</Product>
<Version>5.0.4.2</Version>
<Version>5.0.4.3</Version>
<Copyright>Copyright 2022 &#169; blockmountain</Copyright>
<Authors>blockmountain</Authors>
<PublisherName>blockmountain</PublisherName>
Expand Down
42 changes: 34 additions & 8 deletions Solnet.Mango.Examples/HistoricalDataExample.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Console;
using Solnet.Mango.Historical;
using Solnet.Mango.Models.Events;
using System;
using System.Collections.Generic;
using System.Linq;
Expand Down Expand Up @@ -28,28 +29,53 @@ public HistoricalDataExample()
.SetMinimumLevel(LogLevel.Debug);
}).CreateLogger<IMangoHistoricalDataService>();

_mangoHistoricalDataService = new MangoHistoricalDataService( new MangoHistoricalDataServiceConfig
_mangoHistoricalDataService = new MangoHistoricalDataService(new MangoHistoricalDataServiceConfig
{
MangoGroup = "mainnet.1",
}, _logger);
}

public void Run()
{
var spotStats = _mangoHistoricalDataService.GetMarginLendingStats();
var perpStats = _mangoHistoricalDataService.GetPerpStats();
//var spotStats = _mangoHistoricalDataService.GetMarginLendingStats();

var fundingRate = _mangoHistoricalDataService.GetHistoricalFundingRates("SOL-PERP");
//var perpStats = _mangoHistoricalDataService.GetPerpStats();

var volumeInfo = _mangoHistoricalDataService.GetVolume("2TgaaVoHgnSeEtXvWTx13zQeTf4hYWAMEiMQdcG6EwHi");
//var fundingRate = _mangoHistoricalDataService.GetHistoricalFundingRates("SOL-PERP");

var recentTrades = _mangoHistoricalDataService.GetRecentTrades("2TgaaVoHgnSeEtXvWTx13zQeTf4hYWAMEiMQdcG6EwHi");
//var volumeInfo = _mangoHistoricalDataService.GetVolume("2TgaaVoHgnSeEtXvWTx13zQeTf4hYWAMEiMQdcG6EwHi");

var recentPerpTrades = _mangoHistoricalDataService.GetPerpTrades("CGcrpkxyx92vjyQApsr1jTN6M5PeERKSEaH1zskzccRG");
//var recentTrades = _mangoHistoricalDataService.GetRecentTrades("2TgaaVoHgnSeEtXvWTx13zQeTf4hYWAMEiMQdcG6EwHi");

var openOrders = _mangoHistoricalDataService.GetOpenOrders("DBZUDrcXEPNdLaNJZ973w1joCnsa1k4a8hUFVvgCuzGf");
//var recentPerpTrades = _mangoHistoricalDataService.GetPerpTrades("CGcrpkxyx92vjyQApsr1jTN6M5PeERKSEaH1zskzccRG");

//var openOrders = _mangoHistoricalDataService.GetOpenOrders("DBZUDrcXEPNdLaNJZ973w1joCnsa1k4a8hUFVvgCuzGf");

_mangoHistoricalDataService.ConnectionStateChanged += _mangoHistoricalDataService_ConnectionStateChanged;

_mangoHistoricalDataService.SubscribeFillsAsync(
(snapshot) =>
{
foreach (var evt in snapshot.DecodedEvents)
{
Console.WriteLine($"{snapshot.Market} - {evt.Price} {evt.Quantity}");
}
}, (evt) =>
{
Console.WriteLine($"{evt.Market} - {evt.DecodedEvent.Price} {evt.DecodedEvent.Quantity}");
});


Task.Delay(60_000).Wait();

_mangoHistoricalDataService.UnsubscribeFills();

Console.ReadLine();
}

private void _mangoHistoricalDataService_ConnectionStateChanged(object sender, System.Net.WebSockets.WebSocketState e)
{
_logger?.LogDebug(new EventId(), $"Web Socket State changed to {e}");
}
}
}
38 changes: 37 additions & 1 deletion Solnet.Mango.Historical/IMangoHistoricalDataService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Solnet.Mango.Historical.Models;
using System;
using System.Collections.Generic;
using System.Net.WebSockets;
using System.Threading.Tasks;

namespace Solnet.Mango.Historical
Expand All @@ -12,6 +13,42 @@ namespace Solnet.Mango.Historical
/// </summary>
public interface IMangoHistoricalDataService
{
/// <summary>
/// The web socket connection state.
/// </summary>
public WebSocketState State { get; }

/// <summary>
/// Subscribe to the feed of fill events.
/// </summary>
/// <param name="snapshotAction">An action that is called to receive the fills snapshot upon connection.</param>
/// <param name="eventAction">An action that is called whenever there is a new fill event.</param>
void SubscribeFills(Action<FillsSnapshot> snapshotAction, Action<FillsEvent> eventAction);

/// <summary>
/// Subscribe to the feed of fill events. This is an asynchronous operation.
/// </summary>
/// <param name="snapshotAction">An action that is called to receive the fills snapshot upon connection.</param>
/// <param name="eventAction">An action that is called whenever there is a new fill event.</param>
/// <returns>A task which performs the action.</returns>
Task SubscribeFillsAsync(Action<FillsSnapshot> snapshotAction, Action<FillsEvent> eventAction);

/// <summary>
/// Unsubscribes to the feed of fill events and disconnects from the web socket server.
/// </summary>
void UnsubscribeFills();

/// <summary>
/// Unsubscribes to the feed of fill events and disconnects from the web socket server. This is an asynchronous operation.
/// </summary>
/// <returns>A task which performs the action.</returns>
Task UnsubscribeFillsAsync();

/// <summary>
/// An event raised whenever the web socket connection state changes.
/// </summary>
public event EventHandler<WebSocketState> ConnectionStateChanged;

/// <summary>
/// Gets the margin lending stats. This is an asynchronous operation.
/// </summary>
Expand Down Expand Up @@ -123,6 +160,5 @@ public interface IMangoHistoricalDataService

/// <inheritdoc cref="TradingViewProvider.GetSymbol(string)"/>
TvSymbolInfo GetSymbol(string symbol);

}
}
162 changes: 161 additions & 1 deletion Solnet.Mango.Historical/MangoHistoricalDataService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@
using BlockMountain.TradingView.Models;
using Microsoft.Extensions.Logging;
using Solnet.Mango.Historical.Models;
using Solnet.Mango.Models.Events;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;

namespace Solnet.Mango.Historical
Expand All @@ -32,6 +37,11 @@ public class MangoHistoricalDataService : IMangoHistoricalDataService
/// </summary>
private static readonly string EventHistoryApiCandlesBaseUrl = "https://event-history-api-candles.herokuapp.com/";

/// <summary>
/// The base url for the mango fills service.
/// </summary>
private static readonly string MangoFillsBaseUrl = "ws://api.mngo.cloud:8080";

/// <summary>
/// The logger.
/// </summary>
Expand All @@ -47,6 +57,16 @@ public class MangoHistoricalDataService : IMangoHistoricalDataService
/// </summary>
private HttpClient _httpClient;

/// <summary>
/// The websocket client.
/// </summary>
private ClientWebSocket _webSocket;

/// <summary>
///
/// </summary>
private CancellationTokenSource _cancellationTokenSource;

/// <summary>
/// The json serializer options.
/// </summary>
Expand All @@ -63,13 +83,16 @@ public class MangoHistoricalDataService : IMangoHistoricalDataService
/// <param name="config">The config.</param>
/// <param name="logger">A logger instance.</param>
/// <param name="httpClient">An http client.</param>
/// <param name="webSocket">A web socket client.</param>
/// <param name="tradingViewProvider">A trading view provider.</param>
public MangoHistoricalDataService(MangoHistoricalDataServiceConfig config, ILogger logger = null,
HttpClient httpClient = default, ITradingViewProvider tradingViewProvider = null)
HttpClient httpClient = default, ClientWebSocket webSocket = default, ITradingViewProvider tradingViewProvider = null)
{
_logger = logger;
_config = config;
_httpClient = httpClient ?? new HttpClient();
_webSocket = webSocket ?? new ClientWebSocket();
_cancellationTokenSource = new CancellationTokenSource();
_jsonSerializerOptions = new JsonSerializerOptions()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
Expand Down Expand Up @@ -266,5 +289,142 @@ private async Task<T> HandleResponse<T>(HttpResponseMessage message)
_logger?.LogInformation(new EventId(0, "REC"), $"Result: {data}");
return JsonSerializer.Deserialize<T>(data, _jsonSerializerOptions);
}

/// <inheritdoc cref="IMangoHistoricalDataService.SubscribeFillsAsync"/>
public void SubscribeFills(Action<FillsSnapshot> snapshotAction, Action<FillsEvent> eventAction)
=> SubscribeFillsAsync(snapshotAction, eventAction).Wait();

/// <inheritdoc cref="IMangoHistoricalDataService.SubscribeFillsAsync"/>
public async Task SubscribeFillsAsync(Action<FillsSnapshot> snapshotAction, Action<FillsEvent> eventAction)
{
await _webSocket.ConnectAsync(new Uri(MangoFillsBaseUrl), _cancellationTokenSource.Token);
ConnectionStateChanged?.Invoke(this, State);

Memory<byte> data;

while(_webSocket.State == WebSocketState.Open)
{
try
{
data = await ReadMessage(_cancellationTokenSource.Token);

Console.WriteLine($"{Encoding.UTF8.GetString(data.ToArray())}");
var t = HandleMessage(data);

if (t == -1) continue;

switch (t)
{
case 0:
var snapshot = JsonSerializer.Deserialize<FillsSnapshot>(data.Span, _jsonSerializerOptions);
snapshot.DecodedEvents = new(snapshot.Events.Count);
foreach(var e in snapshot.Events)
{
snapshot.DecodedEvents.Add(FillEvent.Deserialize(Convert.FromBase64String(e)));
}
snapshotAction(snapshot);
break;
case 1:
var evt = JsonSerializer.Deserialize<FillsEvent>(data.Span, _jsonSerializerOptions);
evt.DecodedEvent = FillEvent.Deserialize(Convert.FromBase64String(evt.Event));
eventAction(evt);
break;
}
}
catch (Exception e)
{
_logger?.LogDebug(new EventId(), e, "Exception trying to read next message.");
}
}
_logger?.LogDebug(new EventId(), $"Stopped reading messages. Web Socket State changed to {_webSocket.State}");
ConnectionStateChanged?.Invoke(this, State);
}

/// <inheritdoc cref="IMangoHistoricalDataService.UnsubscribeFills"/>
public void UnsubscribeFills() => UnsubscribeFillsAsync().Wait();

/// <inheritdoc cref="IMangoHistoricalDataService.UnsubscribeFillsAsync"/>
public async Task UnsubscribeFillsAsync()
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "User requested closure.", _cancellationTokenSource.Token);
ConnectionStateChanged?.Invoke(this, State);
}

/// <inheritdoc cref="IMangoHistoricalDataService.ConnectionStateChanged"/>
public event EventHandler<WebSocketState> ConnectionStateChanged;

/// <inheritdoc cref="IMangoHistoricalDataService.State"/>
public WebSocketState State => _webSocket.State;

private async Task<Memory<byte>> ReadMessage(CancellationToken cancellationToken = default)
{
var buffer = new byte[32768];
Memory<byte> mem = new(buffer);
ValueWebSocketReceiveResult result = await _webSocket.ReceiveAsync(mem, cancellationToken).ConfigureAwait(false);
int count = result.Count;

if (result.MessageType == WebSocketMessageType.Close)
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken);
}
else
{
if (!result.EndOfMessage)
{
MemoryStream ms = new ();
ms.Write(mem.Span);


while (!result.EndOfMessage)
{
result = await _webSocket.ReceiveAsync(mem, cancellationToken).ConfigureAwait(false);
ms.Write(mem[..result.Count].Span);
count += result.Count;
}

mem = new Memory<byte>(ms.ToArray());
}
else
{
mem = mem[..count];
}

return mem;
}

return null;
}

private int HandleMessage(Memory<byte> data)
{
Utf8JsonReader jsonReader = new Utf8JsonReader(data.Span);
jsonReader.Read();

if (_logger?.IsEnabled(LogLevel.Information) ?? false)
{
var str = Encoding.UTF8.GetString(data.Span);
_logger?.LogInformation($"[Received] {str}");
}

while (jsonReader.Read())
{
switch (jsonReader.TokenType)
{
case JsonTokenType.PropertyName:
var prop = jsonReader.GetString();
if (prop == "events")
{
return 0;
}
else if(prop == "event")
{
return 1;
}
break;
}
}

return -1;
}
}
}
51 changes: 51 additions & 0 deletions Solnet.Mango.Historical/Models/FillsEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using Solnet.Mango.Models.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Solnet.Mango.Historical.Models
{
/// <summary>
/// Represents a fills event from the websocket connection to the Mango Fills Service.
/// <remarks>For more information: https://docs.mango.markets/development-resources/client-libraries/fills-websocket-feed</remarks>
/// </summary>
public class FillsEvent
{
/// <summary>
/// The event.
/// </summary>
public string Event { get; set; }

/// <summary>
/// The decoded event.
/// </summary>
public FillEvent DecodedEvent { get; set; }

/// <summary>
/// The market name.
/// </summary>
public string Market { get; set; }

/// <summary>
/// The market's event queue.
/// </summary>
public string Queue { get; set; }

/// <summary>
/// The status of the event.
/// </summary>
public string Status { get; set; }

/// <summary>
/// The slot of the event.
/// </summary>
public ulong Slot { get; set; }

/// <summary>
/// The write version.
/// </summary>
public int WriteVersion { get; set; }
}
}
Loading

0 comments on commit d6af986

Please sign in to comment.