Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the rawProducer and rawSuperStreamProducer status #337

Merged
merged 3 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions RabbitMQ.Stream.Client/AbstractEntity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ public abstract class AbstractEntity : IClosable
protected abstract string GetStream();
protected abstract string DumpEntityConfiguration();

protected void ThrowIfClosed()
{
if (!IsOpen())
{
throw new AlreadyClosedException($"{DumpEntityConfiguration()} is closed.");
}
}

// here the _cancelTokenSource is disposed and the token is cancelled
// in producer is used to cancel the send task
// in consumer is used to cancel the receive task
Expand Down
39 changes: 39 additions & 0 deletions RabbitMQ.Stream.Client/ClientExceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,42 @@
// Copyright (c) 2007-2023 VMware, Inc.

using System;
using System.Linq;
using System.Net.Sockets;

namespace RabbitMQ.Stream.Client
{
internal static class ClientExceptions
{

// <summary>
/// IsAKnownException returns true if the exception is a known exception
/// We need it to reconnect when the producer/consumer.
/// - LeaderNotFoundException is a temporary exception
/// It means that the leader is not available and the client can't reconnect.
/// Especially the Producer that needs to know the leader.
/// - SocketException
/// Client is trying to connect in a not ready endpoint.
/// It is usually a temporary situation.
/// - TimeoutException
/// Network call timed out. It is often a temporary situation and we should retry.
/// In this case we can try to reconnect.
///
/// For the other kind of exception, we just throw back the exception.
//</summary>
internal static bool IsAKnownException(Exception exception)
{
if (exception is AggregateException aggregateException)
{
var x = aggregateException.InnerExceptions.Select(x =>
x.GetType() == typeof(SocketException) || x.GetType() == typeof(TimeoutException) ||
x.GetType() == typeof(LeaderNotFoundException));
return x.Any();
}

return exception is (SocketException or TimeoutException or LeaderNotFoundException);
}

public static void MaybeThrowException(ResponseCode responseCode, string message)
{
if (responseCode is ResponseCode.Ok)
Expand All @@ -27,6 +58,14 @@ public static void MaybeThrowException(ResponseCode responseCode, string message
}
}

public class AlreadyClosedException : Exception
{
public AlreadyClosedException(string s)
: base(s)
{
}
}

public class ProtocolException : Exception
{
protected ProtocolException(string s)
Expand Down
3 changes: 3 additions & 0 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ RabbitMQ.Stream.Client.AbstractEntity.IsOpen() -> bool
RabbitMQ.Stream.Client.AbstractEntity.Logger.get -> Microsoft.Extensions.Logging.ILogger
RabbitMQ.Stream.Client.AbstractEntity.Logger.init -> void
RabbitMQ.Stream.Client.AbstractEntity.Shutdown(RabbitMQ.Stream.Client.EntityCommonConfig config, bool ignoreIfAlreadyDeleted = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
RabbitMQ.Stream.Client.AbstractEntity.ThrowIfClosed() -> void
RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken
RabbitMQ.Stream.Client.AlreadyClosedException
RabbitMQ.Stream.Client.AlreadyClosedException.AlreadyClosedException(string s) -> void
RabbitMQ.Stream.Client.AuthMechanism
RabbitMQ.Stream.Client.AuthMechanism.External = 1 -> RabbitMQ.Stream.Client.AuthMechanism
RabbitMQ.Stream.Client.AuthMechanism.Plain = 0 -> RabbitMQ.Stream.Client.AuthMechanism
Expand Down
4 changes: 4 additions & 0 deletions RabbitMQ.Stream.Client/RawProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ private ClientParameters.MetadataUpdateHandler OnMetadataUpdate() =>
/// <param name="compressionType">No Compression, Gzip Compression. Other types are not provided by default</param>
public async ValueTask Send(ulong publishingId, List<Message> subEntryMessages, CompressionType compressionType)
{
ThrowIfClosed();
if (subEntryMessages.Count != 0)
{
await SemaphoreAwaitAsync().ConfigureAwait(false);
Expand All @@ -239,6 +240,7 @@ private async Task SemaphoreAwaitAsync()
/// <param name="messages"></param>
public async ValueTask Send(List<(ulong, Message)> messages)
{
ThrowIfClosed();
PreValidateBatch(messages);
await InternalBatchSend(messages).ConfigureAwait(false);
}
Expand Down Expand Up @@ -275,6 +277,7 @@ internal void PreValidateBatch(List<(ulong, Message)> messages)

private async Task SendMessages(List<(ulong, Message)> messages, bool clearMessagesList = true)
{
ThrowIfClosed();
if (IsFilteringEnabled)
{
await _client.Publish(new PublishFilter(EntityId, messages, _config.Filter.FilterValue,
Expand Down Expand Up @@ -322,6 +325,7 @@ public async Task<ulong> GetLastPublishingId()
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public async ValueTask Send(ulong publishingId, Message message)
{
ThrowIfClosed();
if (message.Size > _client.MaxFrameSize)
{
throw new InvalidOperationException($"Message size is to big. " +
Expand Down
11 changes: 11 additions & 0 deletions RabbitMQ.Stream.Client/RawSuperStreamProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ private async Task<IProducer> InitProducer(string stream)
return p;
}

protected void ThrowIfClosed()
{
if (!IsOpen())
{
throw new AlreadyClosedException($"Super stream {_config.SuperStream} is closed.");
}
}

private async Task<IProducer> GetProducer(string stream)
{
if (!_producers.ContainsKey(stream))
Expand Down Expand Up @@ -170,12 +178,14 @@ private async Task<IProducer> GetProducerForMessage(Message message)

public async ValueTask Send(ulong publishingId, Message message)
{
ThrowIfClosed();
var producer = await GetProducerForMessage(message).ConfigureAwait(false);
await producer.Send(publishingId, message).ConfigureAwait(false);
}

public async ValueTask Send(List<(ulong, Message)> messages)
{
ThrowIfClosed();
var aggregate = new List<(IProducer, List<(ulong, Message)>)>();

// this part is not super-optimized
Expand Down Expand Up @@ -203,6 +213,7 @@ public async ValueTask Send(List<(ulong, Message)> messages)

public async ValueTask Send(ulong publishingId, List<Message> subEntryMessages, CompressionType compressionType)
{
ThrowIfClosed();
var aggregate = new List<(IProducer, List<Message>)>();

// this part is not super-optimized
Expand Down
10 changes: 10 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ internal void Start()

internal void Stop()
{
FlushPendingMessages();
_invalidateTimer.Enabled = false;
_waitForConfirmationActionBlock.Complete();
}
Expand All @@ -129,6 +130,15 @@ await RemoveUnConfirmedMessage(ConfirmationStatus.ClientTimeoutError, pair.Value
}
}

private async void FlushPendingMessages()
{
foreach (var pair in _waitForConfirmation)
{
await RemoveUnConfirmedMessage(ConfirmationStatus.ClientTimeoutError, pair.Value.PublishingId, null)
.ConfigureAwait(false);
}
}
Gsantomaggio marked this conversation as resolved.
Show resolved Hide resolved

internal void AddUnConfirmedMessage(ulong publishingId, Message message)
{
AddUnConfirmedMessage(publishingId, new List<Message> { message });
Expand Down
40 changes: 36 additions & 4 deletions RabbitMQ.Stream.Client/Reliable/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ private Producer(ProducerConfig producerConfig, ILogger<Producer> logger = null)
_logger = logger ?? NullLogger<Producer>.Instance;
}

private void ThrowIfClosed()
{
if (!_isOpen)
{
throw new AlreadyClosedException("Producer is closed");
}
}

/// <summary>
/// Create a new Producer.
/// <param name="producerConfig">Producer Configuration. Where StreamSystem and Stream are mandatory.</param>
Expand Down Expand Up @@ -241,6 +249,7 @@ internal async Task<ulong> GetLastPublishingId()

internal async ValueTask SendInternal(ulong publishingId, Message message)
{
ThrowIfClosed();
_confirmationPipe.AddUnConfirmedMessage(publishingId, message);
try
{
Expand All @@ -249,9 +258,16 @@ internal async ValueTask SendInternal(ulong publishingId, Message message)
// In this case it skips the publish until
// the producer is connected. Messages are safe since are stored
// on the _waitForConfirmation list. The user will get Timeout Error
if (!(_inReconnection))
if (!_inReconnection)
{
await _producer.Send(publishingId, message).ConfigureAwait(false);
if (_producer.IsOpen())
{
await _producer.Send(publishingId, message).ConfigureAwait(false);
}
else
{
_logger?.LogDebug("The internal producer is closed. Message will be timed out");
}
}
}

Expand Down Expand Up @@ -284,14 +300,22 @@ internal async ValueTask SendInternal(ulong publishingId, Message message)
/// In case of error the messages are considered as timed out, you will receive a confirmation with the status TimedOut.
public async ValueTask Send(List<Message> messages, CompressionType compressionType)
{
ThrowIfClosed();
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
Interlocked.Increment(ref _publishingId);
_confirmationPipe.AddUnConfirmedMessage(_publishingId, messages);
try
{
if (!_inReconnection)
{
await _producer.Send(_publishingId, messages, compressionType).ConfigureAwait(false);
if (_producer.IsOpen())
{
await _producer.Send(_publishingId, messages, compressionType).ConfigureAwait(false);
}
else
{
_logger?.LogDebug("The internal producer is closed. Message will be timed out");
}
}
}

Expand Down Expand Up @@ -330,6 +354,7 @@ public override string ToString()
/// In case of error the messages are considered as timed out, you will receive a confirmation with the status TimedOut.
public async ValueTask Send(List<Message> messages)
{
ThrowIfClosed();
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
var messagesToSend = new List<(ulong, Message)>();
foreach (var message in messages)
Expand All @@ -352,7 +377,14 @@ public async ValueTask Send(List<Message> messages)
// on the _waitForConfirmation list. The user will get Timeout Error
if (!(_inReconnection))
{
await _producer.Send(messagesToSend).ConfigureAwait(false);
if (_producer.IsOpen())
{
await _producer.Send(messagesToSend).ConfigureAwait(false);
}
else
{
_logger?.LogDebug("The internal producer is closed. Message will be timed out");
}
}
}

Expand Down
29 changes: 5 additions & 24 deletions RabbitMQ.Stream.Client/Reliable/ReliableBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Copyright (c) 2007-2023 VMware, Inc.

using System;
using System.Net.Sockets;

using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -67,7 +67,7 @@ private async Task Init(bool boot, IReconnectStrategy reconnectStrategy)

catch (Exception e)
{
reconnect = IsAKnownException(e);
reconnect = ClientExceptions.IsAKnownException(e);
LogException(e);
if (!reconnect)
{
Expand Down Expand Up @@ -143,7 +143,8 @@ internal async Task HandleMetaDataMaybeReconnect(string stream, StreamSystem sys
await Task.Delay(500).ConfigureAwait(false);
if (await system.StreamExists(stream).ConfigureAwait(false))
{
BaseLogger.LogInformation("Meta data update stream: {StreamIdentifier}. The stream still exists. Client: {Identity}",
BaseLogger.LogInformation(
"Meta data update stream: {StreamIdentifier}. The stream still exists. Client: {Identity}",
stream,
ToString()
);
Expand All @@ -164,31 +165,11 @@ internal async Task HandleMetaDataMaybeReconnect(string stream, StreamSystem sys
}
}

// <summary>
/// IsAKnownException returns true if the exception is a known exception
/// We need it to reconnect when the producer/consumer.
/// - LeaderNotFoundException is a temporary exception
/// It means that the leader is not available and the client can't reconnect.
/// Especially the Producer that needs to know the leader.
/// - SocketException
/// Client is trying to connect in a not ready endpoint.
/// It is usually a temporary situation.
/// - TimeoutException
/// Some call went in timeout. Maybe a temporary DNS problem.
/// In this case we can try to reconnect.
///
/// For the other kind of exception, we just throw back the exception.
//</summary>
internal static bool IsAKnownException(Exception exception)
{
return exception is (SocketException or TimeoutException or LeaderNotFoundException);
}

private void LogException(Exception exception)
{
const string KnownExceptionTemplate = "{Identity} trying to reconnect due to exception";
const string UnknownExceptionTemplate = "{Identity} received an exception during initialization";
if (IsAKnownException(exception))
if (ClientExceptions.IsAKnownException(exception))
{
BaseLogger.LogError(exception, KnownExceptionTemplate, ToString());
}
Expand Down
Loading
Loading