From 5e00fa3fd04d6b26526dba6b95ecab6c09f18a79 Mon Sep 17 00:00:00 2001 From: Marcus Kimpenhaus Date: Fri, 27 Jun 2025 13:56:28 +0200 Subject: [PATCH 1/7] feat: add FusionCache for resource watcher to enable L1 and L2 caching (hybrid cache) - Integrated FusionCache for robust caching in resource watchers. - Enhanced default configuration with extensible settings in `OperatorSettings`. - Improved concurrency handling using `SemaphoreSlim` for entity events. - Updated tests and dependencies to reflect caching changes. --- .../Builder/OperatorSettings.cs | 16 +++- .../KubeOps.Abstractions.csproj | 1 + .../Builder/OperatorBuilder.cs | 15 ++++ .../Constants/CacheConstants.cs | 19 +++++ src/KubeOps.Operator/KubeOps.Operator.csproj | 1 + .../LeaderAwareResourceWatcher{TEntity}.cs | 4 + .../Watcher/ResourceWatcher{TEntity}.cs | 73 +++++++++++++------ .../Watcher/ResourceWatcher{TEntity}.Test.cs | 19 ++++- 8 files changed, 124 insertions(+), 24 deletions(-) create mode 100644 src/KubeOps.Operator/Constants/CacheConstants.cs diff --git a/src/KubeOps.Abstractions/Builder/OperatorSettings.cs b/src/KubeOps.Abstractions/Builder/OperatorSettings.cs index 2faed930..d7609d6a 100644 --- a/src/KubeOps.Abstractions/Builder/OperatorSettings.cs +++ b/src/KubeOps.Abstractions/Builder/OperatorSettings.cs @@ -1,11 +1,13 @@ using System.Text.RegularExpressions; +using ZiggyCreatures.Caching.Fusion; + namespace KubeOps.Abstractions.Builder; /// /// Operator settings. /// -public sealed class OperatorSettings +public sealed partial class OperatorSettings { private const string DefaultOperatorName = "KubernetesOperator"; private const string NonCharReplacement = "-"; @@ -15,7 +17,7 @@ public sealed class OperatorSettings /// Defaults to "kubernetesoperator" when not set. /// public string Name { get; set; } = - new Regex(@"(\W|_)", RegexOptions.CultureInvariant).Replace( + OperatorNameRegex().Replace( DefaultOperatorName, NonCharReplacement) .ToLowerInvariant(); @@ -59,4 +61,14 @@ public sealed class OperatorSettings /// The wait timeout if the lease cannot be acquired. /// public TimeSpan LeaderElectionRetryPeriod { get; set; } = TimeSpan.FromSeconds(2); + + /// + /// Allows configuration of the FusionCache settings for resource watcher entity caching. + /// This property is optional and can be used to customize caching behavior for resource watcher entities. + /// If not set, a default cache configuration is applied. + /// + public Action? ConfigureResourceWatcherEntityCache { get; set; } + + [GeneratedRegex(@"(\W|_)", RegexOptions.CultureInvariant)] + private static partial Regex OperatorNameRegex(); } diff --git a/src/KubeOps.Abstractions/KubeOps.Abstractions.csproj b/src/KubeOps.Abstractions/KubeOps.Abstractions.csproj index c88647c5..93e54942 100644 --- a/src/KubeOps.Abstractions/KubeOps.Abstractions.csproj +++ b/src/KubeOps.Abstractions/KubeOps.Abstractions.csproj @@ -16,6 +16,7 @@ + diff --git a/src/KubeOps.Operator/Builder/OperatorBuilder.cs b/src/KubeOps.Operator/Builder/OperatorBuilder.cs index f1dc4381..69349836 100644 --- a/src/KubeOps.Operator/Builder/OperatorBuilder.cs +++ b/src/KubeOps.Operator/Builder/OperatorBuilder.cs @@ -21,6 +21,8 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; +using ZiggyCreatures.Caching.Fusion; + namespace KubeOps.Operator.Builder; internal sealed class OperatorBuilder : IOperatorBuilder @@ -36,6 +38,13 @@ public OperatorBuilder(IServiceCollection services, OperatorSettings settings) public IServiceCollection Services { get; } + private static Action DefaultCacheConfiguration + => options => + { + options.DefaultEntryOptions + .SetDuration(Timeout.InfiniteTimeSpan); + }; + public IOperatorBuilder AddController() where TImplementation : class, IEntityController where TEntity : IKubernetesObject @@ -111,6 +120,12 @@ private void AddOperatorBase() Services.AddSingleton(_settings); Services.AddSingleton(new ActivitySource(_settings.Name)); + // add and configure resource watcher entity cache + Services + .AddFusionCache(CacheConstants.CacheNames.ResourceWatcher) + .WithOptions( + options => (_settings.ConfigureResourceWatcherEntityCache ?? DefaultCacheConfiguration).Invoke(options)); + // Add the default configuration and the client separately. This allows external users to override either // just the config (e.g. for integration tests) or to replace the whole client, e.g. with a mock. // We also add the k8s.IKubernetes as a singleton service, in order to allow to access internal services diff --git a/src/KubeOps.Operator/Constants/CacheConstants.cs b/src/KubeOps.Operator/Constants/CacheConstants.cs new file mode 100644 index 00000000..1b6faf1e --- /dev/null +++ b/src/KubeOps.Operator/Constants/CacheConstants.cs @@ -0,0 +1,19 @@ +namespace KubeOps.Operator.Constants; + +/// +/// Provides constant values used for caching purposes within the operator. +/// +public static class CacheConstants +{ + /// + /// Contains constant values representing names used within the operator's caching mechanisms. + /// + public static class CacheNames + { + /// + /// Represents a constant string used as a name for the resource watcher + /// in the operator's caching mechanisms. + /// + public const string ResourceWatcher = "ResourceWatcher"; + } +} diff --git a/src/KubeOps.Operator/KubeOps.Operator.csproj b/src/KubeOps.Operator/KubeOps.Operator.csproj index 5361c6e6..a120b9ba 100644 --- a/src/KubeOps.Operator/KubeOps.Operator.csproj +++ b/src/KubeOps.Operator/KubeOps.Operator.csproj @@ -17,6 +17,7 @@ + diff --git a/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs b/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs index add231f7..8fce8c01 100644 --- a/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs +++ b/src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs @@ -12,6 +12,8 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using ZiggyCreatures.Caching.Fusion; + namespace KubeOps.Operator.Watcher; internal sealed class LeaderAwareResourceWatcher( @@ -21,6 +23,7 @@ internal sealed class LeaderAwareResourceWatcher( TimedEntityQueue queue, OperatorSettings settings, IEntityLabelSelector labelSelector, + IFusionCacheProvider cacheProvider, IKubernetesClient client, IHostApplicationLifetime hostApplicationLifetime, LeaderElector elector) @@ -31,6 +34,7 @@ internal sealed class LeaderAwareResourceWatcher( queue, settings, labelSelector, + cacheProvider, client) where TEntity : IKubernetesObject { diff --git a/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs b/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs index 8d74ba6b..d22c3fc5 100644 --- a/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs +++ b/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs @@ -12,6 +12,7 @@ using KubeOps.Abstractions.Entities; using KubeOps.Abstractions.Finalizer; using KubeOps.KubernetesClient; +using KubeOps.Operator.Constants; using KubeOps.Operator.Logging; using KubeOps.Operator.Queue; @@ -19,6 +20,8 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using ZiggyCreatures.Caching.Fusion; + namespace KubeOps.Operator.Watcher; public class ResourceWatcher( @@ -28,12 +31,14 @@ public class ResourceWatcher( TimedEntityQueue requeue, OperatorSettings settings, IEntityLabelSelector labelSelector, + IFusionCacheProvider cacheProvider, IKubernetesClient client) : IHostedService, IAsyncDisposable, IDisposable where TEntity : IKubernetesObject { - private readonly ConcurrentDictionary _entityCache = new(); + private readonly ConcurrentDictionary _entityLocks = new(); + private readonly IFusionCache _entityCache = cacheProvider.GetCache(CacheConstants.CacheNames.ResourceWatcher); private CancellationTokenSource _cancellationTokenSource = new(); private uint _watcherReconnectRetries; private Task? _eventWatcher; @@ -132,20 +137,35 @@ static async ValueTask CastAndDispose(IDisposable resource) protected virtual async Task OnEventAsync(WatchEventType type, TEntity entity, CancellationToken cancellationToken) { + SemaphoreSlim? semaphore; + switch (type) { case WatchEventType.Added: - if (_entityCache.TryAdd(entity.Uid(), entity.Generation() ?? 0)) + semaphore = _entityLocks.GetOrAdd(entity.Uid(), _ => new(1, 1)); + await semaphore.WaitAsync(cancellationToken); + + try { - // Only perform reconciliation if the entity was not already in the cache. - await ReconcileModificationAsync(entity, cancellationToken); + var cachedGeneration = await _entityCache.TryGetAsync(entity.Uid(), token: cancellationToken); + + if (!cachedGeneration.HasValue) + { + // Only perform reconciliation if the entity was not already in the cache. + await _entityCache.SetAsync(entity.Uid(), entity.Generation() ?? 0, token: cancellationToken); + await ReconcileModificationAsync(entity, cancellationToken); + } + else + { + logger.LogDebug( + """Received ADDED event for entity "{Kind}/{Name}" which was already in the cache. Skip event.""", + entity.Kind, + entity.Name()); + } } - else + finally { - logger.LogDebug( - """Received ADDED event for entity "{Kind}/{Name}" which was already in the cache. Skip event.""", - entity.Kind, - entity.Name()); + semaphore.Release(); } break; @@ -153,21 +173,32 @@ protected virtual async Task OnEventAsync(WatchEventType type, TEntity entity, C switch (entity) { case { Metadata.DeletionTimestamp: null }: - _entityCache.TryGetValue(entity.Uid(), out var cachedGeneration); + semaphore = _entityLocks.GetOrAdd(entity.Uid(), _ => new(1, 1)); + await semaphore.WaitAsync(cancellationToken); - // Check if entity spec has changed through "Generation" value increment. Skip reconcile if not changed. - if (entity.Generation() <= cachedGeneration) + try { - logger.LogDebug( - """Entity "{Kind}/{Name}" modification did not modify generation. Skip event.""", - entity.Kind, - entity.Name()); - return; + var cachedGeneration = await _entityCache.TryGetAsync(entity.Uid(), token: cancellationToken); + + // Check if entity spec has changed through "Generation" value increment. Skip reconcile if not changed. + if (cachedGeneration.HasValue && cachedGeneration >= entity.Generation()) + { + logger.LogDebug( + """Entity "{Kind}/{Name}" modification did not modify generation. Skip event.""", + entity.Kind, + entity.Name()); + return; + } + + // update cached generation since generation now changed + await _entityCache.SetAsync(entity.Uid(), entity.Generation() ?? 1, token: cancellationToken); + await ReconcileModificationAsync(entity, cancellationToken); + } + finally + { + semaphore.Release(); } - // update cached generation since generation now changed - _entityCache.TryUpdate(entity.Uid(), entity.Generation() ?? 1, cachedGeneration); - await ReconcileModificationAsync(entity, cancellationToken); break; case { Metadata: { DeletionTimestamp: not null, Finalizers.Count: > 0 } }: await ReconcileFinalizersSequentialAsync(entity, cancellationToken); @@ -311,7 +342,7 @@ e.InnerException is EndOfStreamException && private async Task ReconcileDeletionAsync(TEntity entity, CancellationToken cancellationToken) { requeue.Remove(entity); - _entityCache.TryRemove(entity.Uid(), out _); + await _entityCache.RemoveAsync(entity.Uid(), token: cancellationToken); await using var scope = provider.CreateAsyncScope(); var controller = scope.ServiceProvider.GetRequiredService>(); diff --git a/test/KubeOps.Operator.Test/Watcher/ResourceWatcher{TEntity}.Test.cs b/test/KubeOps.Operator.Test/Watcher/ResourceWatcher{TEntity}.Test.cs index 9b23e331..10a8f65c 100644 --- a/test/KubeOps.Operator.Test/Watcher/ResourceWatcher{TEntity}.Test.cs +++ b/test/KubeOps.Operator.Test/Watcher/ResourceWatcher{TEntity}.Test.cs @@ -7,6 +7,7 @@ using KubeOps.Abstractions.Builder; using KubeOps.Abstractions.Entities; using KubeOps.KubernetesClient; +using KubeOps.Operator.Constants; using KubeOps.Operator.Queue; using KubeOps.Operator.Watcher; @@ -14,6 +15,8 @@ using Moq; +using ZiggyCreatures.Caching.Fusion; + namespace KubeOps.Operator.Test.Watcher; public sealed class ResourceWatcherTest @@ -28,13 +31,27 @@ public async Task Restarting_Watcher_Should_Trigger_New_Watch() var timedEntityQueue = new TimedEntityQueue(); var operatorSettings = new OperatorSettings { Namespace = "unit-test" }; var kubernetesClient = Mock.Of(); + var cache = Mock.Of(); + var cacheProvider = Mock.Of(); var labelSelector = new DefaultEntityLabelSelector(); Mock.Get(kubernetesClient) .Setup(client => client.WatchAsync("unit-test", null, null, true, It.IsAny())) .Returns((_, _, _, _, cancellationToken) => WaitForCancellationAsync<(WatchEventType, V1Pod)>(cancellationToken)); - var resourceWatcher = new ResourceWatcher(activitySource, logger, serviceProvider, timedEntityQueue, operatorSettings, labelSelector, kubernetesClient); + Mock.Get(cacheProvider) + .Setup(cp => cp.GetCache(It.Is(s => s == CacheConstants.CacheNames.ResourceWatcher))) + .Returns(() => cache); + + var resourceWatcher = new ResourceWatcher( + activitySource, + logger, + serviceProvider, + timedEntityQueue, + operatorSettings, + labelSelector, + cacheProvider, + kubernetesClient); // Act. // Start and stop the watcher. From 6d8def683f5511e997f1e4bd691d57e35d6bdad7 Mon Sep 17 00:00:00 2001 From: Marcus Kimpenhaus Date: Mon, 30 Jun 2025 12:33:29 +0200 Subject: [PATCH 2/7] refactor: optimize resource watcher cache handling and remove redundant entity locks - Renamed `DefaultCacheConfiguration` to `DefaultResourceWatcherCacheConfiguration` for clarity. - Introduced cache key prefix to improve cache segmentation. - Removed `ConcurrentDictionary` for entity locks to simplify concurrency management. - Refactored event handling logic for "added" and "modified" events to streamline codebase. --- .../Builder/OperatorBuilder.cs | 7 +- .../Watcher/ResourceWatcher{TEntity}.cs | 69 +++++++------------ 2 files changed, 27 insertions(+), 49 deletions(-) diff --git a/src/KubeOps.Operator/Builder/OperatorBuilder.cs b/src/KubeOps.Operator/Builder/OperatorBuilder.cs index 69349836..0c59d4e3 100644 --- a/src/KubeOps.Operator/Builder/OperatorBuilder.cs +++ b/src/KubeOps.Operator/Builder/OperatorBuilder.cs @@ -38,11 +38,12 @@ public OperatorBuilder(IServiceCollection services, OperatorSettings settings) public IServiceCollection Services { get; } - private static Action DefaultCacheConfiguration + private static Action DefaultResourceWatcherCacheConfiguration => options => { + options.CacheKeyPrefix = "rw-"; options.DefaultEntryOptions - .SetDuration(Timeout.InfiniteTimeSpan); + .SetDuration(TimeSpan.MaxValue); }; public IOperatorBuilder AddController() @@ -124,7 +125,7 @@ private void AddOperatorBase() Services .AddFusionCache(CacheConstants.CacheNames.ResourceWatcher) .WithOptions( - options => (_settings.ConfigureResourceWatcherEntityCache ?? DefaultCacheConfiguration).Invoke(options)); + options => (_settings.ConfigureResourceWatcherEntityCache ?? DefaultResourceWatcherCacheConfiguration).Invoke(options)); // Add the default configuration and the client separately. This allows external users to override either // just the config (e.g. for integration tests) or to replace the whole client, e.g. with a mock. diff --git a/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs b/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs index d22c3fc5..cd73ba6d 100644 --- a/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs +++ b/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs @@ -1,4 +1,3 @@ -using System.Collections.Concurrent; using System.Diagnostics; using System.Net; using System.Runtime.Serialization; @@ -36,8 +35,6 @@ public class ResourceWatcher( : IHostedService, IAsyncDisposable, IDisposable where TEntity : IKubernetesObject { - private readonly ConcurrentDictionary _entityLocks = new(); - private readonly IFusionCache _entityCache = cacheProvider.GetCache(CacheConstants.CacheNames.ResourceWatcher); private CancellationTokenSource _cancellationTokenSource = new(); private uint _watcherReconnectRetries; @@ -137,35 +134,25 @@ static async ValueTask CastAndDispose(IDisposable resource) protected virtual async Task OnEventAsync(WatchEventType type, TEntity entity, CancellationToken cancellationToken) { - SemaphoreSlim? semaphore; + MaybeValue cachedGeneration; switch (type) { case WatchEventType.Added: - semaphore = _entityLocks.GetOrAdd(entity.Uid(), _ => new(1, 1)); - await semaphore.WaitAsync(cancellationToken); + cachedGeneration = await _entityCache.TryGetAsync(entity.Uid(), token: cancellationToken); - try + if (!cachedGeneration.HasValue) { - var cachedGeneration = await _entityCache.TryGetAsync(entity.Uid(), token: cancellationToken); - - if (!cachedGeneration.HasValue) - { - // Only perform reconciliation if the entity was not already in the cache. - await _entityCache.SetAsync(entity.Uid(), entity.Generation() ?? 0, token: cancellationToken); - await ReconcileModificationAsync(entity, cancellationToken); - } - else - { - logger.LogDebug( - """Received ADDED event for entity "{Kind}/{Name}" which was already in the cache. Skip event.""", - entity.Kind, - entity.Name()); - } + // Only perform reconciliation if the entity was not already in the cache. + await _entityCache.SetAsync(entity.Uid(), entity.Generation() ?? 0, token: cancellationToken); + await ReconcileModificationAsync(entity, cancellationToken); } - finally + else { - semaphore.Release(); + logger.LogDebug( + """Received ADDED event for entity "{Kind}/{Name}" which was already in the cache. Skip event.""", + entity.Kind, + entity.Name()); } break; @@ -173,32 +160,22 @@ protected virtual async Task OnEventAsync(WatchEventType type, TEntity entity, C switch (entity) { case { Metadata.DeletionTimestamp: null }: - semaphore = _entityLocks.GetOrAdd(entity.Uid(), _ => new(1, 1)); - await semaphore.WaitAsync(cancellationToken); + cachedGeneration = await _entityCache.TryGetAsync(entity.Uid(), token: cancellationToken); - try + // Check if entity spec has changed through "Generation" value increment. Skip reconcile if not changed. + if (cachedGeneration.HasValue && cachedGeneration >= entity.Generation()) { - var cachedGeneration = await _entityCache.TryGetAsync(entity.Uid(), token: cancellationToken); - - // Check if entity spec has changed through "Generation" value increment. Skip reconcile if not changed. - if (cachedGeneration.HasValue && cachedGeneration >= entity.Generation()) - { - logger.LogDebug( - """Entity "{Kind}/{Name}" modification did not modify generation. Skip event.""", - entity.Kind, - entity.Name()); - return; - } - - // update cached generation since generation now changed - await _entityCache.SetAsync(entity.Uid(), entity.Generation() ?? 1, token: cancellationToken); - await ReconcileModificationAsync(entity, cancellationToken); - } - finally - { - semaphore.Release(); + logger.LogDebug( + """Entity "{Kind}/{Name}" modification did not modify generation. Skip event.""", + entity.Kind, + entity.Name()); + return; } + // update cached generation since generation now changed + await _entityCache.SetAsync(entity.Uid(), entity.Generation() ?? 1, token: cancellationToken); + await ReconcileModificationAsync(entity, cancellationToken); + break; case { Metadata: { DeletionTimestamp: not null, Finalizers.Count: > 0 } }: await ReconcileFinalizersSequentialAsync(entity, cancellationToken); From 3d2e60781edce713657a7d1d51d5fb16b04aa019 Mon Sep 17 00:00:00 2001 From: Marcus Kimpenhaus Date: Tue, 1 Jul 2025 08:18:17 +0200 Subject: [PATCH 3/7] refactor: enhance resource watcher cache configuration and logging scope - Updated `ConfigureResourceWatcherEntityCache` to use `IFusionCacheBuilder` for extensibility. - Moved resource watcher cache setup logic to `WithResourceWatcherCaching` extension. - Added detailed XML comments for `EntityLoggingScope` to improve documentation. - Removed redundant `DefaultResourceWatcherCacheConfiguration`. --- .../Builder/OperatorSettings.cs | 2 +- .../Builder/CacheExtensions.cs | 46 +++++++++++++++++++ .../Builder/OperatorBuilder.cs | 13 +----- .../Logging/EntityLoggingScope.cs | 24 ++++++++++ 4 files changed, 72 insertions(+), 13 deletions(-) create mode 100644 src/KubeOps.Operator/Builder/CacheExtensions.cs diff --git a/src/KubeOps.Abstractions/Builder/OperatorSettings.cs b/src/KubeOps.Abstractions/Builder/OperatorSettings.cs index d7609d6a..fa8902aa 100644 --- a/src/KubeOps.Abstractions/Builder/OperatorSettings.cs +++ b/src/KubeOps.Abstractions/Builder/OperatorSettings.cs @@ -67,7 +67,7 @@ public sealed partial class OperatorSettings /// This property is optional and can be used to customize caching behavior for resource watcher entities. /// If not set, a default cache configuration is applied. /// - public Action? ConfigureResourceWatcherEntityCache { get; set; } + public Action? ConfigureResourceWatcherEntityCache { get; set; } [GeneratedRegex(@"(\W|_)", RegexOptions.CultureInvariant)] private static partial Regex OperatorNameRegex(); diff --git a/src/KubeOps.Operator/Builder/CacheExtensions.cs b/src/KubeOps.Operator/Builder/CacheExtensions.cs new file mode 100644 index 00000000..33e0ede9 --- /dev/null +++ b/src/KubeOps.Operator/Builder/CacheExtensions.cs @@ -0,0 +1,46 @@ +using KubeOps.Abstractions.Builder; +using KubeOps.Operator.Constants; + +using Microsoft.Extensions.DependencyInjection; + +using ZiggyCreatures.Caching.Fusion; + +namespace KubeOps.Operator.Builder; + +/// +/// Provides extension methods for configuring caching related to the operator. +/// +public static class CacheExtensions +{ + /// + /// Configures resource watcher caching for the given service collection. + /// Adds a FusionCache instance for resource watchers and applies custom or default cache configuration. + /// + /// The service collection to add the resource watcher caching to. + /// + /// The operator settings that optionally provide a custom configuration for the resource watcher entity cache. + /// + /// The modified service collection with resource watcher caching configured. + public static IServiceCollection WithResourceWatcherCaching(this IServiceCollection services, OperatorSettings settings) + { + var cacheBuilder = services + .AddFusionCache(CacheConstants.CacheNames.ResourceWatcher); + + if (settings.ConfigureResourceWatcherEntityCache != default) + { + settings.ConfigureResourceWatcherEntityCache(cacheBuilder); + } + else + { + cacheBuilder + .WithOptions(options => + { + options.CacheKeyPrefix = "rw-"; + options.DefaultEntryOptions + .SetDuration(TimeSpan.MaxValue); + }); + } + + return services; + } +} diff --git a/src/KubeOps.Operator/Builder/OperatorBuilder.cs b/src/KubeOps.Operator/Builder/OperatorBuilder.cs index 0c59d4e3..a46b46a5 100644 --- a/src/KubeOps.Operator/Builder/OperatorBuilder.cs +++ b/src/KubeOps.Operator/Builder/OperatorBuilder.cs @@ -38,14 +38,6 @@ public OperatorBuilder(IServiceCollection services, OperatorSettings settings) public IServiceCollection Services { get; } - private static Action DefaultResourceWatcherCacheConfiguration - => options => - { - options.CacheKeyPrefix = "rw-"; - options.DefaultEntryOptions - .SetDuration(TimeSpan.MaxValue); - }; - public IOperatorBuilder AddController() where TImplementation : class, IEntityController where TEntity : IKubernetesObject @@ -122,10 +114,7 @@ private void AddOperatorBase() Services.AddSingleton(new ActivitySource(_settings.Name)); // add and configure resource watcher entity cache - Services - .AddFusionCache(CacheConstants.CacheNames.ResourceWatcher) - .WithOptions( - options => (_settings.ConfigureResourceWatcherEntityCache ?? DefaultResourceWatcherCacheConfiguration).Invoke(options)); + Services.WithResourceWatcherCaching(_settings); // Add the default configuration and the client separately. This allows external users to override either // just the config (e.g. for integration tests) or to replace the whole client, e.g. with a mock. diff --git a/src/KubeOps.Operator/Logging/EntityLoggingScope.cs b/src/KubeOps.Operator/Logging/EntityLoggingScope.cs index 3a254ce1..19622cbe 100644 --- a/src/KubeOps.Operator/Logging/EntityLoggingScope.cs +++ b/src/KubeOps.Operator/Logging/EntityLoggingScope.cs @@ -6,6 +6,10 @@ namespace KubeOps.Operator.Logging; #pragma warning disable CA1710 +/// +/// A logging scope that encapsulates contextual information related to a Kubernetes entity and event type. +/// Provides a mechanism for structured logging with key-value pairs corresponding to entity metadata and event type. +/// internal sealed record EntityLoggingScope : IReadOnlyCollection> #pragma warning restore CA1710 { @@ -20,6 +24,22 @@ private EntityLoggingScope(IReadOnlyDictionary state) private IReadOnlyDictionary Values { get; } + /// + /// Creates a new instance of for the provided Kubernetes entity and event type. + /// + /// + /// The type of the Kubernetes entity. Must implement . + /// + /// + /// The type of the watch event for the entity (e.g., Added, Modified, Deleted, or Bookmark). + /// + /// + /// The Kubernetes entity associated with the logging scope. This includes metadata such as Kind, Namespace, Name, UID, and ResourceVersion. + /// + /// + /// A new instance containing contextual key-value pairs + /// related to the event type and the provided Kubernetes entity. + /// public static EntityLoggingScope CreateFor(WatchEventType eventType, TEntity entity) where TEntity : IKubernetesObject => new( @@ -29,15 +49,19 @@ public static EntityLoggingScope CreateFor(WatchEventType eventType, TE { nameof(entity.Kind), entity.Kind }, { "Namespace", entity.Namespace() }, { "Name", entity.Name() }, + { "Uid", entity.Uid() }, { "ResourceVersion", entity.ResourceVersion() }, }); + /// public IEnumerator> GetEnumerator() => Values.GetEnumerator(); + /// public override string ToString() => CachedFormattedString ??= $"{{ {string.Join(", ", Values.Select(kvp => $"{kvp.Key} = {kvp.Value}"))} }}"; + /// IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); } From f1840483caeb74952f10831641b3a885c6b4a324 Mon Sep 17 00:00:00 2001 From: Marcus Kimpenhaus Date: Tue, 1 Jul 2025 08:38:32 +0200 Subject: [PATCH 4/7] refactor: rename cache extension methods and adjust visibility - Renamed `WithResourceWatcherCaching` to `WithResourceWatcherEntityCaching` for clarity. - Updated `CacheExtensions` to be `internal` to limit scope. - Removed unused dependency on `ZiggyCreatures.Caching.Fusion`. --- src/KubeOps.Operator/Builder/CacheExtensions.cs | 4 ++-- src/KubeOps.Operator/Builder/OperatorBuilder.cs | 4 +--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/KubeOps.Operator/Builder/CacheExtensions.cs b/src/KubeOps.Operator/Builder/CacheExtensions.cs index 33e0ede9..40367fe1 100644 --- a/src/KubeOps.Operator/Builder/CacheExtensions.cs +++ b/src/KubeOps.Operator/Builder/CacheExtensions.cs @@ -10,7 +10,7 @@ namespace KubeOps.Operator.Builder; /// /// Provides extension methods for configuring caching related to the operator. /// -public static class CacheExtensions +internal static class CacheExtensions { /// /// Configures resource watcher caching for the given service collection. @@ -21,7 +21,7 @@ public static class CacheExtensions /// The operator settings that optionally provide a custom configuration for the resource watcher entity cache. /// /// The modified service collection with resource watcher caching configured. - public static IServiceCollection WithResourceWatcherCaching(this IServiceCollection services, OperatorSettings settings) + internal static IServiceCollection WithResourceWatcherEntityCaching(this IServiceCollection services, OperatorSettings settings) { var cacheBuilder = services .AddFusionCache(CacheConstants.CacheNames.ResourceWatcher); diff --git a/src/KubeOps.Operator/Builder/OperatorBuilder.cs b/src/KubeOps.Operator/Builder/OperatorBuilder.cs index a46b46a5..7c9fc676 100644 --- a/src/KubeOps.Operator/Builder/OperatorBuilder.cs +++ b/src/KubeOps.Operator/Builder/OperatorBuilder.cs @@ -21,8 +21,6 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; -using ZiggyCreatures.Caching.Fusion; - namespace KubeOps.Operator.Builder; internal sealed class OperatorBuilder : IOperatorBuilder @@ -114,7 +112,7 @@ private void AddOperatorBase() Services.AddSingleton(new ActivitySource(_settings.Name)); // add and configure resource watcher entity cache - Services.WithResourceWatcherCaching(_settings); + Services.WithResourceWatcherEntityCaching(_settings); // Add the default configuration and the client separately. This allows external users to override either // just the config (e.g. for integration tests) or to replace the whole client, e.g. with a mock. From 5e4de87ab249f11fc4770cbe7b21775929645566 Mon Sep 17 00:00:00 2001 From: Marcus Kimpenhaus Date: Mon, 7 Jul 2025 14:11:31 +0200 Subject: [PATCH 5/7] refactor: update cache key prefix in `CacheExtensions` to use `CacheConstants` for consistency --- src/KubeOps.Operator/Builder/CacheExtensions.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/KubeOps.Operator/Builder/CacheExtensions.cs b/src/KubeOps.Operator/Builder/CacheExtensions.cs index 40367fe1..a75a26b9 100644 --- a/src/KubeOps.Operator/Builder/CacheExtensions.cs +++ b/src/KubeOps.Operator/Builder/CacheExtensions.cs @@ -35,7 +35,7 @@ internal static IServiceCollection WithResourceWatcherEntityCaching(this IServic cacheBuilder .WithOptions(options => { - options.CacheKeyPrefix = "rw-"; + options.CacheKeyPrefix = $"{CacheConstants.CacheNames.ResourceWatcher}:"; options.DefaultEntryOptions .SetDuration(TimeSpan.MaxValue); }); From 8ce68a84eeba50607c54004a96ffa606115b7642 Mon Sep 17 00:00:00 2001 From: Marcus Kimpenhaus Date: Mon, 7 Jul 2025 15:48:49 +0200 Subject: [PATCH 6/7] docs: add caching documentation and adjust sidebar positions - Added a new `Caching` documentation page explaining resource watcher caching with FusionCache and configuration options (in-memory and distributed). - Updated sidebar positions for `Deployment`, `Utilities`, and `Testing` to accommodate the new `Caching` page. --- docs/docs/operator/caching.mdx | 50 ++++++++++++++++++++++ docs/docs/operator/deployment.mdx | 2 +- docs/docs/operator/testing/_category_.json | 2 +- docs/docs/operator/utilities.mdx | 2 +- 4 files changed, 53 insertions(+), 3 deletions(-) create mode 100644 docs/docs/operator/caching.mdx diff --git a/docs/docs/operator/caching.mdx b/docs/docs/operator/caching.mdx new file mode 100644 index 00000000..4fb5b239 --- /dev/null +++ b/docs/docs/operator/caching.mdx @@ -0,0 +1,50 @@ +--- +title: Caching +description: Caching - Memory and Distributed +sidebar_position: 7 +--- + +## ResourceWatcher + +The `ResourceWatcher` uses an instance of `IFusionCache` to store the `.metadata.generation` value of each observed resource. The key for the cache entry is the resource's unique ID (`metadata.uid`). +The primary purpose of the cache is to skip reconciliation cycles for events that do not represent an actual change to a resource's specification (`.spec`). + +1. **`MODIFIED` Event Type**: + - Kubernetes only increments the `.metadata.generation` value of a resource when its specification (`.spec`) changes. Updates to status fields (`.status`), while also triggering a `MODIFIED` event, do not increase the `generation`. + - When a `MODIFIED` event arrives, the `ResourceWatcher` compares the `generation` of the incoming resource with the value stored in `FusionCache`. + - If the new `generation` is not greater than the cached one, the reconciliation is skipped. This is a critical optimization, as status updates can occur very frequently (e.g., from other controllers) and typically do not require action from your operator. + - Only when the `generation` has increased is the resource forwarded for reconciliation, and the new `generation` value is stored in the cache. + +2. **`ADDED` Event Type**: + - On an `ADDED` event, the watcher checks if the resource is already present in the cache. + - This prevents resources that the operator already knows about (e.g., after a watcher restart) from being incorrectly treated as "new" and reconciled again. + +3. **`DELETED` Event Type**: + - When a resource is deleted, the watcher removes the corresponding entry from the cache to keep the memory clean. + +### Default Configuration: In-Memory (L1) Cache + +By default, and without any extra configuration, `KubeOps` uses a simple in-memory cache for `FusionCache`. + +- **Advantages**: + - Requires zero configuration. + - Very fast, as all data is held in the operator pod's memory. + +- **Disadvantages**: + - The cache is volatile. If the pod restarts, all stored `generation` values are lost, leading to a full reconciliation of all observed resources. + +### Advanced Configuration: Distributed (L2) Cache + +For robust use in production or HA environments, it is essential to extend `FusionCache` with a distributed L2 cache and a backplane. This ensures that all operator instances share a consistent state. +A common setup for this involves using **Redis**. + +**Steps to Configure a Distributed Cache with Redis:** + +1. **Add the necessary NuGet packages to your project:** + - `ZiggyCreatures.FusionCache.Serialization.SystemTextJson` (or another serializer) + - `ZiggyCreatures.FusionCache.Backplane.StackExchangeRedis` + - `Microsoft.Extensions.Caching.StackExchangeRedis` + +2. **Configure the services in your `Program.cs`:** +You need to configure `FusionCache` with a serializer, the distributed cache (L2), and a backplane. The backplane (e.g., via Redis Pub/Sub) ensures that cache invalidations (like a `SET` or `REMOVE`) are immediately propagated to all other operator pods, keeping their L1 caches in sync. + diff --git a/docs/docs/operator/deployment.mdx b/docs/docs/operator/deployment.mdx index 10267813..8dfd83cb 100644 --- a/docs/docs/operator/deployment.mdx +++ b/docs/docs/operator/deployment.mdx @@ -1,7 +1,7 @@ --- title: Deployment description: Deploying your KubeOps Operator -sidebar_position: 7 +sidebar_position: 8 --- # Deployment diff --git a/docs/docs/operator/testing/_category_.json b/docs/docs/operator/testing/_category_.json index 56f722ea..3ea28e1c 100644 --- a/docs/docs/operator/testing/_category_.json +++ b/docs/docs/operator/testing/_category_.json @@ -1,5 +1,5 @@ { - "position": 8, + "position": 9, "label": "Testing", "collapsible": true, "collapsed": true diff --git a/docs/docs/operator/utilities.mdx b/docs/docs/operator/utilities.mdx index ef2656db..3cdfa14e 100644 --- a/docs/docs/operator/utilities.mdx +++ b/docs/docs/operator/utilities.mdx @@ -1,7 +1,7 @@ --- title: Utilities description: Utilities for your Operator and Development -sidebar_position: 9 +sidebar_position: 10 --- # Development and Operator Utilities From 2a6f4fc3ea7039d828d034dcbcef1788377ec53e Mon Sep 17 00:00:00 2001 From: Marcus Kimpenhaus Date: Mon, 7 Jul 2025 16:42:46 +0200 Subject: [PATCH 7/7] docs: enhance caching documentation with configuration examples and FusionCache details - Improved explanations for in-memory and distributed caching setups. - Added example code for customizing resource watcher cache with FusionCache. - Included references to FusionCache and Redis documentation for further guidance. --- docs/docs/operator/caching.mdx | 51 ++++++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 14 deletions(-) diff --git a/docs/docs/operator/caching.mdx b/docs/docs/operator/caching.mdx index 4fb5b239..ce90dec5 100644 --- a/docs/docs/operator/caching.mdx +++ b/docs/docs/operator/caching.mdx @@ -6,13 +6,17 @@ sidebar_position: 7 ## ResourceWatcher -The `ResourceWatcher` uses an instance of `IFusionCache` to store the `.metadata.generation` value of each observed resource. The key for the cache entry is the resource's unique ID (`metadata.uid`). +The `ResourceWatcher` uses a cache instance to store the `.metadata.generation` value of each observed resource. +The key for the cache entry is the resource's unique ID (`metadata.uid`). + The primary purpose of the cache is to skip reconciliation cycles for events that do not represent an actual change to a resource's specification (`.spec`). 1. **`MODIFIED` Event Type**: - - Kubernetes only increments the `.metadata.generation` value of a resource when its specification (`.spec`) changes. Updates to status fields (`.status`), while also triggering a `MODIFIED` event, do not increase the `generation`. - - When a `MODIFIED` event arrives, the `ResourceWatcher` compares the `generation` of the incoming resource with the value stored in `FusionCache`. - - If the new `generation` is not greater than the cached one, the reconciliation is skipped. This is a critical optimization, as status updates can occur very frequently (e.g., from other controllers) and typically do not require action from your operator. + - Kubernetes only increments the `.metadata.generation` value of a resource when its specification (`.spec`) changes. + Updates to status fields (`.status`), while also triggering a `MODIFIED` event, do not increase the `generation`. + - When a `MODIFIED` event arrives, the `ResourceWatcher` compares the `generation` of the incoming resource with the value stored in the cache. + - If the new `generation` is not greater than the cached one, the reconciliation is skipped. + This is a critical optimization, as status updates can occur very frequently (e.g., from other controllers) and typically do not require action from your operator. - Only when the `generation` has increased is the resource forwarded for reconciliation, and the new `generation` value is stored in the cache. 2. **`ADDED` Event Type**: @@ -24,7 +28,7 @@ The primary purpose of the cache is to skip reconciliation cycles for events tha ### Default Configuration: In-Memory (L1) Cache -By default, and without any extra configuration, `KubeOps` uses a simple in-memory cache for `FusionCache`. +By default, and without any extra configuration, `KubeOps` uses a simple in-memory cache. - **Advantages**: - Requires zero configuration. @@ -35,16 +39,35 @@ By default, and without any extra configuration, `KubeOps` uses a simple in-memo ### Advanced Configuration: Distributed (L2) Cache -For robust use in production or HA environments, it is essential to extend `FusionCache` with a distributed L2 cache and a backplane. This ensures that all operator instances share a consistent state. -A common setup for this involves using **Redis**. +For robust use in production or HA environments, it could be essential to extend cache with a distributed L2 cache and a backplane. +This ensures that all operator instances share a consistent state. +A common setup for this involves using [**Redis**](https://github.com/redis/redis). + +### FusionCache + +KubeOps utilizes [`FusionCache`](https://github.com/ZiggyCreatures/FusionCache/blob/main/docs/AGentleIntroduction.md) for seamless support of an L1/L2 cache. +Via `OperatorSettings.ConfigureResourceWatcherEntityCache`, an `Action` is provided that allows extending the standard configuration or +overwriting it with a customized version. -**Steps to Configure a Distributed Cache with Redis:** +Here is an example of what a customized configuration with an L2 cache could look like: -1. **Add the necessary NuGet packages to your project:** - - `ZiggyCreatures.FusionCache.Serialization.SystemTextJson` (or another serializer) - - `ZiggyCreatures.FusionCache.Backplane.StackExchangeRedis` - - `Microsoft.Extensions.Caching.StackExchangeRedis` +```csharp +builder + .Services + .AddKubernetesOperator(settings => + { + settings.Name = OperatorName; + settings.ConfigureResourceWatcherEntityCache = + cacheBuilder => + cacheBuilder + .WithCacheKeyPrefix($"{CacheConstants.CacheNames.ResourceWatcher}:") + .WithSerializer(_ => new FusionCacheSystemTextJsonSerializer()) + .WithRegisteredDistributedCache() + .WithDefaultEntryOptions(options => + options.Duration = TimeSpan.MaxValue); + }) +``` -2. **Configure the services in your `Program.cs`:** -You need to configure `FusionCache` with a serializer, the distributed cache (L2), and a backplane. The backplane (e.g., via Redis Pub/Sub) ensures that cache invalidations (like a `SET` or `REMOVE`) are immediately propagated to all other operator pods, keeping their L1 caches in sync. +For an overview of all of FusionCache's features, we refer you to the corresponding documentation: +https://github.com/ZiggyCreatures/FusionCache/blob/main/docs/CacheLevels.md \ No newline at end of file