Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket - cycles events #120

Merged
merged 12 commits into from
Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Tzkt.Api/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
using Tzkt.Api.Websocket.Hubs;
using Tzkt.Api.Websocket.Processors;
using Tzkt.Data;

using System.Text.Json.Serialization;

namespace Tzkt.Api
{
public class Startup
Expand Down Expand Up @@ -92,6 +93,9 @@ public void ConfigureServices(IServiceCollection services)
services.AddTransient<HeadProcessor<DefaultHub>>();
services.AddTransient<IHubProcessor, HeadProcessor<DefaultHub>>();

services.AddTransient<CyclesProcessor<DefaultHub>>();
services.AddTransient<IHubProcessor, CyclesProcessor<DefaultHub>>();

services.AddTransient<BlocksProcessor<DefaultHub>>();
services.AddTransient<IHubProcessor, BlocksProcessor<DefaultHub>>();

Expand Down Expand Up @@ -123,6 +127,7 @@ public void ConfigureServices(IServiceCollection services)
jsonOptions.PayloadSerializerOptions.Converters.Add(new DateTimeConverter());
jsonOptions.PayloadSerializerOptions.Converters.Add(new OperationConverter());
jsonOptions.PayloadSerializerOptions.Converters.Add(new OperationErrorConverter());
jsonOptions.PayloadSerializerOptions.Converters.Add(new JsonStringEnumConverter());
});
}
#endregion
Expand Down
40 changes: 40 additions & 0 deletions Tzkt.Api/Swagger/WsSubscriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,46 @@ await connection.invoke("SubscribeToHead");

---

## SubscribeToCycle

Sends the blockchain head every time cycle is changed.

### Method

`SubscribeToCycles`

### Channel

`cycles`

### Parameters

````js
{
delayBlocks: 2, // number of blocks to delay cycle changed notification
// minimum 2, defaults to 2
quote: "eur,usd" // comma-separated list of ticker symbols to inject historical prices into response
// defaults to "none"
}
````

### Data model

Same as in [/cycle](#operation/Cycles_GetByIndex)

### State

State contains cycle (`int`) of the last processed cycle.

### Example

````js
connection.on("cycles", (msg) => { console.log(msg); });
await connection.invoke("SubscribeToCycles");
````

---

## SubscribeToBlocks

Sends blocks added to the blockchain
Expand Down
10 changes: 10 additions & 0 deletions Tzkt.Api/Websocket/Hubs/DefaultHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Tzkt.Api.Websocket.Hubs
public class DefaultHub : BaseHub
{
readonly HeadProcessor<DefaultHub> Head;
readonly CyclesProcessor<DefaultHub> Cycle;
readonly BlocksProcessor<DefaultHub> Blocks;
readonly OperationsProcessor<DefaultHub> Operations;
readonly BigMapsProcessor<DefaultHub> BigMaps;
Expand All @@ -18,6 +19,7 @@ public class DefaultHub : BaseHub

public DefaultHub(
HeadProcessor<DefaultHub> head,
CyclesProcessor<DefaultHub> cycle,
BlocksProcessor<DefaultHub> blocks,
OperationsProcessor<DefaultHub> operations,
BigMapsProcessor<DefaultHub> bigMaps,
Expand All @@ -28,6 +30,7 @@ public DefaultHub(
IConfiguration config) : base(logger, config)
{
Head = head;
Cycle = cycle;
Blocks = blocks;
Operations = operations;
BigMaps = bigMaps;
Expand All @@ -41,6 +44,13 @@ public Task<int> SubscribeToHead()
return Head.Subscribe(Clients.Caller, Context.ConnectionId);
}

public Task<int> SubscribeToCycles(CyclesParameter parameters = null)
{
parameters ??= new();
parameters.EnsureValid();
return Cycle.Subscribe(Clients.Caller, Context.ConnectionId, parameters);
}

public Task<int> SubscribeToBlocks()
{
return Blocks.Subscribe(Clients.Caller, Context.ConnectionId);
Expand Down
19 changes: 19 additions & 0 deletions Tzkt.Api/Websocket/Parameters/CyclesParameter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using Microsoft.AspNetCore.SignalR;
using Newtonsoft.Json.Converters;
using System.Text.Json.Serialization;
using Tzkt.Api.Models;

namespace Tzkt.Api.Websocket
{
public class CyclesParameter
{
public int DelayBlocks { get; set; } = 2; // 2 to cover possible reorganization
public Symbols Quote { get; set; } = Symbols.None;

public void EnsureValid()
{
if (DelayBlocks < 2)
throw new HubException("DelayBlocks has to greater than or equal to 2.");
}
}
}
151 changes: 151 additions & 0 deletions Tzkt.Api/Websocket/Processors/CyclesProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
using System;
using System.Linq;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;

using Tzkt.Api.Repositories;
using Tzkt.Api.Services.Cache;
using Tzkt.Api.Models;

namespace Tzkt.Api.Websocket.Processors
{
public class CyclesProcessor<T> : IHubProcessor where T : Hub
{
#region static
const string CycleGroup = "cycles";
const string CycleChannel = "cycles";
static readonly SemaphoreSlim Sema = new(1, 1);
static readonly Dictionary<int, HashSet<string>> DelaySubs = new();
static readonly Dictionary<string, Symbols> QuoteSubs = new();
#endregion

readonly StateCache StateCache;
readonly CyclesRepository CyclesRepo;
readonly IHubContext<T> Context;
readonly ILogger Logger;

public CyclesProcessor(StateCache cache, CyclesRepository repo, IHubContext<T> hubContext, ILogger<CyclesProcessor<T>> logger)
{
StateCache = cache;
CyclesRepo = repo;
Context = hubContext;
Logger = logger;
}

public async Task OnStateChanged()
{
var sendings = new List<Task>(2);
try
{
await Sema.WaitAsync();

var cycleData = await CyclesRepo.Get(StateCache.Current.Cycle, Symbols.None);
Groxan marked this conversation as resolved.
Show resolved Hide resolved

// we notify only group of clients with matching delay
if (DelaySubs.TryGetValue(StateCache.Current.Level - cycleData.FirstLevel, out var connections))
{
foreach (var connectionId in connections)
{
var quote = QuoteSubs[connectionId];
var cycleDataWithQuotes = quote == Symbols.None ? cycleData : await CyclesRepo.Get(StateCache.Current.Cycle, quote);
Groxan marked this conversation as resolved.
Show resolved Hide resolved
sendings.Add(Context.Clients
.Client(connectionId)
.SendData(CycleChannel, cycleDataWithQuotes, StateCache.Current.Cycle));
}
Logger.LogDebug("Cycle {0} sent", StateCache.Current.Cycle);
}
}
catch (Exception ex)
{
Logger.LogError("Failed to process state change: {0}", ex.Message);
}
finally
{
Sema.Release();
#region await sendings
try
{
await Task.WhenAll(sendings);
}
catch (Exception ex)
{
// should never get here
Logger.LogError("Sendings failed: {0}", ex.Message);
}
#endregion
}
}

public async Task<int> Subscribe(IClientProxy client, string connectionId, CyclesParameter parameter)
{
Task sending = Task.CompletedTask;
try
{
await Sema.WaitAsync();
Logger.LogDebug("New subscription...");

if (!DelaySubs.TryGetValue(parameter.DelayBlocks, out var delaySub))
{
delaySub = new(4);
DelaySubs.Add(parameter.DelayBlocks, delaySub);
}
QuoteSubs[connectionId] = parameter.Quote;
delaySub.Add(connectionId);

await Context.Groups.AddToGroupAsync(connectionId, CycleGroup);
sending = client.SendState(CycleChannel, StateCache.Current.Cycle);

Logger.LogDebug("Client {0} subscribed with state {1}", connectionId, StateCache.Current.Cycle);
return StateCache.Current.Level;
}
catch (Exception ex)
{
Logger.LogError("Failed to add subscription: {0}", ex.Message);
return 0;
}
finally
{
Sema.Release();
try
{
await sending;
}
catch (Exception ex)
{
// should never get here
Logger.LogError("Sending failed: {0}", ex.Message);
}
}
}

public void Unsubscribe(string connectionId)
{
try
{
Sema.Wait();
Logger.LogDebug("Remove subscription...");

foreach (var (key, value) in DelaySubs)
{
value.Remove(connectionId);
if (value.Count == 0)
DelaySubs.Remove(key);
}
QuoteSubs.Remove(connectionId);

Logger.LogDebug("Client {0} unsubscribed", connectionId);
}
catch (Exception ex)
{
Logger.LogError("Failed to remove subscription: {0}", ex.Message);
}
finally
{
Sema.Release();
}
}
}
}