Skip to content

Commit

Permalink
Expose async events
Browse files Browse the repository at this point in the history
  • Loading branch information
JonasMH committed Dec 12, 2024
1 parent 3f213ff commit 442b4de
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 15 deletions.
6 changes: 3 additions & 3 deletions src/ToMqttNet/IMqttConnectionService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ namespace ToMqttNet;

public interface IMqttConnectionService
{
public event EventHandler<MqttApplicationMessageReceivedEventArgs>? OnApplicationMessageReceived;
public event EventHandler<EventArgs>? OnConnect;
public event EventHandler<EventArgs>? OnDisconnect;
event Func<MqttApplicationMessageReceivedEventArgs, Task>? OnApplicationMessageReceivedAsync;
event Func<MqttClientConnectedEventArgs, Task>? OnConnectAsync;
event Func<MqttClientDisconnectedEventArgs, Task>? OnDisconnectAsync;

MqttConnectionOptions MqttOptions { get; }

Expand Down
30 changes: 18 additions & 12 deletions src/ToMqttNet/MqttConnectionService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ public class MqttConnectionService(
public MqttConnectionOptions MqttOptions { get; } = mqttOptions.Value;
private readonly IManagedMqttClient _mqttClient = managedMqttClient;

public event EventHandler<MqttApplicationMessageReceivedEventArgs>? OnApplicationMessageReceived;
public event EventHandler<EventArgs>? OnConnect;
public event EventHandler<EventArgs>? OnDisconnect;
public event Func<MqttApplicationMessageReceivedEventArgs, Task>? OnApplicationMessageReceivedAsync;
public event Func<MqttClientConnectedEventArgs, Task>? OnConnectAsync;
public event Func<MqttClientDisconnectedEventArgs, Task>? OnDisconnectAsync;

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
Expand Down Expand Up @@ -78,34 +78,40 @@ await _mqttClient.EnqueueAsync(
.WithRetainFlag()
.Build());

OnConnect?.Invoke(this, new EventArgs());
if (OnConnectAsync != null)
{
await OnConnectAsync(evnt);
}
};

_mqttClient.DisconnectedAsync += (evnt) =>
_mqttClient.DisconnectedAsync += async (evnt) =>
{
_logger.LogInformation(evnt.Exception, "Disconnected from mqtt: {reason}", evnt.Reason);
OnDisconnect?.Invoke(this, new EventArgs());
return Task.CompletedTask;
if (OnDisconnectAsync != null)
{
await OnDisconnectAsync(evnt);
}
};

_mqttClient.ApplicationMessageReceivedAsync += (evnt) =>
_mqttClient.ApplicationMessageReceivedAsync += async (evnt) =>
{
try
{
if (_logger.IsEnabled(LogLevel.Trace))
{
_logger.LogTrace("{topic}: {message}", evnt.ApplicationMessage.Topic, evnt.ApplicationMessage.ConvertPayloadToString());
}
OnApplicationMessageReceived?.Invoke(this, evnt);
if (OnApplicationMessageReceivedAsync != null)
{
await OnApplicationMessageReceivedAsync(evnt);
}
}
catch (Exception e)
{
_logger.LogWarning(e, "Failed to handle message to topic {topic}", evnt.ApplicationMessage.Topic);
_counters.IncreaseMessagesHandled(success: false);
return Task.CompletedTask;
}
_counters.IncreaseMessagesHandled(success: true);
return Task.CompletedTask;
};

_logger.LogInformation("Starting mqttclient");
Expand All @@ -128,7 +134,7 @@ public Task UnsubscribeAsync(params string[] topics)
return _mqttClient!.UnsubscribeAsync(topics);
}

private IMqttClientChannelOptions BuildChannelOptions()
private MqttClientTcpOptions BuildChannelOptions()
{
var tcpOptions = new MqttClientTcpOptions
{
Expand Down

0 comments on commit 442b4de

Please sign in to comment.