Skip to content

Commit

Permalink
[Fix 8419918]: fixed message counts metrics (Azure#3980)
Browse files Browse the repository at this point in the history
* fixed message count metrics
  • Loading branch information
davilu authored Nov 19, 2020
1 parent d3defcc commit 43b735c
Show file tree
Hide file tree
Showing 8 changed files with 516 additions and 374 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public class MessageStore : IMessageStore
readonly CleanupProcessor messagesCleaner;
readonly ICheckpointStore checkpointStore;
readonly IStoreProvider storeProvider;
readonly long messageCount = 0;
TimeSpan timeToLive;

public MessageStore(IStoreProvider storeProvider, ICheckpointStore checkpointStore, TimeSpan timeToLive, bool checkEntireQueueOnCleanup = false)
Expand Down Expand Up @@ -114,7 +113,7 @@ await this.messageEntityStore.PutOrUpdate(
using (MetricsV0.SequentialStoreLatency(endpointId))
{
long offset = await sequentialStore.Append(new MessageRef(edgeMessageId, timeToLive));
Events.MessageAdded(offset, edgeMessageId, endpointId, this.messageCount);
Events.MessageAdded(offset, edgeMessageId, endpointId);
return new MessageWithOffset(message, offset);
}
}
Expand Down Expand Up @@ -261,28 +260,38 @@ private async Task CleanQueue(bool checkEntireQueueOnCleanup)
{
foreach (KeyValuePair<string, ISequentialStore<MessageRef>> endpointSequentialStore in this.messageStore.endpointSequentialStores)
{
var messageQueueId = endpointSequentialStore.Key;
try
{
if (this.cancellationTokenSource.IsCancellationRequested)
{
return;
}

Events.CleanupTaskStarted(endpointSequentialStore.Key);
CheckpointData checkpointData = await this.messageStore.checkpointStore.GetCheckpointDataAsync(endpointSequentialStore.Key, CancellationToken.None);
var (endpointId, priority) = MessageQueueIdHelper.ParseMessageQueueId(messageQueueId);
Events.CleanupTaskStarted(messageQueueId);
CheckpointData checkpointData = await this.messageStore.checkpointStore.GetCheckpointDataAsync(messageQueueId, CancellationToken.None);
ISequentialStore<MessageRef> sequentialStore = endpointSequentialStore.Value;
Events.CleanupCheckpointState(endpointSequentialStore.Key, checkpointData);
Events.CleanupCheckpointState(messageQueueId, checkpointData);
int cleanupEntityStoreCount = 0;

// If checkEntireQueueOnCleanup is set to false, we only peek the head, message counts is tailOffset-headOffset+1
// otherwise count while iterating over the queue.
var headOffset = 0L;
var tailOffset = sequentialStore.GetTailOffset(CancellationToken.None);
var messageCount = 0L;

async Task<bool> DeleteMessageCallback(long offset, MessageRef messageRef)
{
var enqueuedTime = DateTime.UtcNow - messageRef.TimeStamp;
if (checkpointData.Offset < offset &&
enqueuedTime < messageRef.TimeToLive)
var expiry = messageRef.TimeStamp + messageRef.TimeToLive;
if (offset > checkpointData.Offset && expiry > DateTime.UtcNow)
{
// message is not sent and not expired, increase message counts
messageCount++;
return false;
}

headOffset = Math.Max(headOffset, offset);
bool deleteMessage = false;

// Decrement ref count.
Expand All @@ -305,9 +314,9 @@ async Task<bool> DeleteMessageCallback(long offset, MessageRef messageRef)

if (deleteMessage)
{
if (checkpointData.Offset < offset && enqueuedTime >= messageRef.TimeToLive)
if (offset > checkpointData.Offset && expiry <= DateTime.UtcNow)
{
this.expiredCounter.Increment(1, new[] { "ttl_expiry", message?.Message.GetSenderId(), message?.Message.GetOutput(), bool.TrueString } );
this.expiredCounter.Increment(1, new[] { "ttl_expiry", message?.Message.GetSenderId(), message?.Message.GetOutput(), bool.TrueString });
}

await this.messageStore.messageEntityStore.Remove(messageRef.EdgeMessageId);
Expand Down Expand Up @@ -340,7 +349,7 @@ async Task<bool> DeleteMessageCallback(long offset, MessageRef messageRef)
}
}

offset = offset + CleanupBatchSize;
offset += CleanupBatchSize;
}
while (batch.Any());
}
Expand All @@ -350,16 +359,20 @@ async Task<bool> DeleteMessageCallback(long offset, MessageRef messageRef)
{
cleanupCount++;
}

messageCount = tailOffset - headOffset + 1;
}

// update Metrics for message counts
Checkpointer.Metrics.QueueLength.Set(messageCount, new[] { endpointId, priority.ToString() });
totalCleanupCount += cleanupCount;
totalCleanupStoreCount += cleanupEntityStoreCount;
Events.CleanupCompleted(endpointSequentialStore.Key, cleanupCount, cleanupEntityStoreCount, totalCleanupCount, totalCleanupStoreCount);
Events.CleanupCompleted(messageQueueId, cleanupCount, cleanupEntityStoreCount, totalCleanupCount, totalCleanupStoreCount);
await Task.Delay(MinCleanupSleepTime, this.cancellationTokenSource.Token);
}
catch (Exception ex)
{
Events.ErrorCleaningMessagesForEndpoint(ex, endpointSequentialStore.Key);
Events.ErrorCleaningMessagesForEndpoint(ex, messageQueueId);
}
}

Expand Down Expand Up @@ -471,12 +484,12 @@ internal static void CleanupCheckpointState(string endpointId, CheckpointData ch
Log.LogDebug((int)EventIds.CleanupCheckpointState, Invariant($"Checkpoint for endpoint {endpointId} is {checkpointData.Offset}"));
}

internal static void MessageAdded(long offset, string edgeMessageId, string endpointId, long messageCount)
internal static void MessageAdded(long offset, string edgeMessageId, string endpointId)
{
// Print only after every 1000th message to avoid flooding logs.
if (offset % 1000 == 0)
{
Log.LogDebug((int)EventIds.MessageAdded, Invariant($"Added message {edgeMessageId} to store for {endpointId} at offset {offset} - messageCount = {messageCount}"));
Log.LogDebug((int)EventIds.MessageAdded, Invariant($"Added message {edgeMessageId} to store for {endpointId} at offset {offset}."));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) Microsoft. All rights reserved.

namespace Microsoft.Azure.Devices.Routing.Core
{
using System;

public static class MessageQueueIdHelper
{
public const string MessageQueueIdDelimiter = "_Pri";

// The actual ID for the underlying store is of string format: <endpointId>_Pri<priority>
// We need to maintain backwards compatibility for existing sequential stores that don't have the "_Pri<x>" suffix.
// We use the default priority (2,000,000,000) for this, which means the store ID is just the endpoint ID.
public static string GetMessageQueueId(string endpointId, uint priority) => priority == RouteFactory.DefaultPriority ? endpointId : $"{endpointId}{MessageQueueIdDelimiter}{priority}";

public static (string, uint) ParseMessageQueueId(string messageQueueId)
{
var idx = messageQueueId.LastIndexOf(MessageQueueIdDelimiter);
if (idx < 0)
{
return (messageQueueId, RouteFactory.DefaultPriority);
}

var endpointId = messageQueueId.Substring(0, idx);
var priority = messageQueueId.Substring(idx + MessageQueueIdDelimiter.Length);
if (uint.TryParse(priority, out var priorityNum))
{
return (endpointId, priorityNum);
}
else
{
return (messageQueueId, RouteFactory.DefaultPriority);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,9 @@ static string GetContextString(Checkpointer checkpointer)
}
}

static class Metrics
public static class Metrics
{
static readonly IMetricsGauge QueueLength = EdgeMetrics.Instance.CreateGauge(
public static readonly IMetricsGauge QueueLength = EdgeMetrics.Instance.CreateGauge(
"queue_length",
"Number of messages pending to be processed for the endpoint",
new List<string> { "endpoint", "priority", MetricsConstants.MsTelemetry });
Expand Down
Loading

0 comments on commit 43b735c

Please sign in to comment.