diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/storage/MessageStore.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/storage/MessageStore.cs index d43f2b14168..f65ce7e7e62 100644 --- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/storage/MessageStore.cs +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/storage/MessageStore.cs @@ -36,7 +36,6 @@ public class MessageStore : IMessageStore readonly CleanupProcessor messagesCleaner; readonly ICheckpointStore checkpointStore; readonly IStoreProvider storeProvider; - readonly long messageCount = 0; TimeSpan timeToLive; public MessageStore(IStoreProvider storeProvider, ICheckpointStore checkpointStore, TimeSpan timeToLive, bool checkEntireQueueOnCleanup = false) @@ -114,7 +113,7 @@ await this.messageEntityStore.PutOrUpdate( using (MetricsV0.SequentialStoreLatency(endpointId)) { long offset = await sequentialStore.Append(new MessageRef(edgeMessageId, timeToLive)); - Events.MessageAdded(offset, edgeMessageId, endpointId, this.messageCount); + Events.MessageAdded(offset, edgeMessageId, endpointId); return new MessageWithOffset(message, offset); } } @@ -261,6 +260,7 @@ private async Task CleanQueue(bool checkEntireQueueOnCleanup) { foreach (KeyValuePair> endpointSequentialStore in this.messageStore.endpointSequentialStores) { + var messageQueueId = endpointSequentialStore.Key; try { if (this.cancellationTokenSource.IsCancellationRequested) @@ -268,21 +268,30 @@ private async Task CleanQueue(bool checkEntireQueueOnCleanup) return; } - Events.CleanupTaskStarted(endpointSequentialStore.Key); - CheckpointData checkpointData = await this.messageStore.checkpointStore.GetCheckpointDataAsync(endpointSequentialStore.Key, CancellationToken.None); + var (endpointId, priority) = MessageQueueIdHelper.ParseMessageQueueId(messageQueueId); + Events.CleanupTaskStarted(messageQueueId); + CheckpointData checkpointData = await this.messageStore.checkpointStore.GetCheckpointDataAsync(messageQueueId, CancellationToken.None); ISequentialStore sequentialStore = endpointSequentialStore.Value; - Events.CleanupCheckpointState(endpointSequentialStore.Key, checkpointData); + Events.CleanupCheckpointState(messageQueueId, checkpointData); int cleanupEntityStoreCount = 0; + // If checkEntireQueueOnCleanup is set to false, we only peek the head, message counts is tailOffset-headOffset+1 + // otherwise count while iterating over the queue. + var headOffset = 0L; + var tailOffset = sequentialStore.GetTailOffset(CancellationToken.None); + var messageCount = 0L; + async Task DeleteMessageCallback(long offset, MessageRef messageRef) { - var enqueuedTime = DateTime.UtcNow - messageRef.TimeStamp; - if (checkpointData.Offset < offset && - enqueuedTime < messageRef.TimeToLive) + var expiry = messageRef.TimeStamp + messageRef.TimeToLive; + if (offset > checkpointData.Offset && expiry > DateTime.UtcNow) { + // message is not sent and not expired, increase message counts + messageCount++; return false; } + headOffset = Math.Max(headOffset, offset); bool deleteMessage = false; // Decrement ref count. @@ -305,9 +314,9 @@ async Task DeleteMessageCallback(long offset, MessageRef messageRef) if (deleteMessage) { - if (checkpointData.Offset < offset && enqueuedTime >= messageRef.TimeToLive) + if (offset > checkpointData.Offset && expiry <= DateTime.UtcNow) { - this.expiredCounter.Increment(1, new[] { "ttl_expiry", message?.Message.GetSenderId(), message?.Message.GetOutput(), bool.TrueString } ); + this.expiredCounter.Increment(1, new[] { "ttl_expiry", message?.Message.GetSenderId(), message?.Message.GetOutput(), bool.TrueString }); } await this.messageStore.messageEntityStore.Remove(messageRef.EdgeMessageId); @@ -340,7 +349,7 @@ async Task DeleteMessageCallback(long offset, MessageRef messageRef) } } - offset = offset + CleanupBatchSize; + offset += CleanupBatchSize; } while (batch.Any()); } @@ -350,16 +359,20 @@ async Task DeleteMessageCallback(long offset, MessageRef messageRef) { cleanupCount++; } + + messageCount = tailOffset - headOffset + 1; } + // update Metrics for message counts + Checkpointer.Metrics.QueueLength.Set(messageCount, new[] { endpointId, priority.ToString() }); totalCleanupCount += cleanupCount; totalCleanupStoreCount += cleanupEntityStoreCount; - Events.CleanupCompleted(endpointSequentialStore.Key, cleanupCount, cleanupEntityStoreCount, totalCleanupCount, totalCleanupStoreCount); + Events.CleanupCompleted(messageQueueId, cleanupCount, cleanupEntityStoreCount, totalCleanupCount, totalCleanupStoreCount); await Task.Delay(MinCleanupSleepTime, this.cancellationTokenSource.Token); } catch (Exception ex) { - Events.ErrorCleaningMessagesForEndpoint(ex, endpointSequentialStore.Key); + Events.ErrorCleaningMessagesForEndpoint(ex, messageQueueId); } } @@ -471,12 +484,12 @@ internal static void CleanupCheckpointState(string endpointId, CheckpointData ch Log.LogDebug((int)EventIds.CleanupCheckpointState, Invariant($"Checkpoint for endpoint {endpointId} is {checkpointData.Offset}")); } - internal static void MessageAdded(long offset, string edgeMessageId, string endpointId, long messageCount) + internal static void MessageAdded(long offset, string edgeMessageId, string endpointId) { // Print only after every 1000th message to avoid flooding logs. if (offset % 1000 == 0) { - Log.LogDebug((int)EventIds.MessageAdded, Invariant($"Added message {edgeMessageId} to store for {endpointId} at offset {offset} - messageCount = {messageCount}")); + Log.LogDebug((int)EventIds.MessageAdded, Invariant($"Added message {edgeMessageId} to store for {endpointId} at offset {offset}.")); } } } diff --git a/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/MessageQueueIdHelper.cs b/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/MessageQueueIdHelper.cs new file mode 100644 index 00000000000..9e96a9d651d --- /dev/null +++ b/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/MessageQueueIdHelper.cs @@ -0,0 +1,36 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.Azure.Devices.Routing.Core +{ + using System; + + public static class MessageQueueIdHelper + { + public const string MessageQueueIdDelimiter = "_Pri"; + + // The actual ID for the underlying store is of string format: _Pri + // We need to maintain backwards compatibility for existing sequential stores that don't have the "_Pri" suffix. + // We use the default priority (2,000,000,000) for this, which means the store ID is just the endpoint ID. + public static string GetMessageQueueId(string endpointId, uint priority) => priority == RouteFactory.DefaultPriority ? endpointId : $"{endpointId}{MessageQueueIdDelimiter}{priority}"; + + public static (string, uint) ParseMessageQueueId(string messageQueueId) + { + var idx = messageQueueId.LastIndexOf(MessageQueueIdDelimiter); + if (idx < 0) + { + return (messageQueueId, RouteFactory.DefaultPriority); + } + + var endpointId = messageQueueId.Substring(0, idx); + var priority = messageQueueId.Substring(idx + MessageQueueIdDelimiter.Length); + if (uint.TryParse(priority, out var priorityNum)) + { + return (endpointId, priorityNum); + } + else + { + return (messageQueueId, RouteFactory.DefaultPriority); + } + } + } +} diff --git a/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/checkpointers/Checkpointer.cs b/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/checkpointers/Checkpointer.cs index d9f5d47fe7d..200b3f5d270 100644 --- a/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/checkpointers/Checkpointer.cs +++ b/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/checkpointers/Checkpointer.cs @@ -196,9 +196,9 @@ static string GetContextString(Checkpointer checkpointer) } } - static class Metrics + public static class Metrics { - static readonly IMetricsGauge QueueLength = EdgeMetrics.Instance.CreateGauge( + public static readonly IMetricsGauge QueueLength = EdgeMetrics.Instance.CreateGauge( "queue_length", "Number of messages pending to be processed for the endpoint", new List { "endpoint", "priority", MetricsConstants.MsTelemetry }); diff --git a/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/endpoints/StoringAsyncEndpointExecutor.cs b/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/endpoints/StoringAsyncEndpointExecutor.cs index dad0fdbeb43..04f5e2b2313 100644 --- a/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/endpoints/StoringAsyncEndpointExecutor.cs +++ b/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/endpoints/StoringAsyncEndpointExecutor.cs @@ -1,153 +1,153 @@ -// Copyright (c) Microsoft. All rights reserved. -namespace Microsoft.Azure.Devices.Routing.Core.Endpoints -{ - using System; +// Copyright (c) Microsoft. All rights reserved. +namespace Microsoft.Azure.Devices.Routing.Core.Endpoints +{ + using System; using System.Collections.Generic; using System.Collections.Immutable; - using System.Linq; - using System.Threading; - using System.Threading.Tasks; - using App.Metrics; - using App.Metrics.Counter; - using App.Metrics.Timer; - using Microsoft.Azure.Devices.Edge.Util; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using App.Metrics; + using App.Metrics.Counter; + using App.Metrics.Timer; + using Microsoft.Azure.Devices.Edge.Util; using Microsoft.Azure.Devices.Edge.Util.Concurrency; using Microsoft.Azure.Devices.Edge.Util.Metrics; - using Microsoft.Azure.Devices.Routing.Core.Endpoints.StateMachine; - using Microsoft.Extensions.Logging; - using Nito.AsyncEx; - using static System.FormattableString; - using AsyncLock = Microsoft.Azure.Devices.Edge.Util.Concurrency.AsyncLock; - - public class StoringAsyncEndpointExecutor : IEndpointExecutor - { - readonly AtomicBoolean closed = new AtomicBoolean(); - readonly IMessageStore messageStore; - readonly Task sendMessageTask; - readonly AsyncManualResetEvent hasMessagesInQueue = new AsyncManualResetEvent(true); - readonly AsyncEndpointExecutorOptions options; - readonly CancellationTokenSource cts = new CancellationTokenSource(); - readonly ICheckpointerFactory checkpointerFactory; - readonly EndpointExecutorConfig config; - AtomicReference> prioritiesToFsms; - EndpointExecutorFsm lastUsedFsm; - - public StoringAsyncEndpointExecutor( - Endpoint endpoint, - ICheckpointerFactory checkpointerFactory, - EndpointExecutorConfig config, - AsyncEndpointExecutorOptions options, - IMessageStore messageStore) - { - this.Endpoint = Preconditions.CheckNotNull(endpoint); - this.checkpointerFactory = Preconditions.CheckNotNull(checkpointerFactory); - this.config = Preconditions.CheckNotNull(config); - this.options = Preconditions.CheckNotNull(options); - this.messageStore = messageStore; - this.sendMessageTask = Task.Run(this.SendMessagesPump); + using Microsoft.Azure.Devices.Routing.Core.Endpoints.StateMachine; + using Microsoft.Extensions.Logging; + using Nito.AsyncEx; + using static System.FormattableString; + using AsyncLock = Microsoft.Azure.Devices.Edge.Util.Concurrency.AsyncLock; + + public class StoringAsyncEndpointExecutor : IEndpointExecutor + { + readonly AtomicBoolean closed = new AtomicBoolean(); + readonly IMessageStore messageStore; + readonly Task sendMessageTask; + readonly AsyncManualResetEvent hasMessagesInQueue = new AsyncManualResetEvent(true); + readonly AsyncEndpointExecutorOptions options; + readonly CancellationTokenSource cts = new CancellationTokenSource(); + readonly ICheckpointerFactory checkpointerFactory; + readonly EndpointExecutorConfig config; + AtomicReference> prioritiesToFsms; + EndpointExecutorFsm lastUsedFsm; + + public StoringAsyncEndpointExecutor( + Endpoint endpoint, + ICheckpointerFactory checkpointerFactory, + EndpointExecutorConfig config, + AsyncEndpointExecutorOptions options, + IMessageStore messageStore) + { + this.Endpoint = Preconditions.CheckNotNull(endpoint); + this.checkpointerFactory = Preconditions.CheckNotNull(checkpointerFactory); + this.config = Preconditions.CheckNotNull(config); + this.options = Preconditions.CheckNotNull(options); + this.messageStore = messageStore; + this.sendMessageTask = Task.Run(this.SendMessagesPump); this.prioritiesToFsms = new AtomicReference>(ImmutableDictionary.Empty); - } - - public Endpoint Endpoint { get; } - - public EndpointExecutorStatus Status => this.lastUsedFsm.Status; - - public async Task Invoke(IMessage message, uint priority, uint timeToLiveSecs) - { - try - { + } + + public Endpoint Endpoint { get; } + + public EndpointExecutorStatus Status => this.lastUsedFsm.Status; + + public async Task Invoke(IMessage message, uint priority, uint timeToLiveSecs) + { + try + { if (this.closed) { throw new InvalidOperationException($"Endpoint executor for endpoint {this.Endpoint} is closed."); - } - - using (MetricsV0.StoreLatency(this.Endpoint.Id)) - { - // Get the checkpointer corresponding to the queue for this priority - ImmutableDictionary snapshot = this.prioritiesToFsms; + } + + using (MetricsV0.StoreLatency(this.Endpoint.Id)) + { + // Get the checkpointer corresponding to the queue for this priority + ImmutableDictionary snapshot = this.prioritiesToFsms; ICheckpointer checkpointer = snapshot[priority].Checkpointer; - IMessage storedMessage = await this.messageStore.Add(GetMessageQueueId(this.Endpoint.Id, priority), message, timeToLiveSecs); - checkpointer.Propose(storedMessage); - Events.AddMessageSuccess(this, storedMessage.Offset, priority, timeToLiveSecs); - } - - this.hasMessagesInQueue.Set(); - MetricsV0.StoredCountIncrement(this.Endpoint.Id, priority); - } - catch (Exception ex) - { - Routing.UserMetricLogger.LogIngressFailureMetric(1, this.Endpoint.IotHubName, message, "storage_failure"); - Events.AddMessageFailure(this, ex); - throw; - } - } - - public void Dispose() => this.Dispose(true); - - public async Task CloseAsync() - { - Events.Close(this); - - try - { - if (!this.closed.GetAndSet(true)) - { - this.cts.Cancel(); + IMessage storedMessage = await this.messageStore.Add(MessageQueueIdHelper.GetMessageQueueId(this.Endpoint.Id, priority), message, timeToLiveSecs); + checkpointer.Propose(storedMessage); + Events.AddMessageSuccess(this, storedMessage.Offset, priority, timeToLiveSecs); + } + + this.hasMessagesInQueue.Set(); + MetricsV0.StoredCountIncrement(this.Endpoint.Id, priority); + } + catch (Exception ex) + { + Routing.UserMetricLogger.LogIngressFailureMetric(1, this.Endpoint.IotHubName, message, "storage_failure"); + Events.AddMessageFailure(this, ex); + throw; + } + } + + public void Dispose() => this.Dispose(true); + + public async Task CloseAsync() + { + Events.Close(this); + + try + { + if (!this.closed.GetAndSet(true)) + { + this.cts.Cancel(); // Require to close all FSMs to complete currently executing command if any in order to unblock sendMessageTask. ImmutableDictionary snapshot = this.prioritiesToFsms; foreach (EndpointExecutorFsm fsm in snapshot.Values) { await fsm.CloseAsync(); - } - - await (this.messageStore?.RemoveEndpoint(this.Endpoint.Id) ?? Task.CompletedTask); - await (this.sendMessageTask ?? Task.CompletedTask); - } - - Events.CloseSuccess(this); - } - catch (Exception ex) - { - Events.CloseFailure(this, ex); - throw; - } - } - - public async Task SetEndpoint(Endpoint newEndpoint, IList priorities) - { - Events.SetEndpoint(this); - - try - { - Preconditions.CheckNotNull(newEndpoint); - Preconditions.CheckArgument(newEndpoint.Id.Equals(this.Endpoint.Id), $"Can only set new endpoint with same id. Given {newEndpoint.Id}, expected {this.Endpoint.Id}"); - Preconditions.CheckNotNull(priorities); - Preconditions.CheckArgument(priorities.Count != 0); - - if (this.closed) - { - throw new InvalidOperationException($"Endpoint executor for endpoint {this.Endpoint} is closed."); - } - - await this.UpdatePriorities(priorities, Option.Some(newEndpoint)); - Events.SetEndpointSuccess(this); - } - catch (Exception ex) - { - Events.SetEndpointFailure(this, ex); - throw; - } - } - + } + + await (this.messageStore?.RemoveEndpoint(this.Endpoint.Id) ?? Task.CompletedTask); + await (this.sendMessageTask ?? Task.CompletedTask); + } + + Events.CloseSuccess(this); + } + catch (Exception ex) + { + Events.CloseFailure(this, ex); + throw; + } + } + + public async Task SetEndpoint(Endpoint newEndpoint, IList priorities) + { + Events.SetEndpoint(this); + + try + { + Preconditions.CheckNotNull(newEndpoint); + Preconditions.CheckArgument(newEndpoint.Id.Equals(this.Endpoint.Id), $"Can only set new endpoint with same id. Given {newEndpoint.Id}, expected {this.Endpoint.Id}"); + Preconditions.CheckNotNull(priorities); + Preconditions.CheckArgument(priorities.Count != 0); + + if (this.closed) + { + throw new InvalidOperationException($"Endpoint executor for endpoint {this.Endpoint} is closed."); + } + + await this.UpdatePriorities(priorities, Option.Some(newEndpoint)); + Events.SetEndpointSuccess(this); + } + catch (Exception ex) + { + Events.SetEndpointFailure(this, ex); + throw; + } + } + public async Task UpdatePriorities(IList priorities, Option newEndpoint) { Preconditions.CheckArgument(priorities.Count > 0); Events.UpdatePriorities(this, priorities); - if (this.closed) - { - throw new InvalidOperationException($"Endpoint executor for endpoint {this.Endpoint} is closed."); + if (this.closed) + { + throw new InvalidOperationException($"Endpoint executor for endpoint {this.Endpoint} is closed."); } // Update priorities by merging the new ones with the existing. @@ -159,7 +159,7 @@ public async Task UpdatePriorities(IList priorities, Option newE { if (!updatedSnapshot.ContainsKey(priority)) { - string id = GetMessageQueueId(this.Endpoint.Id, priority); + string id = MessageQueueIdHelper.GetMessageQueueId(this.Endpoint.Id, priority); // Create a message queue in the store for every priority we have await this.messageStore.AddEndpoint(id); @@ -191,31 +191,12 @@ public async Task UpdatePriorities(IList priorities, Option newE { Events.UpdatePrioritiesFailure(this, updatedSnapshot.Keys.ToList()); } - } - - static string GetMessageQueueId(string endpointId, uint priority) - { - if (priority == RouteFactory.DefaultPriority) - { - // We need to maintain backwards compatibility - // for existing sequential stores that don't - // have the "_Pri" suffix. We use the default - // priority (2,000,000,000) for this, which means - // the store ID is just the endpoint ID. - return endpointId; - } - else + } + + async Task SendMessagesPump() + { + try { - // The actual ID for the underlying store is of string format: - // _Pri - return endpointId + "_Pri" + priority.ToString(); - } - } - - async Task SendMessagesPump() - { - try - { Events.StartSendMessagesPump(this); int batchSize = this.options.BatchSize * this.Endpoint.FanOutFactor; @@ -224,7 +205,7 @@ async Task SendMessagesPump() var messageProviderPairs = new Dictionary(); // Outer loop to maintain the message pump until the executor shuts down - while (!this.cts.IsCancellationRequested) + while (!this.cts.IsCancellationRequested) { try { @@ -254,7 +235,7 @@ async Task SendMessagesPump() { // Create and cache a new pair for the message provider // so we can reuse it every loop - pair.Item1 = this.messageStore.GetMessageIterator(GetMessageQueueId(this.Endpoint.Id, priority), fsm.Checkpointer.Offset + 1); + pair.Item1 = this.messageStore.GetMessageIterator(MessageQueueIdHelper.GetMessageQueueId(this.Endpoint.Id, priority), fsm.Checkpointer.Offset + 1); pair.Item2 = new StoreMessagesProvider(pair.Item1, batchSize); messageProviderPairs.Add(priority, pair); } @@ -301,166 +282,166 @@ async Task SendMessagesPump() catch (Exception ex) { Events.SendMessagesPumpFailure(this, ex); - } - } - - async Task ProcessMessages(IMessage[] messages, EndpointExecutorFsm fsm) - { - SendMessage command = Commands.SendMessage(messages); - await fsm.RunAsync(command); - await command.Completion; - } - - void Dispose(bool disposing) - { - if (disposing) - { - this.cts.Dispose(); - ImmutableDictionary snapshot = this.prioritiesToFsms; - this.prioritiesToFsms.CompareAndSet(snapshot, ImmutableDictionary.Empty); + } + } + + async Task ProcessMessages(IMessage[] messages, EndpointExecutorFsm fsm) + { + SendMessage command = Commands.SendMessage(messages); + await fsm.RunAsync(command); + await command.Completion; + } + + void Dispose(bool disposing) + { + if (disposing) + { + this.cts.Dispose(); + ImmutableDictionary snapshot = this.prioritiesToFsms; + this.prioritiesToFsms.CompareAndSet(snapshot, ImmutableDictionary.Empty); foreach (KeyValuePair entry in snapshot) { var fsm = entry.Value; fsm.Dispose(); - } - } - } - - // This class is used to prefetch messages from the store before they are needed. - // As soon as the previous batch is consumed, the next batch is fetched. - // A pump is started as soon as the object is created, and it keeps the messages list populated. - internal class StoreMessagesProvider - { - readonly IMessageIterator iterator; - readonly int batchSize; - readonly AsyncLock stateLock = new AsyncLock(); - Task> getMessagesTask; - - public StoreMessagesProvider(IMessageIterator iterator, int batchSize) - { - this.iterator = iterator; - this.batchSize = batchSize; - this.getMessagesTask = Task.Run(this.GetMessagesFromStore); - } - - public async Task GetMessages() - { - using (await this.stateLock.LockAsync()) - { - var messages = await this.getMessagesTask; - if (messages.Count == 0) - { - messages = await this.GetMessagesFromStore(); - } - else - { - this.getMessagesTask = Task.Run(this.GetMessagesFromStore); - } - - return messages.ToArray(); - } - } - - async Task> GetMessagesFromStore() - { - var messagesList = new List(); - while (messagesList.Count < this.batchSize) - { - int curBatchSize = this.batchSize - messagesList.Count; - IList messages = (await this.iterator.GetNext(curBatchSize)).ToList(); - if (!messages.Any()) - { - break; - } - - messagesList.AddRange(messages); - } - - return messagesList; - } - } - - static class Events - { - const int IdStart = Routing.EventIds.StoringAsyncEndpointExecutor; - static readonly ILogger Log = Routing.LoggerFactory.CreateLogger(); - - enum EventIds - { - AddMessageSuccess = IdStart, - StartSendMessagesPump, - SendMessagesError, - ProcessMessagesSuccess, - SendMessagesPumpFailure, - ProcessingMessages, - SetEndpoint, - SetEndpointSuccess, - SetEndpointFailure, + } + } + } + + // This class is used to prefetch messages from the store before they are needed. + // As soon as the previous batch is consumed, the next batch is fetched. + // A pump is started as soon as the object is created, and it keeps the messages list populated. + internal class StoreMessagesProvider + { + readonly IMessageIterator iterator; + readonly int batchSize; + readonly AsyncLock stateLock = new AsyncLock(); + Task> getMessagesTask; + + public StoreMessagesProvider(IMessageIterator iterator, int batchSize) + { + this.iterator = iterator; + this.batchSize = batchSize; + this.getMessagesTask = Task.Run(this.GetMessagesFromStore); + } + + public async Task GetMessages() + { + using (await this.stateLock.LockAsync()) + { + var messages = await this.getMessagesTask; + if (messages.Count == 0) + { + messages = await this.GetMessagesFromStore(); + } + else + { + this.getMessagesTask = Task.Run(this.GetMessagesFromStore); + } + + return messages.ToArray(); + } + } + + async Task> GetMessagesFromStore() + { + var messagesList = new List(); + while (messagesList.Count < this.batchSize) + { + int curBatchSize = this.batchSize - messagesList.Count; + IList messages = (await this.iterator.GetNext(curBatchSize)).ToList(); + if (!messages.Any()) + { + break; + } + + messagesList.AddRange(messages); + } + + return messagesList; + } + } + + static class Events + { + const int IdStart = Routing.EventIds.StoringAsyncEndpointExecutor; + static readonly ILogger Log = Routing.LoggerFactory.CreateLogger(); + + enum EventIds + { + AddMessageSuccess = IdStart, + StartSendMessagesPump, + SendMessagesError, + ProcessMessagesSuccess, + SendMessagesPumpFailure, + ProcessingMessages, + SetEndpoint, + SetEndpointSuccess, + SetEndpointFailure, UpdatePriorities, - UpdatePrioritiesSuccess, - UpdatePrioritiesFailure, - Close, - CloseSuccess, - CloseFailure, - ErrorInPopulatePump - } - + UpdatePrioritiesSuccess, + UpdatePrioritiesFailure, + Close, + CloseSuccess, + CloseFailure, + ErrorInPopulatePump + } + public static void AddMessageSuccess(StoringAsyncEndpointExecutor executor, long offset, uint priority, uint timeToLiveSecs) { Log.LogDebug((int)EventIds.AddMessageSuccess, $"[AddMessageSuccess] Successfully added message to store for EndpointId: {executor.Endpoint.Id}, Message offset: {offset}, Priority: {priority}, TTL: {timeToLiveSecs}"); - } - - public static void AddMessageFailure(StoringAsyncEndpointExecutor executor, Exception ex) - { - Log.LogError((int)EventIds.AddMessageSuccess, ex, $"[AddMessageFailure] Error adding added message to store for EndpointId: {executor.Endpoint.Id}"); - } - - public static void StartSendMessagesPump(StoringAsyncEndpointExecutor executor) - { - Log.LogInformation((int)EventIds.StartSendMessagesPump, $"[StartSendMessagesPump] Starting pump to send stored messages to EndpointId: {executor.Endpoint.Id}."); - } - - public static void SendMessagesError(StoringAsyncEndpointExecutor executor, Exception ex) - { - Log.LogWarning((int)EventIds.SendMessagesError, ex, $"[SendMessageError] Error sending message batch to endpoint to IPL head for EndpointId: {executor.Endpoint.Id}."); - } - + } + + public static void AddMessageFailure(StoringAsyncEndpointExecutor executor, Exception ex) + { + Log.LogError((int)EventIds.AddMessageSuccess, ex, $"[AddMessageFailure] Error adding added message to store for EndpointId: {executor.Endpoint.Id}"); + } + + public static void StartSendMessagesPump(StoringAsyncEndpointExecutor executor) + { + Log.LogInformation((int)EventIds.StartSendMessagesPump, $"[StartSendMessagesPump] Starting pump to send stored messages to EndpointId: {executor.Endpoint.Id}."); + } + + public static void SendMessagesError(StoringAsyncEndpointExecutor executor, Exception ex) + { + Log.LogWarning((int)EventIds.SendMessagesError, ex, $"[SendMessageError] Error sending message batch to endpoint to IPL head for EndpointId: {executor.Endpoint.Id}."); + } + public static void SendMessagesSuccess(StoringAsyncEndpointExecutor executor, ICollection messages, uint priority) { if (messages.Count > 0) { Log.LogDebug((int)EventIds.ProcessMessagesSuccess, Invariant($"[ProcessMessagesSuccess] Successfully processed {messages.Count} messages for EndpointId: {executor.Endpoint.Id}, Priority: {priority}.")); } - } - + } + public static void ProcessingMessages(StoringAsyncEndpointExecutor executor, ICollection messages, uint priority) { if (messages.Count > 0) { Log.LogDebug((int)EventIds.ProcessingMessages, Invariant($"[ProcessingMessages] Processing {messages.Count} messages for EndpointId: {executor.Endpoint.Id}, Priority: {priority}")); } - } - - public static void SendMessagesPumpFailure(StoringAsyncEndpointExecutor executor, Exception ex) - { - Log.LogCritical((int)EventIds.SendMessagesPumpFailure, ex, $"[SendMessagesPumpFailure] Unable to start pump to send stored messages for EndpointId: {executor.Endpoint.Id}."); - } - - public static void SetEndpoint(StoringAsyncEndpointExecutor executor) - { - Log.LogInformation((int)EventIds.SetEndpoint, "[SetEndpoint] Set endpoint began. EndpointId: {0}, EndpointName: {1}", executor.Endpoint.Id, executor.Endpoint.Name); - } - - public static void SetEndpointSuccess(StoringAsyncEndpointExecutor executor) - { - Log.LogInformation((int)EventIds.SetEndpointSuccess, "[SetEndpointSuccess] Set endpoint succeeded. EndpointId: {0}, EndpointName: {1}", executor.Endpoint.Id, executor.Endpoint.Name); - } - - public static void SetEndpointFailure(StoringAsyncEndpointExecutor executor, Exception ex) - { - Log.LogError((int)EventIds.SetEndpointFailure, ex, "[SetEndpointFailure] Set endpoint failed. EndpointId: {0}, EndpointName: {1}", executor.Endpoint.Id, executor.Endpoint.Name); - } - + } + + public static void SendMessagesPumpFailure(StoringAsyncEndpointExecutor executor, Exception ex) + { + Log.LogCritical((int)EventIds.SendMessagesPumpFailure, ex, $"[SendMessagesPumpFailure] Unable to start pump to send stored messages for EndpointId: {executor.Endpoint.Id}."); + } + + public static void SetEndpoint(StoringAsyncEndpointExecutor executor) + { + Log.LogInformation((int)EventIds.SetEndpoint, "[SetEndpoint] Set endpoint began. EndpointId: {0}, EndpointName: {1}", executor.Endpoint.Id, executor.Endpoint.Name); + } + + public static void SetEndpointSuccess(StoringAsyncEndpointExecutor executor) + { + Log.LogInformation((int)EventIds.SetEndpointSuccess, "[SetEndpointSuccess] Set endpoint succeeded. EndpointId: {0}, EndpointName: {1}", executor.Endpoint.Id, executor.Endpoint.Name); + } + + public static void SetEndpointFailure(StoringAsyncEndpointExecutor executor, Exception ex) + { + Log.LogError((int)EventIds.SetEndpointFailure, ex, "[SetEndpointFailure] Set endpoint failed. EndpointId: {0}, EndpointName: {1}", executor.Endpoint.Id, executor.Endpoint.Name); + } + public static void UpdatePriorities(StoringAsyncEndpointExecutor executor, IList priorities) { Log.LogInformation((int)EventIds.UpdatePriorities, $"[UpdatePriorities] Update priorities begin for EndpointId: {executor.Endpoint.Id}, EndpointName: {executor.Endpoint.Name}, Incoming Priorities: {priorities}"); @@ -469,71 +450,71 @@ public static void UpdatePriorities(StoringAsyncEndpointExecutor executor, IList public static void UpdatePrioritiesSuccess(StoringAsyncEndpointExecutor executor, IList priorities) { Log.LogInformation((int)EventIds.UpdatePrioritiesSuccess, $"[UpdatePrioritiesSuccess] Update priorities succeeded EndpointId: {executor.Endpoint.Id}, EndpointName: {executor.Endpoint.Name}, New Priorities: {priorities}"); - } - + } + public static void UpdatePrioritiesFailure(StoringAsyncEndpointExecutor executor, IList priorities) { Log.LogError((int)EventIds.UpdatePrioritiesFailure, $"[UpdatePrioritiesSuccess] Update priorities failed EndpointId: {executor.Endpoint.Id}, EndpointName: {executor.Endpoint.Name}, New Priorities: {priorities}"); - } - - public static void Close(StoringAsyncEndpointExecutor executor) - { - Log.LogInformation((int)EventIds.Close, "[Close] Close began. EndpointId: {0}, EndpointName: {1}", executor.Endpoint.Id, executor.Endpoint.Name); - } - - public static void CloseSuccess(StoringAsyncEndpointExecutor executor) - { - Log.LogInformation((int)EventIds.CloseSuccess, "[CloseSuccess] Close succeeded. EndpointId: {0}, EndpointName: {1}", executor.Endpoint.Id, executor.Endpoint.Name); - } - - public static void CloseFailure(StoringAsyncEndpointExecutor executor, Exception ex) - { - Log.LogError((int)EventIds.CloseFailure, ex, "[CloseFailure] Close failed. EndpointId: {0}, EndpointName: {1}", executor.Endpoint.Id, executor.Endpoint.Name); - } - - public static void ErrorInPopulatePump(Exception ex) - { - Log.LogWarning((int)EventIds.ErrorInPopulatePump, ex, "Error in populate messages pump"); - } - } - - static class MetricsV0 - { - static readonly CounterOptions EndpointMessageStoredCountOptions = new CounterOptions - { - Name = "EndpointMessageStoredCount", - MeasurementUnit = Unit.Events - }; - - static readonly CounterOptions EndpointMessageDrainedCountOptions = new CounterOptions - { - Name = "EndpointMessageDrainedCount", - MeasurementUnit = Unit.Events - }; - - static readonly TimerOptions EndpointMessageLatencyOptions = new TimerOptions - { - Name = "EndpointMessageStoredLatencyMs", - MeasurementUnit = Unit.None, - DurationUnit = TimeUnit.Milliseconds, - RateUnit = TimeUnit.Seconds - }; - + } + + public static void Close(StoringAsyncEndpointExecutor executor) + { + Log.LogInformation((int)EventIds.Close, "[Close] Close began. EndpointId: {0}, EndpointName: {1}", executor.Endpoint.Id, executor.Endpoint.Name); + } + + public static void CloseSuccess(StoringAsyncEndpointExecutor executor) + { + Log.LogInformation((int)EventIds.CloseSuccess, "[CloseSuccess] Close succeeded. EndpointId: {0}, EndpointName: {1}", executor.Endpoint.Id, executor.Endpoint.Name); + } + + public static void CloseFailure(StoringAsyncEndpointExecutor executor, Exception ex) + { + Log.LogError((int)EventIds.CloseFailure, ex, "[CloseFailure] Close failed. EndpointId: {0}, EndpointName: {1}", executor.Endpoint.Id, executor.Endpoint.Name); + } + + public static void ErrorInPopulatePump(Exception ex) + { + Log.LogWarning((int)EventIds.ErrorInPopulatePump, ex, "Error in populate messages pump"); + } + } + + static class MetricsV0 + { + static readonly CounterOptions EndpointMessageStoredCountOptions = new CounterOptions + { + Name = "EndpointMessageStoredCount", + MeasurementUnit = Unit.Events + }; + + static readonly CounterOptions EndpointMessageDrainedCountOptions = new CounterOptions + { + Name = "EndpointMessageDrainedCount", + MeasurementUnit = Unit.Events + }; + + static readonly TimerOptions EndpointMessageLatencyOptions = new TimerOptions + { + Name = "EndpointMessageStoredLatencyMs", + MeasurementUnit = Unit.None, + DurationUnit = TimeUnit.Milliseconds, + RateUnit = TimeUnit.Seconds + }; + public static void StoredCountIncrement(string identity, uint priority) => Edge.Util.Metrics.MetricsV0.CountIncrement(GetTagsWithPriority(identity, priority), EndpointMessageStoredCountOptions, 1); - public static void DrainedCountIncrement(string identity, long amount, uint priority) => Edge.Util.Metrics.MetricsV0.CountIncrement(GetTagsWithPriority(identity, priority), EndpointMessageDrainedCountOptions, amount); - - public static IDisposable StoreLatency(string identity) => Edge.Util.Metrics.MetricsV0.Latency(GetTags(identity), EndpointMessageLatencyOptions); - - internal static MetricTags GetTags(string id) - { - return new MetricTags("EndpointId", id); - } - + public static void DrainedCountIncrement(string identity, long amount, uint priority) => Edge.Util.Metrics.MetricsV0.CountIncrement(GetTagsWithPriority(identity, priority), EndpointMessageDrainedCountOptions, amount); + + public static IDisposable StoreLatency(string identity) => Edge.Util.Metrics.MetricsV0.Latency(GetTags(identity), EndpointMessageLatencyOptions); + + internal static MetricTags GetTags(string id) + { + return new MetricTags("EndpointId", id); + } + internal static MetricTags GetTagsWithPriority(string id, uint priority) { return new MetricTags(new string[] { "EndpointId", "priority" }, new string[] { id, priority.ToString() }); - } - } - } -} + } + } + } +} diff --git a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/DeviceMessageHandlerTest.cs b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/DeviceMessageHandlerTest.cs index 04fdba4ea6a..f90c32a1544 100644 --- a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/DeviceMessageHandlerTest.cs +++ b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/DeviceMessageHandlerTest.cs @@ -323,6 +323,7 @@ public async Task MessageCompletionMismatchedResponseTest() await deviceMessageHandler.ProcessMessageFeedbackAsync(Guid.NewGuid().ToString(), FeedbackStatus.Complete); + await Task.Delay(TimeSpan.FromSeconds(1)); Assert.False(sendMessageTask.IsCompleted); } @@ -389,6 +390,8 @@ public async Task X509DeviceCanSendMessageTest() // send message to x509 device Task sendMessageTask = deviceMessageHandler.SendMessageAsync(message, "input1"); await deviceMessageHandler.ProcessMessageFeedbackAsync(lockToken, FeedbackStatus.Complete); + + await Task.Delay(TimeSpan.FromSeconds(1)); Assert.True(messageReceived); Assert.True(sendMessageTask.IsCompletedSuccessfully); } diff --git a/edge-hub/test/Microsoft.Azure.Devices.Routing.Core.Test/MessageQueueIdHelperTest.cs b/edge-hub/test/Microsoft.Azure.Devices.Routing.Core.Test/MessageQueueIdHelperTest.cs new file mode 100644 index 00000000000..679f64acaa4 --- /dev/null +++ b/edge-hub/test/Microsoft.Azure.Devices.Routing.Core.Test/MessageQueueIdHelperTest.cs @@ -0,0 +1,102 @@ +// Copyright (c) Microsoft. All rights reserved. +namespace Microsoft.Azure.Devices.Routing.Core.Test +{ + using System; + using Microsoft.Azure.Devices.Edge.Util.Test.Common; + using Xunit; + + [Unit] + public class MessageQueueIdHelperTest + { + [Fact] + public void TestGetMessageQueueIdWithDefaultPriority() + { + // Message queue with default priority should remain as endpoint id + var endpointId = Guid.NewGuid().ToString(); + var messageQueueId = MessageQueueIdHelper.GetMessageQueueId(endpointId, Core.RouteFactory.DefaultPriority); + Assert.Equal(endpointId, messageQueueId); + } + + [Fact] + public void TestGetMessageQueueId() + { + // Message queue with non-default priority should combain endpoint id and priority with MessageQueueIdHelper.MessageQueueIdDelimiter + var endpointId = "abc"; + uint priority = 1234; + var expectMessageQueueId = "abc_Pri1234"; + + var messageQueueId = MessageQueueIdHelper.GetMessageQueueId(endpointId, priority); + + Assert.Equal(expectMessageQueueId, messageQueueId); + } + + [Fact] + public void TestGetMessageQueueIdWithNonDefaultPriority() + { + // Message queue with non-default priority should combain endpoint id and priority with MessageQueueIdHelper.MessageQueueIdDelimiter + var endpointId = Guid.NewGuid().ToString(); + uint priority = 1234; + var expectMessageQueueId = $"{endpointId}{MessageQueueIdHelper.MessageQueueIdDelimiter}{priority}"; + + var messageQueueId = MessageQueueIdHelper.GetMessageQueueId(endpointId, priority); + + Assert.Equal(expectMessageQueueId, messageQueueId); + } + + [Fact] + public void TestParseMessageQueueIdWithoutPriority() + { + // Message queue id without priority should return default priority + var expectedEndpointId = Guid.NewGuid().ToString(); + var messageQueueId = expectedEndpointId; + var expectedPriority = Core.RouteFactory.DefaultPriority; + + var (endpointId, priority) = MessageQueueIdHelper.ParseMessageQueueId(messageQueueId); + + Assert.Equal(expectedEndpointId, endpointId); + Assert.Equal(expectedPriority, priority); + } + + [Fact] + public void TestParseMessageQueueIdWithPriority() + { + // Message queue id with priority should return its priority + var expectedEndpointId = Guid.NewGuid().ToString(); + uint expectedPriority = 1234; + var messageQueueId = $"{expectedEndpointId}{MessageQueueIdHelper.MessageQueueIdDelimiter}{expectedPriority}"; + + var (endpointId, priority) = MessageQueueIdHelper.ParseMessageQueueId(messageQueueId); + + Assert.Equal(expectedEndpointId, endpointId); + Assert.Equal(expectedPriority, priority); + } + + [Fact] + public void TestParseMessageQueueIdWithPriorityAndDelimiterInEndpointId() + { + // Endpoint id with delimiter should be ignored + var expectedEndpointId = $"{Guid.NewGuid()}{MessageQueueIdHelper.MessageQueueIdDelimiter}4321"; + uint expectedPriority = 1234; + var messageQueueId = $"{expectedEndpointId}{MessageQueueIdHelper.MessageQueueIdDelimiter}{expectedPriority}"; + + var (endpointId, priority) = MessageQueueIdHelper.ParseMessageQueueId(messageQueueId); + + Assert.Equal(expectedEndpointId, endpointId); + Assert.Equal(expectedPriority, priority); + } + + [Fact] + public void TestParseMessageQueueIdWithDefaultPriorityAndInvalidSuffixInEndpointId() + { + // Endpoint id with delimiter should be ignored + var expectedEndpointId = $"{Guid.NewGuid()}{MessageQueueIdHelper.MessageQueueIdDelimiter}4321a"; + var messageQueueId = expectedEndpointId; + uint expectedPriority = Core.RouteFactory.DefaultPriority; + + var (endpointId, priority) = MessageQueueIdHelper.ParseMessageQueueId(messageQueueId); + + Assert.Equal(expectedEndpointId, endpointId); + Assert.Equal(expectedPriority, priority); + } + } +} diff --git a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/ISequentialStore.cs b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/ISequentialStore.cs index eb13c8ef571..3e2920df87a 100644 --- a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/ISequentialStore.cs +++ b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/ISequentialStore.cs @@ -23,6 +23,8 @@ public interface ISequentialStore : IDisposable long GetHeadOffset(CancellationToken cancellationToken); + long GetTailOffset(CancellationToken cancellationToken); + Task Append(T item, CancellationToken cancellationToken); Task RemoveOffset(Func> predicate, long offset, CancellationToken cancellationToken); diff --git a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/SequentialStore.cs b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/SequentialStore.cs index aa628ef467c..7ea65aa2b56 100644 --- a/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/SequentialStore.cs +++ b/edge-util/src/Microsoft.Azure.Devices.Edge.Storage/SequentialStore.cs @@ -60,6 +60,11 @@ public long GetHeadOffset(CancellationToken cancellationToken) return this.headOffset; } + public long GetTailOffset(CancellationToken _) + { + return this.tailOffset; + } + public async Task Append(T item, CancellationToken cancellationToken) { using (await this.tailLockObject.LockAsync(cancellationToken))