diff --git a/src/K8sOperator.NET/EventWatcher.cs b/src/K8sOperator.NET/EventWatcher.cs index 8c00120..409f39b 100644 --- a/src/K8sOperator.NET/EventWatcher.cs +++ b/src/K8sOperator.NET/EventWatcher.cs @@ -53,11 +53,33 @@ public async Task Start(CancellationToken cancellationToken) _cancellationToken = cancellationToken; _isRunning = true; - var response = Client.ListAsync(LabelSelector, cancellationToken); + Logger.BeginWatch(Crd.PluralName, LabelSelector); - await foreach (var (type, item) in response.WatchAsync(OnError, cancellationToken)) + while (_isRunning && !_cancellationToken.IsCancellationRequested) { - OnEvent(type, item); + try + { + var response = Client.ListAsync(LabelSelector, cancellationToken); + + await foreach (var (type, item) in response.WatchAsync(OnError, cancellationToken)) + { + OnEvent(type, item); + } + } + catch (TaskCanceledException) + { + Logger.WatcherError("Task was canceled."); + } + catch (OperationCanceledException) + { + Logger.WatcherError("Operation was canceled restarting..."); + await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); + } + catch (HttpOperationException ex) + { + Logger.WatcherError($"Http Error: {ex.Response.Content}, restarting..."); + await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); + } } Logger.EndWatch(Crd.PluralName, LabelSelector); @@ -76,8 +98,7 @@ private void OnEvent(WatchEventType eventType, T customResource) var exception = t.Exception.Flatten().InnerException; Logger.ProcessEventError(exception, eventType, customResource); } - }) - ; + }); } private async Task ProccessEventAsync(WatchEventType eventType, T resource) @@ -233,7 +254,7 @@ private void OnError(Exception exception) { if (_isRunning) { - Logger.LogError(exception, "Watcher error"); + Logger.WatcherError(exception.Message); } } } diff --git a/src/K8sOperator.NET/Extensions/LoggingExtensions.cs b/src/K8sOperator.NET/Extensions/LoggingExtensions.cs index 5a5a529..dba0b27 100644 --- a/src/K8sOperator.NET/Extensions/LoggingExtensions.cs +++ b/src/K8sOperator.NET/Extensions/LoggingExtensions.cs @@ -36,9 +36,9 @@ internal static partial class LoggingExtensions [LoggerMessage( EventId = 4, Level = LogLevel.Information, - Message = "Begin watch {ns}/{plural} {labelselector}" + Message = "Begin watch {plural} {labelselector}" )] - public static partial void BeginWatch(this ILogger logger, string ns, string plural, string labelselector); + public static partial void BeginWatch(this ILogger logger, string plural, string labelselector); [LoggerMessage( EventId = 5, @@ -181,4 +181,17 @@ internal static partial class LoggingExtensions Message = "End Error {resource}")] public static partial void EndError(this ILogger logger, CustomResource resource); + [LoggerMessage( + EventId = 28, + Level = LogLevel.Information, + Message = "Watcher Error {message}")] + public static partial void WatcherError(this ILogger logger, string message); + + [LoggerMessage( + EventId = 29, + Level = LogLevel.Information, + Message = "ListAsync {ns}/{plural} {labelselector}" + )] + public static partial void ListAsync(this ILogger logger, string ns, string plural, string labelselector); + } diff --git a/src/K8sOperator.NET/KubernetesClient.cs b/src/K8sOperator.NET/KubernetesClient.cs index 2fd294a..3db5848 100644 --- a/src/K8sOperator.NET/KubernetesClient.cs +++ b/src/K8sOperator.NET/KubernetesClient.cs @@ -28,7 +28,7 @@ public Task> ListAsync(string labelSelector, Ca { var info = typeof(T).GetCustomAttribute()!; - Logger.BeginWatch(Namespace, info.PluralName, labelSelector); + Logger.ListAsync(Namespace, info.PluralName, labelSelector); var response = Client.CustomObjects.ListNamespacedCustomObjectWithHttpMessagesAsync( info.Group, @@ -98,7 +98,7 @@ public Task> ListAsync(string labelSelector, Ca { var info = typeof(T).GetCustomAttribute()!; - Logger.BeginWatch("cluster-wide", info.PluralName, labelSelector); + Logger.ListAsync("cluster-wide", info.PluralName, labelSelector); var response = Client.CustomObjects.ListClusterCustomObjectWithHttpMessagesAsync( info.Group, diff --git a/test/K8sOperator.NET.Tests/EventWatcherTests.cs b/test/K8sOperator.NET.Tests/EventWatcherTests.cs index bb1944f..4739fe4 100644 --- a/test/K8sOperator.NET.Tests/EventWatcherTests.cs +++ b/test/K8sOperator.NET.Tests/EventWatcherTests.cs @@ -52,12 +52,14 @@ private static Watcher.WatchEvent CreateEvent(WatchEventType type, T item) private readonly ITestOutputHelper _testOutput; private readonly Controller _controller = Substitute.For>(); + private readonly CancellationTokenSource _tokenSource; private readonly ILoggerFactory _loggerFactory = Substitute.For(); private readonly ILogger _logger = Substitute.For(); private readonly List _metadata; public EventWatcherTests(ITestOutputHelper testOutput) { + _tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(2)); _testOutput = testOutput; _loggerFactory.CreateLogger(Arg.Any()).Returns(_logger); _metadata = [ @@ -71,8 +73,8 @@ public EventWatcherTests(ITestOutputHelper testOutput) [Fact] public async Task Start_Should_StartWatchAndLogStart() { - var cancellationToken = new CancellationTokenSource().Token; - + var cancellationToken = _tokenSource.Token; + using ( var server = new MockKubeApiServer(_testOutput, endpoints => { endpoints.MapListNamespacedCustomObjectWithHttpMessagesAsync(); @@ -90,7 +92,7 @@ public async Task Start_Should_StartWatchAndLogStart() [Fact] public async Task OnEvent_Should_HandleAddedEventAndCallAddOrModifyAsync() { - var cancellationToken = new CancellationTokenSource().Token; + var cancellationToken = _tokenSource.Token; using (var server = new MockKubeApiServer(_testOutput, endpoints => { @@ -112,7 +114,7 @@ public async Task OnEvent_Should_HandleAddedEventAndCallAddOrModifyAsync() [Fact] public async Task OnEvent_Should_HandleDeletedEventAndCallDeleteAsync() { - var cancellationToken = new CancellationTokenSource().Token; + var cancellationToken = _tokenSource.Token; using (var server = new MockKubeApiServer(_testOutput, endpoints => { @@ -134,7 +136,7 @@ public async Task OnEvent_Should_HandleDeletedEventAndCallDeleteAsync() [Fact] public async Task HandleFinalizeAsync_Should_CallFinalizeAndRemoveFinalizer() { - var cancellationToken = new CancellationTokenSource().Token; + var cancellationToken = _tokenSource.Token; using (var server = new MockKubeApiServer(_testOutput, endpoints => {