Skip to content

Commit

Permalink
Fix flaky streaming test (#2134)
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesNK authored May 25, 2023
1 parent d8e11f4 commit dcf1f81
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 5 deletions.
4 changes: 3 additions & 1 deletion test/FunctionalTests/Client/StreamingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1176,7 +1176,9 @@ await responseStream.WriteAsync(

Logger.LogInformation("Client reading canceled message from server.");
var clientEx = await ExceptionAssert.ThrowsAsync<RpcException>(() => call.ResponseStream.MoveNext()).DefaultTimeout();
Assert.AreEqual(StatusCode.Cancelled, clientEx.StatusCode);

// Race on the server can change which error is returned.
Assert.IsTrue(clientEx.StatusCode == StatusCode.Cancelled || clientEx.StatusCode == StatusCode.Internal);
}

[Test]
Expand Down
12 changes: 11 additions & 1 deletion test/Grpc.Net.Client.Tests/Balancer/ConnectionManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public async Task PickAsync_ChannelStateChangesWithWaitForReady_WaitsForCorrectE
services.AddNUnitLogger();
var serviceProvider = services.BuildServiceProvider();
var loggerFactory = serviceProvider.GetRequiredService<ILoggerFactory>();
var logger = loggerFactory.CreateLogger(GetType());

var resolver = new TestResolver(loggerFactory);
resolver.UpdateAddresses(new List<BalancerAddress>
Expand All @@ -104,23 +105,32 @@ public async Task PickAsync_ChannelStateChangesWithWaitForReady_WaitsForCorrectE
// Assert
Assert.AreEqual(new DnsEndPoint("localhost", 80), result1.Address!.EndPoint);

logger.LogInformation("Updating resolve to have 80 and 81 addresses.");
resolver.UpdateAddresses(new List<BalancerAddress>
{
new BalancerAddress("localhost", 80),
new BalancerAddress("localhost", 81)
});

logger.LogInformation("Wait for both subchannels to be ready.");
await BalancerWaitHelpers.WaitForSubchannelsToBeReadyAsync(logger, clientChannel, expectedCount: 2);

// This needs to happen after both subchannels are ready so the Transports collection has two items in it.
logger.LogInformation("Make subchannels not ready.");
for (var i = 0; i < transportFactory.Transports.Count; i++)
{
transportFactory.Transports[i].UpdateState(ConnectivityState.TransientFailure);
}

logger.LogInformation("Wait for both subchannels to not be ready.");
await BalancerWaitHelpers.WaitForSubchannelsToBeReadyAsync(logger, clientChannel, expectedCount: 0);

var pickTask2 = clientChannel.PickAsync(
new PickContext { Request = new HttpRequestMessage() },
waitForReady: true,
CancellationToken.None).AsTask().DefaultTimeout();

Assert.IsFalse(pickTask2.IsCompleted);
Assert.IsFalse(pickTask2.IsCompleted, "PickAsync should wait until an subchannel is ready.");

resolver.UpdateAddresses(new List<BalancerAddress>
{
Expand Down
12 changes: 9 additions & 3 deletions test/Shared/BalancerWaitHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

namespace Grpc.Tests.Shared;

public static class BalancerWaitHelpers
internal static class BalancerWaitHelpers
{
public static Task WaitForChannelStateAsync(ILogger logger, GrpcChannel channel, ConnectivityState state, int channelId = 1)
{
Expand Down Expand Up @@ -57,7 +57,12 @@ public static async Task<Subchannel> WaitForSubchannelToBeReadyAsync(ILogger log
return subChannel;
}

public static async Task<Subchannel[]> WaitForSubchannelsToBeReadyAsync(ILogger logger, GrpcChannel channel, int expectedCount, Func<SubchannelPicker?, Subchannel[]>? getPickerSubchannels = null, Func<Subchannel, bool>? validateSubchannel = null)
public static Task<Subchannel[]> WaitForSubchannelsToBeReadyAsync(ILogger logger, GrpcChannel channel, int expectedCount, Func<SubchannelPicker?, Subchannel[]>? getPickerSubchannels = null, Func<Subchannel, bool>? validateSubchannel = null)
{
return WaitForSubchannelsToBeReadyAsync(logger, channel.ConnectionManager, expectedCount, getPickerSubchannels, validateSubchannel);
}

public static async Task<Subchannel[]> WaitForSubchannelsToBeReadyAsync(ILogger logger, ConnectionManager connectionManager, int expectedCount, Func<SubchannelPicker?, Subchannel[]>? getPickerSubchannels = null, Func<Subchannel, bool>? validateSubchannel = null)
{
if (getPickerSubchannels == null)
{
Expand All @@ -68,6 +73,7 @@ public static async Task<Subchannel[]> WaitForSubchannelsToBeReadyAsync(ILogger
RoundRobinPicker roundRobinPicker => roundRobinPicker._subchannels.ToArray(),
PickFirstPicker pickFirstPicker => new[] { pickFirstPicker.Subchannel },
EmptyPicker emptyPicker => Array.Empty<Subchannel>(),
ErrorPicker errorPicker => Array.Empty<Subchannel>(),
null => Array.Empty<Subchannel>(),
_ => throw new Exception("Unexpected picker type: " + picker.GetType().FullName)
};
Expand All @@ -79,7 +85,7 @@ public static async Task<Subchannel[]> WaitForSubchannelsToBeReadyAsync(ILogger
Subchannel[]? subChannelsCopy = null;
await TestHelpers.AssertIsTrueRetryAsync(() =>
{
var picker = channel.ConnectionManager._picker;
var picker = connectionManager._picker;
subChannelsCopy = getPickerSubchannels(picker);
logger.LogInformation($"Current subchannel ready count: {subChannelsCopy.Length}");
for (var i = 0; i < subChannelsCopy.Length; i++)
Expand Down

0 comments on commit dcf1f81

Please sign in to comment.