Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix 8419918]: fixed message counts metrics #3980

Merged
merged 14 commits into from
Nov 19, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -261,28 +261,38 @@ private async Task CleanQueue(bool checkEntireQueueOnCleanup)
{
foreach (KeyValuePair<string, ISequentialStore<MessageRef>> endpointSequentialStore in this.messageStore.endpointSequentialStores)
{
var messageQueueId = endpointSequentialStore.Key;
try
{
if (this.cancellationTokenSource.IsCancellationRequested)
{
return;
}

Events.CleanupTaskStarted(endpointSequentialStore.Key);
CheckpointData checkpointData = await this.messageStore.checkpointStore.GetCheckpointDataAsync(endpointSequentialStore.Key, CancellationToken.None);
var (endpointId, priority) = MessageQueueIdHelper.ParseMessageQueueId(messageQueueId);
Events.CleanupTaskStarted(messageQueueId);
CheckpointData checkpointData = await this.messageStore.checkpointStore.GetCheckpointDataAsync(messageQueueId, CancellationToken.None);
ISequentialStore<MessageRef> sequentialStore = endpointSequentialStore.Value;
Events.CleanupCheckpointState(endpointSequentialStore.Key, checkpointData);
Events.CleanupCheckpointState(messageQueueId, checkpointData);
int cleanupEntityStoreCount = 0;

// If cleanup just peek head, message counts is tailOffset-headOffset+1
davilu marked this conversation as resolved.
Show resolved Hide resolved
// otherwise count while iterating over the queue.
var headOffset = 0L;
var tailOffset = sequentialStore.GetTailOffset(CancellationToken.None);
var messageCounts = 0L;
davilu marked this conversation as resolved.
Show resolved Hide resolved

async Task<bool> DeleteMessageCallback(long offset, MessageRef messageRef)
{
var enqueuedTime = DateTime.UtcNow - messageRef.TimeStamp;
if (checkpointData.Offset < offset &&
enqueuedTime < messageRef.TimeToLive)
var expiry = messageRef.TimeStamp + messageRef.TimeToLive;
if (offset > checkpointData.Offset && expiry > DateTime.UtcNow)
davilu marked this conversation as resolved.
Show resolved Hide resolved
{
// message is not sent and not expired, increase message counts
messageCounts++;
return false;
}

headOffset = Math.Max(headOffset, offset);
bool deleteMessage = false;

// Decrement ref count.
Expand All @@ -305,10 +315,7 @@ async Task<bool> DeleteMessageCallback(long offset, MessageRef messageRef)

if (deleteMessage)
{
if (checkpointData.Offset < offset && enqueuedTime >= messageRef.TimeToLive)
davilu marked this conversation as resolved.
Show resolved Hide resolved
{
this.expiredCounter.Increment(1, new[] { "ttl_expiry", message?.Message.GetSenderId(), message?.Message.GetOutput(), bool.TrueString } );
}
this.expiredCounter.Increment(1, new[] { "ttl_expiry", message?.Message.GetSenderId(), message?.Message.GetOutput(), bool.TrueString } );

await this.messageStore.messageEntityStore.Remove(messageRef.EdgeMessageId);
cleanupEntityStoreCount++;
Expand Down Expand Up @@ -340,7 +347,7 @@ async Task<bool> DeleteMessageCallback(long offset, MessageRef messageRef)
}
}

offset = offset + CleanupBatchSize;
offset += CleanupBatchSize;
}
while (batch.Any());
}
Expand All @@ -350,16 +357,20 @@ async Task<bool> DeleteMessageCallback(long offset, MessageRef messageRef)
{
cleanupCount++;
}

messageCounts = tailOffset - headOffset + 1;
}

// update Metrics for message counts
Checkpointer.Metrics.QueueLength.Set(messageCounts, new[] { endpointId, priority.ToString() });
totalCleanupCount += cleanupCount;
totalCleanupStoreCount += cleanupEntityStoreCount;
Events.CleanupCompleted(endpointSequentialStore.Key, cleanupCount, cleanupEntityStoreCount, totalCleanupCount, totalCleanupStoreCount);
Events.CleanupCompleted(messageQueueId, cleanupCount, cleanupEntityStoreCount, totalCleanupCount, totalCleanupStoreCount);
await Task.Delay(MinCleanupSleepTime, this.cancellationTokenSource.Token);
}
catch (Exception ex)
{
Events.ErrorCleaningMessagesForEndpoint(ex, endpointSequentialStore.Key);
Events.ErrorCleaningMessagesForEndpoint(ex, messageQueueId);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) Microsoft. All rights reserved.

namespace Microsoft.Azure.Devices.Routing.Core
{
using System;

public static class MessageQueueIdHelper
{
public const string MessageQueueIdDelimiter = "_Pri";

// The actual ID for the underlying store is of string format: <endpointId>_Pri<priority>
// We need to maintain backwards compatibility for existing sequential stores that don't have the "_Pri<x>" suffix.
// We use the default priority (2,000,000,000) for this, which means the store ID is just the endpoint ID.
public static string GetMessageQueueId(string endpointId, uint priority) => priority == RouteFactory.DefaultPriority ? endpointId : $"{endpointId}{MessageQueueIdDelimiter}{priority}";

public static (string, uint) ParseMessageQueueId(string messageQueueId)
{
var idx = messageQueueId.LastIndexOf(MessageQueueIdDelimiter);
if (idx < 0)
{
return (messageQueueId, RouteFactory.DefaultPriority);
}

var endpointId = messageQueueId.Substring(0, idx);
var priority = messageQueueId.Substring(idx + MessageQueueIdDelimiter.Length);
try
{
return (endpointId, Convert.ToUInt32(priority));
davilu marked this conversation as resolved.
Show resolved Hide resolved
}
catch (Exception e) when (e is FormatException || e is OverflowException)
{
return (messageQueueId, RouteFactory.DefaultPriority);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,9 @@ static string GetContextString(Checkpointer checkpointer)
}
}

static class Metrics
public static class Metrics
{
static readonly IMetricsGauge QueueLength = EdgeMetrics.Instance.CreateGauge(
public static readonly IMetricsGauge QueueLength = EdgeMetrics.Instance.CreateGauge(
"queue_length",
"Number of messages pending to be processed for the endpoint",
new List<string> { "endpoint", "priority", MetricsConstants.MsTelemetry });
Expand Down
Loading