diff --git a/csharp/rocketmq-client-csharp/Consumer.cs b/csharp/rocketmq-client-csharp/Consumer.cs index b9449c6c..2ccb8ef6 100644 --- a/csharp/rocketmq-client-csharp/Consumer.cs +++ b/csharp/rocketmq-client-csharp/Consumer.cs @@ -105,9 +105,10 @@ protected Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, M }; } - internal Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, MessageQueue mq, - FilterExpression filterExpression, TimeSpan awaitDuration) + protected internal Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, MessageQueue mq, + FilterExpression filterExpression, TimeSpan awaitDuration, string attemptId) { + attemptId ??= Guid.NewGuid().ToString(); var group = new Proto.Resource { ResourceNamespace = ClientConfig.Namespace, @@ -120,7 +121,8 @@ internal Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, Me FilterExpression = WrapFilterExpression(filterExpression), LongPollingTimeout = Duration.FromTimeSpan(awaitDuration), BatchSize = batchSize, - AutoRenew = true + AutoRenew = true, + AttemptId = attemptId }; } } diff --git a/csharp/rocketmq-client-csharp/ProcessQueue.cs b/csharp/rocketmq-client-csharp/ProcessQueue.cs index be9c0be6..7a5ab18e 100644 --- a/csharp/rocketmq-client-csharp/ProcessQueue.cs +++ b/csharp/rocketmq-client-csharp/ProcessQueue.cs @@ -20,6 +20,7 @@ using System.Threading; using System.Threading.Tasks; using Apache.Rocketmq.V2; +using Grpc.Core; using Microsoft.Extensions.Logging; using Org.Apache.Rocketmq.Error; @@ -166,20 +167,20 @@ public async Task FetchMessageImmediately() /// /// Make sure that no exception will be thrown. /// - public async Task OnReceiveMessageException(Exception t) + public async Task OnReceiveMessageException(Exception t, string attemptId) { var delay = t is TooManyRequestsException ? ReceivingFlowControlBackoffDelay : ReceivingFailureBackoffDelay; - await ReceiveMessageLater(delay); + await ReceiveMessageLater(delay, attemptId); } - private async Task ReceiveMessageLater(TimeSpan delay) + private async Task ReceiveMessageLater(TimeSpan delay, string attemptId) { var clientId = _consumer.GetClientId(); try { Logger.LogInformation($"Try to receive message later, mq={_mq}, delay={delay}, clientId={clientId}"); await Task.Delay(delay, _receiveMsgCts.Token); - await ReceiveMessage(); + await ReceiveMessage(attemptId); } catch (Exception ex) { @@ -188,11 +189,21 @@ private async Task ReceiveMessageLater(TimeSpan delay) return; } Logger.LogError(ex, $"[Bug] Failed to schedule message receiving request, mq={_mq}, clientId={clientId}"); - await OnReceiveMessageException(ex); + await OnReceiveMessageException(ex, attemptId); } } + private string GenerateAttemptId() + { + return Guid.NewGuid().ToString(); + } + public async Task ReceiveMessage() + { + await ReceiveMessage(GenerateAttemptId()); + } + + public async Task ReceiveMessage(string attemptId) { var clientId = _consumer.GetClientId(); if (_dropped) @@ -203,13 +214,18 @@ public async Task ReceiveMessage() if (IsCacheFull()) { Logger.LogWarning($"Process queue cache is full, would receive message later, mq={_mq}, clientId={clientId}"); - await ReceiveMessageLater(ReceivingBackoffDelayWhenCacheIsFull); + await ReceiveMessageLater(ReceivingBackoffDelayWhenCacheIsFull, attemptId); return; } - await ReceiveMessageImmediately(); + await ReceiveMessageImmediately(attemptId); } private async Task ReceiveMessageImmediately() + { + await ReceiveMessageImmediately(GenerateAttemptId()); + } + + private async Task ReceiveMessageImmediately(string attemptId) { var clientId = _consumer.GetClientId(); if (_consumer.State != State.Running) @@ -217,22 +233,31 @@ private async Task ReceiveMessageImmediately() Logger.LogInformation($"Stop to receive message because consumer is not running, mq={_mq}, clientId={clientId}"); return; } + + var endpoints = _mq.Broker.Endpoints; + var batchSize = GetReceptionBatchSize(); + var longPollingTimeout = _consumer.GetPushConsumerSettings().GetLongPollingTimeout(); + var request = _consumer.WrapReceiveMessageRequest(batchSize, _mq, _filterExpression, longPollingTimeout, attemptId); + + Interlocked.Exchange(ref _activityTime, DateTime.UtcNow.Ticks); + try { - var endpoints = _mq.Broker.Endpoints; - var batchSize = GetReceptionBatchSize(); - var longPollingTimeout = _consumer.GetPushConsumerSettings().GetLongPollingTimeout(); - var request = _consumer.WrapReceiveMessageRequest(batchSize, _mq, _filterExpression, longPollingTimeout); - - Interlocked.Exchange(ref _activityTime, DateTime.UtcNow.Ticks); - var result = await _consumer.ReceiveMessage(request, _mq, longPollingTimeout); await OnReceiveMessageResult(result); } catch (Exception ex) { - Logger.LogError(ex, $"Exception raised during message reception, mq={_mq}, clientId={clientId}"); - await OnReceiveMessageException(ex); + string nextAttemptId = null; + if (ex is RpcException rpcException && + StatusCode.DeadlineExceeded.Equals(rpcException.Status.StatusCode)) + { + nextAttemptId = request.AttemptId; + } + Logger.LogError(ex, $"Exception raised during message reception, mq={_mq}," + + $" attemptId={request.AttemptId}, nextAttemptId={nextAttemptId}," + + $" clientId={clientId}"); + await OnReceiveMessageException(ex, attemptId); } }