Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DT.AzureStorage: Background renewal for pending orchestrator messages #792

Closed
wants to merge 2 commits into from

Conversation

cgillum
Copy link
Member

@cgillum cgillum commented Sep 1, 2022

This PR resolves an issue where a backlog of pending orchestration messages can result in a negative feedback loop where backlogged orchestrations get stuck in duplicate execution loops and can't make forward progress. The problem seems to occur when prefetched messages have been held longer than their visibility timeout (default 5 minutes). This causes us to receive the message again (with a duplicate message warning) and causes our existing in-memory copy of the message to be invalidated, resulting in MessageGone warnings when we finally process and try to delete the message.

The fix is to create a background loop that renews prefetched orchestration messages that are stuck in the pending buffer for long periods of time. This ensures that we don't lose our lock on the message, also ensuring that we can eventually process it successfully.

I tested it using a test that looks like the following (not included in the PR since it's long-running):

[TestMethod]
public async Task ManyStuckOrchestrations()
{
    using TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(
        enableExtendedSessions: false,
        modifySettingsAction: settings =>
        {
            settings.MaxConcurrentTaskOrchestrationWorkItems = 10;
            settings.ControlQueueVisibilityTimeout = TimeSpan.FromMinutes(2);
        });
    await host.StartAsync();

    // Needs to be larger than the max orchestration concurrency
    const int InstanceCount = 100;

    List<TestInstance<int>> instances = await host.StartInlineOrchestrations<string, int>(
        InstanceCount,
        i => $"SleepyOrchestration{i}",
        i => i,
        orchestrationName: "SayHelloOrchestration",
        version: string.Empty,
        implementation: (OrchestrationContext ctx, int input) =>
        {
            Thread.Sleep(TimeSpan.FromMinutes(1));
            return Task.FromResult($"Hello, {input}!");
        });

    // All returned objects point to the same orchestration instance
    OrchestrationState[] finalStates = await Task.WhenAll(instances.Select(
        i => i.WaitForCompletion(timeout: TimeSpan.FromMinutes(30), expectedOutputRegex: @"Hello, \w+!")));
}

There is an active ICM tracking a problem related to this, and I plan on sharing this fix with the customer.

@cgillum cgillum requested review from jviau and amdeel September 1, 2022 23:02
Copy link
Collaborator

@jviau jviau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only concrete change I have is the disposal pattern. Others are some higher-level discussions.

@@ -635,6 +640,54 @@ public virtual void Dispose()
{
this.fetchRuntimeStateQueue.Dispose();
this.readyForProcessingQueue.Dispose();
this.backgroundRenewalCancellationSource.Cancel();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cancel() throws if the token is already disposed, so this class will throw if Dispose is called twice (which violates IDisposable contract - disposal should be safe to call multiple times). Could make it non-readonly and do a Interlocked.Exchange with null and then cancel/dispose only if you got a non-null CTS.

.SelectMany(batch => batch.Messages)
.Where(msg => DateTime.UtcNow.AddMinutes(1) > msg.OriginalQueueMessage.NextVisibleTime)
.ToList()
.ParallelForEachAsync(this.settings.MaxStorageOperationConcurrency, async msg =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious what the behavior of ParallelForEachAsync is here. Makes me a bit nervous to be kicking it off within a lock, then awaiting outside of it. Is this fine? Will we be causing the lock to be held longer than necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can find the source for ParallelForEachAsync here. The short explanation is that we'll kick off all the tasks with the lock held but won't wait for any of them until we've released the lock. The reason to kick things off with the lock held is simply to avoid enumerating the list of messages in an unsafe way.

Copy link
Collaborator

@jviau jviau Sep 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we will be holding the lock for a bit while the tasks are running synchronously. Which will be the first N tasks to max concurrency up to the first yield of each. This may be fine, but I wonder if the obscurity of it could lead to bugs or obscure behavior.

As for enumerating outside the lock - the ToList() call solves that. You will have an entirely new list at that point, so no one outside of this will be enumerating it. What do you think of moving just the parallel to outside the lock?

List<> messagesToRenew = null;
lock (this.messageAndSessionLock)
{
    messagesToRenew = this.pendingOrchestrationMessageBatches
        .SelectMany(batch => batch.Messages)
        .Where(msg => DateTime.UtcNow.AddMinutes(1) > msg.OriginalQueueMessage.NextVisibleTime)
        .ToList();
}

await messagesToRenew.ParallelForEachAsync(...);

But also, fine to leave as is if you prefer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine either way. I expect the behavior will be observably the same but will be a bit more understandable using your suggested approach, so I'll go with your suggestion.

this.backgroundRenewalCancellationSource.Dispose();
}

async Task BackgroundMessageRenewal(CancellationToken cancellationToken)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed previously, I am concerned about this change introducing a scenario where the worker is in a bad state and not processing messages, but continually renews the locks on these messages in the background. What issues will that cause? Is there a way we can renew messages only when we know the worker is actively consuming them?

Does order matter when processing messages from the queue? If not, maybe during the LockNextTaskOrchestrationWorkItemAsync flow we can just skip over/drain messages with expired locks and yield the first non-expired one. If order does matter, can we renew the lock at that point?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your unhealthy worker concern is valid. The new problem we'd create is that we can't automatically fail-over these messages onto a new worker when their partition/queue switches to a new worker. We're effectively trading one problem for another (though I do think the end-result is a net-positive). I need to think about this more.

As far as ordering is concerned, it is important, but we have checks in other places that look for out-of-order messages and takes care of the reordering, so I don't think we need to be too concerned with that here.

@cgillum
Copy link
Member Author

cgillum commented Sep 7, 2022

Closing this in favor of #794. I think we'll need to think more carefully about how to deal with the negative feedback loop problem, but that can happen as part of a separate PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants