Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

All Locked ServiceBus Messages Return to Queue in Reboot Scenario #1697

Closed
Fraegle opened this issue May 14, 2018 · 3 comments
Closed

All Locked ServiceBus Messages Return to Queue in Reboot Scenario #1697

Fraegle opened this issue May 14, 2018 · 3 comments
Assignees

Comments

@Fraegle
Copy link

Fraegle commented May 14, 2018

The WebJobs SDK, apparently, uses QueueClient.ReceiveBatchAsync to receive ServiceBus messages. The message is, apparently, only removed from the Queue, if the message is completed AND the lock is abandoned. The QueueClient, apparently, only abandons the lock, if the next batch of messages is received.

If the process using the WebJobs SDK is ended (e.g. in case of a new deployment to the Web App or a restart), all locked messages return to the queue in Active state, after the lock period expires. It seems to be irrelevant, if BrokeredMessage.ComleteAsync() was called or not.

When completing the message, please call also BrokeredMessage.AbandonAsync() to abandon the lock and remove the message from the queue.

Repro steps

Start the following program, wait until it is in a steady state of consuming and producing messages, stop it and wait the lock timeout. You will see messages with a delivery count of 2. Some of them will have the same message IDs as other messages with a delivery count of 1. These have been completed and should have been removed from the queue

public class Program
{
    public static void Main()
    {
        var config = new JobHostConfiguration();
        config.UseServiceBus();

        if (config.IsDevelopment)
        {
            config.UseDevelopmentSettings();
        }

        FillQueueAsync().GetAwaiter().GetResult();

        var host = new JobHost(config);
        host.RunAndBlock();
    }

    private static async Task FillQueueAsync()
    {
        var numberOfMessages = 24;
        var connectionString = ConfigurationManager.ConnectionStrings["AzureWebJobsServiceBus"].ConnectionString;
        var queueName = ConfigurationManager.AppSettings["QueueName"];
        var messages = new List<BrokeredMessage>(numberOfMessages);

        for (int i = 0; i < numberOfMessages; i++)
        {
            messages.Add(new BrokeredMessage { MessageId = Guid.NewGuid().ToString(), Label = "Test" });
        }

        var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
        if (!namespaceManager.QueueExists(queueName))
        {
            try
            {
                namespaceManager.CreateQueue(queueName);
            }
            catch (MessagingEntityDisabledException)
            {
                // lost a race
            }
        }

        var queueClient = QueueClient.CreateFromConnectionString(connectionString, queueName, ReceiveMode.ReceiveAndDelete);
        IEnumerable<BrokeredMessage> existingMessages;
        do
        {
            existingMessages = await queueClient.ReceiveBatchAsync(numberOfMessages, TimeSpan.Zero).ConfigureAwait(false);
        } while (existingMessages.Any());

        await queueClient.SendBatchAsync(messages).ConfigureAwait(false);
    }
}

public class Functions
{
    private static readonly TimeSpan PrerequisitesLookupDelay = TimeSpan.FromSeconds(10);
    private static readonly TimeSpan ExecutionDelay = TimeSpan.FromSeconds(15);


    public static async Task ProcessQueueMessageAsync([ServiceBusTrigger("%QueueName%")] BrokeredMessage message, [ServiceBus("%QueueName%")]ICollector<BrokeredMessage> queue, TextWriter logger)
    {
        // In the actual codes a couple of DB queries or other I/O operations are made to figure out, whether the operation the message is supposed to trigger
        // can actually be executed.
        var arePrerequisitesFulfilled = await Task.Delay(PrerequisitesLookupDelay).ContinueWith((result) => false).ConfigureAwait(false);

        // In the actual code, this is returned by library code that should not be coupeled with ICollector.
        IEnumerable<BrokeredMessage> outputMessages;

        if (arePrerequisitesFulfilled)
        {
            await logger.WriteLineAsync($"We are able to do the work for the message with the ID {message.MessageId}");
            await logger.WriteLineAsync("In this repro this code path should never be accessed.");
            outputMessages = Enumerable.Empty<BrokeredMessage>();
        }
        else
        {
            await logger.WriteLineAsync($"The prerequisites aren't fulfilled, schedule a message for trying again in the future.");


            outputMessages = new[]
            {
            new BrokeredMessage()
            {
                MessageId = message.MessageId,
                Label = message.Label,
                ScheduledEnqueueTimeUtc = DateTime.UtcNow + ExecutionDelay
            }
        };
        }

        foreach (var outputMessage in outputMessages)
        {
            queue.Add(outputMessage);
        }
    }
}

Expected behavior

After the web job exited and the lock timeout expired all messages in the ServiceBus queue have a unique BrokeredMessage.MessageId

Actual behavior

Messages with the same message ID appear twice for those where the handler executed, but the lock hasn't been abandoned.

Known workarounds

Before returning from the handler call BrokeredMessage.CompleteAsync() AND BrokeredMessage.AbandonAsync()

(This, of course, works only if you handle BrokeredMessage. If you use a de-serializer and only get the body of the message as parameter, this workaround won't work.)

Related information

Versions tried v.2.1.0 v.2.2.0

@mathewc
Copy link
Member

mathewc commented May 14, 2018

We receive by registering a handler via MessageReceiver.OnMessageAsync. Our completion logic for function invocations is here. If an invocation succeeded we call Message.CompleteAsync, and if the invocation failed we rethrow the exception and let the ServiceBus MessageReceiver handle the message state correctly.

Your repro code is outputting new messages into the queue using the same message ID as messages being processed. Isn't that why you're seeing duplicate IDs? E.g. in a particular invocation for ID 1, you output a new message with ID 1 (queue.Add). If the process is killed at this point before the initial message is completed, it will still be in the queue when its lock expires (because the process was killed and in progress messages were abandoned).

@Fraegle
Copy link
Author

Fraegle commented May 15, 2018

As you can see in my repro, I am not using batching APIs myself. I am relying on the WebJobs SDK receiving messages and handling the state of them. I assumed the batching APIs are used by the WebJobs SDK, because in my repro about 16 messages get locked at the same time when running one instance of the web job.

My repro is indeed putting messages with the same message ID into the queue, but then the handler exits successfully and the current message should be removed from the queue. When running the repro you will see that the current message is always completed before the message scheduled for the future is delivered. Especially you won't see any message with a delivery count higher than 1 in the queue while running. While running the repro everything works as expected and documented.

The issue happens after the repro is stopped and the scheduling timeout as well as the lock timeout (which is longer, usually 1min) expired. Now you see messages with a delivery count of 1 (the ones added to the queue by the handler) as well as with a delivery count of 2 (the ones which should have been removed from the queue because the handler completed successfully) with the same message ID.

I am not 100% sure what the underlying issue is, but the behavior is definitely wrong, because these messages should not re-appear in the queue after the handler completed successfully.

When the suggested workaround is applied (calling both BrokeredMessage.CompleteAsync() and BrokeredMessage.AbandonAsync() on the message in the handler) and the repro is run again, you will see (after it is stopped and the lock timeout expired) that none of the message IDs is duplicated.

@paulbatum paulbatum added this to the Active Questions milestone May 16, 2018
@mathewc
Copy link
Member

mathewc commented May 16, 2018

I believe the issue is that you're losing locks due to your invocations exceeding the default ServiceBusConfiguration.AutoRenewTimeout value of 30 seconds. Set this to a value (e.g. 5 minutes) that is greater than the longest execution duration you expect. Also capture some duration metrics on your functions to see how long they are running, so you can set this value correctly.

Under load it's likely the case that you have invocations taking longer than the renewal timeout, causing you to lose locks.

I'm going to close this because in all the instances we see of users complaining about duplicate SB messages, it turns out to be what I stated above - a configuration issue, and not a framework issue. I don't see any bug in our message processing.

If you have a repro that demonstrates that we do have a bug, please reopen.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants