Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions docs/docs/operator/caching.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
---
title: Caching
description: Caching - Memory and Distributed
sidebar_position: 7
---

## ResourceWatcher

The `ResourceWatcher` uses a cache instance to store the `.metadata.generation` value of each observed resource.
The key for the cache entry is the resource's unique ID (`metadata.uid`).

The primary purpose of the cache is to skip reconciliation cycles for events that do not represent an actual change to a resource's specification (`.spec`).

1. **`MODIFIED` Event Type**:
- Kubernetes only increments the `.metadata.generation` value of a resource when its specification (`.spec`) changes.
Updates to status fields (`.status`), while also triggering a `MODIFIED` event, do not increase the `generation`.
- When a `MODIFIED` event arrives, the `ResourceWatcher` compares the `generation` of the incoming resource with the value stored in the cache.
- If the new `generation` is not greater than the cached one, the reconciliation is skipped.
This is a critical optimization, as status updates can occur very frequently (e.g., from other controllers) and typically do not require action from your operator.
- Only when the `generation` has increased is the resource forwarded for reconciliation, and the new `generation` value is stored in the cache.

2. **`ADDED` Event Type**:
- On an `ADDED` event, the watcher checks if the resource is already present in the cache.
- This prevents resources that the operator already knows about (e.g., after a watcher restart) from being incorrectly treated as "new" and reconciled again.

3. **`DELETED` Event Type**:
- When a resource is deleted, the watcher removes the corresponding entry from the cache to keep the memory clean.

### Default Configuration: In-Memory (L1) Cache

By default, and without any extra configuration, `KubeOps` uses a simple in-memory cache.

- **Advantages**:
- Requires zero configuration.
- Very fast, as all data is held in the operator pod's memory.

- **Disadvantages**:
- The cache is volatile. If the pod restarts, all stored `generation` values are lost, leading to a full reconciliation of all observed resources.

### Advanced Configuration: Distributed (L2) Cache

For robust use in production or HA environments, it could be essential to extend cache with a distributed L2 cache and a backplane.
This ensures that all operator instances share a consistent state.
A common setup for this involves using [**Redis**](https://github.com/redis/redis).

### FusionCache

KubeOps utilizes [`FusionCache`](https://github.com/ZiggyCreatures/FusionCache/blob/main/docs/AGentleIntroduction.md) for seamless support of an L1/L2 cache.
Via `OperatorSettings.ConfigureResourceWatcherEntityCache`, an `Action` is provided that allows extending the standard configuration or
overwriting it with a customized version.

Here is an example of what a customized configuration with an L2 cache could look like:

```csharp
builder
.Services
.AddKubernetesOperator(settings =>
{
settings.Name = OperatorName;
settings.ConfigureResourceWatcherEntityCache =
cacheBuilder =>
cacheBuilder
.WithCacheKeyPrefix($"{CacheConstants.CacheNames.ResourceWatcher}:")
.WithSerializer(_ => new FusionCacheSystemTextJsonSerializer())
.WithRegisteredDistributedCache()
.WithDefaultEntryOptions(options =>
options.Duration = TimeSpan.MaxValue);
})
```

For an overview of all of FusionCache's features, we refer you to the corresponding documentation:

https://github.com/ZiggyCreatures/FusionCache/blob/main/docs/CacheLevels.md
2 changes: 1 addition & 1 deletion docs/docs/operator/deployment.mdx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
title: Deployment
description: Deploying your KubeOps Operator
sidebar_position: 7
sidebar_position: 8
---

# Deployment
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/operator/testing/_category_.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"position": 8,
"position": 9,
"label": "Testing",
"collapsible": true,
"collapsed": true
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/operator/utilities.mdx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
title: Utilities
description: Utilities for your Operator and Development
sidebar_position: 9
sidebar_position: 10
---

# Development and Operator Utilities
Expand Down
16 changes: 14 additions & 2 deletions src/KubeOps.Abstractions/Builder/OperatorSettings.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
using System.Text.RegularExpressions;

using ZiggyCreatures.Caching.Fusion;

namespace KubeOps.Abstractions.Builder;

/// <summary>
/// Operator settings.
/// </summary>
public sealed class OperatorSettings
public sealed partial class OperatorSettings
{
private const string DefaultOperatorName = "KubernetesOperator";
private const string NonCharReplacement = "-";
Expand All @@ -15,7 +17,7 @@ public sealed class OperatorSettings
/// Defaults to "kubernetesoperator" when not set.
/// </summary>
public string Name { get; set; } =
new Regex(@"(\W|_)", RegexOptions.CultureInvariant).Replace(
OperatorNameRegex().Replace(
DefaultOperatorName,
NonCharReplacement)
.ToLowerInvariant();
Expand Down Expand Up @@ -59,4 +61,14 @@ public sealed class OperatorSettings
/// The wait timeout if the lease cannot be acquired.
/// </summary>
public TimeSpan LeaderElectionRetryPeriod { get; set; } = TimeSpan.FromSeconds(2);

/// <summary>
/// Allows configuration of the FusionCache settings for resource watcher entity caching.
/// This property is optional and can be used to customize caching behavior for resource watcher entities.
/// If not set, a default cache configuration is applied.
/// </summary>
public Action<IFusionCacheBuilder>? ConfigureResourceWatcherEntityCache { get; set; }

[GeneratedRegex(@"(\W|_)", RegexOptions.CultureInvariant)]
private static partial Regex OperatorNameRegex();
}
1 change: 1 addition & 0 deletions src/KubeOps.Abstractions/KubeOps.Abstractions.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<ItemGroup>
<PackageReference Include="KubernetesClient" Version="16.0.7" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="9.0.6"/>
<PackageReference Include="ZiggyCreatures.FusionCache" Version="2.3.0" />
</ItemGroup>

</Project>
46 changes: 46 additions & 0 deletions src/KubeOps.Operator/Builder/CacheExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using KubeOps.Abstractions.Builder;
using KubeOps.Operator.Constants;

using Microsoft.Extensions.DependencyInjection;

using ZiggyCreatures.Caching.Fusion;

namespace KubeOps.Operator.Builder;

/// <summary>
/// Provides extension methods for configuring caching related to the operator.
/// </summary>
internal static class CacheExtensions
{
/// <summary>
/// Configures resource watcher caching for the given service collection.
/// Adds a FusionCache instance for resource watchers and applies custom or default cache configuration.
/// </summary>
/// <param name="services">The service collection to add the resource watcher caching to.</param>
/// <param name="settings">
/// The operator settings that optionally provide a custom configuration for the resource watcher entity cache.
/// </param>
/// <returns>The modified service collection with resource watcher caching configured.</returns>
internal static IServiceCollection WithResourceWatcherEntityCaching(this IServiceCollection services, OperatorSettings settings)
{
var cacheBuilder = services
.AddFusionCache(CacheConstants.CacheNames.ResourceWatcher);

if (settings.ConfigureResourceWatcherEntityCache != default)
{
settings.ConfigureResourceWatcherEntityCache(cacheBuilder);
}
else
{
cacheBuilder
.WithOptions(options =>
{
options.CacheKeyPrefix = $"{CacheConstants.CacheNames.ResourceWatcher}:";
options.DefaultEntryOptions
.SetDuration(TimeSpan.MaxValue);
});
}

return services;
}
}
3 changes: 3 additions & 0 deletions src/KubeOps.Operator/Builder/OperatorBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ private void AddOperatorBase()
Services.AddSingleton(_settings);
Services.AddSingleton(new ActivitySource(_settings.Name));

// add and configure resource watcher entity cache
Services.WithResourceWatcherEntityCaching(_settings);

// Add the default configuration and the client separately. This allows external users to override either
// just the config (e.g. for integration tests) or to replace the whole client, e.g. with a mock.
// We also add the k8s.IKubernetes as a singleton service, in order to allow to access internal services
Expand Down
19 changes: 19 additions & 0 deletions src/KubeOps.Operator/Constants/CacheConstants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace KubeOps.Operator.Constants;

/// <summary>
/// Provides constant values used for caching purposes within the operator.
/// </summary>
public static class CacheConstants
{
/// <summary>
/// Contains constant values representing names used within the operator's caching mechanisms.
/// </summary>
public static class CacheNames
{
/// <summary>
/// Represents a constant string used as a name for the resource watcher
/// in the operator's caching mechanisms.
/// </summary>
public const string ResourceWatcher = "ResourceWatcher";
}
}
1 change: 1 addition & 0 deletions src/KubeOps.Operator/KubeOps.Operator.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="9.0.6"/>
<PackageReference Include="ZiggyCreatures.FusionCache" Version="2.3.0" />
</ItemGroup>

<ItemGroup>
Expand Down
24 changes: 24 additions & 0 deletions src/KubeOps.Operator/Logging/EntityLoggingScope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
namespace KubeOps.Operator.Logging;

#pragma warning disable CA1710
/// <summary>
/// A logging scope that encapsulates contextual information related to a Kubernetes entity and event type.
/// Provides a mechanism for structured logging with key-value pairs corresponding to entity metadata and event type.
/// </summary>
internal sealed record EntityLoggingScope : IReadOnlyCollection<KeyValuePair<string, object>>
#pragma warning restore CA1710
{
Expand All @@ -20,6 +24,22 @@ private EntityLoggingScope(IReadOnlyDictionary<string, object> state)

private IReadOnlyDictionary<string, object> Values { get; }

/// <summary>
/// Creates a new instance of <see cref="EntityLoggingScope"/> for the provided Kubernetes entity and event type.
/// </summary>
/// <typeparam name="TEntity">
/// The type of the Kubernetes entity. Must implement <see cref="IKubernetesObject{V1ObjectMeta}"/>.
/// </typeparam>
/// <param name="eventType">
/// The type of the watch event for the entity (e.g., Added, Modified, Deleted, or Bookmark).
/// </param>
/// <param name="entity">
/// The Kubernetes entity associated with the logging scope. This includes metadata such as Kind, Namespace, Name, UID, and ResourceVersion.
/// </param>
/// <returns>
/// A new <see cref="EntityLoggingScope"/> instance containing contextual key-value pairs
/// related to the event type and the provided Kubernetes entity.
/// </returns>
public static EntityLoggingScope CreateFor<TEntity>(WatchEventType eventType, TEntity entity)
where TEntity : IKubernetesObject<V1ObjectMeta>
=> new(
Expand All @@ -29,15 +49,19 @@ public static EntityLoggingScope CreateFor<TEntity>(WatchEventType eventType, TE
{ nameof(entity.Kind), entity.Kind },
{ "Namespace", entity.Namespace() },
{ "Name", entity.Name() },
{ "Uid", entity.Uid() },
{ "ResourceVersion", entity.ResourceVersion() },
});

/// <inheritdoc />
public IEnumerator<KeyValuePair<string, object>> GetEnumerator()
=> Values.GetEnumerator();

/// <inheritdoc />
public override string ToString()
=> CachedFormattedString ??= $"{{ {string.Join(", ", Values.Select(kvp => $"{kvp.Key} = {kvp.Value}"))} }}";

/// <inheritdoc />
IEnumerator IEnumerable.GetEnumerator()
=> GetEnumerator();
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

using ZiggyCreatures.Caching.Fusion;

namespace KubeOps.Operator.Watcher;

internal sealed class LeaderAwareResourceWatcher<TEntity>(
Expand All @@ -21,6 +23,7 @@ internal sealed class LeaderAwareResourceWatcher<TEntity>(
TimedEntityQueue<TEntity> queue,
OperatorSettings settings,
IEntityLabelSelector<TEntity> labelSelector,
IFusionCacheProvider cacheProvider,
IKubernetesClient client,
IHostApplicationLifetime hostApplicationLifetime,
LeaderElector elector)
Expand All @@ -31,6 +34,7 @@ internal sealed class LeaderAwareResourceWatcher<TEntity>(
queue,
settings,
labelSelector,
cacheProvider,
client)
where TEntity : IKubernetesObject<V1ObjectMeta>
{
Expand Down
24 changes: 16 additions & 8 deletions src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Net;
using System.Runtime.Serialization;
Expand All @@ -12,13 +11,16 @@
using KubeOps.Abstractions.Entities;
using KubeOps.Abstractions.Finalizer;
using KubeOps.KubernetesClient;
using KubeOps.Operator.Constants;
using KubeOps.Operator.Logging;
using KubeOps.Operator.Queue;

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

using ZiggyCreatures.Caching.Fusion;

namespace KubeOps.Operator.Watcher;

public class ResourceWatcher<TEntity>(
Expand All @@ -28,12 +30,12 @@ public class ResourceWatcher<TEntity>(
TimedEntityQueue<TEntity> requeue,
OperatorSettings settings,
IEntityLabelSelector<TEntity> labelSelector,
IFusionCacheProvider cacheProvider,
IKubernetesClient client)
: IHostedService, IAsyncDisposable, IDisposable
where TEntity : IKubernetesObject<V1ObjectMeta>
{
private readonly ConcurrentDictionary<string, long> _entityCache = new();

private readonly IFusionCache _entityCache = cacheProvider.GetCache(CacheConstants.CacheNames.ResourceWatcher);
private CancellationTokenSource _cancellationTokenSource = new();
private uint _watcherReconnectRetries;
private Task? _eventWatcher;
Expand Down Expand Up @@ -132,12 +134,17 @@ static async ValueTask CastAndDispose(IDisposable resource)

protected virtual async Task OnEventAsync(WatchEventType type, TEntity entity, CancellationToken cancellationToken)
{
MaybeValue<long?> cachedGeneration;

switch (type)
{
case WatchEventType.Added:
if (_entityCache.TryAdd(entity.Uid(), entity.Generation() ?? 0))
cachedGeneration = await _entityCache.TryGetAsync<long?>(entity.Uid(), token: cancellationToken);

if (!cachedGeneration.HasValue)
{
// Only perform reconciliation if the entity was not already in the cache.
await _entityCache.SetAsync(entity.Uid(), entity.Generation() ?? 0, token: cancellationToken);
await ReconcileModificationAsync(entity, cancellationToken);
}
else
Expand All @@ -153,10 +160,10 @@ protected virtual async Task OnEventAsync(WatchEventType type, TEntity entity, C
switch (entity)
{
case { Metadata.DeletionTimestamp: null }:
_entityCache.TryGetValue(entity.Uid(), out var cachedGeneration);
cachedGeneration = await _entityCache.TryGetAsync<long?>(entity.Uid(), token: cancellationToken);

// Check if entity spec has changed through "Generation" value increment. Skip reconcile if not changed.
if (entity.Generation() <= cachedGeneration)
if (cachedGeneration.HasValue && cachedGeneration >= entity.Generation())
{
logger.LogDebug(
"""Entity "{Kind}/{Name}" modification did not modify generation. Skip event.""",
Expand All @@ -166,8 +173,9 @@ protected virtual async Task OnEventAsync(WatchEventType type, TEntity entity, C
}

// update cached generation since generation now changed
_entityCache.TryUpdate(entity.Uid(), entity.Generation() ?? 1, cachedGeneration);
await _entityCache.SetAsync(entity.Uid(), entity.Generation() ?? 1, token: cancellationToken);
await ReconcileModificationAsync(entity, cancellationToken);

break;
case { Metadata: { DeletionTimestamp: not null, Finalizers.Count: > 0 } }:
await ReconcileFinalizersSequentialAsync(entity, cancellationToken);
Expand Down Expand Up @@ -311,7 +319,7 @@ e.InnerException is EndOfStreamException &&
private async Task ReconcileDeletionAsync(TEntity entity, CancellationToken cancellationToken)
{
requeue.Remove(entity);
_entityCache.TryRemove(entity.Uid(), out _);
await _entityCache.RemoveAsync(entity.Uid(), token: cancellationToken);

await using var scope = provider.CreateAsyncScope();
var controller = scope.ServiceProvider.GetRequiredService<IEntityController<TEntity>>();
Expand Down
Loading