diff --git a/Tzkt.Api/Startup.cs b/Tzkt.Api/Startup.cs index 8d945b743..a6a6cf7fc 100644 --- a/Tzkt.Api/Startup.cs +++ b/Tzkt.Api/Startup.cs @@ -92,6 +92,9 @@ public void ConfigureServices(IServiceCollection services) services.AddTransient>(); services.AddTransient>(); + services.AddTransient>(); + services.AddTransient>(); + services.AddTransient>(); services.AddTransient>(); diff --git a/Tzkt.Api/Swagger/WsSubscriptions.md b/Tzkt.Api/Swagger/WsSubscriptions.md index 1e2cb5969..7545fd1ed 100644 --- a/Tzkt.Api/Swagger/WsSubscriptions.md +++ b/Tzkt.Api/Swagger/WsSubscriptions.md @@ -31,6 +31,45 @@ await connection.invoke("SubscribeToHead"); --- +## SubscribeToCycle + +Sends the blockchain head every time cycle is changed. + +### Method + +`SubscribeToCycles` + +### Channel + +`cycles` + +### Parameters + +````js +{ + delayBlocks: 2, // number of blocks to delay cycle changed notification + // minimum 2, defaults to 2 + // delayBlocks has to be lower than number of blocks in cycle to get notifications +} +```` + +### Data model + +Same as in [/cycle](#operation/Cycles_GetByIndex) + +### State + +State contains cycle (`int`) of the last processed cycle. + +### Example + +````js +connection.on("cycles", (msg) => { console.log(msg); }); +await connection.invoke("SubscribeToCycles"); +```` + +--- + ## SubscribeToBlocks Sends blocks added to the blockchain diff --git a/Tzkt.Api/Websocket/Hubs/DefaultHub.cs b/Tzkt.Api/Websocket/Hubs/DefaultHub.cs index f46c9e95e..59e485407 100644 --- a/Tzkt.Api/Websocket/Hubs/DefaultHub.cs +++ b/Tzkt.Api/Websocket/Hubs/DefaultHub.cs @@ -9,6 +9,7 @@ namespace Tzkt.Api.Websocket.Hubs public class DefaultHub : BaseHub { readonly HeadProcessor Head; + readonly CyclesProcessor Cycles; readonly BlocksProcessor Blocks; readonly OperationsProcessor Operations; readonly BigMapsProcessor BigMaps; @@ -18,6 +19,7 @@ public class DefaultHub : BaseHub public DefaultHub( HeadProcessor head, + CyclesProcessor cycles, BlocksProcessor blocks, OperationsProcessor operations, BigMapsProcessor bigMaps, @@ -28,6 +30,7 @@ public DefaultHub( IConfiguration config) : base(logger, config) { Head = head; + Cycles = cycles; Blocks = blocks; Operations = operations; BigMaps = bigMaps; @@ -41,6 +44,13 @@ public Task SubscribeToHead() return Head.Subscribe(Clients.Caller, Context.ConnectionId); } + public Task SubscribeToCycles(CyclesParameter parameters = null) + { + parameters ??= new(); + parameters.EnsureValid(); + return Cycles.Subscribe(Clients.Caller, Context.ConnectionId, parameters); + } + public Task SubscribeToBlocks() { return Blocks.Subscribe(Clients.Caller, Context.ConnectionId); @@ -83,6 +93,7 @@ public Task SubscribeToAccounts(AccountsParameter parameters) public override Task OnDisconnectedAsync(Exception exception) { + Cycles.Unsubscribe(Context.ConnectionId); Operations.Unsubscribe(Context.ConnectionId); BigMaps.Unsubscribe(Context.ConnectionId); Balances.Unsubscribe(Context.ConnectionId); diff --git a/Tzkt.Api/Websocket/Parameters/CyclesParameter.cs b/Tzkt.Api/Websocket/Parameters/CyclesParameter.cs new file mode 100644 index 000000000..1409246b3 --- /dev/null +++ b/Tzkt.Api/Websocket/Parameters/CyclesParameter.cs @@ -0,0 +1,18 @@ +using Microsoft.AspNetCore.SignalR; +using Newtonsoft.Json.Converters; +using System.Text.Json.Serialization; +using Tzkt.Api.Models; + +namespace Tzkt.Api.Websocket +{ + public class CyclesParameter + { + public int DelayBlocks { get; set; } = 2; // 2 to cover possible reorganization + + public void EnsureValid() + { + if (DelayBlocks < 2) + throw new HubException("DelayBlocks has to greater than or equal to 2."); + } + } +} diff --git a/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs b/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs new file mode 100644 index 000000000..82485830e --- /dev/null +++ b/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs @@ -0,0 +1,165 @@ +using System; +using System.Linq; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Logging; + +using Tzkt.Api.Repositories; +using Tzkt.Api.Services.Cache; +using Tzkt.Api.Models; + +namespace Tzkt.Api.Websocket.Processors +{ + public class CyclesProcessor : IHubProcessor where T : Hub + { + #region static + const string CycleChannel = "cycles"; + static readonly SemaphoreSlim Sema = new(1, 1); + static readonly Dictionary> DelaySubs = new(); + static readonly HashSet Subs = new(); + static Cycle CurrentCycle = null; + #endregion + + readonly StateCache StateCache; + readonly CyclesRepository CyclesRepo; + readonly IHubContext Context; + readonly ILogger Logger; + + public CyclesProcessor(StateCache cache, CyclesRepository repo, IHubContext hubContext, ILogger> logger) + { + StateCache = cache; + CyclesRepo = repo; + Context = hubContext; + Logger = logger; + } + + public async Task OnStateChanged() + { + var sendings = new List(2); + try + { + await Sema.WaitAsync(); + if (Subs.Count == 0) + { + Logger.LogDebug("No cycles subs"); + return; + } + + if (CurrentCycle == null || StateCache.Current.Level < CurrentCycle.FirstLevel || StateCache.Current.Level > CurrentCycle.LastLevel) + { + CurrentCycle = await CyclesRepo.Get(StateCache.Current.Cycle, Symbols.None); + } + + // we notify only group of clients with matching delay + if (DelaySubs.TryGetValue(StateCache.Current.Level - CurrentCycle.FirstLevel, out var connections)) + { + foreach (var connectionId in connections) + { + sendings.Add(Context.Clients + .Client(connectionId) + .SendData(CycleChannel, CurrentCycle, StateCache.Current.Cycle)); + } + Logger.LogDebug("Cycle {0} sent", StateCache.Current.Cycle); + } + } + catch (Exception ex) + { + Logger.LogError("Failed to process state change: {0}", ex.Message); + } + finally + { + Sema.Release(); + #region await sendings + try + { + await Task.WhenAll(sendings); + } + catch (Exception ex) + { + // should never get here + Logger.LogError("Sendings failed: {0}", ex.Message); + } + #endregion + } + } + + public async Task Subscribe(IClientProxy client, string connectionId, CyclesParameter parameter) + { + Task sending = Task.CompletedTask; + try + { + await Sema.WaitAsync(); + if (Subs.Contains(connectionId)) + { + throw new HubException($"{connectionId} already subscribed."); + } + + Logger.LogDebug("New subscription..."); + + if (!DelaySubs.TryGetValue(parameter.DelayBlocks, out var delaySub)) + { + delaySub = new(4); + DelaySubs.Add(parameter.DelayBlocks, delaySub); + } + Subs.Add(connectionId); + delaySub.Add(connectionId); + + sending = client.SendState(CycleChannel, StateCache.Current.Cycle); + + Logger.LogDebug("Client {0} subscribed with state {1}", connectionId, StateCache.Current.Cycle); + return StateCache.Current.Cycle; + } + catch (HubException ex) + { + throw; + } + catch (Exception ex) + { + Logger.LogError("Failed to add subscription: {0}", ex.Message); + return 0; + } + finally + { + Sema.Release(); + try + { + await sending; + } + catch (Exception ex) + { + // should never get here + Logger.LogError("Sending failed: {0}", ex.Message); + } + } + } + + public void Unsubscribe(string connectionId) + { + try + { + Sema.Wait(); + Logger.LogDebug("Remove subscription..."); + + foreach (var (key, value) in DelaySubs) + { + value.Remove(connectionId); + if (value.Count == 0) + DelaySubs.Remove(key); + } + Subs.Remove(connectionId); + + Logger.LogDebug("Client {0} unsubscribed", connectionId); + } + catch (Exception ex) + { + Logger.LogError("Failed to remove subscription: {0}", ex.Message); + } + finally + { + Sema.Release(); + } + } + } +}