From 4393ff56315bbe70beb3a119eb57df75281361a7 Mon Sep 17 00:00:00 2001 From: cryi Date: Sun, 24 Jul 2022 11:06:40 +0200 Subject: [PATCH 01/12] cycle events --- Tzkt.Api/Startup.cs | 3 + Tzkt.Api/Websocket/Hubs/DefaultHub.cs | 9 ++ .../Websocket/Parameters/CycleParameter.cs | 15 ++ .../Websocket/Processors/CyclesProcessor.cs | 150 ++++++++++++++++++ 4 files changed, 177 insertions(+) create mode 100644 Tzkt.Api/Websocket/Parameters/CycleParameter.cs create mode 100644 Tzkt.Api/Websocket/Processors/CyclesProcessor.cs diff --git a/Tzkt.Api/Startup.cs b/Tzkt.Api/Startup.cs index 8d945b743..b2613fda9 100644 --- a/Tzkt.Api/Startup.cs +++ b/Tzkt.Api/Startup.cs @@ -92,6 +92,9 @@ public void ConfigureServices(IServiceCollection services) services.AddTransient>(); services.AddTransient>(); + services.AddTransient>(); + services.AddTransient>(); + services.AddTransient>(); services.AddTransient>(); diff --git a/Tzkt.Api/Websocket/Hubs/DefaultHub.cs b/Tzkt.Api/Websocket/Hubs/DefaultHub.cs index f46c9e95e..652cff095 100644 --- a/Tzkt.Api/Websocket/Hubs/DefaultHub.cs +++ b/Tzkt.Api/Websocket/Hubs/DefaultHub.cs @@ -9,6 +9,7 @@ namespace Tzkt.Api.Websocket.Hubs public class DefaultHub : BaseHub { readonly HeadProcessor Head; + readonly CycleProcessor Cycle; readonly BlocksProcessor Blocks; readonly OperationsProcessor Operations; readonly BigMapsProcessor BigMaps; @@ -18,6 +19,7 @@ public class DefaultHub : BaseHub public DefaultHub( HeadProcessor head, + CycleProcessor cycle, BlocksProcessor blocks, OperationsProcessor operations, BigMapsProcessor bigMaps, @@ -28,6 +30,7 @@ public DefaultHub( IConfiguration config) : base(logger, config) { Head = head; + Cycle = cycle; Blocks = blocks; Operations = operations; BigMaps = bigMaps; @@ -40,6 +43,12 @@ public Task SubscribeToHead() { return Head.Subscribe(Clients.Caller, Context.ConnectionId); } + public Task SubscribeToCycle(CycleParameter parameters) + { + parameters ??= new(); + parameters.EnsureValid(); + return Cycle.Subscribe(Clients.Caller, Context.ConnectionId, parameters); + } public Task SubscribeToBlocks() { diff --git a/Tzkt.Api/Websocket/Parameters/CycleParameter.cs b/Tzkt.Api/Websocket/Parameters/CycleParameter.cs new file mode 100644 index 000000000..c842a94dc --- /dev/null +++ b/Tzkt.Api/Websocket/Parameters/CycleParameter.cs @@ -0,0 +1,15 @@ +using Microsoft.AspNetCore.SignalR; + +namespace Tzkt.Api.Websocket +{ + public class CycleParameter + { + public int DelayBlocks { get; set; } = 2; // 2 to cover possible reorganization + + public void EnsureValid() + { + if (DelayBlocks < 2) + throw new HubException("DelayBlocks has to greater than or equal to 2."); + } + } +} diff --git a/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs b/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs new file mode 100644 index 000000000..ad5c212f3 --- /dev/null +++ b/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs @@ -0,0 +1,150 @@ +using System; +using System.Linq; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Logging; + +using Tzkt.Api.Repositories; +using Tzkt.Api.Services.Cache; + +namespace Tzkt.Api.Websocket.Processors +{ + public class CycleProcessor : IHubProcessor where T : Hub + { + #region static + const string CycleGroup = "cycle"; + const string CycleChannel = "cycle"; + static readonly SemaphoreSlim Sema = new(1, 1); + static readonly Dictionary> DelaySubs = new(); + #endregion + + readonly StateCache StateCache; + readonly StateRepository StateRepo; + readonly IHubContext Context; + readonly ILogger Logger; + + private int cycleStartLevel = 0; + private int lastProcessedCycle = 0; + + public CycleProcessor(StateCache cache, StateRepository repo, IHubContext hubContext, ILogger> logger) + { + StateCache = cache; + StateRepo = repo; + Context = hubContext; + Logger = logger; + } + + public async Task OnStateChanged() + { + var sendings = new List(2); + try + { + await Sema.WaitAsync(); + + if (lastProcessedCycle != StateCache.Current.Cycle) + cycleStartLevel = StateCache.Current.Level; + lastProcessedCycle = StateCache.Current.Cycle; + + // we notify only group of clients with matching delay + if (DelaySubs.TryGetValue(StateCache.Current.Level - cycleStartLevel, out var connections)) + { + foreach (var connectionId in connections) + { + sendings.Add(Context.Clients + .Client(connectionId) + .SendData(CycleChannel, StateRepo.Get(), StateCache.Current.Cycle)); + } + Logger.LogDebug("Cycle {0} sent", StateCache.Current.Cycle); + } + } + catch (Exception ex) + { + Logger.LogError("Failed to process state change: {0}", ex.Message); + } + finally + { + Sema.Release(); + #region await sendings + try + { + await Task.WhenAll(sendings); + } + catch (Exception ex) + { + // should never get here + Logger.LogError("Sendings failed: {0}", ex.Message); + } + #endregion + } + } + + public async Task Subscribe(IClientProxy client, string connectionId, CycleParameter parameter) + { + Task sending = Task.CompletedTask; + try + { + await Sema.WaitAsync(); + Logger.LogDebug("New subscription..."); + + if (!DelaySubs.TryGetValue(parameter.DelayBlocks, out var delaySub)) + { + delaySub = new(4); + DelaySubs.Add(parameter.DelayBlocks, delaySub); + } + delaySub.Add(connectionId); + + await Context.Groups.AddToGroupAsync(connectionId, CycleGroup); + sending = client.SendState(CycleChannel, StateCache.Current.Cycle); + + Logger.LogDebug("Client {0} subscribed with state {1}", connectionId, StateCache.Current.Cycle); + return StateCache.Current.Level; + } + catch (Exception ex) + { + Logger.LogError("Failed to add subscription: {0}", ex.Message); + return 0; + } + finally + { + Sema.Release(); + try + { + await sending; + } + catch (Exception ex) + { + // should never get here + Logger.LogError("Sending failed: {0}", ex.Message); + } + } + } + + public void Unsubscribe(string connectionId) + { + try + { + Sema.Wait(); + Logger.LogDebug("Remove subscription..."); + + foreach (var (key, value) in DelaySubs) + { + value.Remove(connectionId); + if (value.Count == 0) + DelaySubs.Remove(key); + } + + Logger.LogDebug("Client {0} unsubscribed", connectionId); + } + catch (Exception ex) + { + Logger.LogError("Failed to remove subscription: {0}", ex.Message); + } + finally + { + Sema.Release(); + } + } + } +} From 6a60785ad37b928337f4ee8dca7811ab4da51a13 Mon Sep 17 00:00:00 2001 From: cryi Date: Sun, 24 Jul 2022 11:15:03 +0200 Subject: [PATCH 02/12] docs --- Tzkt.Api/Swagger/WsSubscriptions.md | 38 +++++++++++++++++++++++++++ Tzkt.Api/Websocket/Hubs/DefaultHub.cs | 1 + 2 files changed, 39 insertions(+) diff --git a/Tzkt.Api/Swagger/WsSubscriptions.md b/Tzkt.Api/Swagger/WsSubscriptions.md index 1e2cb5969..f1637db72 100644 --- a/Tzkt.Api/Swagger/WsSubscriptions.md +++ b/Tzkt.Api/Swagger/WsSubscriptions.md @@ -31,6 +31,44 @@ await connection.invoke("SubscribeToHead"); --- +## SubscribeToCycle + +Sends the blockchain head every time cycle is changed. + +### Method + +`SubscribeToCycle` + +### Channel + +`cycle` + +### Parameters + +````js +{ + delayBlocks: 2, // number of blocks to delay cycle changed notification + // minimum 2, defaults to 2 +} +```` + +### Data model + +Same as in [/blocks](#operation/Blocks_GetHead) + +### State + +State contains cycle (`int`) of the last processed cycle. + +### Example + +````js +connection.on("cycle", (msg) => { console.log(msg); }); +await connection.invoke("SubscribeToCycle"); +```` + +--- + ## SubscribeToBlocks Sends blocks added to the blockchain diff --git a/Tzkt.Api/Websocket/Hubs/DefaultHub.cs b/Tzkt.Api/Websocket/Hubs/DefaultHub.cs index 652cff095..454b772f3 100644 --- a/Tzkt.Api/Websocket/Hubs/DefaultHub.cs +++ b/Tzkt.Api/Websocket/Hubs/DefaultHub.cs @@ -43,6 +43,7 @@ public Task SubscribeToHead() { return Head.Subscribe(Clients.Caller, Context.ConnectionId); } + public Task SubscribeToCycle(CycleParameter parameters) { parameters ??= new(); From eecafcb229af06011072676003bb27d819a247ec Mon Sep 17 00:00:00 2001 From: cryi Date: Sun, 24 Jul 2022 13:53:11 +0200 Subject: [PATCH 03/12] use cycle info from CycleRepository --- .../Websocket/Processors/CyclesProcessor.cs | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs b/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs index ad5c212f3..5bf994061 100644 --- a/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs +++ b/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs @@ -21,17 +21,14 @@ public class CycleProcessor : IHubProcessor where T : Hub #endregion readonly StateCache StateCache; - readonly StateRepository StateRepo; + readonly CyclesRepository CyclesRepo; readonly IHubContext Context; readonly ILogger Logger; - private int cycleStartLevel = 0; - private int lastProcessedCycle = 0; - - public CycleProcessor(StateCache cache, StateRepository repo, IHubContext hubContext, ILogger> logger) + public CycleProcessor(StateCache cache, CyclesRepository repo, IHubContext hubContext, ILogger> logger) { StateCache = cache; - StateRepo = repo; + CyclesRepo = repo; Context = hubContext; Logger = logger; } @@ -42,19 +39,17 @@ public async Task OnStateChanged() try { await Sema.WaitAsync(); - - if (lastProcessedCycle != StateCache.Current.Cycle) - cycleStartLevel = StateCache.Current.Level; - lastProcessedCycle = StateCache.Current.Cycle; + + var cycleData = await CyclesRepo.Get(StateCache.Current.Cycle, Models.Symbols.None); // we notify only group of clients with matching delay - if (DelaySubs.TryGetValue(StateCache.Current.Level - cycleStartLevel, out var connections)) + if (DelaySubs.TryGetValue(StateCache.Current.Level - cycleData.FirstLevel, out var connections)) { foreach (var connectionId in connections) { sendings.Add(Context.Clients .Client(connectionId) - .SendData(CycleChannel, StateRepo.Get(), StateCache.Current.Cycle)); + .SendData(CycleChannel, cycleData, StateCache.Current.Cycle)); } Logger.LogDebug("Cycle {0} sent", StateCache.Current.Cycle); } From cc1c8f8b8b9c6a5247d87af9bca05feb5f6f508e Mon Sep 17 00:00:00 2001 From: cryi Date: Sun, 24 Jul 2022 14:57:32 +0200 Subject: [PATCH 04/12] accept quotes --- Tzkt.Api/Swagger/WsSubscriptions.md | 2 +- Tzkt.Api/Websocket/Parameters/CycleParameter.cs | 4 +++- Tzkt.Api/Websocket/Processors/CyclesProcessor.cs | 10 ++++++++-- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/Tzkt.Api/Swagger/WsSubscriptions.md b/Tzkt.Api/Swagger/WsSubscriptions.md index f1637db72..2fee36f0e 100644 --- a/Tzkt.Api/Swagger/WsSubscriptions.md +++ b/Tzkt.Api/Swagger/WsSubscriptions.md @@ -54,7 +54,7 @@ Sends the blockchain head every time cycle is changed. ### Data model -Same as in [/blocks](#operation/Blocks_GetHead) +Same as in [/cycle](#operation/Cycles_GetByIndex) ### State diff --git a/Tzkt.Api/Websocket/Parameters/CycleParameter.cs b/Tzkt.Api/Websocket/Parameters/CycleParameter.cs index c842a94dc..e2b008825 100644 --- a/Tzkt.Api/Websocket/Parameters/CycleParameter.cs +++ b/Tzkt.Api/Websocket/Parameters/CycleParameter.cs @@ -1,10 +1,12 @@ using Microsoft.AspNetCore.SignalR; - +using Tzkt.Api.Models; + namespace Tzkt.Api.Websocket { public class CycleParameter { public int DelayBlocks { get; set; } = 2; // 2 to cover possible reorganization + public Symbols Quote { get; set; } = Symbols.None; public void EnsureValid() { diff --git a/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs b/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs index 5bf994061..ffcfc556c 100644 --- a/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs +++ b/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs @@ -8,6 +8,7 @@ using Tzkt.Api.Repositories; using Tzkt.Api.Services.Cache; +using Tzkt.Api.Models; namespace Tzkt.Api.Websocket.Processors { @@ -18,6 +19,7 @@ public class CycleProcessor : IHubProcessor where T : Hub const string CycleChannel = "cycle"; static readonly SemaphoreSlim Sema = new(1, 1); static readonly Dictionary> DelaySubs = new(); + static readonly Dictionary QuoteSubs = new(); #endregion readonly StateCache StateCache; @@ -40,16 +42,18 @@ public async Task OnStateChanged() { await Sema.WaitAsync(); - var cycleData = await CyclesRepo.Get(StateCache.Current.Cycle, Models.Symbols.None); + var cycleData = await CyclesRepo.Get(StateCache.Current.Cycle, Symbols.None); // we notify only group of clients with matching delay if (DelaySubs.TryGetValue(StateCache.Current.Level - cycleData.FirstLevel, out var connections)) { foreach (var connectionId in connections) { + var quote = QuoteSubs[connectionId]; + var cycleDataWithQuotes = quote == Symbols.None ? cycleData : await CyclesRepo.Get(StateCache.Current.Cycle, quote); sendings.Add(Context.Clients .Client(connectionId) - .SendData(CycleChannel, cycleData, StateCache.Current.Cycle)); + .SendData(CycleChannel, cycleDataWithQuotes, StateCache.Current.Cycle)); } Logger.LogDebug("Cycle {0} sent", StateCache.Current.Cycle); } @@ -88,6 +92,7 @@ public async Task Subscribe(IClientProxy client, string connectionId, Cycle delaySub = new(4); DelaySubs.Add(parameter.DelayBlocks, delaySub); } + QuoteSubs[connectionId] = parameter.Quote; delaySub.Add(connectionId); await Context.Groups.AddToGroupAsync(connectionId, CycleGroup); @@ -129,6 +134,7 @@ public void Unsubscribe(string connectionId) if (value.Count == 0) DelaySubs.Remove(key); } + QuoteSubs.Remove(connectionId); Logger.LogDebug("Client {0} unsubscribed", connectionId); } From 52eae509824a91c7ca6de002778443f96a709c04 Mon Sep 17 00:00:00 2001 From: cryi Date: Sun, 24 Jul 2022 16:30:21 +0200 Subject: [PATCH 05/12] better align to other API + quotes --- Tzkt.Api/Startup.cs | 8 +++++--- Tzkt.Api/Swagger/WsSubscriptions.md | 14 ++++++++------ Tzkt.Api/Websocket/Hubs/DefaultHub.cs | 6 +++--- .../{CycleParameter.cs => CyclesParameter.cs} | 4 +++- Tzkt.Api/Websocket/Processors/CyclesProcessor.cs | 10 +++++----- 5 files changed, 24 insertions(+), 18 deletions(-) rename Tzkt.Api/Websocket/Parameters/{CycleParameter.cs => CyclesParameter.cs} (79%) diff --git a/Tzkt.Api/Startup.cs b/Tzkt.Api/Startup.cs index b2613fda9..2f2f16cf0 100644 --- a/Tzkt.Api/Startup.cs +++ b/Tzkt.Api/Startup.cs @@ -18,7 +18,8 @@ using Tzkt.Api.Websocket.Hubs; using Tzkt.Api.Websocket.Processors; using Tzkt.Data; - +using System.Text.Json.Serialization; + namespace Tzkt.Api { public class Startup @@ -92,8 +93,8 @@ public void ConfigureServices(IServiceCollection services) services.AddTransient>(); services.AddTransient>(); - services.AddTransient>(); - services.AddTransient>(); + services.AddTransient>(); + services.AddTransient>(); services.AddTransient>(); services.AddTransient>(); @@ -126,6 +127,7 @@ public void ConfigureServices(IServiceCollection services) jsonOptions.PayloadSerializerOptions.Converters.Add(new DateTimeConverter()); jsonOptions.PayloadSerializerOptions.Converters.Add(new OperationConverter()); jsonOptions.PayloadSerializerOptions.Converters.Add(new OperationErrorConverter()); + jsonOptions.PayloadSerializerOptions.Converters.Add(new JsonStringEnumConverter()); }); } #endregion diff --git a/Tzkt.Api/Swagger/WsSubscriptions.md b/Tzkt.Api/Swagger/WsSubscriptions.md index 2fee36f0e..4659e3b5b 100644 --- a/Tzkt.Api/Swagger/WsSubscriptions.md +++ b/Tzkt.Api/Swagger/WsSubscriptions.md @@ -37,18 +37,20 @@ Sends the blockchain head every time cycle is changed. ### Method -`SubscribeToCycle` +`SubscribeToCycles` ### Channel -`cycle` +`cycles` ### Parameters ````js { - delayBlocks: 2, // number of blocks to delay cycle changed notification - // minimum 2, defaults to 2 + delayBlocks: 2, // number of blocks to delay cycle changed notification + // minimum 2, defaults to 2 + quote: "eur,usd" // comma-separated list of ticker symbols to inject historical prices into response + // defaults to "none" } ```` @@ -63,8 +65,8 @@ State contains cycle (`int`) of the last processed cycle. ### Example ````js -connection.on("cycle", (msg) => { console.log(msg); }); -await connection.invoke("SubscribeToCycle"); +connection.on("cycles", (msg) => { console.log(msg); }); +await connection.invoke("SubscribeToCycles"); ```` --- diff --git a/Tzkt.Api/Websocket/Hubs/DefaultHub.cs b/Tzkt.Api/Websocket/Hubs/DefaultHub.cs index 454b772f3..c536595f8 100644 --- a/Tzkt.Api/Websocket/Hubs/DefaultHub.cs +++ b/Tzkt.Api/Websocket/Hubs/DefaultHub.cs @@ -9,7 +9,7 @@ namespace Tzkt.Api.Websocket.Hubs public class DefaultHub : BaseHub { readonly HeadProcessor Head; - readonly CycleProcessor Cycle; + readonly CyclesProcessor Cycle; readonly BlocksProcessor Blocks; readonly OperationsProcessor Operations; readonly BigMapsProcessor BigMaps; @@ -19,7 +19,7 @@ public class DefaultHub : BaseHub public DefaultHub( HeadProcessor head, - CycleProcessor cycle, + CyclesProcessor cycle, BlocksProcessor blocks, OperationsProcessor operations, BigMapsProcessor bigMaps, @@ -44,7 +44,7 @@ public Task SubscribeToHead() return Head.Subscribe(Clients.Caller, Context.ConnectionId); } - public Task SubscribeToCycle(CycleParameter parameters) + public Task SubscribeToCycles(CyclesParameter parameters = null) { parameters ??= new(); parameters.EnsureValid(); diff --git a/Tzkt.Api/Websocket/Parameters/CycleParameter.cs b/Tzkt.Api/Websocket/Parameters/CyclesParameter.cs similarity index 79% rename from Tzkt.Api/Websocket/Parameters/CycleParameter.cs rename to Tzkt.Api/Websocket/Parameters/CyclesParameter.cs index e2b008825..2f7b52393 100644 --- a/Tzkt.Api/Websocket/Parameters/CycleParameter.cs +++ b/Tzkt.Api/Websocket/Parameters/CyclesParameter.cs @@ -1,9 +1,11 @@ using Microsoft.AspNetCore.SignalR; +using Newtonsoft.Json.Converters; +using System.Text.Json.Serialization; using Tzkt.Api.Models; namespace Tzkt.Api.Websocket { - public class CycleParameter + public class CyclesParameter { public int DelayBlocks { get; set; } = 2; // 2 to cover possible reorganization public Symbols Quote { get; set; } = Symbols.None; diff --git a/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs b/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs index ffcfc556c..49092f49a 100644 --- a/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs +++ b/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs @@ -12,11 +12,11 @@ namespace Tzkt.Api.Websocket.Processors { - public class CycleProcessor : IHubProcessor where T : Hub + public class CyclesProcessor : IHubProcessor where T : Hub { #region static - const string CycleGroup = "cycle"; - const string CycleChannel = "cycle"; + const string CycleGroup = "cycles"; + const string CycleChannel = "cycles"; static readonly SemaphoreSlim Sema = new(1, 1); static readonly Dictionary> DelaySubs = new(); static readonly Dictionary QuoteSubs = new(); @@ -27,7 +27,7 @@ public class CycleProcessor : IHubProcessor where T : Hub readonly IHubContext Context; readonly ILogger Logger; - public CycleProcessor(StateCache cache, CyclesRepository repo, IHubContext hubContext, ILogger> logger) + public CyclesProcessor(StateCache cache, CyclesRepository repo, IHubContext hubContext, ILogger> logger) { StateCache = cache; CyclesRepo = repo; @@ -79,7 +79,7 @@ public async Task OnStateChanged() } } - public async Task Subscribe(IClientProxy client, string connectionId, CycleParameter parameter) + public async Task Subscribe(IClientProxy client, string connectionId, CyclesParameter parameter) { Task sending = Task.CompletedTask; try From 90e2bba6e5cbea47efdbed1d5f121fb60f17a3fa Mon Sep 17 00:00:00 2001 From: cryi Date: Sun, 24 Jul 2022 16:49:14 +0200 Subject: [PATCH 06/12] quote and cycle caching --- .../Websocket/Processors/CyclesProcessor.cs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs b/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs index 49092f49a..66df0b4cf 100644 --- a/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs +++ b/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs @@ -27,6 +27,9 @@ public class CyclesProcessor : IHubProcessor where T : Hub readonly IHubContext Context; readonly ILogger Logger; + Cycle CurrentCycle = null; + Dictionary QuoteCache = new(); + public CyclesProcessor(StateCache cache, CyclesRepository repo, IHubContext hubContext, ILogger> logger) { StateCache = cache; @@ -42,15 +45,25 @@ public async Task OnStateChanged() { await Sema.WaitAsync(); - var cycleData = await CyclesRepo.Get(StateCache.Current.Cycle, Symbols.None); + if (CurrentCycle == null || StateCache.Current.Level < CurrentCycle.FirstLevel || StateCache.Current.Level > CurrentCycle.LastLevel) + { + QuoteCache.Clear(); + CurrentCycle = await CyclesRepo.Get(StateCache.Current.Cycle, Symbols.None); + QuoteCache.Add(Symbols.None, CurrentCycle); + } // we notify only group of clients with matching delay - if (DelaySubs.TryGetValue(StateCache.Current.Level - cycleData.FirstLevel, out var connections)) + if (DelaySubs.TryGetValue(StateCache.Current.Level - CurrentCycle.FirstLevel, out var connections)) { foreach (var connectionId in connections) { var quote = QuoteSubs[connectionId]; - var cycleDataWithQuotes = quote == Symbols.None ? cycleData : await CyclesRepo.Get(StateCache.Current.Cycle, quote); + if (!QuoteCache.TryGetValue(quote, out var cycleDataWithQuotes)) + { + cycleDataWithQuotes = await CyclesRepo.Get(StateCache.Current.Cycle, quote); + QuoteCache.Add(quote, cycleDataWithQuotes); + } + sendings.Add(Context.Clients .Client(connectionId) .SendData(CycleChannel, cycleDataWithQuotes, StateCache.Current.Cycle)); From 2a11bac27eaac6756baaf1b58ed7bc17a1d1c332 Mon Sep 17 00:00:00 2001 From: cryi Date: Mon, 25 Jul 2022 17:53:41 +0200 Subject: [PATCH 07/12] drop quotes --- Tzkt.Api/Startup.cs | 4 +--- Tzkt.Api/Websocket/Parameters/CyclesParameter.cs | 3 +-- Tzkt.Api/Websocket/Processors/CyclesProcessor.cs | 15 +-------------- 3 files changed, 3 insertions(+), 19 deletions(-) diff --git a/Tzkt.Api/Startup.cs b/Tzkt.Api/Startup.cs index 2f2f16cf0..a6a6cf7fc 100644 --- a/Tzkt.Api/Startup.cs +++ b/Tzkt.Api/Startup.cs @@ -18,8 +18,7 @@ using Tzkt.Api.Websocket.Hubs; using Tzkt.Api.Websocket.Processors; using Tzkt.Data; -using System.Text.Json.Serialization; - + namespace Tzkt.Api { public class Startup @@ -127,7 +126,6 @@ public void ConfigureServices(IServiceCollection services) jsonOptions.PayloadSerializerOptions.Converters.Add(new DateTimeConverter()); jsonOptions.PayloadSerializerOptions.Converters.Add(new OperationConverter()); jsonOptions.PayloadSerializerOptions.Converters.Add(new OperationErrorConverter()); - jsonOptions.PayloadSerializerOptions.Converters.Add(new JsonStringEnumConverter()); }); } #endregion diff --git a/Tzkt.Api/Websocket/Parameters/CyclesParameter.cs b/Tzkt.Api/Websocket/Parameters/CyclesParameter.cs index 2f7b52393..1409246b3 100644 --- a/Tzkt.Api/Websocket/Parameters/CyclesParameter.cs +++ b/Tzkt.Api/Websocket/Parameters/CyclesParameter.cs @@ -8,8 +8,7 @@ namespace Tzkt.Api.Websocket public class CyclesParameter { public int DelayBlocks { get; set; } = 2; // 2 to cover possible reorganization - public Symbols Quote { get; set; } = Symbols.None; - + public void EnsureValid() { if (DelayBlocks < 2) diff --git a/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs b/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs index 66df0b4cf..eb46ec8f6 100644 --- a/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs +++ b/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs @@ -19,7 +19,6 @@ public class CyclesProcessor : IHubProcessor where T : Hub const string CycleChannel = "cycles"; static readonly SemaphoreSlim Sema = new(1, 1); static readonly Dictionary> DelaySubs = new(); - static readonly Dictionary QuoteSubs = new(); #endregion readonly StateCache StateCache; @@ -28,7 +27,6 @@ public class CyclesProcessor : IHubProcessor where T : Hub readonly ILogger Logger; Cycle CurrentCycle = null; - Dictionary QuoteCache = new(); public CyclesProcessor(StateCache cache, CyclesRepository repo, IHubContext hubContext, ILogger> logger) { @@ -47,9 +45,7 @@ public async Task OnStateChanged() if (CurrentCycle == null || StateCache.Current.Level < CurrentCycle.FirstLevel || StateCache.Current.Level > CurrentCycle.LastLevel) { - QuoteCache.Clear(); CurrentCycle = await CyclesRepo.Get(StateCache.Current.Cycle, Symbols.None); - QuoteCache.Add(Symbols.None, CurrentCycle); } // we notify only group of clients with matching delay @@ -57,16 +53,9 @@ public async Task OnStateChanged() { foreach (var connectionId in connections) { - var quote = QuoteSubs[connectionId]; - if (!QuoteCache.TryGetValue(quote, out var cycleDataWithQuotes)) - { - cycleDataWithQuotes = await CyclesRepo.Get(StateCache.Current.Cycle, quote); - QuoteCache.Add(quote, cycleDataWithQuotes); - } - sendings.Add(Context.Clients .Client(connectionId) - .SendData(CycleChannel, cycleDataWithQuotes, StateCache.Current.Cycle)); + .SendData(CycleChannel, CurrentCycle, StateCache.Current.Cycle)); } Logger.LogDebug("Cycle {0} sent", StateCache.Current.Cycle); } @@ -105,7 +94,6 @@ public async Task Subscribe(IClientProxy client, string connectionId, Cycle delaySub = new(4); DelaySubs.Add(parameter.DelayBlocks, delaySub); } - QuoteSubs[connectionId] = parameter.Quote; delaySub.Add(connectionId); await Context.Groups.AddToGroupAsync(connectionId, CycleGroup); @@ -147,7 +135,6 @@ public void Unsubscribe(string connectionId) if (value.Count == 0) DelaySubs.Remove(key); } - QuoteSubs.Remove(connectionId); Logger.LogDebug("Client {0} unsubscribed", connectionId); } From 0b7ec9a3bbb6df5ed9f5e0556c229041e3a88a85 Mon Sep 17 00:00:00 2001 From: cryi Date: Mon, 25 Jul 2022 17:55:22 +0200 Subject: [PATCH 08/12] docs --- Tzkt.Api/Swagger/WsSubscriptions.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/Tzkt.Api/Swagger/WsSubscriptions.md b/Tzkt.Api/Swagger/WsSubscriptions.md index 4659e3b5b..18ac4f33b 100644 --- a/Tzkt.Api/Swagger/WsSubscriptions.md +++ b/Tzkt.Api/Swagger/WsSubscriptions.md @@ -49,8 +49,6 @@ Sends the blockchain head every time cycle is changed. { delayBlocks: 2, // number of blocks to delay cycle changed notification // minimum 2, defaults to 2 - quote: "eur,usd" // comma-separated list of ticker symbols to inject historical prices into response - // defaults to "none" } ```` From f7050412c1dabfc42b03908ff48dddc215997952 Mon Sep 17 00:00:00 2001 From: cryi Date: Mon, 25 Jul 2022 17:58:13 +0200 Subject: [PATCH 09/12] docs2 --- Tzkt.Api/Swagger/WsSubscriptions.md | 1 + 1 file changed, 1 insertion(+) diff --git a/Tzkt.Api/Swagger/WsSubscriptions.md b/Tzkt.Api/Swagger/WsSubscriptions.md index 18ac4f33b..7545fd1ed 100644 --- a/Tzkt.Api/Swagger/WsSubscriptions.md +++ b/Tzkt.Api/Swagger/WsSubscriptions.md @@ -49,6 +49,7 @@ Sends the blockchain head every time cycle is changed. { delayBlocks: 2, // number of blocks to delay cycle changed notification // minimum 2, defaults to 2 + // delayBlocks has to be lower than number of blocks in cycle to get notifications } ```` From 98219fc47a671a8d959f85b43e37d71f3464f40e Mon Sep 17 00:00:00 2001 From: cryi Date: Mon, 25 Jul 2022 18:20:55 +0200 Subject: [PATCH 10/12] limit subs --- Tzkt.Api/Websocket/Processors/CyclesProcessor.cs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs b/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs index eb46ec8f6..b065c8c80 100644 --- a/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs +++ b/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs @@ -19,6 +19,7 @@ public class CyclesProcessor : IHubProcessor where T : Hub const string CycleChannel = "cycles"; static readonly SemaphoreSlim Sema = new(1, 1); static readonly Dictionary> DelaySubs = new(); + HashSet Subs = new(); #endregion readonly StateCache StateCache; @@ -41,7 +42,12 @@ public async Task OnStateChanged() var sendings = new List(2); try { - await Sema.WaitAsync(); + await Sema.WaitAsync(); + if (Subs.Count == 0) + { + Logger.LogDebug("No cycles subs"); + return; + } if (CurrentCycle == null || StateCache.Current.Level < CurrentCycle.FirstLevel || StateCache.Current.Level > CurrentCycle.LastLevel) { @@ -87,6 +93,11 @@ public async Task Subscribe(IClientProxy client, string connectionId, Cycle try { await Sema.WaitAsync(); + if (Subs.Contains(connectionId)) + { + throw new HubException($"{connectionId} already subscribed."); + } + Logger.LogDebug("New subscription..."); if (!DelaySubs.TryGetValue(parameter.DelayBlocks, out var delaySub)) @@ -94,6 +105,7 @@ public async Task Subscribe(IClientProxy client, string connectionId, Cycle delaySub = new(4); DelaySubs.Add(parameter.DelayBlocks, delaySub); } + Subs.Add(connectionId); delaySub.Add(connectionId); await Context.Groups.AddToGroupAsync(connectionId, CycleGroup); @@ -134,6 +146,7 @@ public void Unsubscribe(string connectionId) value.Remove(connectionId); if (value.Count == 0) DelaySubs.Remove(key); + Subs.Remove(connectionId); } Logger.LogDebug("Client {0} unsubscribed", connectionId); From 7ef7da4dbc93836866408c7dff9af607a1ca79df Mon Sep 17 00:00:00 2001 From: cryi Date: Mon, 25 Jul 2022 18:22:50 +0200 Subject: [PATCH 11/12] static /+readonly --- Tzkt.Api/Websocket/Processors/CyclesProcessor.cs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs b/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs index b065c8c80..eed10fc6e 100644 --- a/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs +++ b/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs @@ -19,7 +19,8 @@ public class CyclesProcessor : IHubProcessor where T : Hub const string CycleChannel = "cycles"; static readonly SemaphoreSlim Sema = new(1, 1); static readonly Dictionary> DelaySubs = new(); - HashSet Subs = new(); + static readonly HashSet Subs = new(); + static Cycle CurrentCycle = null; #endregion readonly StateCache StateCache; @@ -27,8 +28,6 @@ public class CyclesProcessor : IHubProcessor where T : Hub readonly IHubContext Context; readonly ILogger Logger; - Cycle CurrentCycle = null; - public CyclesProcessor(StateCache cache, CyclesRepository repo, IHubContext hubContext, ILogger> logger) { StateCache = cache; @@ -42,7 +41,7 @@ public async Task OnStateChanged() var sendings = new List(2); try { - await Sema.WaitAsync(); + await Sema.WaitAsync(); if (Subs.Count == 0) { Logger.LogDebug("No cycles subs"); @@ -93,9 +92,9 @@ public async Task Subscribe(IClientProxy client, string connectionId, Cycle try { await Sema.WaitAsync(); - if (Subs.Contains(connectionId)) - { - throw new HubException($"{connectionId} already subscribed."); + if (Subs.Contains(connectionId)) + { + throw new HubException($"{connectionId} already subscribed."); } Logger.LogDebug("New subscription..."); From 9945c89a5395aac6bc84387e6dd3affec5454e1c Mon Sep 17 00:00:00 2001 From: cryi Date: Mon, 25 Jul 2022 19:07:28 +0200 Subject: [PATCH 12/12] on disconnect, fix unsubscribe, hub exception etc. --- Tzkt.Api/Websocket/Hubs/DefaultHub.cs | 9 +++++---- Tzkt.Api/Websocket/Processors/CyclesProcessor.cs | 12 +++++++----- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/Tzkt.Api/Websocket/Hubs/DefaultHub.cs b/Tzkt.Api/Websocket/Hubs/DefaultHub.cs index c536595f8..59e485407 100644 --- a/Tzkt.Api/Websocket/Hubs/DefaultHub.cs +++ b/Tzkt.Api/Websocket/Hubs/DefaultHub.cs @@ -9,7 +9,7 @@ namespace Tzkt.Api.Websocket.Hubs public class DefaultHub : BaseHub { readonly HeadProcessor Head; - readonly CyclesProcessor Cycle; + readonly CyclesProcessor Cycles; readonly BlocksProcessor Blocks; readonly OperationsProcessor Operations; readonly BigMapsProcessor BigMaps; @@ -19,7 +19,7 @@ public class DefaultHub : BaseHub public DefaultHub( HeadProcessor head, - CyclesProcessor cycle, + CyclesProcessor cycles, BlocksProcessor blocks, OperationsProcessor operations, BigMapsProcessor bigMaps, @@ -30,7 +30,7 @@ public DefaultHub( IConfiguration config) : base(logger, config) { Head = head; - Cycle = cycle; + Cycles = cycles; Blocks = blocks; Operations = operations; BigMaps = bigMaps; @@ -48,7 +48,7 @@ public Task SubscribeToCycles(CyclesParameter parameters = null) { parameters ??= new(); parameters.EnsureValid(); - return Cycle.Subscribe(Clients.Caller, Context.ConnectionId, parameters); + return Cycles.Subscribe(Clients.Caller, Context.ConnectionId, parameters); } public Task SubscribeToBlocks() @@ -93,6 +93,7 @@ public Task SubscribeToAccounts(AccountsParameter parameters) public override Task OnDisconnectedAsync(Exception exception) { + Cycles.Unsubscribe(Context.ConnectionId); Operations.Unsubscribe(Context.ConnectionId); BigMaps.Unsubscribe(Context.ConnectionId); Balances.Unsubscribe(Context.ConnectionId); diff --git a/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs b/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs index eed10fc6e..82485830e 100644 --- a/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs +++ b/Tzkt.Api/Websocket/Processors/CyclesProcessor.cs @@ -15,11 +15,10 @@ namespace Tzkt.Api.Websocket.Processors public class CyclesProcessor : IHubProcessor where T : Hub { #region static - const string CycleGroup = "cycles"; const string CycleChannel = "cycles"; static readonly SemaphoreSlim Sema = new(1, 1); static readonly Dictionary> DelaySubs = new(); - static readonly HashSet Subs = new(); + static readonly HashSet Subs = new(); static Cycle CurrentCycle = null; #endregion @@ -107,11 +106,14 @@ public async Task Subscribe(IClientProxy client, string connectionId, Cycle Subs.Add(connectionId); delaySub.Add(connectionId); - await Context.Groups.AddToGroupAsync(connectionId, CycleGroup); sending = client.SendState(CycleChannel, StateCache.Current.Cycle); Logger.LogDebug("Client {0} subscribed with state {1}", connectionId, StateCache.Current.Cycle); - return StateCache.Current.Level; + return StateCache.Current.Cycle; + } + catch (HubException ex) + { + throw; } catch (Exception ex) { @@ -145,8 +147,8 @@ public void Unsubscribe(string connectionId) value.Remove(connectionId); if (value.Count == 0) DelaySubs.Remove(key); - Subs.Remove(connectionId); } + Subs.Remove(connectionId); Logger.LogDebug("Client {0} unsubscribed", connectionId); }