Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2110 Review load balancing and independent fetching the list of services in Kube provider #2111

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
360bd61
Move the creation of the services list from the class field to the me…
antikorol Jun 26, 2024
d19322d
Early return after data checking
raman-m Jun 27, 2024
0e9a616
Add unit test for concurrent get list of services
antikorol Jul 1, 2024
bbed7d1
Add logging for invalid service configuration error in RoundRobin loa…
antikorol Jul 9, 2024
fbd0d8d
Code review by @raman-m
raman-m Jul 11, 2024
3fc03dd
Merge branch 'develop' into bugfix/race-condition-on-fetching-service…
raman-m Jul 16, 2024
634e639
Workaround for mistakes made during acceptance testing of load balanc…
raman-m Jul 17, 2024
65a9b62
Let's DRY StickySessionsTests
raman-m Jul 17, 2024
9bb301a
Add acceptance tests, but...
raman-m Jul 17, 2024
1538451
Merge branch 'develop' into bugfix/race-condition-on-fetching-service…
raman-m Jul 18, 2024
65105a8
Independent static indexing iterators per route via service names
raman-m Jul 24, 2024
1036939
Stabilize `CookieStickySessions` load balancer.
raman-m Jul 25, 2024
e33bbc5
Refactor Lease operation for load balancing.
raman-m Jul 29, 2024
2b958bb
Leasing mechanism in Round Robin load balancer
raman-m Aug 4, 2024
a7b5641
Acceptance tests, final version
raman-m Aug 4, 2024
312bc6e
Apply Retry pattern for K8s endpoint integration
raman-m Aug 5, 2024
6367cff
Fix IDE warnings and messages
raman-m Aug 5, 2024
c4c7ae0
Follow suggestions and fix issues from code review by @ggnaegi
raman-m Aug 6, 2024
c42302c
Bump KubeClient from 2.4.10 to 2.5.8
raman-m Aug 6, 2024
e1fe81f
Fix warnings
raman-m Aug 6, 2024
0305278
Final version of `Retry` pattern
raman-m Aug 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 24 additions & 16 deletions src/Ocelot.Provider.Kubernetes/Kube.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
using KubeClient.Models;
using Ocelot.Infrastructure.DesignPatterns;
using Ocelot.Logging;
using Ocelot.Provider.Kubernetes.Interfaces;
using Ocelot.Values;

namespace Ocelot.Provider.Kubernetes;

/// <summary>
/// Default Kubernetes service discovery provider.
/// </summary>
/// <summary>Default Kubernetes service discovery provider.</summary>
/// <remarks>
/// <list type="bullet">
/// <item>NuGet: <see href="https://www.nuget.org/packages/KubeClient">KubeClient</see></item>
/// <item>GitHub: <see href="https://github.com/tintoy/dotnet-kube-client">dotnet-kube-client</see></item>
/// </list>
/// </remarks>
public class Kube : IServiceDiscoveryProvider
{
private readonly KubeRegistryConfiguration _configuration;
private readonly IOcelotLogger _logger;
private readonly IKubeApiClient _kubeApi;
private readonly IKubeServiceBuilder _serviceBuilder;
private readonly List<Service> _services;

public Kube(
KubeRegistryConfiguration configuration,
Expand All @@ -26,28 +30,32 @@ public Kube(
_logger = factory.CreateLogger<Kube>();
_kubeApi = kubeApi;
_serviceBuilder = serviceBuilder;
_services = new();
}

public virtual async Task<List<Service>> GetAsync()
{
var endpoint = await _kubeApi
.ResourceClient(client => new EndPointClientV1(client))
.GetAsync(_configuration.KeyOfServiceInK8s, _configuration.KubeNamespace);
var endpoint = await Retry.OperationAsync(GetEndpoint, CheckErroneousState, logger: _logger);

_services.Clear();
if (endpoint?.Subsets.Count != 0)
if (CheckErroneousState(endpoint))
{
_services.AddRange(BuildServices(_configuration, endpoint));
}
else
{
_logger.LogWarning(() => $"K8s Namespace:{_configuration.KubeNamespace}, Service:{_configuration.KeyOfServiceInK8s}; Unable to use: it is invalid. Address must contain host only e.g. localhost and port must be greater than 0!");
_logger.LogWarning(() => GetMessage($"Unable to use bad result returned by {nameof(Kube)} integration endpoint because the final result is invalid/unknown after multiple retries!"));
return new(0);
raman-m marked this conversation as resolved.
Show resolved Hide resolved
raman-m marked this conversation as resolved.
Show resolved Hide resolved
}

return _services;
return BuildServices(_configuration, endpoint)
.ToList();
antikorol marked this conversation as resolved.
Show resolved Hide resolved
}

private Task<EndpointsV1> GetEndpoint() => _kubeApi
.ResourceClient(client => new EndPointClientV1(client))
.GetAsync(_configuration.KeyOfServiceInK8s, _configuration.KubeNamespace);

private bool CheckErroneousState(EndpointsV1 endpoint)
=> (endpoint?.Subsets?.Count ?? 0) == 0; // null or count is zero

private string GetMessage(string message)
=> $"{nameof(Kube)} provider. Namespace:{_configuration.KubeNamespace}, Service:{_configuration.KeyOfServiceInK8s}; {message}";

protected virtual IEnumerable<Service> BuildServices(KubeRegistryConfiguration configuration, EndpointsV1 endpoint)
=> _serviceBuilder.BuildServices(configuration, endpoint);
}
8 changes: 4 additions & 4 deletions src/Ocelot.Provider.Kubernetes/KubeServiceCreator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ namespace Ocelot.Provider.Kubernetes;

public class KubeServiceCreator : IKubeServiceCreator
{
private readonly IOcelotLogger _logger;

public KubeServiceCreator(IOcelotLoggerFactory factory)
{
ArgumentNullException.ThrowIfNull(factory);
_logger = factory.CreateLogger<KubeServiceCreator>();
Logger = factory.CreateLogger<KubeServiceCreator>();
}

public virtual IEnumerable<Service> Create(KubeRegistryConfiguration configuration, EndpointsV1 endpoint, EndpointSubsetV1 subset)
Expand All @@ -34,6 +32,8 @@ public virtual IEnumerable<Service> CreateInstance(KubeRegistryConfiguration con
return new Service[] { instance };
}

protected IOcelotLogger Logger { get; }

protected virtual string GetServiceName(KubeRegistryConfiguration configuration, EndpointsV1 endpoint, EndpointSubsetV1 subset, EndpointAddressV1 address)
=> endpoint.Metadata?.Name;

Expand All @@ -46,7 +46,7 @@ protected virtual ServiceHostAndPort GetServiceHostAndPort(KubeRegistryConfigura
: ports.FirstOrDefault(portNameToScheme);
portV1 ??= new();
portV1.Name ??= configuration.Scheme ?? string.Empty;
_logger.LogDebug(() => $"K8s service with key '{configuration.KeyOfServiceInK8s}' and address {address.Ip}; Detected port is {portV1.Name}:{portV1.Port}. Total {ports.Count} ports of [{string.Join(',', ports.Select(p => p.Name))}].");
Logger.LogDebug(() => $"K8s service with key '{configuration.KeyOfServiceInK8s}' and address {address.Ip}; Detected port is {portV1.Name}:{portV1.Port}. Total {ports.Count} ports of [{string.Join(',', ports.Select(p => p.Name))}].");
return new ServiceHostAndPort(address.Ip, portV1.Port, portV1.Name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
<Compile Remove="KubeApiClientFactory.cs" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="KubeClient" Version="2.4.10" />
<PackageReference Include="KubeClient.Extensions.DependencyInjection" Version="2.4.10" />
<PackageReference Include="KubeClient" Version="2.5.8" />
<PackageReference Include="KubeClient.Extensions.DependencyInjection" Version="2.5.8" />
raman-m marked this conversation as resolved.
Show resolved Hide resolved
<PackageReference Include="StyleCop.Analyzers" Version="1.2.0-beta.507">
<PrivateAssets>all</PrivateAssets>
</PackageReference>
Expand Down
115 changes: 115 additions & 0 deletions src/Ocelot/Infrastructure/DesignPatterns/Retry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
using Ocelot.Logging;

namespace Ocelot.Infrastructure.DesignPatterns;

/// <summary>
/// Basic <seealso href="https://www.bing.com/search?q=Retry+pattern">Retry pattern</seealso> for stabilizing integrated services.
/// </summary>
/// <remarks>Docs:
/// <list type="bullet">
/// <item><see href="https://learn.microsoft.com/en-us/azure/architecture/patterns/retry">Microsoft Learn | Retry pattern</see></item>
/// </list>
/// </remarks>
public static class Retry
{
public const int DefaultRetryTimes = 3;
public const int DefaultWaitTimeMilliseconds = 25;

private static string GetMessage<T>(T operation, int retryNo, string message)
where T : Delegate
=> $"Ocelot {nameof(Retry)} strategy for the operation of '{operation.GetType()}' type -> {nameof(Retry)} No {retryNo}: {message}";

/// <summary>
/// Retry a synchronous operation when an exception occurs or predicate is true, then delay and retry again.
/// </summary>
/// <typeparam name="TResult">Type of the result of the sync operation.</typeparam>
/// <param name="operation">Required Func-delegate of the operation.</param>
/// <param name="predicate">Predicate to check, optionally.</param>
/// <param name="retryTimes">Number of retries.</param>
/// <param name="waitTime">Waiting time in milliseconds.</param>
/// <param name="logger">Concrete logger from upper context.</param>
/// <returns>A <typeparamref name="TResult"/> value as the result of the sync operation.</returns>
public static TResult Operation<TResult>(
Func<TResult> operation,
Predicate<TResult> predicate = null,
int retryTimes = DefaultRetryTimes, int waitTime = DefaultWaitTimeMilliseconds,
IOcelotLogger logger = null)
{
for (int n = 1; n < retryTimes; n++)
{
TResult result;
try
{
result = operation.Invoke();
}
catch (Exception e)
{
logger?.LogError(() => GetMessage(operation, n, $"Caught exception of the {e.GetType()} type -> Message: {e.Message}."), e);
Thread.Sleep(waitTime);
continue; // the result is unknown, so continue to retry
}

// Apply predicate for known result
if (predicate?.Invoke(result) == true)
{
logger?.LogWarning(() => GetMessage(operation, n, $"The predicate has identified erroneous state in the returned result. For further details, implement logging of the result's value or properties within the predicate method."));
Thread.Sleep(waitTime);
continue; // on erroneous state
}

// Happy path
return result;
}

// Last retry should generate native exception or other erroneous state(s)
logger?.LogDebug(() => GetMessage(operation, retryTimes, $"Retrying lastly..."));
return operation.Invoke(); // also final result must be analyzed in the upper context
}

/// <summary>
/// Retry an asynchronous operation when an exception occurs or predicate is true, then delay and retry again.
/// </summary>
/// <typeparam name="TResult">Type of the result of the async operation.</typeparam>
/// <param name="operation">Required Func-delegate of the operation.</param>
/// <param name="predicate">Predicate to check, optionally.</param>
/// <param name="retryTimes">Number of retries.</param>
/// <param name="waitTime">Waiting time in milliseconds.</param>
/// <param name="logger">Concrete logger from upper context.</param>
/// <returns>A <typeparamref name="TResult"/> value as the result of the async operation.</returns>
public static async Task<TResult> OperationAsync<TResult>(
Func<Task<TResult>> operation, // required operation delegate
Predicate<TResult> predicate = null, // optional retry predicate for the result
int retryTimes = DefaultRetryTimes, int waitTime = DefaultWaitTimeMilliseconds, // retrying options
IOcelotLogger logger = null) // static injections
{
for (int n = 1; n < retryTimes; n++)
{
TResult result;
try
{
result = await operation?.Invoke();
}
catch (Exception e)
{
logger?.LogError(() => GetMessage(operation, n, $"Caught exception of the {e.GetType()} type -> Message: {e.Message}."), e);
await Task.Delay(waitTime);
continue; // the result is unknown, so continue to retry
}

// Apply predicate for known result
if (predicate?.Invoke(result) == true)
{
logger?.LogWarning(() => GetMessage(operation, n, $"The predicate has identified erroneous state in the returned result. For further details, implement logging of the result's value or properties within the predicate method."));
await Task.Delay(waitTime);
continue; // on erroneous state
}

// Happy path
return result;
}

// Last retry should generate native exception or other erroneous state(s)
logger?.LogDebug(() => GetMessage(operation, retryTimes, $"Retrying lastly..."));
return await operation?.Invoke(); // also final result must be analyzed in the upper context
}
}
116 changes: 57 additions & 59 deletions src/Ocelot/LoadBalancer/LoadBalancers/CookieStickySessions.cs
Original file line number Diff line number Diff line change
@@ -1,85 +1,83 @@
using Microsoft.AspNetCore.Http;
using Ocelot.Infrastructure;
using Ocelot.Middleware;
using Ocelot.Responses;
using Ocelot.Values;

namespace Ocelot.LoadBalancer.LoadBalancers
namespace Ocelot.LoadBalancer.LoadBalancers;

public class CookieStickySessions : ILoadBalancer
{
public class CookieStickySessions : ILoadBalancer
private readonly int _keyExpiryInMs;
private readonly string _cookieName;
private readonly ILoadBalancer _loadBalancer;
private readonly IBus<StickySession> _bus;

private static readonly object Locker = new();
private static readonly Dictionary<string, StickySession> Stored = new(); // TODO Inject instead of static sharing

public CookieStickySessions(ILoadBalancer loadBalancer, string cookieName, int keyExpiryInMs, IBus<StickySession> bus)
{
private readonly int _keyExpiryInMs;
private readonly string _key;
private readonly ILoadBalancer _loadBalancer;
private readonly ConcurrentDictionary<string, StickySession> _stored;
private readonly IBus<StickySession> _bus;
private readonly object _lock = new();
_bus = bus;
_cookieName = cookieName;
_keyExpiryInMs = keyExpiryInMs;
_loadBalancer = loadBalancer;
_bus.Subscribe(CheckExpiry);
}

public CookieStickySessions(ILoadBalancer loadBalancer, string key, int keyExpiryInMs, IBus<StickySession> bus)
private void CheckExpiry(StickySession sticky)
raman-m marked this conversation as resolved.
Show resolved Hide resolved
{
// TODO Get test coverage for this
lock (Locker)
{
_bus = bus;
_key = key;
_keyExpiryInMs = keyExpiryInMs;
_loadBalancer = loadBalancer;
_stored = new ConcurrentDictionary<string, StickySession>();
_bus.Subscribe(ss =>
if (!Stored.TryGetValue(sticky.Key, out var session) || session.Expiry >= DateTime.UtcNow)
{
//todo - get test coverage for this.
if (_stored.TryGetValue(ss.Key, out var stickySession))
{
lock (_lock)
{
if (stickySession.Expiry < DateTime.UtcNow)
{
_stored.TryRemove(stickySession.Key, out _);
_loadBalancer.Release(stickySession.HostAndPort);
}
}
}
});
return;
}

Stored.Remove(session.Key);
_loadBalancer.Release(session.HostAndPort);
}
}

public async Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
public Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
{
var route = httpContext.Items.DownstreamRoute();
var serviceName = route.LoadBalancerKey;
var cookie = httpContext.Request.Cookies[_cookieName];
var key = $"{serviceName}:{cookie}"; // strong key name because of static store
lock (Locker)
{
var key = httpContext.Request.Cookies[_key];

lock (_lock)
if (!string.IsNullOrEmpty(key) && Stored.TryGetValue(key, out StickySession cached))
{
if (!string.IsNullOrEmpty(key) && _stored.ContainsKey(key))
{
var cached = _stored[key];

var updated = new StickySession(cached.HostAndPort, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs), key);

_stored[key] = updated;

_bus.Publish(updated, _keyExpiryInMs);

return new OkResponse<ServiceHostAndPort>(updated.HostAndPort);
}
var updated = new StickySession(cached.HostAndPort, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs), key);
Update(key, updated);
return Task.FromResult<Response<ServiceHostAndPort>>(new OkResponse<ServiceHostAndPort>(updated.HostAndPort));
}

var next = await _loadBalancer.Lease(httpContext);

// There is no value in the store, so lease it now!
var next = _loadBalancer.Lease(httpContext).GetAwaiter().GetResult(); // unfortunately the operation must be synchronous
if (next.IsError)
{
return new ErrorResponse<ServiceHostAndPort>(next.Errors);
}

lock (_lock)
{
if (!string.IsNullOrEmpty(key) && !_stored.ContainsKey(key))
{
var ss = new StickySession(next.Data, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs), key);
_stored[key] = ss;
_bus.Publish(ss, _keyExpiryInMs);
}
return Task.FromResult<Response<ServiceHostAndPort>>(new ErrorResponse<ServiceHostAndPort>(next.Errors));
}

return new OkResponse<ServiceHostAndPort>(next.Data);
var ss = new StickySession(next.Data, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs), key);
Update(key, ss);
return Task.FromResult<Response<ServiceHostAndPort>>(new OkResponse<ServiceHostAndPort>(next.Data));
}
}

public void Release(ServiceHostAndPort hostAndPort)
protected void Update(string key, StickySession value)
raman-m marked this conversation as resolved.
Show resolved Hide resolved
{
lock (Locker)
{
Stored[key] = value;
_bus.Publish(value, _keyExpiryInMs);
}
}

public void Release(ServiceHostAndPort hostAndPort)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ public class CookieStickySessionsCreator : ILoadBalancerCreator
{
public Response<ILoadBalancer> Create(DownstreamRoute route, IServiceDiscoveryProvider serviceProvider)
{
var loadBalancer = new RoundRobin(async () => await serviceProvider.GetAsync());
var options = route.LoadBalancerOptions;
var loadBalancer = new RoundRobin(serviceProvider.GetAsync, route.LoadBalancerKey);
var bus = new InMemoryBus<StickySession>();
return new OkResponse<ILoadBalancer>(new CookieStickySessions(loadBalancer, route.LoadBalancerOptions.Key,
route.LoadBalancerOptions.ExpiryInMs, bus));
return new OkResponse<ILoadBalancer>(
new CookieStickySessions(loadBalancer, options.Key, options.ExpiryInMs, bus));
}

public string Type => nameof(CookieStickySessions);
Expand Down
Loading