diff --git a/Tzkt.Api/Controllers/MetadataController.cs b/Tzkt.Api/Controllers/MetadataController.cs index 009f20490..8ca602768 100644 --- a/Tzkt.Api/Controllers/MetadataController.cs +++ b/Tzkt.Api/Controllers/MetadataController.cs @@ -73,15 +73,7 @@ public async Task>> UpdateAccountMetada if (metadata.Any(x => !Regex.IsMatch(x.Key, "^(tz1|tz2|tz3|KT1)[0-9A-Za-z]{33}$"))) return BadRequest("Invalid account address"); - var res = await Metadata.UpdateAccountMetadata(metadata); - - #region update cached metadata - var addresses = res.Select(x => x.Key).ToList(); - await Accounts.ReloadMetadata(addresses); - await Aliases.Reload(addresses); - #endregion - - return Ok(res); + return Ok(await Metadata.UpdateAccountMetadata(metadata)); } catch (JsonException ex) { @@ -221,14 +213,7 @@ public async Task>> UpdateSoftwareMetad if (metadata.Any(x => !Regex.IsMatch(x.Key, "^[0-9a-f]{8}$"))) return BadRequest("Invalid software short hash"); - var res = await Metadata.UpdateSoftwareMetadata(metadata); - - #region update cached metadata - var hashes = res.Select(x => x.Key).ToList(); - await Software.Reload(hashes); - #endregion - - return Ok(res); + return Ok(await Metadata.UpdateSoftwareMetadata(metadata)); } catch (JsonException ex) { diff --git a/Tzkt.Api/Program.cs b/Tzkt.Api/Program.cs index 440a9bdff..b8723858b 100644 --- a/Tzkt.Api/Program.cs +++ b/Tzkt.Api/Program.cs @@ -30,7 +30,7 @@ public static IHostBuilder CreateHostBuilder(string[] args) => appConfig.Sources.Clear(); appConfig.AddJsonFile("appsettings.json", true); appConfig.AddJsonFile($"appsettings.{host.HostingEnvironment.EnvironmentName}.json", true); - appConfig.AddEnvironmentVariables("TZKT_"); + appConfig.AddEnvironmentVariables(); appConfig.AddEnvironmentVariables("TZKT_API_"); appConfig.AddCommandLine(args); }) diff --git a/Tzkt.Api/Services/Cache/Accounts/AccountsCache.cs b/Tzkt.Api/Services/Cache/Accounts/AccountsCache.cs index 99d1e1e99..f19c5a492 100644 --- a/Tzkt.Api/Services/Cache/Accounts/AccountsCache.cs +++ b/Tzkt.Api/Services/Cache/Accounts/AccountsCache.cs @@ -126,18 +126,10 @@ public async Task GetAliasAsync(int id) return (await GetAsync(id)).Info; } - public async Task ReloadMetadata(List addresses) + public void UpdateMetadata(string address, string json) { - using var db = GetConnection(); - var rows = await db.QueryAsync( - @"SELECT ""Id"", ""Metadata"" FROM ""Accounts"" WHERE ""Address"" = ANY(@addresses::character(36)[])", - new { addresses }); - - foreach (var row in rows) - { - var user = await GetAsync((int)row.Id); - user.Metadata = AccountMetadata.Parse((string)row.Metadata); - } + if (TryGetSafe(address, out var account)) + account.Metadata = AccountMetadata.Parse(json); } #endregion diff --git a/Tzkt.Api/Services/Cache/Aliases/AliasesCache.cs b/Tzkt.Api/Services/Cache/Aliases/AliasesCache.cs index d2a5f512b..ef7bd5c9c 100644 --- a/Tzkt.Api/Services/Cache/Aliases/AliasesCache.cs +++ b/Tzkt.Api/Services/Cache/Aliases/AliasesCache.cs @@ -1,5 +1,6 @@ using System.Collections.Generic; using System.Linq; +using System.Text.Json; using System.Threading.Tasks; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; @@ -27,29 +28,29 @@ public AliasesCache(IConfiguration config, ILogger logger) : base( logger.LogInformation("Loaded {1} aliases", Aliases.Count); } - public async Task Reload(List addresses) + public void UpdateMetadata(string address, string json) { - using var db = GetConnection(); - var rows = await db.QueryAsync( - $@"{SelectQuery} WHERE ""Address"" = ANY(@addresses::character(36)[])", - new { addresses }); + string name = null; + if (json != null) + { + using var doc = JsonDocument.Parse(json); + if (doc.RootElement.TryGetProperty("alias", out var alias)) + name = alias.GetString(); + } lock (this) { - foreach (var row in rows) + if (name != null) { - if (row.Name != null) - { - var alias = Aliases.FirstOrDefault(x => x.Address == row.Address); - if (alias == null) - Aliases.Add(row); - else - alias.Name = row.Name; - } + var alias = Aliases.FirstOrDefault(x => x.Address == address); + if (alias == null) + Aliases.Add(new Alias { Address = address, Name = name }); else - { - Aliases.RemoveAll(x => x.Address == row.Address); - } + alias.Name = name; + } + else + { + Aliases.RemoveAll(x => x.Address == address); } } } diff --git a/Tzkt.Api/Services/Cache/Software/SoftwareCache.cs b/Tzkt.Api/Services/Cache/Software/SoftwareCache.cs index 227980189..baa86d711 100644 --- a/Tzkt.Api/Services/Cache/Software/SoftwareCache.cs +++ b/Tzkt.Api/Services/Cache/Software/SoftwareCache.cs @@ -57,15 +57,13 @@ public SoftwareCache(TimeCache time, IConfiguration config, ILogger hashes) + public void UpdateMetadata(string shortHash) { - using var db = GetConnection(); - var rows = await db.QueryAsync($@"{SelectQuery} WHERE ""ShortHash"" = ANY(@hashes::character(8)[])", new { hashes }); - lock (this) { - foreach (var row in rows) - Software[(int)row.Id] = Parse(row); + using var db = GetConnection(); + var row = db.QueryFirstOrDefault($@"{SelectQuery} WHERE ""ShortHash"" = @shortHash::character(8)", new { shortHash }); + if (row != null) Software[(int)row.Id] = Parse(row); } } diff --git a/Tzkt.Api/Services/Sync/StateListener.cs b/Tzkt.Api/Services/Sync/StateListener.cs index 708d65ec4..70243255f 100644 --- a/Tzkt.Api/Services/Sync/StateListener.cs +++ b/Tzkt.Api/Services/Sync/StateListener.cs @@ -17,25 +17,37 @@ namespace Tzkt.Api.Services.Sync { public class StateListener : BackgroundService { + #region static + const string StateTrigger = "state_changed"; + const string AccountMetadataTrigger = "account_metadata_changed"; + const string SoftwareMetadataTrigger = "software_metadata_changed"; + //const string ProposalMetadataTrigger = "proposal_metadata_changed"; + //const string ProtocolMetadataTrigger = "protocol_metadata_changed"; + #endregion + readonly string ConnectionString; readonly StateCache State; readonly BigMapsCache BigMaps; readonly AccountsCache Accounts; + readonly AliasesCache Aliases; readonly ProtocolsCache Protocols; + readonly SoftwareCache Software; readonly QuotesCache Quotes; readonly TimeCache Times; readonly HomeService Home; readonly IEnumerable Processors; readonly ILogger Logger; - Task Notifying = Task.CompletedTask; - readonly List<(int Level, string Hash)> Changes = new List<(int, string)>(4); + Task StateNotifying = Task.CompletedTask; + readonly List<(int Level, string Hash)> StateChanges = new(4); public StateListener( StateCache state, BigMapsCache bigMaps, AccountsCache accounts, + AliasesCache aliases, + SoftwareCache software, ProtocolsCache protocols, QuotesCache quotes, TimeCache times, @@ -49,7 +61,9 @@ public StateListener( State = state; BigMaps = bigMaps; Accounts = accounts; + Aliases = aliases; Protocols = protocols; + Software = software; Quotes = quotes; Times = times; Home = home; @@ -61,10 +75,10 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken) { try { - Logger.LogInformation("State listener started"); + Logger.LogInformation("DB listener started"); using var db = new NpgsqlConnection(ConnectionString); - db.Notification += OnStateChanged; + db.Notification += OnNotification; while (!cancellationToken.IsCancellationRequested) { @@ -73,8 +87,13 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken) if (db.State != ConnectionState.Open) { await db.OpenAsync(cancellationToken); - await db.ExecuteAsync("LISTEN state_changed;"); - Logger.LogInformation("State listener connected"); + await db.ExecuteAsync($@" + LISTEN {StateTrigger}; + LISTEN {AccountMetadataTrigger}; + LISTEN {SoftwareMetadataTrigger};"); + //LISTEN { ProposalMetadataTrigger}; + //LISTEN { ProtocolMetadataTrigger}; + Logger.LogInformation("Db listener connected"); } await db.WaitAsync(cancellationToken); } @@ -82,26 +101,26 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken) catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { break; } catch (Exception ex) { - Logger.LogError("State listener disconnected: {0}", ex.Message); + Logger.LogError("DB listener disconnected: {0}", ex.Message); await Task.Delay(1000, cancellationToken); } } - db.Notification -= OnStateChanged; + db.Notification -= OnNotification; } catch (TaskCanceledException) when (cancellationToken.IsCancellationRequested) { } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { } catch (Exception ex) { - Logger.LogCritical($"State listener crashed: {ex.Message}"); + Logger.LogCritical($"DB listener crashed: {ex.Message}"); } finally { - Logger.LogWarning("State listener stopped"); + Logger.LogWarning("DB listener stopped"); } } - private void OnStateChanged(object sender, NpgsqlNotificationEventArgs e) + private void OnNotification(object sender, NpgsqlNotificationEventArgs e) { Logger.LogDebug("Received {1} notification with payload {2}", e.Channel, e.Payload); @@ -111,27 +130,34 @@ private void OnStateChanged(object sender, NpgsqlNotificationEventArgs e) return; } - var data = e.Payload.Split(':', StringSplitOptions.RemoveEmptyEntries); - if (data.Length != 2 || !int.TryParse(data[0], out var level) || data[1].Length != 51) + if (e.Channel == StateTrigger) { - Logger.LogCritical("Invalid trigger payload {1}", e.Payload); - return; - } + var data = e.Payload.Split(':', StringSplitOptions.RemoveEmptyEntries); + if (data.Length != 2 || !int.TryParse(data[0], out var level) || data[1].Length != 51) + { + Logger.LogCritical("Invalid trigger payload {1}", e.Payload); + return; + } - lock (Changes) - { - Changes.Add((level, data[1])); + lock (StateChanges) + { + StateChanges.Add((level, data[1])); - if (Notifying.IsCompleted) - Notifying = NotifyAsync(); // async run + if (StateNotifying.IsCompleted) + StateNotifying = NotifyStateAsync(); // async run + } + } + else + { + NotifyMetadata(e.Channel, e.Payload); } } - private async Task NotifyAsync() + async Task NotifyStateAsync() { try { - Logger.LogDebug("Processing notification..."); + Logger.LogDebug("Processing state notification..."); #region update state RawState newState; @@ -143,21 +169,21 @@ private async Task NotifyAsync() if (attempts++ > 32) { // should never get here, but to make sure there are no infinite loops... - Logger.LogCritical("Failed to reach state equal to trigger's payload '{1}'", Changes[^1].Hash); + Logger.LogCritical("Failed to reach state equal to trigger's payload '{1}'", StateChanges[^1].Hash); return; } newState = await State.LoadAsync(); - lock (Changes) + lock (StateChanges) { - if (newState.Hash != Changes[^1].Hash) + if (newState.Hash != StateChanges[^1].Hash) { Logger.LogDebug("Lost sync. Retrying..."); continue; } - changes = Changes.ToList(); - Changes.Clear(); + changes = StateChanges.ToList(); + StateChanges.Clear(); break; } } @@ -182,24 +208,62 @@ private async Task NotifyAsync() _ = Home.UpdateAsync(); #endregion - Logger.LogDebug("Notification processed"); + Logger.LogDebug("State notification processed"); - lock (Changes) + lock (StateChanges) { - if (Changes.Count > 0) + if (StateChanges.Count > 0) { - Logger.LogDebug("Handle pending notification"); - Notifying = NotifyAsync(); // async run + Logger.LogDebug("Handle pending state notification"); + StateNotifying = NotifyStateAsync(); // async run } else { - Notifying = Task.CompletedTask; + StateNotifying = Task.CompletedTask; } } } catch (Exception ex) { - Logger.LogError("Failed to process notification: {1}", ex.Message); + Logger.LogError("Failed to process state notification: {1}", ex.Message); + } + } + + void NotifyMetadata(string channel, string payload) + { + try + { + Logger.LogDebug("Processing metadata notification..."); + + var divider = payload.IndexOf(':'); + if (divider == -1) + { + Logger.LogError("Invalid metadata notification payload"); + return; + } + + var key = payload[0..divider]; + var value = payload[(divider + 1)..]; + if (value == string.Empty) value = null; + + switch (channel) + { + case AccountMetadataTrigger: + Accounts.UpdateMetadata(key, value); + Aliases.UpdateMetadata(key, value); + break; + case SoftwareMetadataTrigger: + Software.UpdateMetadata(key); + break; + default: + break; + } + + Logger.LogDebug("Metadata notification processed"); + } + catch (Exception ex) + { + Logger.LogError("Failed to process metadata notification: {1}", ex.Message); } } } diff --git a/Tzkt.Data/Migrations/20210714125513_Triggers.cs b/Tzkt.Data/Migrations/20210714125513_Triggers.cs index 0fd36a733..7bd468cc9 100644 --- a/Tzkt.Data/Migrations/20210714125513_Triggers.cs +++ b/Tzkt.Data/Migrations/20210714125513_Triggers.cs @@ -56,24 +56,28 @@ FOR EACH ROW CREATE TRIGGER account_metadata_changed AFTER UPDATE OF ""Metadata"" ON ""Accounts"" FOR EACH ROW + WHEN (OLD.""Metadata"" IS DISTINCT FROM NEW.""Metadata"") EXECUTE PROCEDURE notify_account_metadata();"); migrationBuilder.Sql(@" CREATE TRIGGER proposal_metadata_changed AFTER UPDATE OF ""Metadata"" ON ""Proposals"" FOR EACH ROW + WHEN (OLD.""Metadata"" IS DISTINCT FROM NEW.""Metadata"") EXECUTE PROCEDURE notify_proposal_metadata();"); migrationBuilder.Sql(@" CREATE TRIGGER protocol_metadata_changed AFTER UPDATE OF ""Metadata"" ON ""Protocols"" FOR EACH ROW + WHEN (OLD.""Metadata"" IS DISTINCT FROM NEW.""Metadata"") EXECUTE PROCEDURE notify_protocol_metadata();"); migrationBuilder.Sql(@" CREATE TRIGGER software_metadata_changed AFTER UPDATE OF ""Metadata"" ON ""Software"" FOR EACH ROW + WHEN (OLD.""Metadata"" IS DISTINCT FROM NEW.""Metadata"") EXECUTE PROCEDURE notify_software_metadata();"); } diff --git a/Tzkt.Sync/Program.cs b/Tzkt.Sync/Program.cs index 4125136be..15894ea95 100644 --- a/Tzkt.Sync/Program.cs +++ b/Tzkt.Sync/Program.cs @@ -27,7 +27,7 @@ public static IHostBuilder CreateHostBuilder(string[] args) => appConfig.Sources.Clear(); appConfig.AddJsonFile("appsettings.json", true); appConfig.AddJsonFile($"appsettings.{host.HostingEnvironment.EnvironmentName}.json", true); - appConfig.AddEnvironmentVariables("TZKT_"); + appConfig.AddEnvironmentVariables(); appConfig.AddEnvironmentVariables("TZKT_SYNC_"); appConfig.AddCommandLine(args); })