Skip to content

Commit

Permalink
add ConfigureAwait to most of core Akka lib
Browse files Browse the repository at this point in the history
  • Loading branch information
0x53A committed Mar 9, 2017
1 parent b300fcc commit 6678f41
Show file tree
Hide file tree
Showing 12 changed files with 43 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,10 @@ protected override async Task<AssociationHandle> AssociateInternal(Address remot
var clientBootstrap = ClientFactory(remoteAddress);
var socketAddress = AddressToSocketAddress(remoteAddress);

var associate = await clientBootstrap.ConnectAsync(socketAddress);
var associate = await clientBootstrap.ConnectAsync(socketAddress).ConfigureAwait(false);

var handler = (TcpClientHandler)associate.Pipeline.Last();
return await handler.StatusFuture;
return await handler.StatusFuture.ConfigureAwait(false);
}
catch (AggregateException e) when (e.InnerException is ConnectException)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ public override async Task<Tuple<Address, TaskCompletionSource<IAssociationEvent

try
{
var newServerChannel = await NewServer(listenAddress);
var newServerChannel = await NewServer(listenAddress).ConfigureAwait(false);


// Block reads until a handler actor is registered
Expand Down Expand Up @@ -369,7 +369,7 @@ public override async Task<Tuple<Address, TaskCompletionSource<IAssociationEvent
Log.Error(ex, "Failed to bind to {0}; shutting down Helios transport.", listenAddress);
try
{
await Shutdown();
await Shutdown().ConfigureAwait(false);
}
catch
{
Expand All @@ -384,12 +384,12 @@ public override async Task<Tuple<Address, TaskCompletionSource<IAssociationEvent
/// </summary>
/// <param name="remoteAddress">TBD</param>
/// <returns>TBD</returns>
public override async Task<AssociationHandle> Associate(Address remoteAddress)
public override Task<AssociationHandle> Associate(Address remoteAddress)
{
if (!ServerChannel.IsOpen)
throw new HeliosConnectionException(ExceptionType.NotOpen, "Transport is not open");

return await AssociateInternal(remoteAddress);
return AssociateInternal(remoteAddress);
}

/// <summary>
Expand All @@ -406,10 +406,10 @@ public override async Task<bool> Shutdown()
tasks.Add(channel.CloseAsync());
}
var all = Task.WhenAll(tasks);
await all;
await all.ConfigureAwait(false);

var server = ServerChannel?.CloseAsync() ?? TaskEx.Completed;
await server;
await server.ConfigureAwait(false);

return all.IsCompleted && server.IsCompleted;
}
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3796,8 +3796,8 @@ namespace Akka.Pattern
public Akka.Pattern.CircuitBreaker OnClose(System.Action callback) { }
public Akka.Pattern.CircuitBreaker OnHalfOpen(System.Action callback) { }
public Akka.Pattern.CircuitBreaker OnOpen(System.Action callback) { }
public async System.Threading.Tasks.Task<T> WithCircuitBreaker<T>(System.Func<System.Threading.Tasks.Task<T>> body) { }
public async System.Threading.Tasks.Task WithCircuitBreaker(System.Func<System.Threading.Tasks.Task> body) { }
public System.Threading.Tasks.Task<T> WithCircuitBreaker<T>(System.Func<System.Threading.Tasks.Task<T>> body) { }
public System.Threading.Tasks.Task WithCircuitBreaker(System.Func<System.Threading.Tasks.Task> body) { }
public void WithSyncCircuitBreaker(System.Action body) { }
public T WithSyncCircuitBreaker<T>(System.Func<T> body) { }
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Remote/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,7 @@ private async Task<object> AssociateAsync()
{
try
{
return new Handle(await Transport.Associate(RemoteAddress, _refuseUid));
return new Handle(await Transport.Associate(RemoteAddress, _refuseUid).ConfigureAwait(false));
}
catch (Exception e)
{
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Remote/Transport/AkkaProtocolTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public async Task<AkkaProtocolHandle> Associate(Address remoteAddress, int? refu

manager.Tell(new AssociateUnderlyingRefuseUid(SchemeAugmenter.RemoveScheme(remoteAddress), statusPromise, refuseUid));

return (AkkaProtocolHandle)await statusPromise.Task;
return (AkkaProtocolHandle)await statusPromise.Task.ConfigureAwait(false);
}

#region Static properties
Expand Down
20 changes: 10 additions & 10 deletions src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ protected async Task<IChannel> NewServer(EndPoint listenAddress)
var dns = listenAddress as DnsEndPoint;
if (dns != null)
{
listenAddress = await DnsToIPEndpoint(dns);
listenAddress = await DnsToIPEndpoint(dns).ConfigureAwait(false);
}

return await ServerFactory().BindAsync(listenAddress).ConfigureAwait(false);
Expand All @@ -171,7 +171,7 @@ public override async Task<Tuple<Address, TaskCompletionSource<IAssociationEvent

try
{
var newServerChannel = await NewServer(listenAddress);
var newServerChannel = await NewServer(listenAddress).ConfigureAwait(false);

// Block reads until a handler actor is registered
// no incoming connections will be accepted until this value is reset
Expand Down Expand Up @@ -203,7 +203,7 @@ public override async Task<Tuple<Address, TaskCompletionSource<IAssociationEvent
Log.Error(ex, "Failed to bind to {0}; shutting down DotNetty transport.", listenAddress);
try
{
await Shutdown();
await Shutdown().ConfigureAwait(false);
}
catch
{
Expand All @@ -218,7 +218,7 @@ public override async Task<AssociationHandle> Associate(Address remoteAddress)
if (!ServerChannel.Open)
throw new ChannelException("Transport is not open");

return await AssociateInternal(remoteAddress);
return await AssociateInternal(remoteAddress).ConfigureAwait(false);
}

protected abstract Task<AssociationHandle> AssociateInternal(Address remoteAddress);
Expand All @@ -233,10 +233,10 @@ public override async Task<bool> Shutdown()
tasks.Add(channel.CloseAsync());
}
var all = Task.WhenAll(tasks);
await all;
await all.ConfigureAwait(false);

var server = ServerChannel?.CloseAsync() ?? TaskEx.Completed;
await server;
await server.ConfigureAwait(false);

return all.IsCompleted && server.IsCompleted;
}
Expand Down Expand Up @@ -283,12 +283,12 @@ protected async Task<IPEndPoint> DnsToIPEndpoint(DnsEndPoint dns)
IPEndPoint endpoint;
//if (!Settings.EnforceIpFamily)
//{
// endpoint = await ResolveNameAsync(dns);
// endpoint = await ResolveNameAsync(dns).ConfigureAwait(false);
//}
//else
//{
var addressFamily = Settings.DnsUseIpv6 ? AddressFamily.InterNetworkV6 : AddressFamily.InterNetwork;
endpoint = await ResolveNameAsync(dns, addressFamily);
endpoint = await ResolveNameAsync(dns, addressFamily).ConfigureAwait(false);
//}
return endpoint;
}
Expand Down Expand Up @@ -388,15 +388,15 @@ private ServerBootstrap ServerFactory()

private async Task<IPEndPoint> ResolveNameAsync(DnsEndPoint address)
{
var resolved = await Dns.GetHostEntryAsync(address.Host);
var resolved = await Dns.GetHostEntryAsync(address.Host).ConfigureAwait(false);
//NOTE: for some reason while Helios takes first element from resolved address list
// on the DotNetty side we need to take the last one in order to be compatible
return new IPEndPoint(resolved.AddressList[resolved.AddressList.Length - 1], address.Port);
}

private async Task<IPEndPoint> ResolveNameAsync(DnsEndPoint address, AddressFamily addressFamily)
{
var resolved = await Dns.GetHostEntryAsync(address.Host);
var resolved = await Dns.GetHostEntryAsync(address.Host).ConfigureAwait(false);
var found = resolved.AddressList.LastOrDefault(a => a.AddressFamily == addressFamily);
if (found == null)
{
Expand Down
8 changes: 4 additions & 4 deletions src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,10 @@ protected override async Task<AssociationHandle> AssociateInternal(Address remot
{
var clientBootstrap = ClientFactory(remoteAddress);
var socketAddress = AddressToSocketAddress(remoteAddress);
socketAddress = await MapEndpointAsync(socketAddress);
var associate = await clientBootstrap.ConnectAsync(socketAddress);
socketAddress = await MapEndpointAsync(socketAddress).ConfigureAwait(false);
var associate = await clientBootstrap.ConnectAsync(socketAddress).ConfigureAwait(false);
var handler = (TcpClientHandler)associate.Pipeline.Last();
return await handler.StatusFuture;
return await handler.StatusFuture.ConfigureAwait(false);
}
catch (AggregateException e) when (e.InnerException is ConnectException)
{
Expand Down Expand Up @@ -217,7 +217,7 @@ private async Task<IPEndPoint> MapEndpointAsync(EndPoint socketAddress)

var dns = socketAddress as DnsEndPoint;
if (dns != null)
ipEndPoint = await DnsToIPEndpoint(dns);
ipEndPoint = await DnsToIPEndpoint(dns).ConfigureAwait(false);
else
ipEndPoint = (IPEndPoint) socketAddress;

Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Remote/Transport/TransportAdapters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public override Task<Tuple<Address, TaskCompletionSource<IAssociationEventListen
{
var listenAddress = listenerTask.Result.Item1;
var listenerPromise = listenerTask.Result.Item2;
listenerPromise.TrySetResult(await InterceptListen(listenAddress, upstreamListenerPromise.Task));
listenerPromise.TrySetResult(await InterceptListen(listenAddress, upstreamListenerPromise.Task).ConfigureAwait(false));
return
new Tuple<Address, TaskCompletionSource<IAssociationEventListener>>(
SchemeAugmenter.AugmentScheme(listenAddress), upstreamListenerPromise);
Expand Down
8 changes: 4 additions & 4 deletions src/core/Akka/Pattern/CircuitBreaker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,19 +138,19 @@ public long CurrentFailureCount
/// <typeparam name="T">TBD</typeparam>
/// <param name="body">Call needing protected</param>
/// <returns><see cref="Task"/> containing the call result</returns>
public async Task<T> WithCircuitBreaker<T>(Func<Task<T>> body)
public Task<T> WithCircuitBreaker<T>(Func<Task<T>> body)
{
return await CurrentState.Invoke<T>(body);
return CurrentState.Invoke<T>(body);
}

/// <summary>
/// Wraps invocation of asynchronous calls that need to be protected
/// </summary>
/// <param name="body">Call needing protected</param>
/// <returns><see cref="Task"/></returns>
public async Task WithCircuitBreaker(Func<Task> body)
public Task WithCircuitBreaker(Func<Task> body)
{
await CurrentState.Invoke(body);
return CurrentState.Invoke(body);
}

/// <summary>
Expand Down
16 changes: 8 additions & 8 deletions src/core/Akka/Pattern/CircuitBreakerState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ public HalfOpen(CircuitBreaker breaker)
/// <param name="body">Implementation of the call that needs protected</param>
/// <exception cref="OpenCircuitException">TBD</exception>
/// <returns><see cref="Task"/> containing result of protected call</returns>
public override async Task<T> Invoke<T>(Func<Task<T>> body)
public override Task<T> Invoke<T>(Func<Task<T>> body)
{
if (!_lock.CompareAndSet(true, false))
{
throw new OpenCircuitException();
}
return await CallThrough(body);
return CallThrough(body);
}

/// <summary>
Expand All @@ -122,13 +122,13 @@ public override async Task<T> Invoke<T>(Func<Task<T>> body)
/// <param name="body">Implementation of the call that needs protected</param>
/// <exception cref="OpenCircuitException">TBD</exception>
/// <returns><see cref="Task"/> containing result of protected call</returns>
public override async Task Invoke(Func<Task> body)
public override Task Invoke(Func<Task> body)
{
if (!_lock.CompareAndSet(true, false))
{
throw new OpenCircuitException();
}
await CallThrough(body);
return CallThrough(body);
}

/// <summary>
Expand Down Expand Up @@ -188,19 +188,19 @@ public Closed(CircuitBreaker breaker)
/// <typeparam name="T">TBD</typeparam>
/// <param name="body">Implementation of the call that needs protected</param>
/// <returns><see cref="Task"/> containing result of protected call</returns>
public override async Task<T> Invoke<T>(Func<Task<T>> body)
public override Task<T> Invoke<T>(Func<Task<T>> body)
{
return await CallThrough(body);
return CallThrough(body);
}

/// <summary>
/// Implementation of invoke, which simply attempts the call
/// </summary>
/// <param name="body">Implementation of the call that needs protected</param>
/// <returns><see cref="Task"/> containing result of protected call</returns>
public override async Task Invoke(Func<Task> body)
public override Task Invoke(Func<Task> body)
{
await CallThrough(body);
return CallThrough(body);
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/Routing/TailChopping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public override void Send(object message, IActorRef sender)
try
{

completion.TrySetResult(await ((Task<object>)_routees[currentIndex].Ask(message, _within)));
completion.TrySetResult(await ((Task<object>)_routees[currentIndex].Ask(message, _within)).ConfigureAwait(false));
}
catch (TaskCanceledException)
{
Expand Down
6 changes: 3 additions & 3 deletions src/core/Akka/Util/Internal/AtomicState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ await Task
listener.Invoke();
}
}
);
).ConfigureAwait(false);
}

/// <summary>
Expand All @@ -89,7 +89,7 @@ public async Task<T> CallThrough<T>(Func<Task<T>> task)
T result = default(T);
try
{
result = await task();
result = await task().ConfigureAwait(false);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -128,7 +128,7 @@ public async Task CallThrough(Func<Task> task)

try
{
await task();
await task().ConfigureAwait(false);
}
catch (Exception ex)
{
Expand Down

0 comments on commit 6678f41

Please sign in to comment.