diff --git a/iothub/device/samples/getting started/MessageReceiveSample/MessageReceiveSample.cs b/iothub/device/samples/getting started/MessageReceiveSample/MessageReceiveSample.cs index eb5e3fa25d..5aa9a21db8 100644 --- a/iothub/device/samples/getting started/MessageReceiveSample/MessageReceiveSample.cs +++ b/iothub/device/samples/getting started/MessageReceiveSample/MessageReceiveSample.cs @@ -22,6 +22,8 @@ public class MessageReceiveSample private readonly TimeSpan? _maxRunTime; private readonly DeviceClient _deviceClient; private readonly TransportType _transportType; + private SemaphoreSlim _processMessageSemaphore; + private CancellationTokenSource _cts; public MessageReceiveSample(DeviceClient deviceClient, TransportType transportType, TimeSpan? maxRunTime) { @@ -32,13 +34,17 @@ public MessageReceiveSample(DeviceClient deviceClient, TransportType transportTy public async Task RunSampleAsync() { - using var cts = _maxRunTime.HasValue + _cts = _maxRunTime.HasValue ? new CancellationTokenSource(_maxRunTime.Value) : new CancellationTokenSource(); + + // Semaphore to ensure sequential execution of received C2D messages. + _processMessageSemaphore = new SemaphoreSlim(1, 1); + Console.CancelKeyPress += (sender, eventArgs) => { eventArgs.Cancel = true; - cts.Cancel(); + _cts.Cancel(); Console.WriteLine("Sample execution cancellation requested; will exit."); }; Console.WriteLine($"{DateTime.Now}> Press Control+C at any time to quit the sample."); @@ -46,7 +52,7 @@ public async Task RunSampleAsync() // First receive C2D messages using the polling ReceiveAsync(). Console.WriteLine($"\n{DateTime.Now}> Device waiting for C2D messages from the hub..."); Console.WriteLine($"{DateTime.Now}> Use the Azure Portal IoT hub blade or Azure IoT Explorer to send a message to this device."); - await ReceiveC2dMessagesPollingAndCompleteAsync(cts.Token); + await ReceiveC2dMessagesPollingAndCompleteAsync(_cts.Token); if (_transportType != TransportType.Http1) { @@ -61,15 +67,19 @@ public async Task RunSampleAsync() try { - await Task.Delay(-1, cts.Token); + await Task.Delay(-1, _cts.Token); } catch (TaskCanceledException) { // Done running. } - - // Now unsubscibe from receiving the callback. - await _deviceClient.SetReceiveMessageHandlerAsync(null, null); + finally + { + // Now unsubscibe from receiving the callback. + await _deviceClient.SetReceiveMessageHandlerAsync(null, null); + _processMessageSemaphore.Dispose(); + _cts.Dispose(); + } } } @@ -104,13 +114,21 @@ private async Task ReceiveC2dMessagesPollingAndCompleteAsync(CancellationToken c private async Task OnC2dMessageReceivedAsync(Message receivedMessage, object _) { - Console.WriteLine($"{DateTime.Now}> C2D message callback - message received with Id={receivedMessage.MessageId}."); - PrintMessage(receivedMessage); - - await _deviceClient.CompleteAsync(receivedMessage); - Console.WriteLine($"{DateTime.Now}> Completed C2D message with Id={receivedMessage.MessageId}."); + try + { + // Use a semaphore to ensure C2D messages are processed in order - a requirement of IoT hub. + await _processMessageSemaphore.WaitAsync(_cts.Token).ConfigureAwait(false); + Console.WriteLine($"{DateTime.Now}> C2D message callback - message received with Id={receivedMessage.MessageId}."); + PrintMessage(receivedMessage); - receivedMessage.Dispose(); + await _deviceClient.CompleteAsync(receivedMessage); + Console.WriteLine($"{DateTime.Now}> Completed C2D message with Id={receivedMessage.MessageId}."); + } + finally + { + receivedMessage.Dispose(); + _processMessageSemaphore.Release(); + } } private static void PrintMessage(Message receivedMessage) @@ -123,8 +141,8 @@ private static void PrintMessage(Message receivedMessage) { formattedMessage.AppendLine($"\tProperty: key={prop.Key}, value={prop.Value}"); } - // System properties can be accessed using their respective accessors, e.g. DeliveryCount. - formattedMessage.AppendLine($"\tDelivery count: {receivedMessage.DeliveryCount}"); + // System properties can be accessed using their respective accessors, e.g. ContentType. + formattedMessage.AppendLine($"\tContent type: {receivedMessage.ContentType}"); Console.WriteLine($"{DateTime.Now}> {formattedMessage}"); }