From 02fdcc5a72468a676b8579e84809a6b088c16213 Mon Sep 17 00:00:00 2001 From: maniglia Date: Wed, 8 May 2024 14:28:04 +0200 Subject: [PATCH 1/3] EventBusRabbitMq fix. --- .../Abstractions/EventBus/IEventBusBase.cs | 2 +- .../EventBusRabbitMq.cs | 35 ++++++++++++------- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusBase.cs b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusBase.cs index c8081219..57b44271 100644 --- a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusBase.cs +++ b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusBase.cs @@ -13,7 +13,7 @@ public interface IEventBusBase void Initialize(bool asyncMode = true, CancellationToken cancel = default) where TIntegrationEvent : IIntegrationEvent, new(); - ValueTask Subscribe(bool asyncMode = true) + ValueTask Subscribe(bool asyncMode = true) where TIntegrationEvent : IIntegrationEvent, new() where TIntegrationEventHandler : IIntegrationEventHandler; diff --git a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMq.cs b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMq.cs index 125b32ac..117a1f67 100644 --- a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMq.cs +++ b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMq.cs @@ -258,35 +258,41 @@ protected virtual void QueueInitialize(IModel channel) #region [Subscribe] - public async ValueTask Subscribe(bool asyncMode = true) + public async ValueTask Subscribe(bool asyncMode = true) where TIntegrationEvent : IIntegrationEvent, new() where TIntegrationEventHandler : IIntegrationEventHandler { var eventName = this.SubsManager.GetEventKey(); - await this.DoInternalSubscription(eventName).ConfigureAwait(false); - this.SubsManager.AddSubscription(); - if (asyncMode) - { - await this.StartBasicConsumeAsync().ConfigureAwait(false); - } - else + var internalSubscriptionResult = await this.DoInternalSubscription(eventName).ConfigureAwait(false); + + if (internalSubscriptionResult) { - this.StartBasicConsume(); + this.SubsManager.AddSubscription(); + if (asyncMode) + { + return await this.StartBasicConsumeAsync().ConfigureAwait(false); + } + else + { + return this.StartBasicConsume(); + } } + + return false; } - protected async ValueTask DoInternalSubscription(string eventName) + protected async ValueTask DoInternalSubscription(string eventName) { //var containsKey = this.SubsManager.HasSubscriptionsForEvent(eventName); if (this.SubsManager.HasSubscriptionsForEvent(eventName)) { - return; + return false; } if (this.PersistentConnection is null) { - return; + return false; } if (!this.PersistentConnection.IsConnected) @@ -296,7 +302,7 @@ protected async ValueTask DoInternalSubscription(string eventName) if (!connectionResult) { this.Logger.LogWarning("EventBusRabbitMq DoInternalSubscriptionRpc: {0}!", "no connection"); - return; + return false; } } @@ -311,9 +317,12 @@ protected async ValueTask DoInternalSubscription(string eventName) { channel.QueueBind(this.QueueName, this.EventBusParameters.ExchangeDeclareParameters.ExchangeName, eventName); + return true; } } } + + return false; } #endregion From 65befbcc4a6c709a235db30cff21b595bb1966e3 Mon Sep 17 00:00:00 2001 From: maniglia Date: Wed, 8 May 2024 14:42:43 +0200 Subject: [PATCH 2/3] Testing --- .../Abstractions/EventBus/IEventBusQueue.cs | 2 +- .../Abstractions/EventBus/IEventBusRpc.cs | 2 +- .../EventBus/IEventBusRpcClient.cs | 2 +- .../EventBus/IEventBusRpcServer.cs | 2 +- .../Abstractions/EventBus/IEventBusTyped.cs | 2 +- .../EventBusRabbitMqDynamic.cs | 8 +++-- .../EventBusRabbitMqQueue.cs | 12 ++++--- .../EventBusRabbitMqRpc.cs | 27 ++++++++++----- .../EventBusRabbitMqRpcClient.cs | 34 ++++++++++++------- .../EventBusRabbitMqRpcServer.cs | 34 ++++++++++++------- .../EventBusRabbitMqTyped.cs | 24 ++++++++----- .../TestEventBus/TestEventBusRpc.cs | 2 +- 12 files changed, 98 insertions(+), 53 deletions(-) diff --git a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusQueue.cs b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusQueue.cs index 95dc81bb..5c47ca37 100644 --- a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusQueue.cs +++ b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusQueue.cs @@ -11,7 +11,7 @@ IIntegrationQueueHandler GetIntegrationQueueHandler; - ValueTask SubscribeQueue(string routingKey) + ValueTask SubscribeQueue(string routingKey) where TIntegrationEvent : IIntegrationEvent, new() where TIntegrationQueueHandler : IIntegrationQueueHandler; diff --git a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusRpc.cs b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusRpc.cs index e6649b72..d9e32c3e 100644 --- a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusRpc.cs +++ b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusRpc.cs @@ -11,7 +11,7 @@ IIntegrationRpcHandler GetIntegration where TIntegrationEvent : IIntegrationEvent, new() where TIntegrationEventReply : IIntegrationEventReply, new(); - ValueTask SubscribeRpc(string routingKey) + ValueTask SubscribeRpc(string routingKey) where TIntegrationEvent : IIntegrationEvent, new() where TIntegrationEventReply : IIntegrationEventReply, new() where TIntegrationRpcHandler : IIntegrationRpcHandler; diff --git a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusRpcClient.cs b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusRpcClient.cs index b7ab9026..8aa74596 100644 --- a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusRpcClient.cs +++ b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusRpcClient.cs @@ -29,7 +29,7 @@ Task CallAsync(TIntegrationEventRp // where TIntegrationEventReply : IIntegrationEventReply // where TH : IIntegrationRpcClientHandler; - ValueTask SubscribeRpcClient(string replyRoutingKey, bool asyncMode = true) + ValueTask SubscribeRpcClient(string replyRoutingKey, bool asyncMode = true) where TIntegrationEventHandler : IIntegrationRpcClientHandler; //void UnsubscribeRpcClient(string routingKey) diff --git a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusRpcServer.cs b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusRpcServer.cs index a8b8d499..6e1c6c25 100644 --- a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusRpcServer.cs +++ b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusRpcServer.cs @@ -16,7 +16,7 @@ IIntegrationRpcServerHandler GetIn where TIntegrationEventRpc : IIntegrationEventRpc, new() where TIntegrationEventReply : IIntegrationEventReply, new(); - ValueTask SubscribeRpcServer(string routingKey, bool asyncMode = true) + ValueTask SubscribeRpcServer(string routingKey, bool asyncMode = true) where TIntegrationEventRpc : IIntegrationEventRpc, new() where TIntegrationEventReply : IIntegrationEventReply, new() where TIntegrationRpcServerHandler : IIntegrationRpcServerHandler; diff --git a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusTyped.cs b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusTyped.cs index 0ae563d5..12ee04a3 100644 --- a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusTyped.cs +++ b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusTyped.cs @@ -7,7 +7,7 @@ namespace KSociety.Base.EventBus.Abstractions.EventBus public interface IEventBusTyped : IEventBus { - ValueTask Subscribe(string routingKey, bool asyncMode = false) + ValueTask Subscribe(string routingKey, bool asyncMode = false) where TIntegrationEvent : IIntegrationEvent, new() where TIntegrationEventHandler : IIntegrationEventHandler; diff --git a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqDynamic.cs b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqDynamic.cs index 13e7f40c..246fd1e8 100644 --- a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqDynamic.cs +++ b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqDynamic.cs @@ -43,8 +43,12 @@ public EventBusRabbitMqDynamic(IRabbitMqPersistentConnection persistentConnectio public async ValueTask SubscribeDynamic(string eventName) where TDynamicIntegrationEventHandler : IDynamicIntegrationEventHandler { - await this.DoInternalSubscription(eventName).ConfigureAwait(false); - this.SubsManager?.AddDynamicSubscription(eventName); + var internalSubscriptionResult = await this.DoInternalSubscription(eventName).ConfigureAwait(false); + + if (internalSubscriptionResult) + { + this.SubsManager?.AddDynamicSubscription(eventName); + } //ToDo //await this.StartBasicConsume().ConfigureAwait(false); } diff --git a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqQueue.cs b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqQueue.cs index 65831140..cf768ce8 100644 --- a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqQueue.cs +++ b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqQueue.cs @@ -115,14 +115,18 @@ public override async ValueTask Publish(IIntegrationEvent @event) #region [Subscribe] - public async ValueTask SubscribeQueue(string routingKey) + public async ValueTask SubscribeQueue(string routingKey) where TIntegrationEvent : IIntegrationEvent, new() where TIntegrationQueueHandler : IIntegrationQueueHandler { var eventName = this.SubsManager?.GetEventKey(); - await this.DoInternalSubscription(eventName + "." + routingKey).ConfigureAwait(false); - this.SubsManager?.AddSubscriptionQueue(eventName + "." + routingKey); - await this.StartBasicConsumeAsync().ConfigureAwait(false); + var internalSubscriptionResult = await this.DoInternalSubscription(eventName + "." + routingKey).ConfigureAwait(false); + if (internalSubscriptionResult) + { + this.SubsManager?.AddSubscriptionQueue(eventName + "." + routingKey); + return await this.StartBasicConsumeAsync().ConfigureAwait(false); + } + return false; } #endregion diff --git a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqRpc.cs b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqRpc.cs index 3c7dceb0..b89bbcab 100644 --- a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqRpc.cs +++ b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqRpc.cs @@ -209,7 +209,7 @@ protected override void QueueInitialize(IModel channel) #region [Subscribe] - public async ValueTask SubscribeRpc(string routingKey) + public async ValueTask SubscribeRpc(string routingKey) where TIntegrationEvent : IIntegrationEvent, new() where TIntegrationEventReply : IIntegrationEventReply, new() where TIntegrationRpcHandler : IIntegrationRpcHandler @@ -217,21 +217,28 @@ public async ValueTask SubscribeRpc(); var eventNameResult = this.SubsManager.GetEventReplyKey(); //this.Logger.LogDebug("SubscribeRpc: eventName: {0}.{1} eventNameResult: {2}.{3}", eventName, routingKey, eventNameResult, routingKey); - await this.DoInternalSubscriptionRpc(eventName + "." + routingKey, eventNameResult + "." + routingKey) + var internalSubscriptionResult = await this.DoInternalSubscriptionRpc(eventName + "." + routingKey, eventNameResult + "." + routingKey) .ConfigureAwait(false); - this.SubsManager.AddSubscriptionRpc(eventName + "." + routingKey, eventNameResult + "." + routingKey); - await this.StartBasicConsumeServer().ConfigureAwait(false); - await this.StartBasicConsumeReplyAsync().ConfigureAwait(false); + if (internalSubscriptionResult) + { + this.SubsManager.AddSubscriptionRpc(eventName + "." + routingKey, eventNameResult + "." + routingKey); + await this.StartBasicConsumeServer().ConfigureAwait(false); + await this.StartBasicConsumeReplyAsync().ConfigureAwait(false); + + return true; + } + + return false; } - private async ValueTask DoInternalSubscriptionRpc(string eventName, string eventNameResult) + private async ValueTask DoInternalSubscriptionRpc(string eventName, string eventNameResult) { try { //var containsKey = this.SubsManager.HasSubscriptionsForEvent(eventName); if (this.SubsManager.HasSubscriptionsForEvent(eventName)) { - return; + return false; } if (!this.PersistentConnection.IsConnected) @@ -241,7 +248,7 @@ private async ValueTask DoInternalSubscriptionRpc(string eventName, string event if (!connectionResult) { this.Logger.LogWarning("EventBusRabbitMqRpc DoInternalSubscriptionRpc: {0}!", "no connection"); - return; + return false; } } @@ -260,6 +267,8 @@ private async ValueTask DoInternalSubscriptionRpc(string eventName, string event channel.QueueBind(this._queueNameReply, this.EventBusParameters.ExchangeDeclareParameters.ExchangeName, eventNameResult); + + return true; } } } @@ -272,6 +281,8 @@ private async ValueTask DoInternalSubscriptionRpc(string eventName, string event { this.Logger.LogError(ex, "EventBusRabbitMqRpc DoInternalSubscriptionRpc: "); } + + return false; } #endregion diff --git a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqRpcClient.cs b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqRpcClient.cs index 4fcd17d0..ed36e41c 100644 --- a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqRpcClient.cs +++ b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqRpcClient.cs @@ -359,31 +359,37 @@ protected override void QueueInitialize(IModel channel) // await this.StartBasicConsume().ConfigureAwait(false); //} - public async ValueTask SubscribeRpcClient(string replyRoutingKey, bool asyncMode = true) + public async ValueTask SubscribeRpcClient(string replyRoutingKey, bool asyncMode = true) where TIntegrationRpcClientHandler : IIntegrationRpcClientHandler { var eventNameResult = this.SubsManager.GetEventReplyKey(); //this.Logger.LogTrace("SubscribeRpcClient reply routing key: {0}, event name result: {1}", replyRoutingKey, eventNameResult); - await this.DoInternalSubscriptionRpc(eventNameResult + "." + replyRoutingKey).ConfigureAwait(false); - this.SubsManager.AddSubscriptionRpcClient(eventNameResult + "." + replyRoutingKey); - if (asyncMode) - { - await this.StartBasicConsumeAsync().ConfigureAwait(false); - } - else + var internalSubscriptionResult = await this.DoInternalSubscriptionRpc(eventNameResult + "." + replyRoutingKey).ConfigureAwait(false); + + if (internalSubscriptionResult) { - this.StartBasicConsume(); + this.SubsManager.AddSubscriptionRpcClient(eventNameResult + "." + replyRoutingKey); + if (asyncMode) + { + return await this.StartBasicConsumeAsync().ConfigureAwait(false); + } + else + { + return this.StartBasicConsume(); + } } + + return false; } - private async ValueTask DoInternalSubscriptionRpc(string eventNameResult) + private async ValueTask DoInternalSubscriptionRpc(string eventNameResult) { try { //var containsKey = this.SubsManager.HasSubscriptionsForEvent(eventNameResult); if (this.SubsManager.HasSubscriptionsForEvent(eventNameResult)) { - return; + return false; } if (!this.PersistentConnection.IsConnected) @@ -393,7 +399,7 @@ private async ValueTask DoInternalSubscriptionRpc(string eventNameResult) if (!connectionResult) { this.Logger.LogWarning("EventBusRabbitMqRpcClient DoInternalSubscriptionRpc: {0}!", "no connection"); - return; + return false; } } @@ -409,6 +415,8 @@ private async ValueTask DoInternalSubscriptionRpc(string eventNameResult) channel.QueueBind(this._queueNameReply, this.EventBusParameters.ExchangeDeclareParameters.ExchangeName, eventNameResult); //ToDo + + return true; } } } @@ -421,6 +429,8 @@ private async ValueTask DoInternalSubscriptionRpc(string eventNameResult) { this.Logger.LogError(ex, "EventBusRabbitMqRpcClient DoInternalSubscriptionRpc: "); } + + return false; } #endregion diff --git a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqRpcServer.cs b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqRpcServer.cs index b17a6890..a73c73cb 100644 --- a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqRpcServer.cs +++ b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqRpcServer.cs @@ -219,7 +219,7 @@ protected override void QueueInitialize(IModel channel) #region [Subscribe] - public async ValueTask SubscribeRpcServer(string routingKey, bool asyncMode = true) + public async ValueTask SubscribeRpcServer(string routingKey, bool asyncMode = true) where TIntegrationEventRpc : IIntegrationEventRpc, new() where TIntegrationEventReply : IIntegrationEventReply, new() where TIntegrationRpcServerHandler : IIntegrationRpcServerHandler @@ -227,28 +227,34 @@ public async ValueTask SubscribeRpcServer(); var eventNameResult = this.SubsManager.GetEventReplyKey(); //this.Logger.LogTrace("SubscribeRpcServer routing key: {0}, event name: {1}, event name result: {2}", routingKey, eventName, eventNameResult); - await this.DoInternalSubscriptionRpc(eventName + "." + routingKey, eventNameResult + "." + routingKey) + var internalSubscriptionResult = await this.DoInternalSubscriptionRpc(eventName + "." + routingKey, eventNameResult + "." + routingKey) .ConfigureAwait(false); - this.SubsManager.AddSubscriptionRpcServer(eventName + "." + routingKey, eventNameResult + "." + routingKey); - if (asyncMode) - { - await this.StartBasicConsumeServerAsync().ConfigureAwait(false); - } - else + if (internalSubscriptionResult) { - this.StartBasicConsumeServer(); + this.SubsManager.AddSubscriptionRpcServer(eventName + "." + routingKey, eventNameResult + "." + routingKey); + + if (asyncMode) + { + return await this.StartBasicConsumeServerAsync().ConfigureAwait(false); + } + else + { + return this.StartBasicConsumeServer(); + } } + + return false; } - private async ValueTask DoInternalSubscriptionRpc(string eventName, string eventNameResult) + private async ValueTask DoInternalSubscriptionRpc(string eventName, string eventNameResult) { try { //var containsKey = this.SubsManager.HasSubscriptionsForEvent(eventName); if (this.SubsManager.HasSubscriptionsForEvent(eventName)) { - return; + return false; } if (!this.PersistentConnection.IsConnected) @@ -258,7 +264,7 @@ private async ValueTask DoInternalSubscriptionRpc(string eventName, string event if (!connectionResult) { this.Logger.LogWarning("EventBusRabbitMqRpcServer DoInternalSubscriptionRpc: {0}!", "no connection"); - return; + return false; } } @@ -281,6 +287,8 @@ private async ValueTask DoInternalSubscriptionRpc(string eventName, string event { channel.QueueBind(this.QueueName, this.EventBusParameters.ExchangeDeclareParameters.ExchangeName, eventName); + + return true; //channel.QueueBind(this._queueNameReply, // this.EventBusParameters.ExchangeDeclareParameters.ExchangeName, // eventNameResult); @@ -296,6 +304,8 @@ private async ValueTask DoInternalSubscriptionRpc(string eventName, string event { this.Logger.LogError(ex, "EventBusRabbitMqRpcServer DoInternalSubscriptionRpc: "); } + + return false; } #endregion diff --git a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqTyped.cs b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqTyped.cs index e8f167ea..251e49c3 100644 --- a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqTyped.cs +++ b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqTyped.cs @@ -53,23 +53,29 @@ public EventBusRabbitMqTyped(IRabbitMqPersistentConnection persistentConnection, #region [Subscribe] - public async ValueTask Subscribe(string routingKey, bool asyncMode = false) + public async ValueTask Subscribe(string routingKey, bool asyncMode = false) where TIntegrationEvent : IIntegrationEvent, new() where TIntegrationEventHandler : IIntegrationEventHandler { var eventName = this.SubsManager?.GetEventKey(); //this.Logger?.LogTrace("SubscribeTyped routing key: {0}, event name: {1}", routingKey, eventName); - await this.DoInternalSubscription(eventName + "." + routingKey).ConfigureAwait(false); - this.SubsManager?.AddSubscription(eventName + "." + routingKey); - if(asyncMode) + var internalSubscriptionResult = await this.DoInternalSubscription(eventName + "." + routingKey).ConfigureAwait(false); + + if (internalSubscriptionResult) { - await this.StartBasicConsumeAsync().ConfigureAwait(false); + this.SubsManager?.AddSubscription(eventName + "." + routingKey); + if (asyncMode) + { + return await this.StartBasicConsumeAsync().ConfigureAwait(false); + } + else + { + return this.StartBasicConsume(); + } } - else - { - this.StartBasicConsume(); - } + + return false; } #endregion diff --git a/src/01/KSocietyBaseEventBus/Test/KSociety.Base.EventBus.Test/TestEventBus/TestEventBusRpc.cs b/src/01/KSocietyBaseEventBus/Test/KSociety.Base.EventBus.Test/TestEventBus/TestEventBusRpc.cs index 22fc3890..9f8fb5cf 100644 --- a/src/01/KSocietyBaseEventBus/Test/KSociety.Base.EventBus.Test/TestEventBus/TestEventBusRpc.cs +++ b/src/01/KSocietyBaseEventBus/Test/KSociety.Base.EventBus.Test/TestEventBus/TestEventBusRpc.cs @@ -22,7 +22,7 @@ public class TestEventBusRpc : Test public TestEventBusRpc() { - this.Subscriber = new Subscriber(this.LoggerFactory, this.PersistentConnection, this.EventBusParameters, 10); + this.Subscriber = new Subscriber(this.LoggerFactory, this.PersistentConnection, this.EventBusParameters, 10, true); //this.Subscriber. new Thread(async () => From 3a5324204abec8e12c6717877c3730445543ccb6 Mon Sep 17 00:00:00 2001 From: maniglia Date: Wed, 8 May 2024 14:57:50 +0200 Subject: [PATCH 3/3] Testing --- .../Abstractions/EventBus/IEventBusDynamic.cs | 2 +- .../EventBusRabbitMqDynamic.cs | 6 +++++- .../EventBusRabbitMqRpc.cs | 13 +++++++++---- .../EventBusRabbitMqRpcClient.cs | 7 +++++-- .../EventBusRabbitMqRpcServer.cs | 14 ++++++++++---- 5 files changed, 30 insertions(+), 12 deletions(-) diff --git a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusDynamic.cs b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusDynamic.cs index 2a38da1f..bbfb79af 100644 --- a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusDynamic.cs +++ b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBus/Abstractions/EventBus/IEventBusDynamic.cs @@ -7,7 +7,7 @@ namespace KSociety.Base.EventBus.Abstractions.EventBus public interface IEventBusDynamic { - ValueTask SubscribeDynamic(string routingKey) + ValueTask SubscribeDynamic(string routingKey) where TDynamicIntegrationEventHandler : IDynamicIntegrationEventHandler; void UnsubscribeDynamic(string routingKey) diff --git a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqDynamic.cs b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqDynamic.cs index 246fd1e8..c7fad06b 100644 --- a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqDynamic.cs +++ b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqDynamic.cs @@ -40,7 +40,7 @@ public EventBusRabbitMqDynamic(IRabbitMqPersistentConnection persistentConnectio #region [Subscribe] - public async ValueTask SubscribeDynamic(string eventName) + public async ValueTask SubscribeDynamic(string eventName) where TDynamicIntegrationEventHandler : IDynamicIntegrationEventHandler { var internalSubscriptionResult = await this.DoInternalSubscription(eventName).ConfigureAwait(false); @@ -48,9 +48,13 @@ public async ValueTask SubscribeDynamic(string if (internalSubscriptionResult) { this.SubsManager?.AddDynamicSubscription(eventName); + + return true; } //ToDo //await this.StartBasicConsume().ConfigureAwait(false); + + return false; } #endregion diff --git a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqRpc.cs b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqRpc.cs index b89bbcab..ccee6494 100644 --- a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqRpc.cs +++ b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqRpc.cs @@ -497,8 +497,12 @@ protected async ValueTask CreateConsumerChannelServerAsync CreateConsumerChannelReplyAsync CreateConsumerChannelAsync(CancellationToken c if (!this.PersistentConnection.IsConnected) { - await this.PersistentConnection.TryConnectAsync().ConfigureAwait(false); - //this.PersistentConnection.TryConnect(); + var connectionResult = await this.PersistentConnection.TryConnectAsync().ConfigureAwait(false); + if (!connectionResult) + { + return null; + } } var channel = this.PersistentConnection.CreateModel(); diff --git a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqRpcServer.cs b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqRpcServer.cs index a73c73cb..4baccd2a 100644 --- a/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqRpcServer.cs +++ b/src/01/KSocietyBaseEventBus/KSociety.Base.EventBusRabbitMQ/EventBusRabbitMqRpcServer.cs @@ -578,8 +578,11 @@ protected async ValueTask CreateConsumerChannelServerAsync CreateConsumerChannelReplyAsync(CancellationToke //this.Logger.LogTrace("CreateConsumerChannelReplyAsync queue name: {0}", this._queueNameReply); if (!this.PersistentConnection.IsConnected) { - await this.PersistentConnection.TryConnectAsync().ConfigureAwait(false); - //this.PersistentConnection.TryConnect(); + var connectionResult = await this.PersistentConnection.TryConnectAsync().ConfigureAwait(false); + if (connectionResult) + { + return null; + } } var channel = this.PersistentConnection.CreateModel();