Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental #433

Merged
merged 2 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Grpc.Net.Client" Version="2.61.0" />
<PackageReference Include="Grpc.Net.Client" Version="2.62.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,34 @@

<!-- .NET 6.0 -->
<ItemGroup Condition="'$(TargetFramework)' == 'net6.0'">
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="7.0.16" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Proxies" Version="7.0.16" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="7.0.16" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="7.0.16" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite" Version="7.0.16" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="7.0.17" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Proxies" Version="7.0.17" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="7.0.17" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="7.0.17" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite" Version="7.0.17" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="7.0.0" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="7.0.11" />
</ItemGroup>

<!-- .NET 7.0 -->
<ItemGroup Condition="'$(TargetFramework)' == 'net7.0'">
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="7.0.16" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Proxies" Version="7.0.16" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="7.0.16" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="7.0.16" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite" Version="7.0.16" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="7.0.17" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Proxies" Version="7.0.17" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="7.0.17" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="7.0.17" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite" Version="7.0.17" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="7.0.0" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="7.0.11" />
</ItemGroup>

<!-- .NET 8.0 -->
<ItemGroup Condition="'$(TargetFramework)' == 'net8.0'">
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="8.0.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Proxies" Version="8.0.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="8.0.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="8.0.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite" Version="8.0.2" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="8.0.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="8.0.3" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Proxies" Version="8.0.3" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="8.0.3" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="8.0.3" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite" Version="8.0.3" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="8.0.2" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="8.0.2" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageReference Include="xunit" Version="2.7.0" />
<PackageReference Include="coverlet.collector" Version="6.0.1">
<PackageReference Include="coverlet.collector" Version="6.0.2">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="CsvHelper" Version="31.0.2" />
<PackageReference Include="CsvHelper" Version="31.0.3" />
<PackageReference Include="MediatR.Contracts" Version="2.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.1" />
<PackageReference Include="Microsoft.NETCore.Platforms" Version="7.0.4" />
<PackageReference Include="Quartz" Version="3.8.1" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public interface IEventBusBase
void Initialize<TIntegrationEvent>(CancellationToken cancel = default)
where TIntegrationEvent : IIntegrationEvent, new();

ValueTask Subscribe<TIntegrationEvent, TIntegrationEventHandler>()
ValueTask Subscribe<TIntegrationEvent, TIntegrationEventHandler>(bool asyncMode = true)
where TIntegrationEvent : IIntegrationEvent, new()
where TIntegrationEventHandler : IIntegrationEventHandler<TIntegrationEvent>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Task<TIntegrationEventReply> CallAsync<TIntegrationEventRpc>(TIntegrationEventRp
// where TIntegrationEventReply : IIntegrationEventReply
// where TH : IIntegrationRpcClientHandler<TIntegrationEventReply>;

ValueTask SubscribeRpcClient<TIntegrationEventHandler>(string replyRoutingKey)
ValueTask SubscribeRpcClient<TIntegrationEventHandler>(string replyRoutingKey, bool asyncMode = true)
where TIntegrationEventHandler : IIntegrationRpcClientHandler<TIntegrationEventReply>;

//void UnsubscribeRpcClient<TIntegrationEventReply, TH>(string routingKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ IIntegrationRpcServerHandler<TIntegrationEventRpc, TIntegrationEventReply> GetIn
where TIntegrationEventRpc : IIntegrationEventRpc, new()
where TIntegrationEventReply : IIntegrationEventReply, new();

ValueTask SubscribeRpcServer<TIntegrationEventRpc, TIntegrationEventReply, TIntegrationRpcServerHandler>(string routingKey)
ValueTask SubscribeRpcServer<TIntegrationEventRpc, TIntegrationEventReply, TIntegrationRpcServerHandler>(string routingKey, bool asyncMode = true)
where TIntegrationEventRpc : IIntegrationEventRpc, new()
where TIntegrationEventReply : IIntegrationEventReply, new()
where TIntegrationRpcServerHandler : IIntegrationRpcServerHandler<TIntegrationEventRpc, TIntegrationEventReply>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace KSociety.Base.EventBus.Abstractions.EventBus

public interface IEventBusTyped : IEventBus
{
ValueTask Subscribe<TIntegrationEvent, TIntegrationEventHandler>(string routingKey)
ValueTask Subscribe<TIntegrationEvent, TIntegrationEventHandler>(string routingKey, bool asyncMode = false)
where TIntegrationEvent : IIntegrationEvent, new()
where TIntegrationEventHandler : IIntegrationEventHandler<TIntegrationEvent>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<PackageReference Include="Autofac" Version="8.0.0" />
<PackageReference Include="protobuf-net" Version="3.2.30" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.1" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -240,15 +240,23 @@ protected virtual void QueueInitialize(IModel channel)

#region [Subscribe]

public async ValueTask Subscribe<TIntegrationEvent, TIntegrationEventHandler>()
public async ValueTask Subscribe<TIntegrationEvent, TIntegrationEventHandler>(bool asyncMode = true)
where TIntegrationEvent : IIntegrationEvent, new()
where TIntegrationEventHandler : IIntegrationEventHandler<TIntegrationEvent>
{

var eventName = this.SubsManager.GetEventKey<TIntegrationEvent>();
await this.DoInternalSubscription(eventName).ConfigureAwait(false);
this.SubsManager.AddSubscription<TIntegrationEvent, TIntegrationEventHandler>();
await this.StartBasicConsume<TIntegrationEvent>().ConfigureAwait(false);
if (asyncMode)
{
await this.StartBasicConsumeAsync<TIntegrationEvent>().ConfigureAwait(false);
}
else
{
await this.StartBasicConsume<TIntegrationEvent>().ConfigureAwait(false);
}

}

protected async ValueTask DoInternalSubscription(string eventName)
Expand Down Expand Up @@ -329,9 +337,9 @@ protected virtual async ValueTask<bool> StartBasicConsume<TIntegrationEvent>()

if (this.ConsumerChannel.Value != null)
{
var consumer = new AsyncEventingBasicConsumer(await this.ConsumerChannel);
var consumer = new EventingBasicConsumer(await this.ConsumerChannel);

consumer.Received += this.ConsumerReceivedAsync<TIntegrationEvent>;
consumer.Received += this.ConsumerReceived<TIntegrationEvent>;

(await this.ConsumerChannel).BasicConsume(
queue: this.QueueName,
Expand All @@ -354,6 +362,45 @@ protected virtual async ValueTask<bool> StartBasicConsume<TIntegrationEvent>()
return false;
}

protected virtual async ValueTask<bool> StartBasicConsumeAsync<TIntegrationEvent>()
where TIntegrationEvent : IIntegrationEvent, new()
{
//this.Logger.LogTrace("EventBusRabbitMq Starting RabbitMQ basic consume.");
try
{
if (this.ConsumerChannel is null)
{
this.Logger.LogWarning("ConsumerChannel is null!");
return false;
}

if (this.ConsumerChannel.Value != null)
{
var asyncConsumer = new AsyncEventingBasicConsumer(await this.ConsumerChannel);

asyncConsumer.Received += this.ConsumerReceivedAsync<TIntegrationEvent>;

(await this.ConsumerChannel).BasicConsume(
queue: this.QueueName,
autoAck: false,
consumer: asyncConsumer);

//this.Logger.LogInformation("EventBusRabbitMq StartBasicConsume done. Queue name: {0}, autoAck: {1}", this.QueueName, true);

return true;
}

this.Logger.LogError("StartBasicConsume can't call on ConsumerChannel is null");

}
catch (Exception ex)
{
this.Logger.LogError(ex, "StartBasicConsume: ");
}

return false;
}

//protected virtual async ValueTask<bool> StartBasicConsume<TR>()
// where TR : IIntegrationEventReply
//{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,14 +341,22 @@ protected override void QueueInitialize(IModel channel)
// await this.StartBasicConsume<TIntegrationEventReply>().ConfigureAwait(false);
//}

public async ValueTask SubscribeRpcClient<TIntegrationRpcClientHandler>(string replyRoutingKey)
public async ValueTask SubscribeRpcClient<TIntegrationRpcClientHandler>(string replyRoutingKey, bool asyncMode = true)
where TIntegrationRpcClientHandler : IIntegrationRpcClientHandler<TIntegrationEventReply>
{
var eventNameResult = this.SubsManager.GetEventReplyKey<TIntegrationEventReply>();
//this.Logger.LogTrace("SubscribeRpcClient reply routing key: {0}, event name result: {1}", replyRoutingKey, eventNameResult);
await this.DoInternalSubscriptionRpc(eventNameResult + "." + replyRoutingKey).ConfigureAwait(false);
this.SubsManager.AddSubscriptionRpcClient<TIntegrationEventReply, TIntegrationRpcClientHandler>(eventNameResult + "." + replyRoutingKey);
await this.StartBasicConsume().ConfigureAwait(false);
if (asyncMode)
{
await this.StartBasicConsumeAsync().ConfigureAwait(false);
}
else
{
await this.StartBasicConsume().ConfigureAwait(false);
}

}

private async ValueTask DoInternalSubscriptionRpc(string eventNameResult)
Expand Down Expand Up @@ -445,9 +453,13 @@ protected async ValueTask<bool> StartBasicConsume()

if (this.ConsumerChannel.Value != null)
{
var consumer = new AsyncEventingBasicConsumer(await this.ConsumerChannel);
var consumer = new EventingBasicConsumer(await this.ConsumerChannel);

consumer.Received += this.ConsumerReceived;

consumer.Received += this.ConsumerReceivedAsync;
//var consumer = new EventingBasicConsumer(await this.ConsumerChannel);

//consumer.Received += this.ConsumerReceived;


// autoAck specifies that as soon as the consumer gets the message,
Expand All @@ -474,6 +486,53 @@ protected async ValueTask<bool> StartBasicConsume()
return false;
}

protected async ValueTask<bool> StartBasicConsumeAsync()
{
this.Logger.LogTrace("EventBusRabbitMqRpcClient Starting RabbitMQ basic consume");

try
{
if (this.ConsumerChannel is null)
{
this.Logger.LogWarning("EventBusRabbitMqRpcClient ConsumerChannel is null!");
return false;
}

if (this.ConsumerChannel.Value != null)
{
var asyncConsumer = new AsyncEventingBasicConsumer(await this.ConsumerChannel);

asyncConsumer.Received += this.ConsumerReceivedAsync;

//var consumer = new EventingBasicConsumer(await this.ConsumerChannel);

//consumer.Received += this.ConsumerReceived;


// autoAck specifies that as soon as the consumer gets the message,
// it will ack, even if it dies mid-way through the callback

(await this.ConsumerChannel).BasicConsume(
queue: this._queueNameReply, //ToDo
autoAck: true, //ToDo
consumer: asyncConsumer);

//this.Logger.LogInformation("EventBusRabbitMqRpcClient StartBasicConsume done. Queue name: {0}, autoAck: {1}", this._queueNameReply, true);

return true;
}

this.Logger.LogError("StartBasicConsume can't call on ConsumerChannel is null");

}
catch (Exception ex)
{
this.Logger.LogError(ex, "StartBasicConsume: ");
}

return false;
}

protected void ConsumerReceived(object sender, BasicDeliverEventArgs eventArgs)
{
var result = eventArgs.RoutingKey.Split('.');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ protected override void QueueInitialize(IModel channel)

#region [Subscribe]

public async ValueTask SubscribeRpcServer<TIntegrationEventRpc, TIntegrationEventReply, TIntegrationRpcServerHandler>(string routingKey)
public async ValueTask SubscribeRpcServer<TIntegrationEventRpc, TIntegrationEventReply, TIntegrationRpcServerHandler>(string routingKey, bool asyncMode = true)
where TIntegrationEventRpc : IIntegrationEventRpc, new()
where TIntegrationEventReply : IIntegrationEventReply, new()
where TIntegrationRpcServerHandler : IIntegrationRpcServerHandler<TIntegrationEventRpc, TIntegrationEventReply>
Expand All @@ -213,7 +213,15 @@ public async ValueTask SubscribeRpcServer<TIntegrationEventRpc, TIntegrationEven
await this.DoInternalSubscriptionRpc(eventName + "." + routingKey, eventNameResult + "." + routingKey)
.ConfigureAwait(false);
this.SubsManager.AddSubscriptionRpcServer<TIntegrationEventRpc, TIntegrationEventReply, TIntegrationRpcServerHandler>(eventName + "." + routingKey, eventNameResult + "." + routingKey);
await this.StartBasicConsumeServer<TIntegrationEventRpc, TIntegrationEventReply>().ConfigureAwait(false);

if (asyncMode)
{
await this.StartBasicConsumeServerAsync<TIntegrationEventRpc, TIntegrationEventReply>().ConfigureAwait(false);
}
else
{
await this.StartBasicConsumeServer<TIntegrationEventRpc, TIntegrationEventReply>().ConfigureAwait(false);
}
}

private async ValueTask DoInternalSubscriptionRpc(string eventName, string eventNameResult)
Expand Down Expand Up @@ -306,6 +314,45 @@ protected async ValueTask<bool> StartBasicConsumeServer<TIntegrationEventRpc, TI
{
//this.Logger.LogTrace("EventBusRabbitMqRpcServer Starting RabbitMQ basic consume.");

try
{
if (this.ConsumerChannel is null)
{
this.Logger.LogWarning("EventBusRabbitMqRpcServer ConsumerChannel is null!");
return false;
}

if (this.ConsumerChannel.Value != null)
{
var consumer = new EventingBasicConsumer(await this.ConsumerChannel);

consumer.Received += this.ConsumerReceivedServer<TIntegrationEventRpc, TIntegrationEventReply>;

(await this.ConsumerChannel).BasicConsume(
queue: this.QueueName,
autoAck: false,
consumer: consumer);
//this.Logger.LogInformation("EventBusRabbitMqRpcServer StartBasicConsume done. Queue name: {0}, autoAck: {1}", this.QueueName, false);

return true;
}

this.Logger.LogError("StartBasicConsume can't call on ConsumerChannel is null");
}
catch (Exception ex)
{
this.Logger.LogError(ex, "StartBasicConsume: ");
}

return false;
}

protected async ValueTask<bool> StartBasicConsumeServerAsync<TIntegrationEventRpc, TIntegrationEventReply>()
where TIntegrationEventRpc : IIntegrationEventRpc, new()
where TIntegrationEventReply : IIntegrationEventReply, new()
{
//this.Logger.LogTrace("EventBusRabbitMqRpcServer Starting RabbitMQ basic consume.");

try
{
if (this.ConsumerChannel is null)
Expand Down Expand Up @@ -361,7 +408,7 @@ protected void ConsumerReceivedServer<TIntegrationEventRpc, TIntegrationEventRep
var ms = new MemoryStream();
Serializer.Serialize(ms, response);
var body = ms.ToArray();
if (!String.IsNullOrEmpty(this.EventBusParameters.ExchangeDeclareParameters.ExchangeName))
if (this._consumerChannelReply != null && !String.IsNullOrEmpty(this.EventBusParameters.ExchangeDeclareParameters.ExchangeName))
{
this._consumerChannelReply.Value.Result.BasicPublish(
this.EventBusParameters.ExchangeDeclareParameters.ExchangeName,
Expand Down
Loading