Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use events for lock lost notification #37643

Merged
merged 17 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ public CreateMessageBatchOptions() { }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override string ToString() { throw null; }
}
public partial class MessageLockLostEventArgs
{
public MessageLockLostEventArgs(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Exception exception) { }
public System.Exception Exception { get { throw null; } }
public Azure.Messaging.ServiceBus.ServiceBusReceivedMessage Message { get { throw null; } }
}
public sealed partial class ProcessErrorEventArgs : System.EventArgs
{
public ProcessErrorEventArgs(System.Exception exception, Azure.Messaging.ServiceBus.ServiceBusErrorSource errorSource, string fullyQualifiedNamespace, string entityPath, string identifier, System.Threading.CancellationToken cancellationToken) { }
Expand All @@ -33,14 +39,15 @@ public ProcessMessageEventArgs(Azure.Messaging.ServiceBus.ServiceBusReceivedMess
public string FullyQualifiedNamespace { get { throw null; } }
public string Identifier { get { throw null; } }
public Azure.Messaging.ServiceBus.ServiceBusReceivedMessage Message { get { throw null; } }
public System.Threading.CancellationToken MessageLockCancellationToken { get { throw null; } }
public event System.Func<Azure.Messaging.ServiceBus.MessageLockLostEventArgs, System.Threading.Tasks.Task> MessageLockLostAsync { add { } remove { } }
public virtual System.Threading.Tasks.Task AbandonMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task CompleteMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task DeadLetterMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.IDictionary<string, object> propertiesToModify, string deadLetterReason, string deadLetterErrorDescription = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task DeadLetterMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task DeadLetterMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, string deadLetterReason, string deadLetterErrorDescription = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task DeferMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Messaging.ServiceBus.ProcessorReceiveActions GetReceiveActions() { throw null; }
protected internal virtual System.Threading.Tasks.Task OnMessageLockLostAsync(Azure.Messaging.ServiceBus.MessageLockLostEventArgs args) { throw null; }
public virtual System.Threading.Tasks.Task RenewMessageLockAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
}
public partial class ProcessorReceiveActions
Expand Down Expand Up @@ -77,8 +84,8 @@ public ProcessSessionMessageEventArgs(Azure.Messaging.ServiceBus.ServiceBusRecei
public string Identifier { get { throw null; } }
public Azure.Messaging.ServiceBus.ServiceBusReceivedMessage Message { get { throw null; } }
public string SessionId { get { throw null; } }
public System.Threading.CancellationToken SessionLockCancellationToken { get { throw null; } }
public System.DateTimeOffset SessionLockedUntil { get { throw null; } }
public event System.Func<Azure.Messaging.ServiceBus.SessionLockLostEventArgs, System.Threading.Tasks.Task> SessionLockLostAsync { add { } remove { } }
public virtual System.Threading.Tasks.Task AbandonMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task CompleteMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task DeadLetterMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.Dictionary<string, object> propertiesToModify, string deadLetterReason, string deadLetterErrorDescription = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
Expand All @@ -87,6 +94,7 @@ public ProcessSessionMessageEventArgs(Azure.Messaging.ServiceBus.ServiceBusRecei
public virtual System.Threading.Tasks.Task DeferMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Messaging.ServiceBus.ProcessorReceiveActions GetReceiveActions() { throw null; }
public virtual System.Threading.Tasks.Task<System.BinaryData> GetSessionStateAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
protected internal virtual System.Threading.Tasks.Task OnSessionLockLostAsync(Azure.Messaging.ServiceBus.SessionLockLostEventArgs args) { throw null; }
public virtual void ReleaseSession() { }
public virtual System.Threading.Tasks.Task RenewSessionLockAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SetSessionStateAsync(System.BinaryData sessionState, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
Expand Down Expand Up @@ -554,6 +562,13 @@ public enum ServiceBusTransportType
AmqpTcp = 0,
AmqpWebSockets = 1,
}
public partial class SessionLockLostEventArgs : System.EventArgs
{
public SessionLockLostEventArgs(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.DateTimeOffset sessionLockedUntil, System.Exception exception) { }
public System.Exception Exception { get { throw null; } }
public Azure.Messaging.ServiceBus.ServiceBusReceivedMessage Message { get { throw null; } }
public System.DateTimeOffset SessionLockedUntil { get { throw null; } }
}
public enum SubQueue
{
None = 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ namespace Azure.Messaging.ServiceBus
{
internal static class CancellationTokenSourceExtensions
{
public static void CancelAfterLockExpired(this CancellationTokenSource? cancellationTokenSource,
public static void CancelAfterLockExpired(
this CancellationTokenSource? cancellationTokenSource,
ServiceBusReceivedMessage receivedMessage)
{
if (cancellationTokenSource is null || receivedMessage.LockedUntil == default)
Expand All @@ -28,7 +29,8 @@ public static void CancelAfterLockExpired(this CancellationTokenSource? cancella
}
}

public static void CancelAfterLockExpired(this CancellationTokenSource? cancellationTokenSource,
public static void CancelAfterLockExpired(
this CancellationTokenSource? cancellationTokenSource,
ServiceBusSessionReceiver? sessionReceiver)
{
if (cancellationTokenSource is null || sessionReceiver is null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;

namespace Azure.Messaging.ServiceBus
{
/// <summary>
/// This type represents the event args relating to the message lock lost event.
/// </summary>
public class MessageLockLostEventArgs
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
{
/// <summary>
/// Constructs a new <see cref="MessageLockLostEventArgs"/> instance.
/// </summary>
/// <param name="message">The message that the lock was lost for.</param>
/// <param name="exception">The exception, if any, which led to the event being raised.</param>
public MessageLockLostEventArgs(ServiceBusReceivedMessage message, Exception exception)
{
Message = message;
Exception = exception;
}

/// <summary>
/// The message that the lock was lost for.
/// </summary>
public ServiceBusReceivedMessage Message { get; }

/// <summary>
/// Gets the exception, if any, which led to the event being raised. If the exception is null,
/// then the event was raised due to the message lock expiring based on the
/// <see cref="ServiceBusReceivedMessage.LockedUntil"/> property.
/// </summary>
public Exception Exception { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,31 @@ public class ProcessMessageEventArgs : EventArgs
/// </summary>
public CancellationToken CancellationToken { get; }

/// <summary>
/// An event that is raised when the message lock is lost. This event is only raised for the scope of the Process Message handler.
/// Once the handler returns, the event will not be raised. There are two cases in which this event can be raised:
/// <list type="numbered">
/// <item>
/// <description>When the message lock has expired based on the <see cref="ServiceBusReceivedMessage.LockedUntil"/> property</description>
/// </item>
/// <item>
/// <description>When a non-transient exception occurs while attempting to renew the session lock.</description>
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
/// </item>
/// </list>
/// </summary>
public event Func<MessageLockLostEventArgs, Task> MessageLockLostAsync;
jsquire marked this conversation as resolved.
Show resolved Hide resolved
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved

internal CancellationTokenSource MessageLockLostCancellationSource { get; }

/// <summary>
/// The <see cref="System.Threading.CancellationToken"/> instance is cancelled when the lock renewal failed to
/// renew the lock or the <see cref="ServiceBusProcessorOptions.MaxAutoLockRenewalDuration"/> has elapsed.
/// </summary>
/// <remarks>The cancellation token is triggered by comparing <see cref="ServiceBusReceivedMessage.LockedUntil"/>
/// against <see cref="DateTimeOffset.UtcNow"/> and might be subjected to clock drift.</remarks>
public CancellationToken MessageLockCancellationToken { get; }
internal CancellationToken MessageLockCancellationToken { get; }

internal Exception LockLostException { get; set; }

/// <summary>
/// The path of the Service Bus entity that the message was received from.
Expand All @@ -54,7 +72,6 @@ public class ProcessMessageEventArgs : EventArgs

private readonly ServiceBusReceiver _receiver;
private readonly ProcessorReceiveActions _receiveActions;
private readonly CancellationTokenSource messageLockLostCancellationSource;

/// <summary>
/// Initializes a new instance of the <see cref="ProcessMessageEventArgs"/> class.
Expand Down Expand Up @@ -97,7 +114,6 @@ public ProcessMessageEventArgs(ServiceBusReceivedMessage message, ServiceBusRece
/// <param name="cancellationToken">The processor's <see cref="System.Threading.CancellationToken"/> instance which will be cancelled
/// in the event that <see cref="ServiceBusProcessor.StopProcessingAsync"/> is called.
/// </param>
[EditorBrowsable(EditorBrowsableState.Never)]
internal ProcessMessageEventArgs(
ServiceBusReceivedMessage message,
ReceiverManager manager,
Expand All @@ -109,12 +125,13 @@ internal ProcessMessageEventArgs(
_receiver = manager?.Receiver;
CancellationToken = cancellationToken;

messageLockLostCancellationSource = new CancellationTokenSource();
MessageLockCancellationToken = messageLockLostCancellationSource.Token;
messageLockLostCancellationSource.CancelAfterLockExpired(Message);
MessageLockLostCancellationSource = new CancellationTokenSource();
MessageLockCancellationToken = MessageLockLostCancellationSource.Token;

MessageLockLostCancellationSource.CancelAfterLockExpired(Message);

bool autoRenew = manager?.ShouldAutoRenewMessageLock() == true;
_receiveActions = new ProcessorReceiveActions(message, messageLockLostCancellationSource, manager, autoRenew);
_receiveActions = new ProcessorReceiveActions(this, manager, autoRenew);
}

/// <summary>
Expand All @@ -136,6 +153,13 @@ internal ProcessMessageEventArgs(
Identifier = identifier;
}

/// <summary>
/// Invokes the message lock lost event handler after a message lock is lost.
/// This method can be overridden to raise an event manually for testing purposes.
/// </summary>
/// <param name="args">The event args containing information related to the lock lost event.</param>
protected internal virtual Task OnMessageLockLostAsync(MessageLockLostEventArgs args) => MessageLockLostAsync?.Invoke(args) ?? Task.CompletedTask;

///<inheritdoc cref="ServiceBusReceiver.AbandonMessageAsync(ServiceBusReceivedMessage, IDictionary{string, object}, CancellationToken)"/>
public virtual async Task AbandonMessageAsync(
ServiceBusReceivedMessage message,
Expand Down Expand Up @@ -231,7 +255,7 @@ public virtual async Task RenewMessageLockAsync(
// Currently only the trigger message supports cancellation token for LockedUntil.
if (message == Message)
{
messageLockLostCancellationSource.CancelAfterLockExpired(Message);
MessageLockLostCancellationSource.CancelAfterLockExpired(Message);
}
}

Expand All @@ -250,8 +274,14 @@ internal async Task CancelMessageLockRenewalAsync()
}
finally
{
messageLockLostCancellationSource.Dispose();
MessageLockLostCancellationSource.Dispose();
}
}

internal CancellationTokenRegistration RegisterMessageLockLostHandler() =>
MessageLockCancellationToken.Register(
() => OnMessageLockLostAsync(new MessageLockLostEventArgs(
Message,
LockLostException)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ public ProcessSessionEventArgs(
Identifier = identifier;
}

[EditorBrowsable(EditorBrowsableState.Never)]
internal ProcessSessionEventArgs(
SessionReceiverManager manager,
CancellationToken cancellationToken)
Expand Down
Loading