Skip to content

Commit

Permalink
Listen metadata DB triggers
Browse files Browse the repository at this point in the history
  • Loading branch information
Groxan committed Aug 2, 2021
1 parent 29578bd commit c5e6a12
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 88 deletions.
19 changes: 2 additions & 17 deletions Tzkt.Api/Controllers/MetadataController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,7 @@ public async Task<ActionResult<IEnumerable<ObjectMetadata>>> UpdateAccountMetada
if (metadata.Any(x => !Regex.IsMatch(x.Key, "^(tz1|tz2|tz3|KT1)[0-9A-Za-z]{33}$")))
return BadRequest("Invalid account address");

var res = await Metadata.UpdateAccountMetadata(metadata);

#region update cached metadata
var addresses = res.Select(x => x.Key).ToList();
await Accounts.ReloadMetadata(addresses);
await Aliases.Reload(addresses);
#endregion

return Ok(res);
return Ok(await Metadata.UpdateAccountMetadata(metadata));
}
catch (JsonException ex)
{
Expand Down Expand Up @@ -221,14 +213,7 @@ public async Task<ActionResult<IEnumerable<ObjectMetadata>>> UpdateSoftwareMetad
if (metadata.Any(x => !Regex.IsMatch(x.Key, "^[0-9a-f]{8}$")))
return BadRequest("Invalid software short hash");

var res = await Metadata.UpdateSoftwareMetadata(metadata);

#region update cached metadata
var hashes = res.Select(x => x.Key).ToList();
await Software.Reload(hashes);
#endregion

return Ok(res);
return Ok(await Metadata.UpdateSoftwareMetadata(metadata));
}
catch (JsonException ex)
{
Expand Down
2 changes: 1 addition & 1 deletion Tzkt.Api/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static IHostBuilder CreateHostBuilder(string[] args) =>
appConfig.Sources.Clear();
appConfig.AddJsonFile("appsettings.json", true);
appConfig.AddJsonFile($"appsettings.{host.HostingEnvironment.EnvironmentName}.json", true);
appConfig.AddEnvironmentVariables("TZKT_");
appConfig.AddEnvironmentVariables();
appConfig.AddEnvironmentVariables("TZKT_API_");
appConfig.AddCommandLine(args);
})
Expand Down
14 changes: 3 additions & 11 deletions Tzkt.Api/Services/Cache/Accounts/AccountsCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,10 @@ public async Task<Alias> GetAliasAsync(int id)
return (await GetAsync(id)).Info;
}

public async Task ReloadMetadata(List<string> addresses)
public void UpdateMetadata(string address, string json)
{
using var db = GetConnection();
var rows = await db.QueryAsync(
@"SELECT ""Id"", ""Metadata"" FROM ""Accounts"" WHERE ""Address"" = ANY(@addresses::character(36)[])",
new { addresses });

foreach (var row in rows)
{
var user = await GetAsync((int)row.Id);
user.Metadata = AccountMetadata.Parse((string)row.Metadata);
}
if (TryGetSafe(address, out var account))
account.Metadata = AccountMetadata.Parse(json);
}
#endregion

Expand Down
35 changes: 18 additions & 17 deletions Tzkt.Api/Services/Cache/Aliases/AliasesCache.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -27,29 +28,29 @@ public AliasesCache(IConfiguration config, ILogger<AliasesCache> logger) : base(
logger.LogInformation("Loaded {1} aliases", Aliases.Count);
}

public async Task Reload(List<string> addresses)
public void UpdateMetadata(string address, string json)
{
using var db = GetConnection();
var rows = await db.QueryAsync<Alias>(
$@"{SelectQuery} WHERE ""Address"" = ANY(@addresses::character(36)[])",
new { addresses });
string name = null;
if (json != null)
{
using var doc = JsonDocument.Parse(json);
if (doc.RootElement.TryGetProperty("alias", out var alias))
name = alias.GetString();
}

lock (this)
{
foreach (var row in rows)
if (name != null)
{
if (row.Name != null)
{
var alias = Aliases.FirstOrDefault(x => x.Address == row.Address);
if (alias == null)
Aliases.Add(row);
else
alias.Name = row.Name;
}
var alias = Aliases.FirstOrDefault(x => x.Address == address);
if (alias == null)
Aliases.Add(new Alias { Address = address, Name = name });
else
{
Aliases.RemoveAll(x => x.Address == row.Address);
}
alias.Name = name;
}
else
{
Aliases.RemoveAll(x => x.Address == address);
}
}
}
Expand Down
10 changes: 4 additions & 6 deletions Tzkt.Api/Services/Cache/Software/SoftwareCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,13 @@ public SoftwareCache(TimeCache time, IConfiguration config, ILogger<SoftwareCach
Logger.LogDebug("Loaded {0} software", Software.Count);
}

public async Task Reload(List<string> hashes)
public void UpdateMetadata(string shortHash)
{
using var db = GetConnection();
var rows = await db.QueryAsync($@"{SelectQuery} WHERE ""ShortHash"" = ANY(@hashes::character(8)[])", new { hashes });

lock (this)
{
foreach (var row in rows)
Software[(int)row.Id] = Parse(row);
using var db = GetConnection();
var row = db.QueryFirstOrDefault($@"{SelectQuery} WHERE ""ShortHash"" = @shortHash::character(8)", new { shortHash });
if (row != null) Software[(int)row.Id] = Parse(row);
}
}

Expand Down
134 changes: 99 additions & 35 deletions Tzkt.Api/Services/Sync/StateListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,37 @@ namespace Tzkt.Api.Services.Sync
{
public class StateListener : BackgroundService
{
#region static
const string StateTrigger = "state_changed";
const string AccountMetadataTrigger = "account_metadata_changed";
const string SoftwareMetadataTrigger = "software_metadata_changed";
//const string ProposalMetadataTrigger = "proposal_metadata_changed";
//const string ProtocolMetadataTrigger = "protocol_metadata_changed";
#endregion

readonly string ConnectionString;

readonly StateCache State;
readonly BigMapsCache BigMaps;
readonly AccountsCache Accounts;
readonly AliasesCache Aliases;
readonly ProtocolsCache Protocols;
readonly SoftwareCache Software;
readonly QuotesCache Quotes;
readonly TimeCache Times;
readonly HomeService Home;
readonly IEnumerable<IHubProcessor> Processors;
readonly ILogger Logger;

Task Notifying = Task.CompletedTask;
readonly List<(int Level, string Hash)> Changes = new List<(int, string)>(4);
Task StateNotifying = Task.CompletedTask;
readonly List<(int Level, string Hash)> StateChanges = new(4);

public StateListener(
StateCache state,
BigMapsCache bigMaps,
AccountsCache accounts,
AliasesCache aliases,
SoftwareCache software,
ProtocolsCache protocols,
QuotesCache quotes,
TimeCache times,
Expand All @@ -49,7 +61,9 @@ public StateListener(
State = state;
BigMaps = bigMaps;
Accounts = accounts;
Aliases = aliases;
Protocols = protocols;
Software = software;
Quotes = quotes;
Times = times;
Home = home;
Expand All @@ -61,10 +75,10 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
try
{
Logger.LogInformation("State listener started");
Logger.LogInformation("DB listener started");

using var db = new NpgsqlConnection(ConnectionString);
db.Notification += OnStateChanged;
db.Notification += OnNotification;

while (!cancellationToken.IsCancellationRequested)
{
Expand All @@ -73,35 +87,40 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken)
if (db.State != ConnectionState.Open)
{
await db.OpenAsync(cancellationToken);
await db.ExecuteAsync("LISTEN state_changed;");
Logger.LogInformation("State listener connected");
await db.ExecuteAsync($@"
LISTEN {StateTrigger};
LISTEN {AccountMetadataTrigger};
LISTEN {SoftwareMetadataTrigger};");
//LISTEN { ProposalMetadataTrigger};
//LISTEN { ProtocolMetadataTrigger};
Logger.LogInformation("Db listener connected");
}
await db.WaitAsync(cancellationToken);
}
catch (TaskCanceledException) when (cancellationToken.IsCancellationRequested) { break; }
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { break; }
catch (Exception ex)
{
Logger.LogError("State listener disconnected: {0}", ex.Message);
Logger.LogError("DB listener disconnected: {0}", ex.Message);
await Task.Delay(1000, cancellationToken);
}
}

db.Notification -= OnStateChanged;
db.Notification -= OnNotification;
}
catch (TaskCanceledException) when (cancellationToken.IsCancellationRequested) { }
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { }
catch (Exception ex)
{
Logger.LogCritical($"State listener crashed: {ex.Message}");
Logger.LogCritical($"DB listener crashed: {ex.Message}");
}
finally
{
Logger.LogWarning("State listener stopped");
Logger.LogWarning("DB listener stopped");
}
}

private void OnStateChanged(object sender, NpgsqlNotificationEventArgs e)
private void OnNotification(object sender, NpgsqlNotificationEventArgs e)
{
Logger.LogDebug("Received {1} notification with payload {2}", e.Channel, e.Payload);

Expand All @@ -111,27 +130,34 @@ private void OnStateChanged(object sender, NpgsqlNotificationEventArgs e)
return;
}

var data = e.Payload.Split(':', StringSplitOptions.RemoveEmptyEntries);
if (data.Length != 2 || !int.TryParse(data[0], out var level) || data[1].Length != 51)
if (e.Channel == StateTrigger)
{
Logger.LogCritical("Invalid trigger payload {1}", e.Payload);
return;
}
var data = e.Payload.Split(':', StringSplitOptions.RemoveEmptyEntries);
if (data.Length != 2 || !int.TryParse(data[0], out var level) || data[1].Length != 51)
{
Logger.LogCritical("Invalid trigger payload {1}", e.Payload);
return;
}

lock (Changes)
{
Changes.Add((level, data[1]));
lock (StateChanges)
{
StateChanges.Add((level, data[1]));

if (Notifying.IsCompleted)
Notifying = NotifyAsync(); // async run
if (StateNotifying.IsCompleted)
StateNotifying = NotifyStateAsync(); // async run
}
}
else
{
NotifyMetadata(e.Channel, e.Payload);
}
}

private async Task NotifyAsync()
async Task NotifyStateAsync()
{
try
{
Logger.LogDebug("Processing notification...");
Logger.LogDebug("Processing state notification...");

#region update state
RawState newState;
Expand All @@ -143,21 +169,21 @@ private async Task NotifyAsync()
if (attempts++ > 32)
{
// should never get here, but to make sure there are no infinite loops...
Logger.LogCritical("Failed to reach state equal to trigger's payload '{1}'", Changes[^1].Hash);
Logger.LogCritical("Failed to reach state equal to trigger's payload '{1}'", StateChanges[^1].Hash);
return;
}

newState = await State.LoadAsync();
lock (Changes)
lock (StateChanges)
{
if (newState.Hash != Changes[^1].Hash)
if (newState.Hash != StateChanges[^1].Hash)
{
Logger.LogDebug("Lost sync. Retrying...");
continue;
}

changes = Changes.ToList();
Changes.Clear();
changes = StateChanges.ToList();
StateChanges.Clear();
break;
}
}
Expand All @@ -182,24 +208,62 @@ private async Task NotifyAsync()
_ = Home.UpdateAsync();
#endregion

Logger.LogDebug("Notification processed");
Logger.LogDebug("State notification processed");

lock (Changes)
lock (StateChanges)
{
if (Changes.Count > 0)
if (StateChanges.Count > 0)
{
Logger.LogDebug("Handle pending notification");
Notifying = NotifyAsync(); // async run
Logger.LogDebug("Handle pending state notification");
StateNotifying = NotifyStateAsync(); // async run
}
else
{
Notifying = Task.CompletedTask;
StateNotifying = Task.CompletedTask;
}
}
}
catch (Exception ex)
{
Logger.LogError("Failed to process notification: {1}", ex.Message);
Logger.LogError("Failed to process state notification: {1}", ex.Message);
}
}

void NotifyMetadata(string channel, string payload)
{
try
{
Logger.LogDebug("Processing metadata notification...");

var divider = payload.IndexOf(':');
if (divider == -1)
{
Logger.LogError("Invalid metadata notification payload");
return;
}

var key = payload[0..divider];
var value = payload[(divider + 1)..];
if (value == string.Empty) value = null;

switch (channel)
{
case AccountMetadataTrigger:
Accounts.UpdateMetadata(key, value);
Aliases.UpdateMetadata(key, value);
break;
case SoftwareMetadataTrigger:
Software.UpdateMetadata(key);
break;
default:
break;
}

Logger.LogDebug("Metadata notification processed");
}
catch (Exception ex)
{
Logger.LogError("Failed to process metadata notification: {1}", ex.Message);
}
}
}
Expand Down
Loading

0 comments on commit c5e6a12

Please sign in to comment.