diff --git a/src/ToMqttNet/IMqttConnectionService.cs b/src/ToMqttNet/IMqttConnectionService.cs index 70f29f8..556a705 100644 --- a/src/ToMqttNet/IMqttConnectionService.cs +++ b/src/ToMqttNet/IMqttConnectionService.cs @@ -6,9 +6,9 @@ namespace ToMqttNet; public interface IMqttConnectionService { - public event EventHandler? OnApplicationMessageReceived; - public event EventHandler? OnConnect; - public event EventHandler? OnDisconnect; + event Func? OnApplicationMessageReceivedAsync; + event Func? OnConnectAsync; + event Func? OnDisconnectAsync; MqttConnectionOptions MqttOptions { get; } diff --git a/src/ToMqttNet/MqttConnectionService.cs b/src/ToMqttNet/MqttConnectionService.cs index 23a84a0..368614a 100644 --- a/src/ToMqttNet/MqttConnectionService.cs +++ b/src/ToMqttNet/MqttConnectionService.cs @@ -27,9 +27,9 @@ public class MqttConnectionService( public MqttConnectionOptions MqttOptions { get; } = mqttOptions.Value; private readonly IManagedMqttClient _mqttClient = managedMqttClient; - public event EventHandler? OnApplicationMessageReceived; - public event EventHandler? OnConnect; - public event EventHandler? OnDisconnect; + public event Func? OnApplicationMessageReceivedAsync; + public event Func? OnConnectAsync; + public event Func? OnDisconnectAsync; protected override async Task ExecuteAsync(CancellationToken stoppingToken) { @@ -78,17 +78,22 @@ await _mqttClient.EnqueueAsync( .WithRetainFlag() .Build()); - OnConnect?.Invoke(this, new EventArgs()); + if (OnConnectAsync != null) + { + await OnConnectAsync(evnt); + } }; - _mqttClient.DisconnectedAsync += (evnt) => + _mqttClient.DisconnectedAsync += async (evnt) => { _logger.LogInformation(evnt.Exception, "Disconnected from mqtt: {reason}", evnt.Reason); - OnDisconnect?.Invoke(this, new EventArgs()); - return Task.CompletedTask; + if (OnDisconnectAsync != null) + { + await OnDisconnectAsync(evnt); + } }; - _mqttClient.ApplicationMessageReceivedAsync += (evnt) => + _mqttClient.ApplicationMessageReceivedAsync += async (evnt) => { try { @@ -96,16 +101,17 @@ await _mqttClient.EnqueueAsync( { _logger.LogTrace("{topic}: {message}", evnt.ApplicationMessage.Topic, evnt.ApplicationMessage.ConvertPayloadToString()); } - OnApplicationMessageReceived?.Invoke(this, evnt); + if (OnApplicationMessageReceivedAsync != null) + { + await OnApplicationMessageReceivedAsync(evnt); + } } catch (Exception e) { _logger.LogWarning(e, "Failed to handle message to topic {topic}", evnt.ApplicationMessage.Topic); _counters.IncreaseMessagesHandled(success: false); - return Task.CompletedTask; } _counters.IncreaseMessagesHandled(success: true); - return Task.CompletedTask; }; _logger.LogInformation("Starting mqttclient"); @@ -128,7 +134,7 @@ public Task UnsubscribeAsync(params string[] topics) return _mqttClient!.UnsubscribeAsync(topics); } - private IMqttClientChannelOptions BuildChannelOptions() + private MqttClientTcpOptions BuildChannelOptions() { var tcpOptions = new MqttClientTcpOptions {