Skip to content

Commit

Permalink
Add reentrant push consumer message receiving support for c# sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
tsaitsung-han.tht committed Jul 9, 2024
1 parent 3925413 commit e655b6b
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 19 deletions.
8 changes: 5 additions & 3 deletions csharp/rocketmq-client-csharp/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,10 @@ protected Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, M
};
}

internal Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, MessageQueue mq,
FilterExpression filterExpression, TimeSpan awaitDuration)
protected internal Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, MessageQueue mq,
FilterExpression filterExpression, TimeSpan awaitDuration, string attemptId)
{
attemptId ??= Guid.NewGuid().ToString();
var group = new Proto.Resource
{
ResourceNamespace = ClientConfig.Namespace,
Expand All @@ -120,7 +121,8 @@ internal Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, Me
FilterExpression = WrapFilterExpression(filterExpression),
LongPollingTimeout = Duration.FromTimeSpan(awaitDuration),
BatchSize = batchSize,
AutoRenew = true
AutoRenew = true,
AttemptId = attemptId

Check failure on line 125 in csharp/rocketmq-client-csharp/Consumer.cs

View workflow job for this annotation

GitHub Actions / csharp-build / ubuntu-20.04

'ReceiveMessageRequest' does not contain a definition for 'AttemptId'

Check failure on line 125 in csharp/rocketmq-client-csharp/Consumer.cs

View workflow job for this annotation

GitHub Actions / csharp-build / ubuntu-20.04

'ReceiveMessageRequest' does not contain a definition for 'AttemptId'

Check failure on line 125 in csharp/rocketmq-client-csharp/Consumer.cs

View workflow job for this annotation

GitHub Actions / csharp-build / ubuntu-20.04

'ReceiveMessageRequest' does not contain a definition for 'AttemptId'

Check failure on line 125 in csharp/rocketmq-client-csharp/Consumer.cs

View workflow job for this annotation

GitHub Actions / csharp-build / windows-2022

'ReceiveMessageRequest' does not contain a definition for 'AttemptId'

Check failure on line 125 in csharp/rocketmq-client-csharp/Consumer.cs

View workflow job for this annotation

GitHub Actions / csharp-build / windows-2022

'ReceiveMessageRequest' does not contain a definition for 'AttemptId'

Check failure on line 125 in csharp/rocketmq-client-csharp/Consumer.cs

View workflow job for this annotation

GitHub Actions / csharp-build / windows-2022

'ReceiveMessageRequest' does not contain a definition for 'AttemptId'
};
}
}
Expand Down
57 changes: 41 additions & 16 deletions csharp/rocketmq-client-csharp/ProcessQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
using System.Threading;
using System.Threading.Tasks;
using Apache.Rocketmq.V2;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Org.Apache.Rocketmq.Error;

Expand Down Expand Up @@ -166,20 +167,20 @@ public async Task FetchMessageImmediately()
/// <remarks>
/// Make sure that no exception will be thrown.
/// </remarks>
public async Task OnReceiveMessageException(Exception t)
public async Task OnReceiveMessageException(Exception t, string attemptId)
{
var delay = t is TooManyRequestsException ? ReceivingFlowControlBackoffDelay : ReceivingFailureBackoffDelay;
await ReceiveMessageLater(delay);
await ReceiveMessageLater(delay, attemptId);
}

private async Task ReceiveMessageLater(TimeSpan delay)
private async Task ReceiveMessageLater(TimeSpan delay, string attemptId)
{
var clientId = _consumer.GetClientId();
try
{
Logger.LogInformation($"Try to receive message later, mq={_mq}, delay={delay}, clientId={clientId}");
await Task.Delay(delay, _receiveMsgCts.Token);
await ReceiveMessage();
await ReceiveMessage(attemptId);
}
catch (Exception ex)
{
Expand All @@ -188,11 +189,21 @@ private async Task ReceiveMessageLater(TimeSpan delay)
return;
}
Logger.LogError(ex, $"[Bug] Failed to schedule message receiving request, mq={_mq}, clientId={clientId}");
await OnReceiveMessageException(ex);
await OnReceiveMessageException(ex, attemptId);
}
}

private string GenerateAttemptId()
{
return Guid.NewGuid().ToString();
}

public async Task ReceiveMessage()
{
await ReceiveMessage(GenerateAttemptId());
}

public async Task ReceiveMessage(string attemptId)
{
var clientId = _consumer.GetClientId();
if (_dropped)
Expand All @@ -203,36 +214,50 @@ public async Task ReceiveMessage()
if (IsCacheFull())
{
Logger.LogWarning($"Process queue cache is full, would receive message later, mq={_mq}, clientId={clientId}");
await ReceiveMessageLater(ReceivingBackoffDelayWhenCacheIsFull);
await ReceiveMessageLater(ReceivingBackoffDelayWhenCacheIsFull, attemptId);
return;
}
await ReceiveMessageImmediately();
await ReceiveMessageImmediately(attemptId);
}

private async Task ReceiveMessageImmediately()
{
await ReceiveMessageImmediately(GenerateAttemptId());
}

private async Task ReceiveMessageImmediately(string attemptId)
{
var clientId = _consumer.GetClientId();
if (_consumer.State != State.Running)
{
Logger.LogInformation($"Stop to receive message because consumer is not running, mq={_mq}, clientId={clientId}");
return;
}

var endpoints = _mq.Broker.Endpoints;
var batchSize = GetReceptionBatchSize();
var longPollingTimeout = _consumer.GetPushConsumerSettings().GetLongPollingTimeout();
var request = _consumer.WrapReceiveMessageRequest(batchSize, _mq, _filterExpression, longPollingTimeout, attemptId);

Interlocked.Exchange(ref _activityTime, DateTime.UtcNow.Ticks);

try
{
var endpoints = _mq.Broker.Endpoints;
var batchSize = GetReceptionBatchSize();
var longPollingTimeout = _consumer.GetPushConsumerSettings().GetLongPollingTimeout();
var request = _consumer.WrapReceiveMessageRequest(batchSize, _mq, _filterExpression, longPollingTimeout);

Interlocked.Exchange(ref _activityTime, DateTime.UtcNow.Ticks);

var result = await _consumer.ReceiveMessage(request, _mq, longPollingTimeout);
await OnReceiveMessageResult(result);
}
catch (Exception ex)
{
Logger.LogError(ex, $"Exception raised during message reception, mq={_mq}, clientId={clientId}");
await OnReceiveMessageException(ex);
string nextAttemptId = null;
if (ex is RpcException rpcException &&
StatusCode.DeadlineExceeded.Equals(rpcException.Status.StatusCode))
{
nextAttemptId = request.AttemptId;

Check failure on line 255 in csharp/rocketmq-client-csharp/ProcessQueue.cs

View workflow job for this annotation

GitHub Actions / csharp-build / ubuntu-20.04

'ReceiveMessageRequest' does not contain a definition for 'AttemptId' and no accessible extension method 'AttemptId' accepting a first argument of type 'ReceiveMessageRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 255 in csharp/rocketmq-client-csharp/ProcessQueue.cs

View workflow job for this annotation

GitHub Actions / csharp-build / ubuntu-20.04

'ReceiveMessageRequest' does not contain a definition for 'AttemptId' and no accessible extension method 'AttemptId' accepting a first argument of type 'ReceiveMessageRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 255 in csharp/rocketmq-client-csharp/ProcessQueue.cs

View workflow job for this annotation

GitHub Actions / csharp-build / ubuntu-20.04

'ReceiveMessageRequest' does not contain a definition for 'AttemptId' and no accessible extension method 'AttemptId' accepting a first argument of type 'ReceiveMessageRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 255 in csharp/rocketmq-client-csharp/ProcessQueue.cs

View workflow job for this annotation

GitHub Actions / csharp-build / ubuntu-20.04

'ReceiveMessageRequest' does not contain a definition for 'AttemptId' and no accessible extension method 'AttemptId' accepting a first argument of type 'ReceiveMessageRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 255 in csharp/rocketmq-client-csharp/ProcessQueue.cs

View workflow job for this annotation

GitHub Actions / csharp-build / windows-2022

'ReceiveMessageRequest' does not contain a definition for 'AttemptId' and no accessible extension method 'AttemptId' accepting a first argument of type 'ReceiveMessageRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 255 in csharp/rocketmq-client-csharp/ProcessQueue.cs

View workflow job for this annotation

GitHub Actions / csharp-build / windows-2022

'ReceiveMessageRequest' does not contain a definition for 'AttemptId' and no accessible extension method 'AttemptId' accepting a first argument of type 'ReceiveMessageRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 255 in csharp/rocketmq-client-csharp/ProcessQueue.cs

View workflow job for this annotation

GitHub Actions / csharp-build / windows-2022

'ReceiveMessageRequest' does not contain a definition for 'AttemptId' and no accessible extension method 'AttemptId' accepting a first argument of type 'ReceiveMessageRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 255 in csharp/rocketmq-client-csharp/ProcessQueue.cs

View workflow job for this annotation

GitHub Actions / csharp-build / windows-2022

'ReceiveMessageRequest' does not contain a definition for 'AttemptId' and no accessible extension method 'AttemptId' accepting a first argument of type 'ReceiveMessageRequest' could be found (are you missing a using directive or an assembly reference?)
}
Logger.LogError(ex, $"Exception raised during message reception, mq={_mq}," +
$" attemptId={request.AttemptId}, nextAttemptId={nextAttemptId}," +

Check failure on line 258 in csharp/rocketmq-client-csharp/ProcessQueue.cs

View workflow job for this annotation

GitHub Actions / csharp-build / ubuntu-20.04

'ReceiveMessageRequest' does not contain a definition for 'AttemptId' and no accessible extension method 'AttemptId' accepting a first argument of type 'ReceiveMessageRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 258 in csharp/rocketmq-client-csharp/ProcessQueue.cs

View workflow job for this annotation

GitHub Actions / csharp-build / ubuntu-20.04

'ReceiveMessageRequest' does not contain a definition for 'AttemptId' and no accessible extension method 'AttemptId' accepting a first argument of type 'ReceiveMessageRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 258 in csharp/rocketmq-client-csharp/ProcessQueue.cs

View workflow job for this annotation

GitHub Actions / csharp-build / ubuntu-20.04

'ReceiveMessageRequest' does not contain a definition for 'AttemptId' and no accessible extension method 'AttemptId' accepting a first argument of type 'ReceiveMessageRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 258 in csharp/rocketmq-client-csharp/ProcessQueue.cs

View workflow job for this annotation

GitHub Actions / csharp-build / windows-2022

'ReceiveMessageRequest' does not contain a definition for 'AttemptId' and no accessible extension method 'AttemptId' accepting a first argument of type 'ReceiveMessageRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 258 in csharp/rocketmq-client-csharp/ProcessQueue.cs

View workflow job for this annotation

GitHub Actions / csharp-build / windows-2022

'ReceiveMessageRequest' does not contain a definition for 'AttemptId' and no accessible extension method 'AttemptId' accepting a first argument of type 'ReceiveMessageRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 258 in csharp/rocketmq-client-csharp/ProcessQueue.cs

View workflow job for this annotation

GitHub Actions / csharp-build / windows-2022

'ReceiveMessageRequest' does not contain a definition for 'AttemptId' and no accessible extension method 'AttemptId' accepting a first argument of type 'ReceiveMessageRequest' could be found (are you missing a using directive or an assembly reference?)
$" clientId={clientId}");
await OnReceiveMessageException(ex, attemptId);
}
}

Expand Down

0 comments on commit e655b6b

Please sign in to comment.