Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2168 WatchKube discovery provider #2174

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion docs/features/kubernetes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ The example here shows a typical configuration:
}
}

Service deployment in **Namespace** ``Dev``, **ServiceDiscoveryProvider** type is ``Kube``, you also can set :ref:`k8s-pollkube-provider` type.
Service deployment in **Namespace** ``Dev``, **ServiceDiscoveryProvider** type is ``Kube``, you also can set :ref:`k8s-pollkube-provider` or :ref:`k8s-watchkube-provider` type.

**Note 1**: ``Host``, ``Port`` and ``Token`` are no longer in use.

Expand Down Expand Up @@ -109,6 +109,22 @@ This really depends on how volatile your services are.
We doubt it will matter for most people and polling may give a tiny performance improvement over calling Kubernetes per request.
There is no way for Ocelot to work these out for you.

.. _k8s-watchkube-provider:

WatchKube provider
^^^^^^^^^^^^^^^^^^

This option utilizes Kubernetes API `watch requests <https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes>`_ for fetching service configuration.
Essentially it means that there will be one streamed http connection with kube-api per downstream service.
Changes streamed by this connection will be used for updating available endpoints list.

.. code-block:: json

"ServiceDiscoveryProvider": {
"Namespace": "dev",
"Type": "WatchKube"
}

Global vs Route Levels
----------------------

Expand Down
29 changes: 25 additions & 4 deletions src/Ocelot.Provider.Kubernetes/EndPointClientV1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,24 @@ namespace Ocelot.Provider.Kubernetes
{
public class EndPointClientV1 : KubeResourceClient, IEndPointClient
{
private readonly HttpRequest _collection;
private readonly HttpRequest _byName;
private readonly HttpRequest _watchByName;

public EndPointClientV1(IKubeApiClient client) : base(client)
{
_collection = KubeRequest.Create("api/v1/namespaces/{Namespace}/endpoints/{ServiceName}");
_byName = KubeRequest.Create("api/v1/namespaces/{Namespace}/endpoints/{ServiceName}");
_watchByName = KubeRequest.Create("api/v1/watch/namespaces/{Namespace}/endpoints/{ServiceName}");
}

public async Task<EndpointsV1> GetAsync(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default)
public async Task<EndpointsV1> GetAsync(string serviceName, string kubeNamespace = null,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(serviceName))
{
throw new ArgumentNullException(nameof(serviceName));
}

var request = _collection
var request = _byName
.WithTemplateParameters(new
{
Namespace = kubeNamespace ?? KubeClient.DefaultNamespace,
Expand All @@ -34,5 +37,23 @@ public async Task<EndpointsV1> GetAsync(string serviceName, string kubeNamespace
? await response.ReadContentAsAsync<EndpointsV1>()
: null;
}

public IObservable<IResourceEventV1<EndpointsV1>> Watch(string serviceName, string kubeNamespace,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(serviceName))
{
throw new ArgumentNullException(nameof(serviceName));
}

return ObserveEvents<EndpointsV1>(
_watchByName.WithTemplateParameters(new
{
ServiceName = serviceName,
Namespace = kubeNamespace ?? KubeClient.DefaultNamespace,
}),
"watch v1/Endpoints '" + serviceName + "' in namespace " +
(kubeNamespace ?? KubeClient.DefaultNamespace));
}
}
}
2 changes: 2 additions & 0 deletions src/Ocelot.Provider.Kubernetes/Interfaces/IEndPointClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ namespace Ocelot.Provider.Kubernetes.Interfaces;
public interface IEndPointClient : IKubeResourceClient
{
Task<EndpointsV1> GetAsync(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default);

IObservable<IResourceEventV1<EndpointsV1>> Watch(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default);
}
2 changes: 1 addition & 1 deletion src/Ocelot.Provider.Kubernetes/Kube.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public virtual async Task<List<Service>> GetAsync()
}

private Task<EndpointsV1> GetEndpoint() => _kubeApi
.ResourceClient<IEndPointClient>(client => new EndPointClientV1(client))
.EndpointsV1()
.GetAsync(_configuration.KeyOfServiceInK8s, _configuration.KubeNamespace);

private bool CheckErroneousState(EndpointsV1 endpoint)
Expand Down
9 changes: 9 additions & 0 deletions src/Ocelot.Provider.Kubernetes/KubeApiClientExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using Ocelot.Provider.Kubernetes.Interfaces;

namespace Ocelot.Provider.Kubernetes;

public static class KubeApiClientExtensions
{
public static IEndPointClient EndpointsV1(this IKubeApiClient client)
=> client.ResourceClient<IEndPointClient>(x => new EndPointClientV1(x));
}
30 changes: 19 additions & 11 deletions src/Ocelot.Provider.Kubernetes/KubernetesProviderFactory.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection;
using Ocelot.Configuration;
using Ocelot.Logging;
using Ocelot.Logging;
using Ocelot.Provider.Kubernetes.Interfaces;
using System.Reactive.Concurrency;

namespace Ocelot.Provider.Kubernetes
{
public static class KubernetesProviderFactory // TODO : IServiceDiscoveryProviderFactory
{
/// <summary>
/// String constant used for provider type definition.
{
/// <summary>
/// String constant used for provider type definition.
/// </summary>
public const string PollKube = nameof(Kubernetes.PollKube);

public static ServiceDiscoveryFinderDelegate Get { get; } = CreateProvider;

public const string WatchKube = nameof(Kubernetes.WatchKube);

public static ServiceDiscoveryFinderDelegate Get { get; } = CreateProvider;

private static IServiceDiscoveryProvider CreateProvider(IServiceProvider provider, ServiceProviderConfiguration config, DownstreamRoute route)
{
Expand All @@ -27,11 +30,16 @@ private static IServiceDiscoveryProvider CreateProvider(IServiceProvider provide
Scheme = route.DownstreamScheme,
};

if (WatchKube.Equals(config.Type, StringComparison.OrdinalIgnoreCase))
{
return new WatchKube(configuration, factory, kubeClient, serviceBuilder, Scheduler.Default);
}

var defaultK8sProvider = new Kube(configuration, factory, kubeClient, serviceBuilder);

return PollKube.Equals(config.Type, StringComparison.OrdinalIgnoreCase)
? new PollKube(config.PollingInterval, factory, defaultK8sProvider)
: defaultK8sProvider;
return PollKube.Equals(config.Type, StringComparison.OrdinalIgnoreCase)
? new PollKube(config.PollingInterval, factory, defaultK8sProvider)
: defaultK8sProvider;
}
}
}
27 changes: 27 additions & 0 deletions src/Ocelot.Provider.Kubernetes/ObservableExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System.Reactive.Concurrency;
using System.Reactive.Linq;

namespace Ocelot.Provider.Kubernetes
{
public static class ObservableExtensions
{
public static IObservable<TSource> RetryAfter<TSource>(this IObservable<TSource> source,
TimeSpan dueTime,
IScheduler scheduler)
{
return RepeatInfinite(source, dueTime, scheduler).Catch();
}

private static IEnumerable<IObservable<TSource>> RepeatInfinite<TSource>(IObservable<TSource> source,
TimeSpan dueTime,
IScheduler scheduler)
{
yield return source;

while (true)
{
yield return source.DelaySubscription(dueTime, scheduler);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,7 @@
<ItemGroup>
<PackageReference Update="Microsoft.SourceLink.GitHub" Version="8.0.0" />
</ItemGroup>
<ItemGroup>
<InternalsVisibleTo Include="Ocelot.UnitTests" />
</ItemGroup>
</Project>
88 changes: 88 additions & 0 deletions src/Ocelot.Provider.Kubernetes/WatchKube.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using KubeClient.Models;
using Ocelot.Logging;
using Ocelot.Provider.Kubernetes.Interfaces;
using Ocelot.Values;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

namespace Ocelot.Provider.Kubernetes;

public class WatchKube : IServiceDiscoveryProvider, IDisposable
{
internal const int FailedSubscriptionRetrySeconds = 5;
internal const int FirstResultsFetchingTimeoutSeconds = 3;

private readonly KubeRegistryConfiguration _configuration;
private readonly IOcelotLogger _logger;
private readonly IKubeApiClient _kubeApi;
private readonly IKubeServiceBuilder _serviceBuilder;
private readonly IScheduler _scheduler;

private readonly IDisposable _subscription;
private readonly TaskCompletionSource _firstResultsCompletionSource;

private List<Service> _services = new();

public WatchKube(
KubeRegistryConfiguration configuration,
IOcelotLoggerFactory factory,
IKubeApiClient kubeApi,
IKubeServiceBuilder serviceBuilder,
IScheduler scheduler)
{
_configuration = configuration;
_logger = factory.CreateLogger<WatchKube>();
_kubeApi = kubeApi;
_serviceBuilder = serviceBuilder;
_scheduler = scheduler;

_firstResultsCompletionSource = new TaskCompletionSource();
SetFirstResultsCompletedAfterDelay();
_subscription = CreateSubscription();
}

public virtual async Task<List<Service>> GetAsync()
{
// wait for first results fetching
await _firstResultsCompletionSource.Task;

if (_services is not { Count: > 0 })
{
_logger.LogWarning(() => GetMessage("Subscription to service endpoints gave no results!"));
}

return _services;
}

private void SetFirstResultsCompletedAfterDelay() => Observable
.Timer(TimeSpan.FromSeconds(FirstResultsFetchingTimeoutSeconds), _scheduler)
.Subscribe(_ => _firstResultsCompletionSource.TrySetResult());

private IDisposable CreateSubscription() =>
_kubeApi
.EndpointsV1()
.Watch(_configuration.KeyOfServiceInK8s, _configuration.KubeNamespace)
.Do(_ => { }, ex => _logger.LogError(() => GetMessage("Endpoints subscription error occured."), ex))
.RetryAfter(TimeSpan.FromSeconds(FailedSubscriptionRetrySeconds), _scheduler)
.Subscribe(
onNext: endpointEvent =>
{
_services = endpointEvent.EventType switch
{
ResourceEventType.Deleted or ResourceEventType.Error => new(),
_ when (endpointEvent.Resource?.Subsets?.Count ?? 0) == 0 => new(),
_ => _serviceBuilder.BuildServices(_configuration, endpointEvent.Resource).ToList(),
};
_firstResultsCompletionSource.TrySetResult();
},
onCompleted: () =>
{
// called only when subscription canceled in Dispose
_logger.LogInformation(() => GetMessage("Subscription to service endpoints completed"));
});

private string GetMessage(string message)
=> $"{nameof(WatchKube)} provider. Namespace:{_configuration.KubeNamespace}, Service:{_configuration.KeyOfServiceInK8s}; {message}";

public void Dispose() => _subscription.Dispose();
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ public override void Dispose()
base.Dispose();
}

[Fact]
public void ShouldReturnServicesFromK8s()
[Theory]
[InlineData(nameof(Kube))]
[InlineData(nameof(WatchKube))]
public void ShouldReturnServicesFromK8s(string discoveryType)
{
const string namespaces = nameof(KubernetesServiceDiscoveryTests);
const string serviceName = nameof(ShouldReturnServicesFromK8s);
Expand All @@ -55,7 +57,7 @@ public void ShouldReturnServicesFromK8s()
var subsetV1 = GivenSubsetAddress(downstream);
var endpoints = GivenEndpoints(subsetV1);
var route = GivenRouteWithServiceName(namespaces);
var configuration = GivenKubeConfiguration(namespaces, route);
var configuration = GivenKubeConfiguration(namespaces, discoveryType, route);
var downstreamResponse = serviceName;
this.Given(x => GivenServiceInstanceIsRunning(downstreamUrl, downstreamResponse))
.And(x => x.GivenThereIsAFakeKubernetesProvider(endpoints, serviceName, namespaces))
Expand Down Expand Up @@ -95,7 +97,7 @@ public void ShouldReturnServicesByPortNameAsDownstreamScheme(string downstreamSc
route.DownstreamScheme = downstreamScheme; // !!! Warning !!! Select port by name as scheme
route.UpstreamPathTemplate = "/api/example/{url}";
route.ServiceName = serviceName; // "example-web"
var configuration = GivenKubeConfiguration(namespaces, route);
var configuration = GivenKubeConfiguration(namespaces, nameof(Kube), route);

this.Given(x => GivenServiceInstanceIsRunning(downstreamUrl, nameof(ShouldReturnServicesByPortNameAsDownstreamScheme)))
.And(x => x.GivenThereIsAFakeKubernetesProvider(endpoints, serviceName, namespaces))
Expand Down Expand Up @@ -173,7 +175,7 @@ public void ShouldHighlyLoadOnUnstableKubeProvider_WithRoundRobinLoadBalancing(i
downstreams.ForEach(ds => GivenSubsetAddress(ds, subset));
var endpoints = GivenEndpoints(subset, serviceName); // totalServices service instances with different ports
var route = GivenRouteWithServiceName(namespaces, serviceName, nameof(RoundRobinAnalyzer)); // !!!
var configuration = GivenKubeConfiguration(namespaces, route);
var configuration = GivenKubeConfiguration(namespaces, nameof(Kube), route);
GivenMultipleServiceInstancesAreRunning(downstreamUrls, downstreamResponses);
GivenThereIsAConfiguration(configuration);
GivenOcelotIsRunningWithServices(WithKubernetesAndRoundRobin);
Expand Down Expand Up @@ -245,7 +247,7 @@ private FileRoute GivenRouteWithServiceName(string serviceNamespace,
LoadBalancerOptions = new() { Type = loadBalancerType },
};

private FileConfiguration GivenKubeConfiguration(string serviceNamespace, params FileRoute[] routes)
private FileConfiguration GivenKubeConfiguration(string serviceNamespace, string type, params FileRoute[] routes)
{
var u = new Uri(_kubernetesUrl);
var configuration = GivenConfiguration(routes);
Expand All @@ -254,7 +256,7 @@ private FileConfiguration GivenKubeConfiguration(string serviceNamespace, params
Scheme = u.Scheme,
Host = u.Host,
Port = u.Port,
Type = nameof(Kube),
Type = type,
PollingInterval = 0,
Namespace = serviceNamespace,
};
Expand Down Expand Up @@ -305,15 +307,34 @@ private void GivenThereIsAFakeKubernetesProvider(EndpointsV1 endpoints, bool isS
context.Response.Headers.Append("Content-Type", "application/json");
await context.Response.WriteAsync(json);
}

if (context.Request.Path.Value == $"/api/v1/watch/namespaces/{namespaces}/endpoints/{serviceName}")
{
var json = JsonConvert.SerializeObject(new ResourceEventV1<EndpointsV1>()
{
EventType = ResourceEventType.Added,
Resource = endpoints,
});

if (context.Request.Headers.TryGetValue("Authorization", out var values))
{
_receivedToken = values.First();
}

context.Response.StatusCode = 200;
context.Response.Headers.Append("Content-Type", "application/json");

await using var sw = new StreamWriter(context.Response.Body);
await sw.WriteLineAsync(json);
await sw.FlushAsync();

// keeping open connection like kube api will slow down tests
}
});
}

private static ServiceDescriptor GetValidateScopesDescriptor()
=> ServiceDescriptor.Singleton<IServiceProviderFactory<IServiceCollection>>(
new DefaultServiceProviderFactory(new() { ValidateScopes = true }));
private IOcelotBuilder AddKubernetes(IServiceCollection services) => services
.Configure(_kubeClientOptionsConfigure)
.Replace(GetValidateScopesDescriptor())
.AddOcelot().AddKubernetes(false);

private void WithKubernetes(IServiceCollection services) => AddKubernetes(services);
Expand Down
Loading