Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2119 Review load balancing (2nd round) and redesign DefaultConsulServiceBuilder with ConsulProviderFactory refactoring to make it thread safe and friendly #2151

Merged
merged 20 commits into from
Oct 3, 2024
Merged
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Reuse service counters of ConcurrentSteps
raman-m committed Sep 4, 2024
commit 48e46637c106a8010dc05edc6510ad029104dd79
41 changes: 38 additions & 3 deletions test/Ocelot.AcceptanceTests/ConcurrentSteps.cs
Original file line number Diff line number Diff line change
@@ -151,9 +151,6 @@ protected Task[] RunParallelRequests(int times, Func<int, string> urlFunc)
return _tasks;
}

public void ThenAllStatusCodesShouldBe(HttpStatusCode expected)
=> _responses.ShouldAllBe(response => response.Value.StatusCode == expected);

private async Task GetParallelResponse(string url, int threadIndex)
{
var response = await _ocelotClient.GetAsync(url);
@@ -165,4 +162,42 @@ private async Task GetParallelResponse(string url, int threadIndex)
count.ShouldBeGreaterThan(0);
_responses[threadIndex] = response;
}

public void ThenAllStatusCodesShouldBe(HttpStatusCode expected)
=> _responses.ShouldAllBe(response => response.Value.StatusCode == expected);

private string CalledTimesMessage()
{
var sortedByIndex = _counters.OrderBy(_ => _.Key).Select(_ => _.Value).ToArray();
return $"All values are [{string.Join(',', sortedByIndex)}]";
}

public void ThenAllServicesShouldHaveBeenCalledTimes(int expected)
=> _counters.Sum(_ => _.Value).ShouldBe(expected, CalledTimesMessage());

public void ThenServiceShouldHaveBeenCalledTimes(int index, int expected)
=> _counters[index].ShouldBe(expected, CalledTimesMessage());

public void ThenServicesShouldHaveBeenCalledTimes(params int[] expected)
{
for (int i = 0; i < expected.Length; i++)
{
_counters[i].ShouldBe(expected[i], CalledTimesMessage());
}
}

public void ThenAllServicesCalledRealisticAmountOfTimes(int bottom, int top)
{
var sortedByIndex = _counters.OrderBy(_ => _.Key).Select(_ => _.Value).ToArray();
var customMessage = $"{nameof(bottom)}: {bottom}\n {nameof(top)}: {top}\n All values are [{string.Join(',', sortedByIndex)}]";
int sum = 0, totalSum = _counters.Sum(_ => _.Value);

// Last offline services cannot be called at all, thus don't assert zero counters
for (int i = 0; i < _counters.Count && sum < totalSum; i++)
{
int actual = _counters[i];
actual.ShouldBeInRange(bottom, top, customMessage);
sum += actual;
}
}
}
24 changes: 6 additions & 18 deletions test/Ocelot.AcceptanceTests/LoadBalancerTests.cs
Original file line number Diff line number Diff line change
@@ -31,10 +31,10 @@ public void ShouldLoadBalanceRequest_WithLeastConnection()
this.Given(x => GivenThereIsAConfiguration(configuration))
.And(x => GivenOcelotIsRunning())
.When(x => WhenIGetUrlOnTheApiGatewayConcurrently("/", 50))
.Then(x => x.ThenTheTwoServicesShouldHaveBeenCalledTimes(50))
.Then(x => ThenAllServicesShouldHaveBeenCalledTimes(50))

// Quite risky assertion because the actual values based on health checks and threading
.And(x => x.ThenBothServicesCalledRealisticAmountOfTimes(1, 49)) // (24, 26)
.And(x => ThenAllServicesCalledRealisticAmountOfTimes(1, 49)) // (24, 26)
.BDDfy();
}

@@ -52,10 +52,10 @@ public void ShouldLoadBalanceRequest_WithRoundRobin()
.And(x => GivenThereIsAConfiguration(configuration))
.And(x => GivenOcelotIsRunning())
.When(x => WhenIGetUrlOnTheApiGatewayConcurrently("/", 50))
.Then(x => x.ThenTheTwoServicesShouldHaveBeenCalledTimes(50))
.Then(x => ThenAllServicesShouldHaveBeenCalledTimes(50))

// Quite risky assertion because the actual values based on health checks and threading
.And(x => x.ThenBothServicesCalledRealisticAmountOfTimes(1, 49)) // (24, 26)
.And(x => ThenAllServicesCalledRealisticAmountOfTimes(1, 49)) // (24, 26)
.BDDfy();
}

@@ -73,10 +73,10 @@ public void ShouldLoadBalanceRequest_WithCustomLoadBalancer()
this.Given(x => GivenThereIsAConfiguration(configuration))
.And(x => GivenOcelotIsRunningWithCustomLoadBalancer(loadBalancerFactoryFunc))
.When(x => WhenIGetUrlOnTheApiGatewayConcurrently("/", 50))
.Then(x => x.ThenTheTwoServicesShouldHaveBeenCalledTimes(50))
.Then(x => ThenAllServicesShouldHaveBeenCalledTimes(50))

// Quite risky assertion because the actual values based on health checks and threading
.And(x => x.ThenBothServicesCalledRealisticAmountOfTimes(1, 49)) // (24, 26)
.And(x => ThenAllServicesCalledRealisticAmountOfTimes(1, 49)) // (24, 26)
.BDDfy();
}

@@ -111,18 +111,6 @@ public async Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
public void Release(ServiceHostAndPort hostAndPort) { }
}

private void ThenBothServicesCalledRealisticAmountOfTimes(int bottom, int top)
{
_counters[0].ShouldBeInRange(bottom, top);
_counters[1].ShouldBeInRange(bottom, top);
}

private void ThenTheTwoServicesShouldHaveBeenCalledTimes(int expected)
{
var total = _counters[0] + _counters[1];
total.ShouldBe(expected);
}

private FileRoute GivenRoute(string loadBalancer, params int[] ports) => new()
{
DownstreamPathTemplate = "/",
Original file line number Diff line number Diff line change
@@ -45,21 +45,19 @@ public void ShouldDiscoverServicesInConsul_LoadBalanceByLeastConnection_InRoutes
{
const string serviceName = "product";
var consulPort = PortFinder.GetRandomPort();
var port1 = PortFinder.GetRandomPort();
var port2 = PortFinder.GetRandomPort();
var serviceEntryOne = GivenServiceEntry(port1, serviceName: serviceName);
var serviceEntryTwo = GivenServiceEntry(port2, serviceName: serviceName);
var ports = PortFinder.GetPorts(2);
var serviceEntries = ports.Select(port => GivenServiceEntry(port, serviceName: serviceName)).ToArray();
var route = GivenRoute(serviceName: serviceName, loadBalancerType: nameof(LeastConnection));
var configuration = GivenServiceDiscovery(consulPort, route);
var urls = new string[] { DownstreamUrl(port1) , DownstreamUrl(port2) };
var urls = ports.Select(DownstreamUrl).ToArray();
this.Given(x => GivenMultipleServiceInstancesAreRunning(urls, serviceName))
.And(x => x.GivenThereIsAFakeConsulServiceDiscoveryProvider(DownstreamUrl(consulPort)))
.And(x => x.GivenTheServicesAreRegisteredWithConsul(serviceEntryOne, serviceEntryTwo))
.And(x => x.GivenTheServicesAreRegisteredWithConsul(serviceEntries))
.And(x => GivenThereIsAConfiguration(configuration))
.And(x => GivenOcelotIsRunningWithServices(WithConsul))
.When(x => WhenIGetUrlOnTheApiGatewayConcurrently("/", 50))
.Then(x => x.ThenTheTwoServicesShouldHaveBeenCalledTimes(50))
.And(x => x.ThenBothServicesCalledRealisticAmountOfTimes(1, 49)) // LeastConnection is unpredictable
.Then(x => ThenAllServicesShouldHaveBeenCalledTimes(50))
.And(x => ThenAllServicesCalledRealisticAmountOfTimes(1, 49)) // LeastConnection is unpredictable
.BDDfy();
}

@@ -124,24 +122,22 @@ public void ShouldUseConsulServiceDiscovery_LoadBalanceRequest_DynamicRoutingWit
{
const string serviceName = "product";
var consulPort = PortFinder.GetRandomPort();
var port1 = PortFinder.GetRandomPort();
var port2 = PortFinder.GetRandomPort();
var serviceEntry1 = GivenServiceEntry(port1, serviceName: serviceName);
var serviceEntry2 = GivenServiceEntry(port2, serviceName: serviceName);
var ports = PortFinder.GetPorts(2);
var serviceEntries = ports.Select(port => GivenServiceEntry(port, serviceName: serviceName)).ToArray();

var configuration = GivenServiceDiscovery(consulPort);
configuration.GlobalConfiguration.LoadBalancerOptions = new() { Type = nameof(LeastConnection) };
configuration.GlobalConfiguration.DownstreamScheme = "http";

var urls = new string[] { DownstreamUrl(port1), DownstreamUrl(port2) };
var urls = ports.Select(DownstreamUrl).ToArray();
this.Given(x => GivenMultipleServiceInstancesAreRunning(urls, serviceName))
.And(x => x.GivenThereIsAFakeConsulServiceDiscoveryProvider(DownstreamUrl(consulPort)))
.And(x => x.GivenTheServicesAreRegisteredWithConsul(serviceEntry1, serviceEntry2))
.And(x => x.GivenTheServicesAreRegisteredWithConsul(serviceEntries))
.And(x => GivenThereIsAConfiguration(configuration))
.And(x => GivenOcelotIsRunningWithServices(WithConsul))
.When(x => WhenIGetUrlOnTheApiGatewayConcurrently($"/{serviceName}/", 50))
.Then(x => x.ThenTheTwoServicesShouldHaveBeenCalledTimes(50))
.And(x => x.ThenBothServicesCalledRealisticAmountOfTimes(1, 49)) // LeastConnection is unpredictable
.Then(x => ThenAllServicesShouldHaveBeenCalledTimes(50))
.And(x => ThenAllServicesCalledRealisticAmountOfTimes(1, 49)) // LeastConnection is unpredictable
.BDDfy();
}

@@ -177,30 +173,28 @@ public void ShouldSendRequestToService_AfterItBecomesAvailableInConsul()
{
const string serviceName = "product";
var consulPort = PortFinder.GetRandomPort();
var port1 = PortFinder.GetRandomPort();
var port2 = PortFinder.GetRandomPort();
var serviceEntry1 = GivenServiceEntry(port1, serviceName: serviceName);
var serviceEntry2 = GivenServiceEntry(port2, serviceName: serviceName);
var ports = PortFinder.GetPorts(2);
var serviceEntries = ports.Select(port => GivenServiceEntry(port, serviceName: serviceName)).ToArray();
var route = GivenRoute(serviceName: serviceName);
var configuration = GivenServiceDiscovery(consulPort, route);
var urls = new string[] { DownstreamUrl(port1), DownstreamUrl(port2) };
var urls = ports.Select(DownstreamUrl).ToArray();
this.Given(_ => GivenMultipleServiceInstancesAreRunning(urls, serviceName))
.And(x => x.GivenThereIsAFakeConsulServiceDiscoveryProvider(DownstreamUrl(consulPort)))
.And(x => x.GivenTheServicesAreRegisteredWithConsul(serviceEntry1, serviceEntry2))
.And(x => x.GivenTheServicesAreRegisteredWithConsul(serviceEntries))
.And(x => GivenThereIsAConfiguration(configuration))
.And(x => GivenOcelotIsRunningWithServices(WithConsul))
.And(x => WhenIGetUrlOnTheApiGatewayConcurrently("/", 10))
.And(x => x.ThenTheTwoServicesShouldHaveBeenCalledTimes(10))
.And(x => x.ThenBothServicesCalledRealisticAmountOfTimes(1, 9)) //(4, 6))
.And(x => x.WhenIRemoveAService(serviceEntry2))
.And(x => ThenAllServicesShouldHaveBeenCalledTimes(10))
.And(x => ThenAllServicesCalledRealisticAmountOfTimes(1, 9)) //(4, 6))
.And(x => x.WhenIRemoveAService(serviceEntries[1])) // 2nd entry
.And(x => x.GivenIResetCounters())
.And(x => WhenIGetUrlOnTheApiGatewayConcurrently("/", 10))
.And(x => x.ThenOnlyOneServiceHasBeenCalled())
.And(x => x.WhenIAddAServiceBackIn(serviceEntry2))
.And(x => ThenServicesShouldHaveBeenCalledTimes(10, 0)) // 2nd is offline
.And(x => x.WhenIAddAServiceBackIn(serviceEntries[1])) // 2nd entry
.And(x => x.GivenIResetCounters())
.When(x => WhenIGetUrlOnTheApiGatewayConcurrently("/", 10))
.Then(x => x.ThenTheTwoServicesShouldHaveBeenCalledTimes(10))
.And(x => x.ThenBothServicesCalledRealisticAmountOfTimes(4, 6))
.Then(x => ThenAllServicesShouldHaveBeenCalledTimes(10))
.And(x => ThenAllServicesCalledRealisticAmountOfTimes(4, 6))
.BDDfy();
}

@@ -335,25 +329,25 @@ public void ShouldReturnServiceAddressByOverriddenServiceBuilder_WhenThereIsANod
[Trait("Bug", "2119")]
public void ShouldReturnDifferentServices_ConcurrentRequestsWithDifferentServices()
{
var names = new string[] { "ProjectsService", "CustomersService" };
var consulPort = PortFinder.GetRandomPort();
var port1 = PortFinder.GetRandomPort();
var port2 = PortFinder.GetRandomPort();
var service1 = GivenServiceEntry(port1, serviceName: "ProjectsService");
var service2 = GivenServiceEntry(port2, serviceName: "CustomersService");
var route1 = GivenRoute("/{all}", "/projects/{all}", serviceName: "ProjectsService", loadBalancerType: nameof(LeastConnection));
var route2 = GivenRoute("/{all}", "/customers/{all}", serviceName: "CustomersService", loadBalancerType: nameof(LeastConnection));
var ports = PortFinder.GetPorts(2);
var service1 = GivenServiceEntry(ports[0], serviceName: names[0]);
var service2 = GivenServiceEntry(ports[1], serviceName: names[1]);
var route1 = GivenRoute("/{all}", "/projects/{all}", serviceName: names[0], loadBalancerType: nameof(LeastConnection));
var route2 = GivenRoute("/{all}", "/customers/{all}", serviceName: names[1], loadBalancerType: nameof(LeastConnection));
route1.UpstreamHttpMethod = route2.UpstreamHttpMethod = new() { HttpMethods.Get, HttpMethods.Post, HttpMethods.Put, HttpMethods.Delete };
var configuration = GivenServiceDiscovery(consulPort, route1, route2);
var urls = new string[] { DownstreamUrl(port1), DownstreamUrl(port2) };
var responses = new string[] { "ProjectsService", "CustomersService" };
this.Given(x => GivenMultipleServiceInstancesAreRunning(urls, responses))
var urls = ports.Select(DownstreamUrl).ToArray();
this.Given(x => GivenMultipleServiceInstancesAreRunning(urls, names)) // service names as responses
.And(x => x.GivenThereIsAFakeConsulServiceDiscoveryProvider(DownstreamUrl(consulPort)))
.And(x => x.GivenTheServicesAreRegisteredWithConsul(service1, service2))
.And(x => GivenThereIsAConfiguration(configuration))
.And(x => GivenOcelotIsRunningWithServices(WithConsul))
.When(x => WhenIGetUrlOnTheApiGatewayConcurrently(50, "/customers/api/customers", "/projects/api/projects"))
.Then(x => x.ThenTheTwoServicesShouldHaveBeenCalledTimes(50))
.And(x => x.ThenBothServicesCalledRealisticAmountOfTimes(24, 26)) // LeastConnection is unpredictable
.Then(x => ThenAllServicesShouldHaveBeenCalledTimes(50))
.And(x => ThenAllServicesCalledRealisticAmountOfTimes(24, 26)) // LeastConnection is unpredictable
.And(x => ThenServicesShouldHaveBeenCalledTimes(25, 25))
.BDDfy();
}

@@ -430,36 +424,17 @@ private void WhenIAddAServiceBackIn(ServiceEntry serviceEntry)
_consulServices.Add(serviceEntry);
}

private void ThenOnlyOneServiceHasBeenCalled()
{
_counters[0].ShouldBe(10);
_counters[1].ShouldBe(0);
}

private void WhenIRemoveAService(ServiceEntry serviceEntry)
{
_consulServices.Remove(serviceEntry);
}

private void GivenIResetCounters()
{
_counters[0] = 0;
_counters[1] = 0;
_counters[0] = _counters[1] = 0;
_counterConsul = 0;
}

private void ThenBothServicesCalledRealisticAmountOfTimes(int bottom, int top)
{
_counters[0].ShouldBeInRange(bottom, top);
_counters[1].ShouldBeInRange(bottom, top);
}

private void ThenTheTwoServicesShouldHaveBeenCalledTimes(int expected)
{
var total = _counters[0] + _counters[1];
total.ShouldBe(expected);
}

private void GivenTheServicesAreRegisteredWithConsul(params ServiceEntry[] serviceEntries) => _consulServices.AddRange(serviceEntries);
private void GivenTheServiceNodesAreRegisteredWithConsul(params Node[] nodes) => _consulNodes.AddRange(nodes);

Original file line number Diff line number Diff line change
@@ -66,6 +66,7 @@ public void ShouldReturnServicesFromK8s()
.When(_ => WhenIGetUrlOnTheApiGateway("/"))
.Then(_ => ThenTheStatusCodeShouldBe(HttpStatusCode.OK))
.And(_ => ThenTheResponseBodyShouldBe($"1:{downstreamResponse}"))
.And(x => ThenAllServicesShouldHaveBeenCalledTimes(1))
.And(x => x.ThenTheTokenIs("Bearer txpc696iUhbVoudg164r93CxDTrKRVWG"))
.BDDfy();
}
@@ -107,6 +108,7 @@ public void ShouldReturnServicesByPortNameAsDownstreamScheme(string downstreamSc
.And(_ => ThenTheResponseBodyShouldBe(downstreamScheme == "http"
? "1:" + nameof(ShouldReturnServicesByPortNameAsDownstreamScheme)
: string.Empty))
.And(x => ThenAllServicesShouldHaveBeenCalledTimes(downstreamScheme == "http" ? 1 : 0))
.And(x => x.ThenTheTokenIs("Bearer txpc696iUhbVoudg164r93CxDTrKRVWG"))
.BDDfy();
}
@@ -162,9 +164,7 @@ public void ShouldHighlyLoadOnUnstableKubeProvider_WithRoundRobinLoadBalancing(i
[CallerMemberName] string serviceName = nameof(ArrangeHighLoadOnKubeProviderAndRoundRobinBalancer))
{
const string namespaces = nameof(KubernetesServiceDiscoveryTests);
var servicePorts = Enumerable.Repeat(0, totalServices)
.Select(_ => PortFinder.GetRandomPort())
.ToArray();
var servicePorts = PortFinder.GetPorts(totalServices);
var downstreamUrls = servicePorts
.Select(port => LoopbackLocalhostUrl(port, Array.IndexOf(servicePorts, port)))
.ToArray(); // based on localhost aka loopback network interface
@@ -195,6 +195,7 @@ private void HighlyLoadOnKubeProviderAndRoundRobinBalancer(int totalRequests, in
ThenAllStatusCodesShouldBe(HttpStatusCode.OK);
ThenAllServicesShouldHaveBeenCalledTimes(totalRequests);
_roundRobinAnalyzer.ShouldNotBeNull().Analyze();
_roundRobinAnalyzer.Events.Count.ShouldBe(totalRequests);
_roundRobinAnalyzer.HasManyServiceGenerations(k8sGenerationNo).ShouldBeTrue();
}

@@ -333,29 +334,6 @@ private RoundRobinAnalyzer GetRoundRobinAnalyzer(DownstreamRoute route, IService
private static readonly object K8sCounterLocker = new();
private int _k8sCounter, _k8sServiceGeneration;

private void ThenAllServicesShouldHaveBeenCalledTimes(int expected)
{
var sortedByIndex = _counters.OrderBy(_ => _.Key).Select(_ => _.Value).ToArray();
var customMessage = $"All values are [{string.Join(',', sortedByIndex)}]";
_counters.Sum(_ => _.Value).ShouldBe(expected, customMessage);
_roundRobinAnalyzer.Events.Count.ShouldBe(expected);
}

private void ThenAllServicesCalledRealisticAmountOfTimes(int bottom, int top)
{
var sortedByIndex = _counters.OrderBy(_ => _.Key).Select(_ => _.Value).ToArray();
var customMessage = $"{nameof(bottom)}: {bottom}\n {nameof(top)}: {top}\n All values are [{string.Join(',', sortedByIndex)}]";
int sum = 0, totalSum = _counters.Sum(_ => _.Value);

// Last services cannot be called at all, zero counters
for (int i = 0; i < _counters.Count && sum < totalSum; i++)
{
int actual = _counters[i];
actual.ShouldBeInRange(bottom, top, customMessage);
sum += actual;
}
}

private void ThenServiceCountersShouldMatchLeasingCounters(int[] ports)
{
var leasingCounters = _roundRobinAnalyzer.GetHostCounters();
7 changes: 4 additions & 3 deletions test/Ocelot.Testing/PortFinder.cs
Original file line number Diff line number Diff line change
@@ -36,10 +36,10 @@ public static int[] GetPorts(int count)
var ports = new int[count];
lock (LockObj)
{
for (int i = 0; i < count; i++)
for (int i = 0; i < count; i++, CurrentPort++)
{
ExceedingPortRangeException.ThrowIf(CurrentPort > EndPortRange);
ports[i] = UsePort(CurrentPort++);
ports[i] = UsePort(CurrentPort);
}
}
return ports;
@@ -63,5 +63,6 @@ public class ExceedingPortRangeException : Exception
public ExceedingPortRangeException()
: base("Cannot find available port to bind to!") { }

public static void ThrowIf(bool condition) => _ = condition ? throw new ExceedingPortRangeException() : 0;
public static void ThrowIf(bool condition)
=> _ = condition ? throw new ExceedingPortRangeException() : 0;
}