Skip to content

Commit

Permalink
Merge pull request #445 from K-Society/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
maniglia authored May 8, 2024
2 parents 311a93e + d8e1d34 commit b2251a1
Show file tree
Hide file tree
Showing 15 changed files with 151 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public interface IEventBusBase
void Initialize<TIntegrationEvent>(bool asyncMode = true, CancellationToken cancel = default)
where TIntegrationEvent : IIntegrationEvent, new();

ValueTask Subscribe<TIntegrationEvent, TIntegrationEventHandler>(bool asyncMode = true)
ValueTask<bool> Subscribe<TIntegrationEvent, TIntegrationEventHandler>(bool asyncMode = true)
where TIntegrationEvent : IIntegrationEvent, new()
where TIntegrationEventHandler : IIntegrationEventHandler<TIntegrationEvent>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace KSociety.Base.EventBus.Abstractions.EventBus

public interface IEventBusDynamic
{
ValueTask SubscribeDynamic<TDynamicIntegrationEventHandler>(string routingKey)
ValueTask<bool> SubscribeDynamic<TDynamicIntegrationEventHandler>(string routingKey)
where TDynamicIntegrationEventHandler : IDynamicIntegrationEventHandler;

void UnsubscribeDynamic<TDynamicIntegrationEventHandler>(string routingKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ IIntegrationQueueHandler<TIntegrationEvent> GetIntegrationQueueHandler<TIntegrat
where TIntegrationEvent : IIntegrationEvent, new()
where TIntegrationQueueHandler : IIntegrationQueueHandler<TIntegrationEvent>;

ValueTask SubscribeQueue<TIntegrationEvent, TIntegrationQueueHandler>(string routingKey)
ValueTask<bool> SubscribeQueue<TIntegrationEvent, TIntegrationQueueHandler>(string routingKey)
where TIntegrationEvent : IIntegrationEvent, new()
where TIntegrationQueueHandler : IIntegrationQueueHandler<TIntegrationEvent>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ IIntegrationRpcHandler<TIntegrationEvent, TIntegrationEventReply> GetIntegration
where TIntegrationEvent : IIntegrationEvent, new()
where TIntegrationEventReply : IIntegrationEventReply, new();

ValueTask SubscribeRpc<TIntegrationEvent, TIntegrationEventReply, TIntegrationRpcHandler>(string routingKey)
ValueTask<bool> SubscribeRpc<TIntegrationEvent, TIntegrationEventReply, TIntegrationRpcHandler>(string routingKey)
where TIntegrationEvent : IIntegrationEvent, new()
where TIntegrationEventReply : IIntegrationEventReply, new()
where TIntegrationRpcHandler : IIntegrationRpcHandler<TIntegrationEvent, TIntegrationEventReply>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Task<TIntegrationEventReply> CallAsync<TIntegrationEventRpc>(TIntegrationEventRp
// where TIntegrationEventReply : IIntegrationEventReply
// where TH : IIntegrationRpcClientHandler<TIntegrationEventReply>;

ValueTask SubscribeRpcClient<TIntegrationEventHandler>(string replyRoutingKey, bool asyncMode = true)
ValueTask<bool> SubscribeRpcClient<TIntegrationEventHandler>(string replyRoutingKey, bool asyncMode = true)
where TIntegrationEventHandler : IIntegrationRpcClientHandler<TIntegrationEventReply>;

//void UnsubscribeRpcClient<TIntegrationEventReply, TH>(string routingKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ IIntegrationRpcServerHandler<TIntegrationEventRpc, TIntegrationEventReply> GetIn
where TIntegrationEventRpc : IIntegrationEventRpc, new()
where TIntegrationEventReply : IIntegrationEventReply, new();

ValueTask SubscribeRpcServer<TIntegrationEventRpc, TIntegrationEventReply, TIntegrationRpcServerHandler>(string routingKey, bool asyncMode = true)
ValueTask<bool> SubscribeRpcServer<TIntegrationEventRpc, TIntegrationEventReply, TIntegrationRpcServerHandler>(string routingKey, bool asyncMode = true)
where TIntegrationEventRpc : IIntegrationEventRpc, new()
where TIntegrationEventReply : IIntegrationEventReply, new()
where TIntegrationRpcServerHandler : IIntegrationRpcServerHandler<TIntegrationEventRpc, TIntegrationEventReply>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace KSociety.Base.EventBus.Abstractions.EventBus

public interface IEventBusTyped : IEventBus
{
ValueTask Subscribe<TIntegrationEvent, TIntegrationEventHandler>(string routingKey, bool asyncMode = false)
ValueTask<bool> Subscribe<TIntegrationEvent, TIntegrationEventHandler>(string routingKey, bool asyncMode = false)
where TIntegrationEvent : IIntegrationEvent, new()
where TIntegrationEventHandler : IIntegrationEventHandler<TIntegrationEvent>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,35 +258,41 @@ protected virtual void QueueInitialize(IModel channel)

#region [Subscribe]

public async ValueTask Subscribe<TIntegrationEvent, TIntegrationEventHandler>(bool asyncMode = true)
public async ValueTask<bool> Subscribe<TIntegrationEvent, TIntegrationEventHandler>(bool asyncMode = true)
where TIntegrationEvent : IIntegrationEvent, new()
where TIntegrationEventHandler : IIntegrationEventHandler<TIntegrationEvent>
{

var eventName = this.SubsManager.GetEventKey<TIntegrationEvent>();
await this.DoInternalSubscription(eventName).ConfigureAwait(false);
this.SubsManager.AddSubscription<TIntegrationEvent, TIntegrationEventHandler>();
if (asyncMode)
{
await this.StartBasicConsumeAsync<TIntegrationEvent>().ConfigureAwait(false);
}
else
var internalSubscriptionResult = await this.DoInternalSubscription(eventName).ConfigureAwait(false);

if (internalSubscriptionResult)
{
this.StartBasicConsume<TIntegrationEvent>();
this.SubsManager.AddSubscription<TIntegrationEvent, TIntegrationEventHandler>();
if (asyncMode)
{
return await this.StartBasicConsumeAsync<TIntegrationEvent>().ConfigureAwait(false);
}
else
{
return this.StartBasicConsume<TIntegrationEvent>();
}
}

return false;
}

protected async ValueTask DoInternalSubscription(string eventName)
protected async ValueTask<bool> DoInternalSubscription(string eventName)
{
//var containsKey = this.SubsManager.HasSubscriptionsForEvent(eventName);
if (this.SubsManager.HasSubscriptionsForEvent(eventName))
{
return;
return false;
}

if (this.PersistentConnection is null)
{
return;
return false;
}

if (!this.PersistentConnection.IsConnected)
Expand All @@ -296,7 +302,7 @@ protected async ValueTask DoInternalSubscription(string eventName)
if (!connectionResult)
{
this.Logger.LogWarning("EventBusRabbitMq DoInternalSubscriptionRpc: {0}!", "no connection");
return;
return false;
}
}

Expand All @@ -311,9 +317,12 @@ protected async ValueTask DoInternalSubscription(string eventName)
{
channel.QueueBind(this.QueueName,
this.EventBusParameters.ExchangeDeclareParameters.ExchangeName, eventName);
return true;
}
}
}

return false;
}

#endregion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,21 @@ public EventBusRabbitMqDynamic(IRabbitMqPersistentConnection persistentConnectio

#region [Subscribe]

public async ValueTask SubscribeDynamic<TDynamicIntegrationEventHandler>(string eventName)
public async ValueTask<bool> SubscribeDynamic<TDynamicIntegrationEventHandler>(string eventName)
where TDynamicIntegrationEventHandler : IDynamicIntegrationEventHandler
{
await this.DoInternalSubscription(eventName).ConfigureAwait(false);
this.SubsManager?.AddDynamicSubscription<TDynamicIntegrationEventHandler>(eventName);
var internalSubscriptionResult = await this.DoInternalSubscription(eventName).ConfigureAwait(false);

if (internalSubscriptionResult)
{
this.SubsManager?.AddDynamicSubscription<TDynamicIntegrationEventHandler>(eventName);

return true;
}
//ToDo
//await this.StartBasicConsume().ConfigureAwait(false);

return false;
}

#endregion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,18 @@ public override async ValueTask Publish(IIntegrationEvent @event)

#region [Subscribe]

public async ValueTask SubscribeQueue<TIntegrationEvent, TIntegrationQueueHandler>(string routingKey)
public async ValueTask<bool> SubscribeQueue<TIntegrationEvent, TIntegrationQueueHandler>(string routingKey)
where TIntegrationEvent : IIntegrationEvent, new()
where TIntegrationQueueHandler : IIntegrationQueueHandler<TIntegrationEvent>
{
var eventName = this.SubsManager?.GetEventKey<TIntegrationEvent>();
await this.DoInternalSubscription(eventName + "." + routingKey).ConfigureAwait(false);
this.SubsManager?.AddSubscriptionQueue<TIntegrationEvent, TIntegrationQueueHandler>(eventName + "." + routingKey);
await this.StartBasicConsumeAsync<TIntegrationEvent>().ConfigureAwait(false);
var internalSubscriptionResult = await this.DoInternalSubscription(eventName + "." + routingKey).ConfigureAwait(false);
if (internalSubscriptionResult)
{
this.SubsManager?.AddSubscriptionQueue<TIntegrationEvent, TIntegrationQueueHandler>(eventName + "." + routingKey);
return await this.StartBasicConsumeAsync<TIntegrationEvent>().ConfigureAwait(false);
}
return false;
}

#endregion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,29 +209,36 @@ protected override void QueueInitialize(IModel channel)

#region [Subscribe]

public async ValueTask SubscribeRpc<TIntegrationEvent, TIntegrationEventReply, TIntegrationRpcHandler>(string routingKey)
public async ValueTask<bool> SubscribeRpc<TIntegrationEvent, TIntegrationEventReply, TIntegrationRpcHandler>(string routingKey)
where TIntegrationEvent : IIntegrationEvent, new()
where TIntegrationEventReply : IIntegrationEventReply, new()
where TIntegrationRpcHandler : IIntegrationRpcHandler<TIntegrationEvent, TIntegrationEventReply>
{
var eventName = this.SubsManager.GetEventKey<TIntegrationEvent>();
var eventNameResult = this.SubsManager.GetEventReplyKey<TIntegrationEventReply>();
//this.Logger.LogDebug("SubscribeRpc: eventName: {0}.{1} eventNameResult: {2}.{3}", eventName, routingKey, eventNameResult, routingKey);
await this.DoInternalSubscriptionRpc(eventName + "." + routingKey, eventNameResult + "." + routingKey)
var internalSubscriptionResult = await this.DoInternalSubscriptionRpc(eventName + "." + routingKey, eventNameResult + "." + routingKey)
.ConfigureAwait(false);
this.SubsManager.AddSubscriptionRpc<TIntegrationEvent, TIntegrationEventReply, TIntegrationRpcHandler>(eventName + "." + routingKey, eventNameResult + "." + routingKey);
await this.StartBasicConsumeServer<TIntegrationEvent, TIntegrationEventReply>().ConfigureAwait(false);
await this.StartBasicConsumeReplyAsync<TIntegrationEventReply>().ConfigureAwait(false);
if (internalSubscriptionResult)
{
this.SubsManager.AddSubscriptionRpc<TIntegrationEvent, TIntegrationEventReply, TIntegrationRpcHandler>(eventName + "." + routingKey, eventNameResult + "." + routingKey);
await this.StartBasicConsumeServer<TIntegrationEvent, TIntegrationEventReply>().ConfigureAwait(false);
await this.StartBasicConsumeReplyAsync<TIntegrationEventReply>().ConfigureAwait(false);

return true;
}

return false;
}

private async ValueTask DoInternalSubscriptionRpc(string eventName, string eventNameResult)
private async ValueTask<bool> DoInternalSubscriptionRpc(string eventName, string eventNameResult)
{
try
{
//var containsKey = this.SubsManager.HasSubscriptionsForEvent(eventName);
if (this.SubsManager.HasSubscriptionsForEvent(eventName))
{
return;
return false;
}

if (!this.PersistentConnection.IsConnected)
Expand All @@ -241,7 +248,7 @@ private async ValueTask DoInternalSubscriptionRpc(string eventName, string event
if (!connectionResult)
{
this.Logger.LogWarning("EventBusRabbitMqRpc DoInternalSubscriptionRpc: {0}!", "no connection");
return;
return false;
}
}

Expand All @@ -260,6 +267,8 @@ private async ValueTask DoInternalSubscriptionRpc(string eventName, string event
channel.QueueBind(this._queueNameReply,
this.EventBusParameters.ExchangeDeclareParameters.ExchangeName,
eventNameResult);

return true;
}
}
}
Expand All @@ -272,6 +281,8 @@ private async ValueTask DoInternalSubscriptionRpc(string eventName, string event
{
this.Logger.LogError(ex, "EventBusRabbitMqRpc DoInternalSubscriptionRpc: ");
}

return false;
}

#endregion
Expand Down Expand Up @@ -486,8 +497,12 @@ protected async ValueTask<IModel> CreateConsumerChannelServerAsync<TIntegrationE
{
if (!this.PersistentConnection.IsConnected)
{
await this.PersistentConnection.TryConnectAsync().ConfigureAwait(false);
//this.PersistentConnection.TryConnect();
var connectionResult = await this.PersistentConnection.TryConnectAsync().ConfigureAwait(false);

if (!connectionResult)
{
return null;
}
}

var channel = this.PersistentConnection.CreateModel();
Expand Down Expand Up @@ -516,8 +531,9 @@ private async ValueTask<IModel> CreateConsumerChannelReplyAsync<TIntegrationEven
{
if (!this.PersistentConnection.IsConnected)
{
await this.PersistentConnection.TryConnectAsync().ConfigureAwait(false);
//this.PersistentConnection.TryConnect();
var connectionResult = await this.PersistentConnection.TryConnectAsync().ConfigureAwait(false);

if(!connectionResult) { return null; }
}

var channel = this.PersistentConnection.CreateModel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,31 +359,37 @@ protected override void QueueInitialize(IModel channel)
// await this.StartBasicConsume<TIntegrationEventReply>().ConfigureAwait(false);
//}

public async ValueTask SubscribeRpcClient<TIntegrationRpcClientHandler>(string replyRoutingKey, bool asyncMode = true)
public async ValueTask<bool> SubscribeRpcClient<TIntegrationRpcClientHandler>(string replyRoutingKey, bool asyncMode = true)
where TIntegrationRpcClientHandler : IIntegrationRpcClientHandler<TIntegrationEventReply>
{
var eventNameResult = this.SubsManager.GetEventReplyKey<TIntegrationEventReply>();
//this.Logger.LogTrace("SubscribeRpcClient reply routing key: {0}, event name result: {1}", replyRoutingKey, eventNameResult);
await this.DoInternalSubscriptionRpc(eventNameResult + "." + replyRoutingKey).ConfigureAwait(false);
this.SubsManager.AddSubscriptionRpcClient<TIntegrationEventReply, TIntegrationRpcClientHandler>(eventNameResult + "." + replyRoutingKey);
if (asyncMode)
{
await this.StartBasicConsumeAsync().ConfigureAwait(false);
}
else
var internalSubscriptionResult = await this.DoInternalSubscriptionRpc(eventNameResult + "." + replyRoutingKey).ConfigureAwait(false);

if (internalSubscriptionResult)
{
this.StartBasicConsume();
this.SubsManager.AddSubscriptionRpcClient<TIntegrationEventReply, TIntegrationRpcClientHandler>(eventNameResult + "." + replyRoutingKey);
if (asyncMode)
{
return await this.StartBasicConsumeAsync().ConfigureAwait(false);
}
else
{
return this.StartBasicConsume();
}
}

return false;
}

private async ValueTask DoInternalSubscriptionRpc(string eventNameResult)
private async ValueTask<bool> DoInternalSubscriptionRpc(string eventNameResult)
{
try
{
//var containsKey = this.SubsManager.HasSubscriptionsForEvent(eventNameResult);
if (this.SubsManager.HasSubscriptionsForEvent(eventNameResult))
{
return;
return false;
}

if (!this.PersistentConnection.IsConnected)
Expand All @@ -393,7 +399,7 @@ private async ValueTask DoInternalSubscriptionRpc(string eventNameResult)
if (!connectionResult)
{
this.Logger.LogWarning("EventBusRabbitMqRpcClient DoInternalSubscriptionRpc: {0}!", "no connection");
return;
return false;
}
}

Expand All @@ -409,6 +415,8 @@ private async ValueTask DoInternalSubscriptionRpc(string eventNameResult)
channel.QueueBind(this._queueNameReply,
this.EventBusParameters.ExchangeDeclareParameters.ExchangeName,
eventNameResult); //ToDo

return true;
}
}
}
Expand All @@ -421,6 +429,8 @@ private async ValueTask DoInternalSubscriptionRpc(string eventNameResult)
{
this.Logger.LogError(ex, "EventBusRabbitMqRpcClient DoInternalSubscriptionRpc: ");
}

return false;
}

#endregion
Expand Down Expand Up @@ -667,8 +677,11 @@ protected async ValueTask<IModel> CreateConsumerChannelAsync(CancellationToken c

if (!this.PersistentConnection.IsConnected)
{
await this.PersistentConnection.TryConnectAsync().ConfigureAwait(false);
//this.PersistentConnection.TryConnect();
var connectionResult = await this.PersistentConnection.TryConnectAsync().ConfigureAwait(false);
if (!connectionResult)
{
return null;
}
}

var channel = this.PersistentConnection.CreateModel();
Expand Down
Loading

0 comments on commit b2251a1

Please sign in to comment.