Skip to content

Commit

Permalink
Merge pull request #465 from K-Society/experimental
Browse files Browse the repository at this point in the history
QueuePurge
  • Loading branch information
maniglia authored Dec 12, 2024
2 parents f79420a + f70e5b6 commit 5c873cd
Show file tree
Hide file tree
Showing 15 changed files with 98 additions and 12 deletions.
2 changes: 1 addition & 1 deletion src/01/01/Form/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
<ItemGroup>
<PackageReference Include="Nerdbank.GitVersioning" Condition="!Exists('packages.config')">
<PrivateAssets>all</PrivateAssets>
<Version>3.6.146</Version>
<Version>3.7.112</Version>
</PackageReference>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0">
<PrivateAssets>all</PrivateAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
<ItemGroup>
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="9.0.0" Condition="'$(TargetFramework)' == 'netstandard2.0'" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
<PackageReference Include="Serilog" Version="4.1.0" />
<PackageReference Include="Serilog.Extensions.Logging" Version="8.0.0" />
<PackageReference Include="Serilog" Version="4.2.0" />
<PackageReference Include="Serilog.Extensions.Logging" Version="9.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/01/02/KSociety.Base.Srv.Agent/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private Connection(IAgentConfiguration agentConfiguration)

if (this.DebugFlag)
{
this.Logger.LogTrace(@"Grpc Agent Connection for: {0}", this._agentConfiguration.ConnectionUrl);
this.Logger?.LogTrace(@"Grpc Agent Connection for: {0}", this._agentConfiguration.ConnectionUrl);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Grpc.Net.Client" Version="2.66.0" />
<PackageReference Include="Grpc.Net.Client" Version="2.67.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net6.0;net7.0</TargetFrameworks>
<TargetFrameworks>net8.0</TargetFrameworks>
<IsPackable>false</IsPackable>
</PropertyGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@ ValueTask<bool> Subscribe<TIntegrationEvent, TIntegrationEventHandler>()
void Unsubscribe<TIntegrationEvent, TIntegrationEventHandler>()
where TIntegrationEvent : IIntegrationEvent, new()
where TIntegrationEventHandler : IIntegrationEventHandler<TIntegrationEvent>;

ValueTask<uint> QueuePurge(CancellationToken cancellationToken = default);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace KSociety.Base.EventBus.Abstractions.EventBus
{
using Handler;
using System.Threading;
using System.Threading.Tasks;

public interface IEventBusRpc : IEventBus
Expand All @@ -20,5 +21,7 @@ void UnsubscribeRpc<TIntegrationEvent, TIntegrationEventReply, TIntegrationRpcHa
where TIntegrationEvent : IIntegrationEvent, new()
where TIntegrationEventReply : IIntegrationEventReply, new()
where TIntegrationRpcHandler : IIntegrationRpcHandler<TIntegrationEvent, TIntegrationEventReply>;

ValueTask<uint> QueueReplyPurge(CancellationToken cancellationToken = default);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,7 @@ void UnsubscribeRpcServer<TIntegrationEventRpc, TIntegrationEventReply, TIntegra
where TIntegrationEventRpc : IIntegrationEventRpc, new()
where TIntegrationEventReply : IIntegrationEventReply, new()
where TIntegrationRpcServerHandler : IIntegrationRpcServerHandler<TIntegrationEventRpc, TIntegrationEventReply>;

ValueTask<uint> QueueReplyPurge(CancellationToken cancellationToken = default);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,33 @@ protected override void Dispose(bool disposing)

#endregion

public virtual async ValueTask<uint> QueuePurge(CancellationToken cancellationToken = default)
{
try
{
if (this.ConsumerChannel is null)
{
this.Logger.LogWarning("ConsumerChannel is null!");
return 0;
}

if (this.ConsumerChannel.Value != null && !String.IsNullOrEmpty(this.QueueName))
{
var result = await (await this.ConsumerChannel).QueuePurgeAsync(this.QueueName, cancellationToken);

return result;
}

this.Logger.LogError("QueuePurge can't call on ConsumerChannel is null or queue name is null.");
}
catch (Exception ex)
{
this.Logger.LogError(ex, "QueuePurge: ");
}

return 0;
}

protected virtual async ValueTask<bool> StartBasicConsume<TIntegrationEvent>()
where TIntegrationEvent : IIntegrationEvent, new()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,33 @@ protected override void Dispose(bool disposing)

#endregion

public async ValueTask<uint> QueueReplyPurge(CancellationToken cancellationToken = default)
{
try
{
if (this._consumerChannelReply is null)
{
this.Logger.LogWarning("ConsumerChannelReply is null!");
return 0;
}

if (this._consumerChannelReply.Value != null && !String.IsNullOrEmpty(this._queueNameReply))
{
var result = await (await this._consumerChannelReply).QueuePurgeAsync(this._queueNameReply, cancellationToken);

return result;
}

this.Logger.LogError("QueueReplyPurge can't call on ConsumerChannelReply is null or queue name is null.");
}
catch (Exception ex)
{
this.Logger.LogError(ex, "QueueReplyPurge: ");
}

return 0;
}

protected async ValueTask<bool> StartBasicConsumeServer<TIntegrationEvent, TIntegrationEventReply>()
where TIntegrationEvent : IIntegrationEvent, new()
where TIntegrationEventReply : IIntegrationEventReply, new()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,8 +494,6 @@ protected override void Dispose(bool disposing)

#endregion



//protected async ValueTask<bool> StartBasicConsume()
//{
// this.Logger.LogTrace("EventBusRabbitMqRpcClient Starting RabbitMQ basic consume");
Expand Down Expand Up @@ -534,7 +532,7 @@ protected override void Dispose(bool disposing)
// }

// this.Logger.LogError("StartBasicConsume can't call on ConsumerChannel is null");

// }
// catch (Exception ex)
// {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,33 @@ protected override void Dispose(bool disposing)

#endregion

public async ValueTask<uint> QueueReplyPurge(CancellationToken cancellationToken = default)
{
try
{
if (this._consumerChannelReply is null)
{
this.Logger.LogWarning("ConsumerChannelReply is null!");
return 0;
}

if (this._consumerChannelReply.Value != null && !String.IsNullOrEmpty(this._queueNameReply))
{
var result = await (await this._consumerChannelReply).QueuePurgeAsync(this._queueNameReply, cancellationToken);

return result;
}

this.Logger.LogError("QueueReplyPurge can't call on ConsumerChannelReply is null or queue name is null.");
}
catch (Exception ex)
{
this.Logger.LogError(ex, "QueueReplyPurge: ");
}

return 0;
}

//protected async ValueTask<bool> StartBasicConsumeServer<TIntegrationEventRpc, TIntegrationEventReply>()
// where TIntegrationEventRpc : IIntegrationEventRpc, new()
// where TIntegrationEventReply : IIntegrationEventReply, new()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net6.0;net7.0;net8.0</TargetFrameworks>
<TargetFrameworks>net8.0</TargetFrameworks>
<IsPackable>false</IsPackable>
</PropertyGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net6.0;net7.0</TargetFrameworks>
<TargetFrameworks>net8.0</TargetFrameworks>
<IsPackable>false</IsPackable>
</PropertyGroup>

Expand Down
2 changes: 1 addition & 1 deletion src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
<ItemGroup>
<PackageReference Include="Nerdbank.GitVersioning" Condition="!Exists('packages.config')">
<PrivateAssets>all</PrivateAssets>
<Version>3.6.146</Version>
<Version>3.7.112</Version>
</PackageReference>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0">
<PrivateAssets>all</PrivateAssets>
Expand Down

0 comments on commit 5c873cd

Please sign in to comment.