Skip to content

Commit

Permalink
Merge pull request #428 from K-Society/experimental
Browse files Browse the repository at this point in the history
Fix in CallAsync
  • Loading branch information
maniglia authored Mar 4, 2024
2 parents fce04b3 + 0448ab8 commit ab55a43
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="8.0.0" Condition="'$(TargetFramework)' == 'netstandard2.0'" />
<PackageReference Include="CsvHelper" Version="31.0.0" />
<PackageReference Include="CsvHelper" Version="31.0.2" />
<PackageReference Include="MediatR.Contracts" Version="2.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.NETCore.Platforms" Version="7.0.4" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="8.0.0" Condition="'$(TargetFramework)' == 'netstandard2.0'" />
<PackageReference Include="Autofac" Version="8.0.0" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="8.0.0" Condition="'$(TargetFramework)' == 'netstandard2.0'" />
<PackageReference Include="protobuf-net" Version="3.2.30" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,6 @@ public override async ValueTask Publish(IIntegrationEvent @event)
}
}

//public async Task<TIntegrationEventReply> CallAsync<TIntegrationEventReply>(T @event,
// CancellationToken cancellationToken = default)
// where TIntegrationEventReply : IIntegrationEventReply
public async Task<TIntegrationEventReply> CallAsync<TIntegrationEventRpc>(TIntegrationEventRpc @event,
CancellationToken cancellationToken = default)
where TIntegrationEventRpc : IIntegrationEventRpc, new()
Expand All @@ -194,63 +191,76 @@ public async Task<TIntegrationEventReply> CallAsync<TIntegrationEventRpc>(TInteg
return default;
}

if (!this.PersistentConnection.IsConnected)
bool isConnected;
if (this.PersistentConnection.IsConnected)
{
await this.PersistentConnection.TryConnectAsync().ConfigureAwait(false);
//this.PersistentConnection.TryConnect();
isConnected = true;
}
else
{
isConnected = await this.PersistentConnection.TryConnectAsync().ConfigureAwait(false);
}

var policy = Policy.Handle<BrokerUnreachableException>()
.Or<SocketException>()
.Or<Exception>()
.WaitAndRetryForever(retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
{
this.Logger.LogWarning(ex, "CallAsync: ");
});
if (isConnected)
{
var policy = Policy.Handle<BrokerUnreachableException>()
.Or<SocketException>()
.Or<Exception>()
.WaitAndRetryForever(retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
(ex, time) =>
{
this.Logger.LogWarning(ex, "CallAsync: ");
});

var correlationId = Guid.NewGuid().ToString();
var correlationId = Guid.NewGuid().ToString();

var tcs = new TaskCompletionSource<TIntegrationEventReply>(TaskCreationOptions.RunContinuationsAsynchronously);
var addResult = this._callbackMapper.TryAdd(correlationId, tcs);
var tcs = new TaskCompletionSource<TIntegrationEventReply>(TaskCreationOptions
.RunContinuationsAsynchronously);
var addResult = this._callbackMapper.TryAdd(correlationId, tcs);

using (var channel = this.PersistentConnection.CreateModel())
{
if (channel != null)
using (var channel = this.PersistentConnection.CreateModel())
{
var routingKey = @event.RoutingKey;
if (this.EventBusParameters.ExchangeDeclareParameters != null)
if (channel != null)
{
channel.ExchangeDeclare(this.EventBusParameters.ExchangeDeclareParameters.ExchangeName,
this.EventBusParameters.ExchangeDeclareParameters.ExchangeType,
this.EventBusParameters.ExchangeDeclareParameters.ExchangeDurable,
this.EventBusParameters.ExchangeDeclareParameters.ExchangeAutoDelete);

using (var ms = new MemoryStream())
var routingKey = @event.RoutingKey;
if (this.EventBusParameters.ExchangeDeclareParameters != null)
{
Serializer.Serialize(ms, @event);
var body = ms.ToArray();
channel.ExchangeDeclare(this.EventBusParameters.ExchangeDeclareParameters.ExchangeName,
this.EventBusParameters.ExchangeDeclareParameters.ExchangeType,
this.EventBusParameters.ExchangeDeclareParameters.ExchangeDurable,
this.EventBusParameters.ExchangeDeclareParameters.ExchangeAutoDelete);

policy.Execute(() =>
using (var ms = new MemoryStream())
{
var properties = channel.CreateBasicProperties();
Serializer.Serialize(ms, @event);
var body = ms.ToArray();

policy.Execute(() =>
{
var properties = channel.CreateBasicProperties();

properties.DeliveryMode = 1; //2 = persistent, write on disk
properties.CorrelationId = correlationId;
properties.ReplyTo = this._queueNameReply; //ToDo
properties.DeliveryMode = 1; //2 = persistent, write on disk
properties.CorrelationId = correlationId;
properties.ReplyTo = this._queueNameReply; //ToDo

channel.BasicPublish(this.EventBusParameters.ExchangeDeclareParameters.ExchangeName, routingKey,true, properties, body);
});
channel.BasicPublish(
this.EventBusParameters.ExchangeDeclareParameters.ExchangeName, routingKey,
true, properties, body);
});
}
}
}
}
}

//cancellationToken.Register(() => this._callbackMapper.TryRemove(correlationId, out var tmp));
cancellationToken.Register(() => this.HandleResponse(correlationId, cancellationToken));
//cancellationToken.Register(() => this._callbackMapper.TryRemove(correlationId, out var tmp));
cancellationToken.Register(() => this.HandleResponse(correlationId, cancellationToken));

var result = await tcs.Task.ConfigureAwait(false);
var result = await tcs.Task.ConfigureAwait(false);

return result;
}

return result;
this.Logger.LogWarning("CallAsync: {0}", "Not connected to bus!");
}
catch (TaskCanceledException)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="CsvHelper" Version="31.0.0" />
<PackageReference Include="CsvHelper" Version="31.0.2" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageReference Include="protobuf-net" Version="3.2.30" />
<PackageReference Include="xunit" Version="2.7.0" />
Expand Down

0 comments on commit ab55a43

Please sign in to comment.