Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions src/K8sOperator.NET/EventWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,33 @@ public async Task Start(CancellationToken cancellationToken)
_cancellationToken = cancellationToken;
_isRunning = true;

var response = Client.ListAsync<T>(LabelSelector, cancellationToken);
Logger.BeginWatch(Crd.PluralName, LabelSelector);

await foreach (var (type, item) in response.WatchAsync<T, object>(OnError, cancellationToken))
while (_isRunning && !_cancellationToken.IsCancellationRequested)
{
OnEvent(type, item);
try
{
var response = Client.ListAsync<T>(LabelSelector, cancellationToken);

await foreach (var (type, item) in response.WatchAsync<T, object>(OnError, cancellationToken))
{
OnEvent(type, item);
}
}
catch (TaskCanceledException)
{
Logger.WatcherError("Task was canceled.");
}
catch (OperationCanceledException)
{
Logger.WatcherError("Operation was canceled restarting...");
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
}
catch (HttpOperationException ex)
{
Logger.WatcherError($"Http Error: {ex.Response.Content}, restarting...");
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
}
}

Logger.EndWatch(Crd.PluralName, LabelSelector);
Expand All @@ -76,8 +98,7 @@ private void OnEvent(WatchEventType eventType, T customResource)
var exception = t.Exception.Flatten().InnerException;
Logger.ProcessEventError(exception, eventType, customResource);
}
})
;
});
}

private async Task ProccessEventAsync(WatchEventType eventType, T resource)
Expand Down Expand Up @@ -233,7 +254,7 @@ private void OnError(Exception exception)
{
if (_isRunning)
{
Logger.LogError(exception, "Watcher error");
Logger.WatcherError(exception.Message);
}
}
}
Expand Down
17 changes: 15 additions & 2 deletions src/K8sOperator.NET/Extensions/LoggingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ internal static partial class LoggingExtensions
[LoggerMessage(
EventId = 4,
Level = LogLevel.Information,
Message = "Begin watch {ns}/{plural} {labelselector}"
Message = "Begin watch {plural} {labelselector}"
)]
public static partial void BeginWatch(this ILogger logger, string ns, string plural, string labelselector);
public static partial void BeginWatch(this ILogger logger, string plural, string labelselector);

[LoggerMessage(
EventId = 5,
Expand Down Expand Up @@ -181,4 +181,17 @@ internal static partial class LoggingExtensions
Message = "End Error {resource}")]
public static partial void EndError(this ILogger logger, CustomResource resource);

[LoggerMessage(
EventId = 28,
Level = LogLevel.Information,
Message = "Watcher Error {message}")]
public static partial void WatcherError(this ILogger logger, string message);

[LoggerMessage(
EventId = 29,
Level = LogLevel.Information,
Message = "ListAsync {ns}/{plural} {labelselector}"
)]
public static partial void ListAsync(this ILogger logger, string ns, string plural, string labelselector);

}
4 changes: 2 additions & 2 deletions src/K8sOperator.NET/KubernetesClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public Task<HttpOperationResponse<object>> ListAsync<T>(string labelSelector, Ca
{
var info = typeof(T).GetCustomAttribute<KubernetesEntityAttribute>()!;

Logger.BeginWatch(Namespace, info.PluralName, labelSelector);
Logger.ListAsync(Namespace, info.PluralName, labelSelector);

var response = Client.CustomObjects.ListNamespacedCustomObjectWithHttpMessagesAsync(
info.Group,
Expand Down Expand Up @@ -98,7 +98,7 @@ public Task<HttpOperationResponse<object>> ListAsync<T>(string labelSelector, Ca
{
var info = typeof(T).GetCustomAttribute<KubernetesEntityAttribute>()!;

Logger.BeginWatch("cluster-wide", info.PluralName, labelSelector);
Logger.ListAsync("cluster-wide", info.PluralName, labelSelector);

var response = Client.CustomObjects.ListClusterCustomObjectWithHttpMessagesAsync(
info.Group,
Expand Down
12 changes: 7 additions & 5 deletions test/K8sOperator.NET.Tests/EventWatcherTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ private static Watcher<T>.WatchEvent CreateEvent<T>(WatchEventType type, T item)

private readonly ITestOutputHelper _testOutput;
private readonly Controller<TestResource> _controller = Substitute.For<Controller<TestResource>>();
private readonly CancellationTokenSource _tokenSource;
private readonly ILoggerFactory _loggerFactory = Substitute.For<ILoggerFactory>();
private readonly ILogger _logger = Substitute.For<ILogger>();
private readonly List<object> _metadata;

public EventWatcherTests(ITestOutputHelper testOutput)
{
_tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(2));
_testOutput = testOutput;
_loggerFactory.CreateLogger(Arg.Any<string>()).Returns(_logger);
_metadata = [
Expand All @@ -71,8 +73,8 @@ public EventWatcherTests(ITestOutputHelper testOutput)
[Fact]
public async Task Start_Should_StartWatchAndLogStart()
{
var cancellationToken = new CancellationTokenSource().Token;
var cancellationToken = _tokenSource.Token;

using ( var server = new MockKubeApiServer(_testOutput, endpoints =>
{
endpoints.MapListNamespacedCustomObjectWithHttpMessagesAsync<TestResource>();
Expand All @@ -90,7 +92,7 @@ public async Task Start_Should_StartWatchAndLogStart()
[Fact]
public async Task OnEvent_Should_HandleAddedEventAndCallAddOrModifyAsync()
{
var cancellationToken = new CancellationTokenSource().Token;
var cancellationToken = _tokenSource.Token;

using (var server = new MockKubeApiServer(_testOutput, endpoints =>
{
Expand All @@ -112,7 +114,7 @@ public async Task OnEvent_Should_HandleAddedEventAndCallAddOrModifyAsync()
[Fact]
public async Task OnEvent_Should_HandleDeletedEventAndCallDeleteAsync()
{
var cancellationToken = new CancellationTokenSource().Token;
var cancellationToken = _tokenSource.Token;

using (var server = new MockKubeApiServer(_testOutput, endpoints =>
{
Expand All @@ -134,7 +136,7 @@ public async Task OnEvent_Should_HandleDeletedEventAndCallDeleteAsync()
[Fact]
public async Task HandleFinalizeAsync_Should_CallFinalizeAndRemoveFinalizer()
{
var cancellationToken = new CancellationTokenSource().Token;
var cancellationToken = _tokenSource.Token;

using (var server = new MockKubeApiServer(_testOutput, endpoints =>
{
Expand Down
Loading