Skip to content

Commit

Permalink
Fix masstansit consume and produce bug, add multiple queue in one pro…
Browse files Browse the repository at this point in the history
…ject

Signed-off-by: virtual <1185513330@qq.com>
  • Loading branch information
cocosip committed Jul 14, 2023
1 parent 68c4d54 commit 41b2971
Show file tree
Hide file tree
Showing 54 changed files with 603 additions and 1,489 deletions.
6 changes: 3 additions & 3 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>

<!--Volo ABP packages-->
<VoloAbpPackageVersion>7.2.3</VoloAbpPackageVersion>
<VoloAbpPackageVersion>7.3.0</VoloAbpPackageVersion>

<!-- All Microsoft EntityFrameworkCore packages -->
<MicrosoftEntityFrameworkCorePackageVersion>7.0.1</MicrosoftEntityFrameworkCorePackageVersion>
Expand Down Expand Up @@ -32,13 +32,13 @@
<MoqPackageVersion>4.18.4</MoqPackageVersion>

<!-- xunit https://www.nuget.org/packages/xUnit -->
<xUnitPackageVersion>2.4.2</xUnitPackageVersion>
<xUnitPackageVersion>2.5.0</xUnitPackageVersion>

<!-- xunit.extensibility.execution https://www.nuget.org/packages/xunit.extensibility.execution -->
<xUnitExtensibilityExecutionPackageVersion>2.4.1</xUnitExtensibilityExecutionPackageVersion>

<!-- xunit.runner.visualstudio https://www.nuget.org/packages/xunit.runner.visualstudio -->
<xUnitRunnerVisualstudioPackageVersion>2.4.5</xUnitRunnerVisualstudioPackageVersion>
<xUnitRunnerVisualstudioPackageVersion>2.5.0</xUnitRunnerVisualstudioPackageVersion>

<!--coverlet.collector-->
<CoverletCollectorPackageVersion>6.0.0</CoverletCollectorPackageVersion>
Expand Down
2 changes: 1 addition & 1 deletion common.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<LangVersion>latest</LangVersion>
<Version>2.8.1</Version>
<Version>2.8.2</Version>
<NoWarn>$(NoWarn);CS1591;CS0436</NoWarn>
<Authors>virtual</Authors>
<Product>sharp-abp</Product>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="AWSSDK.S3" Version="3.7.107.1" />
<PackageReference Include="AWSSDK.SecurityToken" Version="3.7.103.17" />
<PackageReference Include="AWSSDK.S3" Version="3.7.108" />
<PackageReference Include="AWSSDK.SecurityToken" Version="3.7.103.22" />
<PackageReference Include="Volo.Abp.Caching" Version="$(VoloAbpPackageVersion)" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Storage.Blobs" Version="12.16.0" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.17.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="AWSSDK.S3" Version="3.7.107.1" />
<PackageReference Include="AWSSDK.S3" Version="3.7.108" />
<PackageReference Include="Volo.Abp.Timing" Version="$(VoloAbpPackageVersion)" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="FreeSql" Version="3.2.697" />
<PackageReference Include="FreeSql" Version="3.2.698" />
<PackageReference Include="Volo.Abp.EntityFrameworkCore" Version="$(VoloAbpPackageVersion)" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using MassTransit;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Threading.Tasks;
Expand All @@ -20,36 +21,42 @@ public override void PreConfigureServices(ServiceConfigurationContext context)
public override Task PreConfigureServicesAsync(ServiceConfigurationContext context)
{
var configuration = context.Services.GetConfiguration();
PreConfigure<AbpMassTransitActiveMqOptions>(options => options.PreConfigure(configuration));

var activeMqOptions = context.Services.ExecutePreConfiguredActions<AbpMassTransitActiveMqOptions>();

PreConfigure<AbpMassTransitActiveMqOptions>(options =>
var abpMassTransitOptions = configuration
.GetSection("MassTransitOptions")
.Get<AbpMassTransitOptions>();
if (abpMassTransitOptions.Provider.Equals(MassTransitActiveMqConsts.ProviderName, StringComparison.OrdinalIgnoreCase))
{
options.DefaultQueueNameFormatFunc = ActiveMqUtil.QueueNameFormat;
PreConfigure<AbpMassTransitActiveMqOptions>(options => options.PreConfigure(configuration));

options.DefaultPublishTopologyConfigure = new Action<IActiveMqMessagePublishTopologyConfigurator>(c =>
{
c.AutoDelete = activeMqOptions.DefaultAutoDelete;
c.Durable = activeMqOptions.DefaultDurable;
c.Exclude = activeMqOptions.DefaultExclude;
});
var activeMqOptions = context.Services.ExecutePreConfiguredActions<AbpMassTransitActiveMqOptions>();

options.DefaultReceiveEndpointConfigure = new Action<string, IActiveMqReceiveEndpointConfigurator>((queueName, c) =>
PreConfigure<AbpMassTransitActiveMqOptions>(options =>
{
//c.Bind(exchangeName);
c.ConcurrentMessageLimit = activeMqOptions.DefaultConcurrentMessageLimit;
c.PrefetchCount = activeMqOptions.DefaultPrefetchCount;
c.AutoDelete = activeMqOptions.DefaultAutoDelete;
c.Durable = activeMqOptions.DefaultDurable;
//c.ExchangeType = rabbitMqOptions.DefaultExchangeType;
});
options.DefaultQueueNameFormatFunc = ActiveMqUtil.QueueNameFormat;
options.ActiveMqPostConfigures.Add(new Action<IBusRegistrationContext, IActiveMqBusFactoryConfigurator>((ctx, cfg) =>
{
cfg.ConfigureEndpoints(ctx);
}));
});
options.DefaultPublishTopologyConfigure = new Action<IActiveMqMessagePublishTopologyConfigurator>(c =>
{
c.AutoDelete = activeMqOptions.DefaultAutoDelete;
c.Durable = activeMqOptions.DefaultDurable;
c.Exclude = activeMqOptions.DefaultExclude;
});
options.DefaultReceiveEndpointConfigure = new Action<string, IActiveMqReceiveEndpointConfigurator>((queueName, c) =>
{
//c.Bind(exchangeName);
c.ConcurrentMessageLimit = activeMqOptions.DefaultConcurrentMessageLimit;
c.PrefetchCount = activeMqOptions.DefaultPrefetchCount;
c.AutoDelete = activeMqOptions.DefaultAutoDelete;
c.Durable = activeMqOptions.DefaultDurable;
//c.ExchangeType = rabbitMqOptions.DefaultExchangeType;
});
options.ActiveMqPostConfigures.Add(new Action<IBusRegistrationContext, IActiveMqBusFactoryConfigurator>((ctx, cfg) =>
{
cfg.ConfigureEndpoints(ctx);
}));
});
}
return Task.CompletedTask;
}

Expand All @@ -61,10 +68,13 @@ public override void ConfigureServices(ServiceConfigurationContext context)

public override Task ConfigureServicesAsync(ServiceConfigurationContext context)
{
var massTransitOptions = context.Services.ExecutePreConfiguredActions<AbpMassTransitOptions>();

if (massTransitOptions.Provider.Equals(MassTransitActiveMqConsts.ProviderName, StringComparison.OrdinalIgnoreCase))
var configuration = context.Services.GetConfiguration();
var abpMassTransitOptions = configuration
.GetSection("MassTransitOptions")
.Get<AbpMassTransitOptions>();
if (abpMassTransitOptions.Provider.Equals(MassTransitActiveMqConsts.ProviderName, StringComparison.OrdinalIgnoreCase))
{
var massTransitOptions = context.Services.ExecutePreConfiguredActions<AbpMassTransitOptions>();
var activeMqOptions = context.Services.ExecutePreConfiguredActions<AbpMassTransitActiveMqOptions>();

context.Services.AddMassTransit(x =>
Expand Down Expand Up @@ -151,6 +161,7 @@ public override Task ConfigureServicesAsync(ServiceConfigurationContext context)
});
});
}

return Task.CompletedTask;
}

Expand All @@ -162,14 +173,22 @@ public override void PostConfigureServices(ServiceConfigurationContext context)

public override Task PostConfigureServicesAsync(ServiceConfigurationContext context)
{
Configure<AbpMassTransitActiveMqOptions>(options =>
var configuration = context.Services.GetConfiguration();
var abpMassTransitOptions = configuration
.GetSection("MassTransitOptions")
.Get<AbpMassTransitOptions>();
if (abpMassTransitOptions.Provider.Equals(MassTransitActiveMqConsts.ProviderName, StringComparison.OrdinalIgnoreCase))
{
var actions = context.Services.GetPreConfigureActions<AbpMassTransitActiveMqOptions>();
foreach (var action in actions)
Configure<AbpMassTransitActiveMqOptions>(options =>
{
action(options);
}
});
var actions = context.Services.GetPreConfigureActions<AbpMassTransitActiveMqOptions>();
foreach (var action in actions)
{
action(options);
}
});
}

return Task.CompletedTask;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using MassTransit;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Threading.Tasks;
Expand All @@ -19,42 +20,49 @@ public override void PreConfigureServices(ServiceConfigurationContext context)

public override Task PreConfigureServicesAsync(ServiceConfigurationContext context)
{
PreConfigure<AbpMassTransitOptions>(options =>
var configuration = context.Services.GetConfiguration();
var abpMassTransitOptions = configuration
.GetSection("MassTransitOptions")
.Get<AbpMassTransitOptions>();
if (abpMassTransitOptions.Provider.Equals(MassTransitKafkaConsts.ProviderName, StringComparison.OrdinalIgnoreCase))
{
options.PreConfigures.Add(new Action<IBusRegistrationConfigurator>(c =>
PreConfigure<AbpMassTransitOptions>(options =>
{
c.UsingInMemory();
}));
});

var configuration = context.Services.GetConfiguration();
PreConfigure<AbpMassTransitKafkaOptions>(options => options.PreConfigure(configuration));
options.PreConfigures.Add(new Action<IBusRegistrationConfigurator>(c =>
{
c.UsingInMemory();
}));
});

var kafkaOptions = context.Services.ExecutePreConfiguredActions<AbpMassTransitKafkaOptions>();
PreConfigure<AbpMassTransitKafkaOptions>(options => options.PreConfigure(configuration));

PreConfigure<AbpMassTransitKafkaOptions>(options =>
{
options.DefaultTopicFormatFunc = KafkaUtil.TopicFormat;
var kafkaOptions = context.Services.ExecutePreConfiguredActions<AbpMassTransitKafkaOptions>();

options.DefaultReceiveEndpointConfigure = new Action<IKafkaTopicReceiveEndpointConfigurator>(c =>
PreConfigure<AbpMassTransitKafkaOptions>(options =>
{
c.ConcurrentMessageLimit = kafkaOptions.DefaultConcurrentMessageLimit;
c.MaxPollInterval = TimeSpan.FromMilliseconds(kafkaOptions.DefaultMaxPollInterval);
c.SessionTimeout = TimeSpan.FromSeconds(kafkaOptions.DefaultSessionTimeout);
c.EnableAutoOffsetStore = kafkaOptions.DefaultEnableAutoOffsetStore;
c.AutoOffsetReset = kafkaOptions.DefaultAutoOffsetReset;
});
options.DefaultTopicFormatFunc = KafkaUtil.TopicFormat;
//Kafka keep alive
options.KafkaConfigures.Add(new Action<IRiderRegistrationContext, IKafkaFactoryConfigurator>((ctx, k) =>
{
k.ConfigureSocket(s =>
options.DefaultReceiveEndpointConfigure = new Action<IKafkaTopicReceiveEndpointConfigurator>(c =>
{
s.KeepaliveEnable = true;
c.ConcurrentMessageLimit = kafkaOptions.DefaultConcurrentMessageLimit;
c.MaxPollInterval = TimeSpan.FromMilliseconds(kafkaOptions.DefaultMaxPollInterval);
c.SessionTimeout = TimeSpan.FromSeconds(kafkaOptions.DefaultSessionTimeout);
c.EnableAutoOffsetStore = kafkaOptions.DefaultEnableAutoOffsetStore;
c.AutoOffsetReset = kafkaOptions.DefaultAutoOffsetReset;
});
}));
});
//Kafka keep alive
options.KafkaConfigures.Add(new Action<IRiderRegistrationContext, IKafkaFactoryConfigurator>((ctx, k) =>
{
k.ConfigureSocket(s =>
{
s.KeepaliveEnable = true;
});
}));
});
}

return Task.CompletedTask;
}

Expand All @@ -66,11 +74,14 @@ public override void ConfigureServices(ServiceConfigurationContext context)

public override Task ConfigureServicesAsync(ServiceConfigurationContext context)
{
var massTransitOptions = context.Services.ExecutePreConfiguredActions<AbpMassTransitOptions>();
if (massTransitOptions.Provider.Equals(MassTransitKafkaConsts.ProviderName, StringComparison.OrdinalIgnoreCase))
var configuration = context.Services.GetConfiguration();
var abpMassTransitOptions = configuration
.GetSection("MassTransitOptions")
.Get<AbpMassTransitOptions>();
if (abpMassTransitOptions.Provider.Equals(MassTransitKafkaConsts.ProviderName, StringComparison.OrdinalIgnoreCase))
{
var massTransitOptions = context.Services.ExecutePreConfiguredActions<AbpMassTransitOptions>();
var kafkaOptions = context.Services.ExecutePreConfiguredActions<AbpMassTransitKafkaOptions>();

context.Services.AddMassTransit(x =>
{
//PreConfigure
Expand Down Expand Up @@ -164,25 +175,33 @@ public override Task ConfigureServicesAsync(ServiceConfigurationContext context)
}
});
}

return Task.CompletedTask;
}


public override void PostConfigureServices(ServiceConfigurationContext context)
{
AsyncHelper.RunSync(() => PostConfigureServicesAsync(context));
}

public override Task PostConfigureServicesAsync(ServiceConfigurationContext context)
{
Configure<AbpMassTransitKafkaOptions>(options =>
var configuration = context.Services.GetConfiguration();
var abpMassTransitOptions = configuration
.GetSection("MassTransitOptions")
.Get<AbpMassTransitOptions>();
if (abpMassTransitOptions.Provider.Equals(MassTransitKafkaConsts.ProviderName, StringComparison.OrdinalIgnoreCase))
{
var actions = context.Services.GetPreConfigureActions<AbpMassTransitKafkaOptions>();
foreach (var action in actions)
Configure<AbpMassTransitKafkaOptions>(options =>
{
action(options);
}
});
var actions = context.Services.GetPreConfigureActions<AbpMassTransitKafkaOptions>();
foreach (var action in actions)
{
action(options);
}
});
}

return Task.CompletedTask;
}

Expand Down
Loading

0 comments on commit 41b2971

Please sign in to comment.