diff --git a/Program.cs b/Program.cs index bb2b311..58f5053 100644 --- a/Program.cs +++ b/Program.cs @@ -18,9 +18,6 @@ * You should have received a copy of the GNU Affero General Public License * along with Xibo. If not, see . */ -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; using xibo_xmr; IHost host = Host.CreateDefaultBuilder(args) diff --git a/Worker.cs b/Worker.cs index 28071dd..f2d21aa 100644 --- a/Worker.cs +++ b/Worker.cs @@ -19,11 +19,8 @@ * along with Xibo. If not, see . */ using System.Collections.Concurrent; -using System.Linq.Expressions; using System.Text; using ConcurrentPriorityQueue.Core; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using NetMQ; using NetMQ.Sockets; @@ -38,6 +35,8 @@ public class Worker : BackgroundService private readonly ConcurrentPriorityQueue _queue; + private readonly BlockingCollection _relayQueue; + private int _sentCount = 0; public Worker(ILogger logger, IOptions settings) @@ -45,6 +44,7 @@ public Worker(ILogger logger, IOptions settings) _logger = logger; _settings = settings.Value; _queue = new(); + _relayQueue = new(); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) @@ -69,12 +69,23 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) // messages arriving from the CMS and adds them to the queue with the right QoS // 2. Set up a Publisher (PUB) socket bound to `pubOn` which processes the queue // 3. Set up a periodic timer which sends a heartbeat message (H) every 30 seconds + // 4. Handle relay if set // ------- - await Task.WhenAll( + List tasks = new() + { Task.Factory.StartNew(() => { new NetMQRuntime().Run(stoppingToken, ResponderAsync(stoppingToken)); }, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default), Task.Factory.StartNew(() => { new NetMQRuntime().Run(stoppingToken, PublisherAsync(stoppingToken)); }, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default), Task.Factory.StartNew(() => { new NetMQRuntime().Run(stoppingToken, HeartbeatAsync(stoppingToken)); }, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default) - ); + }; + + // Do we relay? + if (!string.IsNullOrEmpty(_settings.relayOn)) + { + tasks.Add(Task.Factory.StartNew(() => { new NetMQRuntime().Run(stoppingToken, RelayAsync(stoppingToken)); }, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default)); + } + + // Await all + await Task.WhenAll(tasks); // Must call clean up at the end NetMQConfig.Cleanup(); @@ -111,6 +122,16 @@ async Task ResponderAsync(CancellationToken stoppingToken) } else { + // Relay + if (!string.IsNullOrEmpty(_settings.relayOn)) + { + bool relayResult = _relayQueue.TryAdd(message); + if (!relayResult) + { + _logger.LogError("Failed to add message to the relay queue"); + } + } + // Decode the message try { @@ -235,6 +256,27 @@ async Task HeartbeatAsync(CancellationToken stoppingToken) } } + async Task RelayAsync(CancellationToken stoppingToken) + { + using var relaySocket = new RequestSocket(_settings.relayOn); + + while (!stoppingToken.IsCancellationRequested) + { + await Task.Run(() => { + bool result = _relayQueue.TryTake(out string message, -1, stoppingToken); + if (result && !string.IsNullOrEmpty(message)) + { + _logger.LogDebug("Relay message"); + bool sendResult = relaySocket.TrySendFrame(message); + if (!sendResult) + { + _logger.LogError("Unable to relay message"); + } + } + }, stoppingToken); + } + } + private static Dictionary NewStats() { return new() diff --git a/ZmqSettings.cs b/ZmqSettings.cs index 793ede6..a5ad6aa 100644 --- a/ZmqSettings.cs +++ b/ZmqSettings.cs @@ -28,4 +28,5 @@ public class ZmqSettings public bool ipv6RespSupport { get; set; } public bool ipv6PubSupport { get; set; } public int? pubSendTimeoutMs { get; set; } + public string? relayOn {get; set; } } diff --git a/appsettings.json b/appsettings.json index 1f8a0fa..889a0cc 100644 --- a/appsettings.json +++ b/appsettings.json @@ -1,7 +1,7 @@ { "Logging": { "LogLevel": { - "Default": "Debug", + "Default": "Information", "Microsoft.Hosting.Lifetime": "Information" }, "Console": { @@ -20,6 +20,7 @@ "queueSize": 10, "ipv6RespSupport": false, "ipv6PubSupport": false, - "pubSendTimeoutMs": 500 + "pubSendTimeoutMs": 500, + "relayOn": "" } }