Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

poll and watch k8s #1989

Merged
merged 2 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 86 additions & 16 deletions src/Proto.Cluster.Kubernetes/KubernetesClusterMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ internal class KubernetesClusterMonitor : IActor
private string _address;
private string _clusterName;
private IKubernetes _kubernetes;
private DateTime _lastRestart;
private string _podName;
private bool _stopping;
private Watcher<V1Pod> _watcher;
Expand All @@ -46,7 +45,7 @@ public Task ReceiveAsync(IContext context) =>
context.Message switch
{
RegisterMember cmd => Register(cmd),
StartWatchingCluster cmd => StartWatchingCluster(cmd.ClusterName, context),
StartWatchingCluster _ => StartWatchingCluster(context),
DeregisterMember => StopWatchingCluster(),
Stopping => StopWatchingCluster(),
_ => Task.CompletedTask
Expand Down Expand Up @@ -80,20 +79,31 @@ private Task StopWatchingCluster()
return Task.CompletedTask;
}

private Task StartWatchingCluster(string clusterName, ISenderContext context)
private async Task StartWatchingCluster(IContext context)
{
var selector = $"{LabelCluster}={clusterName}";

Logger.Log(_config.DebugLogLevel, "[Cluster][KubernetesProvider] Starting to watch pods with {Selector}",
selector);
try
{
await Poll();
}
catch (Exception x)
{
Logger.LogError(x, "[Cluster][KubernetesProvider] Failed to poll the Kubernetes API");
}

_watcherTask = _kubernetes.ListNamespacedPodWithHttpMessagesAsync(
KubernetesExtensions.GetKubeNamespace(),
labelSelector: selector,
watch: true,
timeoutSeconds: _config.WatchTimeoutSeconds
);
if (!_config.DisableWatch)
{
await Watch();
}

await Task.Delay(1000);

context.Send(context.Self, new StartWatchingCluster(_clusterName));
}

private Task Watch()
{
var tcs = new TaskCompletionSource();
_watcherTask = GetListTask(_clusterName);
_watcher = _watcherTask.Watch<V1Pod, V1PodList>(Watch, Error, Closed);
_watching = true;

Expand Down Expand Up @@ -127,16 +137,71 @@ void Closed()

void Restart()
{
_lastRestart = DateTime.UtcNow;
_watching = false;

DisposeWatcher();
DisposeWatcherTask();

context.Send(context.Self!, new StartWatchingCluster(_clusterName));
tcs.SetResult();
}

return Task.CompletedTask;
return tcs.Task;
}

private async Task Poll()
{
var x = await GetListTask(_clusterName);
foreach (var eventPod in x.Body.Items)
{
var podLabels = eventPod.Metadata.Labels;

if (!podLabels.TryGetValue(LabelCluster, out var podClusterName))
{
Logger.LogInformation(
"[Cluster][KubernetesProvider] The pod {PodName} is not a Proto.Cluster node",
eventPod.Metadata.Name
);

continue;
}

if (_clusterName != podClusterName)
{
Logger.LogInformation(
"[Cluster][KubernetesProvider] The pod {PodName} is from another cluster {Cluster}",
eventPod.Metadata.Name, _clusterName
);

continue;
}

_clusterPods[eventPod.Uid()] = eventPod;
}

var uids = x.Body.Items.Select(p => p.Uid()).ToHashSet();
var toRemove = _clusterPods.Keys.Where(k => !uids.Contains(k)).ToList();

foreach(var uid in toRemove)
{
_clusterPods.Remove(uid);
}

UpdateTopology();
}

private Task<HttpOperationResponse<V1PodList>> GetListTask(string clusterName)
{
var selector = $"{LabelCluster}={clusterName}";

Logger.Log(_config.DebugLogLevel, "[Cluster][KubernetesProvider] Starting to watch pods with {Selector}",
selector);

return _kubernetes.ListNamespacedPodWithHttpMessagesAsync(
KubernetesExtensions.GetKubeNamespace(),
labelSelector: selector,
watch: true,
timeoutSeconds: _config.WatchTimeoutSeconds
);
}

private void RecreateKubernetesClient()
Expand Down Expand Up @@ -219,6 +284,11 @@ private void Watch(WatchEventType eventType, V1Pod eventPod)
_clusterPods[eventPod.Uid()] = eventPod;
}

UpdateTopology();
}

private void UpdateTopology()
{
var memberStatuses = _clusterPods.Values
.Select(x => x.GetMemberStatus())
.Where(x => x.IsRunning && (x.IsReady || x.Member.Id == _cluster.System.Id))
Expand Down
8 changes: 7 additions & 1 deletion src/Proto.Cluster.Kubernetes/KubernetesProviderConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,24 @@ namespace Proto.Cluster.Kubernetes;
public record KubernetesProviderConfig
{
public KubernetesProviderConfig(int watchTimeoutSeconds = 30, bool developerLogging = false,
Func<IKubernetes> clientFactory = null)
Func<IKubernetes> clientFactory = null, bool disableWatch = false)
{
WatchTimeoutSeconds = watchTimeoutSeconds;
DeveloperLogging = developerLogging;
ClientFactory = clientFactory ?? DefaultFactory;
DisableWatch = disableWatch;
}

/// <summary>
/// A timeout for the watch pods operation
/// </summary>
public int WatchTimeoutSeconds { get; }

/// <summary>
/// Disable the watch pods operation and rely on HTTP request response polling instead
/// </summary>
public bool DisableWatch { get; set; }

/// <summary>
/// Enables more detailed logging
/// </summary>
Expand Down