Skip to content

Commit

Permalink
Complete received C2D message within semaphore - Device Client sample (
Browse files Browse the repository at this point in the history
…#3198)

* Update MessageReceiveSample.cs

* Update MessageReceiveSample.cs

* added cancellation token to semaphore.

* cr changes - added try/finally blocks
  • Loading branch information
tmahmood-microsoft authored Mar 29, 2023
1 parent e06fcf5 commit 6f2033b
Showing 1 changed file with 33 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public class MessageReceiveSample
private readonly TimeSpan? _maxRunTime;
private readonly DeviceClient _deviceClient;
private readonly TransportType _transportType;
private SemaphoreSlim _processMessageSemaphore;
private CancellationTokenSource _cts;

public MessageReceiveSample(DeviceClient deviceClient, TransportType transportType, TimeSpan? maxRunTime)
{
Expand All @@ -32,21 +34,25 @@ public MessageReceiveSample(DeviceClient deviceClient, TransportType transportTy

public async Task RunSampleAsync()
{
using var cts = _maxRunTime.HasValue
_cts = _maxRunTime.HasValue
? new CancellationTokenSource(_maxRunTime.Value)
: new CancellationTokenSource();

// Semaphore to ensure sequential execution of received C2D messages.
_processMessageSemaphore = new SemaphoreSlim(1, 1);

Console.CancelKeyPress += (sender, eventArgs) =>
{
eventArgs.Cancel = true;
cts.Cancel();
_cts.Cancel();
Console.WriteLine("Sample execution cancellation requested; will exit.");
};
Console.WriteLine($"{DateTime.Now}> Press Control+C at any time to quit the sample.");

// First receive C2D messages using the polling ReceiveAsync().
Console.WriteLine($"\n{DateTime.Now}> Device waiting for C2D messages from the hub...");
Console.WriteLine($"{DateTime.Now}> Use the Azure Portal IoT hub blade or Azure IoT Explorer to send a message to this device.");
await ReceiveC2dMessagesPollingAndCompleteAsync(cts.Token);
await ReceiveC2dMessagesPollingAndCompleteAsync(_cts.Token);

if (_transportType != TransportType.Http1)
{
Expand All @@ -61,15 +67,19 @@ public async Task RunSampleAsync()

try
{
await Task.Delay(-1, cts.Token);
await Task.Delay(-1, _cts.Token);
}
catch (TaskCanceledException)
{
// Done running.
}

// Now unsubscibe from receiving the callback.
await _deviceClient.SetReceiveMessageHandlerAsync(null, null);
finally
{
// Now unsubscibe from receiving the callback.
await _deviceClient.SetReceiveMessageHandlerAsync(null, null);
_processMessageSemaphore.Dispose();
_cts.Dispose();
}
}
}

Expand Down Expand Up @@ -104,13 +114,21 @@ private async Task ReceiveC2dMessagesPollingAndCompleteAsync(CancellationToken c

private async Task OnC2dMessageReceivedAsync(Message receivedMessage, object _)
{
Console.WriteLine($"{DateTime.Now}> C2D message callback - message received with Id={receivedMessage.MessageId}.");
PrintMessage(receivedMessage);

await _deviceClient.CompleteAsync(receivedMessage);
Console.WriteLine($"{DateTime.Now}> Completed C2D message with Id={receivedMessage.MessageId}.");
try
{
// Use a semaphore to ensure C2D messages are processed in order - a requirement of IoT hub.
await _processMessageSemaphore.WaitAsync(_cts.Token).ConfigureAwait(false);
Console.WriteLine($"{DateTime.Now}> C2D message callback - message received with Id={receivedMessage.MessageId}.");
PrintMessage(receivedMessage);

receivedMessage.Dispose();
await _deviceClient.CompleteAsync(receivedMessage);
Console.WriteLine($"{DateTime.Now}> Completed C2D message with Id={receivedMessage.MessageId}.");
}
finally
{
receivedMessage.Dispose();
_processMessageSemaphore.Release();
}
}

private static void PrintMessage(Message receivedMessage)
Expand All @@ -123,8 +141,8 @@ private static void PrintMessage(Message receivedMessage)
{
formattedMessage.AppendLine($"\tProperty: key={prop.Key}, value={prop.Value}");
}
// System properties can be accessed using their respective accessors, e.g. DeliveryCount.
formattedMessage.AppendLine($"\tDelivery count: {receivedMessage.DeliveryCount}");
// System properties can be accessed using their respective accessors, e.g. ContentType.
formattedMessage.AppendLine($"\tContent type: {receivedMessage.ContentType}");

Console.WriteLine($"{DateTime.Now}> {formattedMessage}");
}
Expand Down

0 comments on commit 6f2033b

Please sign in to comment.