From 814e15126ad2fe55dc07d80063dc64703c11cca3 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Mon, 15 May 2023 13:56:17 +0200 Subject: [PATCH] Kubediag fixes (#2007) * improve kubediag * harden k8s monitor * ignore blocked gossips --- benchmarks/KubernetesDiagnostics/Program.cs | 204 +++++++++--------- benchmarks/KubernetesDiagnostics/build2.sh | 6 + .../Context/ActorLoggingContext.cs | 12 +- .../KubernetesClusterMonitor.cs | 1 + .../KubernetesExtensions.cs | 10 +- src/Proto.Cluster/ClusterConfig.cs | 6 + src/Proto.Cluster/Gossip/Gossip.cs | 49 +++-- src/Proto.Cluster/Gossip/GossipActor.cs | 46 ++-- src/Proto.Remote/RemoteConfigBase.cs | 2 +- 9 files changed, 189 insertions(+), 147 deletions(-) create mode 100755 benchmarks/KubernetesDiagnostics/build2.sh diff --git a/benchmarks/KubernetesDiagnostics/Program.cs b/benchmarks/KubernetesDiagnostics/Program.cs index 3cb163b141..46b2181e1a 100644 --- a/benchmarks/KubernetesDiagnostics/Program.cs +++ b/benchmarks/KubernetesDiagnostics/Program.cs @@ -1,126 +1,106 @@ using System; -using System.Linq; using System.Threading; using System.Threading.Tasks; +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Proto; using Proto.Cluster; -using Proto.Cluster.Gossip; using Proto.Cluster.Kubernetes; -using Proto.Cluster.Partition; +using Proto.Cluster.PartitionActivator; using Proto.Remote; -using Proto.Remote.GrpcNet; -namespace KubernetesDiagnostics; +var advertisedHost = Environment.GetEnvironmentVariable("PROTOHOSTPUBLIC"); -public static class Program +var builder = WebApplication.CreateBuilder(args); +builder.Services.AddLogging(x => x.AddSimpleConsole(c => { - public static async Task Main() - { - ThreadPool.SetMinThreads(100, 100); - Console.WriteLine("Starting..."); - - /* - * docker build . -t rogeralsing/kubdiagg - * kubectl apply --filename service.yaml - * kubectl get pods -l app=kubdiag - * kubectl logs -l app=kubdiag --all-containers - * - */ - - var l = LoggerFactory.Create(c => c.AddConsole().SetMinimumLevel(LogLevel.Information)); - Log.SetLoggerFactory(l); - var log = Log.CreateLogger("main"); - - var identity = new PartitionIdentityLookup(TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(2) - ); - - /* - - name: "REDIS" - value: "redis" - - name: PROTOPORT - value: "8080" - - name: PROTOHOST - value: "0.0.0.0" - - name: "PROTOHOSTPUBLIC" - */ - - var port = int.Parse(Environment.GetEnvironmentVariable("PROTOPORT") ?? "0"); - var host = Environment.GetEnvironmentVariable("PROTOHOST") ?? "127.0.0.1"; - var advertisedHost = Environment.GetEnvironmentVariable("PROTOHOSTPUBLIC"); - - log.LogInformation("Host {host}", host); - log.LogInformation("Port {port}", port); - log.LogInformation("Advertised Host {advertisedHost}", advertisedHost); - - var clusterProvider = GetProvider(); - - var noOpsProps = Props.FromFunc(ctx => Task.CompletedTask); - var echoKind = new ClusterKind("echo", noOpsProps); - var system = new ActorSystem(new ActorSystemConfig()) - .WithRemote(GrpcNetRemoteConfig - .BindTo(host, port) - .WithAdvertisedHost(advertisedHost) - .WithEndpointWriterMaxRetries(2) - ) - .WithCluster(ClusterConfig - .Setup("mycluster", clusterProvider, identity) - .WithClusterKind("empty", Props.Empty) - .WithClusterKind(echoKind) - ); - - // system.EventStream.Subscribe(e => { Console.WriteLine($"{DateTime.Now:O} Gossip update Member {e.MemberId} Key {e.Key}"); }); - - system.EventStream.Subscribe(e => { + c.SingleLine = true; +})); - var hash = e.TopologyHash; - Console.WriteLine($"{DateTime.Now:O} My members {hash}"); - } - ); +builder.Services.AddProtoCluster((_, x) => +{ + x.Port = 0; + x.ConfigureRemote = r => + r.WithAdvertisedHost(advertisedHost); - var cts = new CancellationTokenSource(); + x.ConfigureCluster = c => c + .WithClusterKind("echo", Props.FromFunc(ctx => Task.CompletedTask)) + .WithClusterKind("empty", Props.FromFunc(ctx => Task.CompletedTask)) + .WithExitOnShutdown() + .WithHeartbeatExpirationDisabled(); - Console.CancelKeyPress += (_, _) => { cts.Cancel(); }; + x.ClusterProvider = new KubernetesProvider(); + x.IdentityLookup = new PartitionActivatorLookup(); + +}); - await system - .Cluster() - .StartMemberAsync(); - - system.Shutdown.Register(() => - { - Console.WriteLine("Shutting down..."); - Environment.Exit(0); - }); +builder.Services.AddHealthChecks().AddCheck("proto", null, new[] { "ready", "live" }); +builder.Services.AddHostedService(); + +var app = builder.Build(); + +app.MapGet("/", async (Cluster cluster) => +{ + +}); + +app.MapHealthChecks("/health"); + +app.Run(); + +public class DummyHostedService : IHostedService +{ + private readonly ActorSystem _system; + private readonly ILogger _logger; + private bool _running; + + public DummyHostedService(ActorSystem system, ILogger logger) + { + _system = system; + _logger = logger; + } + + public async Task StartAsync(CancellationToken cancellationToken) + { + _logger.LogInformation("Starting DummyHostedService"); + _running = true; + + _system.EventStream.Subscribe(e => { + + var hash = e.TopologyHash; + _logger.LogInformation($"{DateTime.Now:O} My members {hash}"); + } + ); var props = Props.FromFunc(ctx => Task.CompletedTask); - system.Root.SpawnNamed(props, "dummy"); + _system.Root.SpawnNamed(props, "dummy"); - var clusterIdentity = ClusterIdentity.Create("some-id", echoKind.Name); + _ = SafeTask.Run(RunLoop); + _ = SafeTask.Run(PrintMembersLoop); + } - while (!cts.IsCancellationRequested) - { - var m = system.Cluster().MemberList.GetAllMembers(); - var hash = Member.TopologyHash(m); + private async Task RunLoop() + { + var clusterIdentity = + ClusterIdentity.Create("some-id", new ClusterKind("echo", Props.FromFunc(ctx => Task.CompletedTask)).Name); - Console.WriteLine($"{DateTime.Now:O} Hash {hash} Count {m.Length}"); + while (_running) + { + var m = _system.Cluster().MemberList.GetAllMembers(); try { - var t = await system.Cluster().RequestAsync(clusterIdentity, new Touch(), CancellationTokens.FromSeconds(1)); + var t = await _system.Cluster() + .RequestAsync(clusterIdentity, new Touch(), CancellationTokens.FromSeconds(1)); - if (t != null) - { - Console.WriteLine($"called cluster actor {t.Who}"); - } - else - { - Console.WriteLine($"call to cluster actor returned null"); - } + _logger.LogInformation($"called cluster actor {t.Who}"); } catch (Exception e) { - Console.WriteLine($"Could not call cluster actor: {e}"); + _logger.LogError(e, "Could not call cluster actor"); } foreach (var member in m) @@ -129,30 +109,44 @@ await system try { - var t = await system.Root.RequestAsync(pid, new Touch(), CancellationTokens.FromSeconds(1)); + var t = await _system.Root.RequestAsync(pid, new Touch(), CancellationTokens.FromSeconds(1)); if (t != null) { - Console.WriteLine($"called dummy actor {pid}"); + _logger.LogInformation("called dummy actor {PID}", pid); } else { - Console.WriteLine($"call to dummy actor timed out {pid}"); + _logger.LogInformation("call to dummy actor timed out {PID}", pid); } } catch { - Console.WriteLine($"Could not call dummy actor {pid}"); + _logger.LogInformation("Could not call dummy actor {PID}", pid); } } - await Task.Delay(3000); + await Task.Delay(5000); } - - await system - .Cluster() - .ShutdownAsync(); } + + private async Task PrintMembersLoop() + { + + while (_running) + { + var m = _system.Cluster().MemberList.GetAllMembers(); + var hash = Member.TopologyHash(m); + + _logger.LogInformation($"{DateTime.Now:O} Hash {hash} Count {m.Length}"); - private static IClusterProvider GetProvider() => new KubernetesProvider(); + await Task.Delay(2000); + } + } + + public Task StopAsync(CancellationToken cancellationToken) + { + _running = false; + return Task.CompletedTask; + } } \ No newline at end of file diff --git a/benchmarks/KubernetesDiagnostics/build2.sh b/benchmarks/KubernetesDiagnostics/build2.sh new file mode 100755 index 0000000000..3c01b466aa --- /dev/null +++ b/benchmarks/KubernetesDiagnostics/build2.sh @@ -0,0 +1,6 @@ +docker login +kubectl delete --filename service.yaml +dotnet publish --os linux -c Release --arch amd64 -p:PublishProfile=DefaultContainer +docker tag kubernetesdiagnostics:1.0.0 rogeralsing/kubediag +docker push rogeralsing/kubediag +kubectl apply --filename service.yaml \ No newline at end of file diff --git a/src/Proto.Actor/Context/ActorLoggingContext.cs b/src/Proto.Actor/Context/ActorLoggingContext.cs index 00cac80574..caab117f2d 100644 --- a/src/Proto.Actor/Context/ActorLoggingContext.cs +++ b/src/Proto.Actor/Context/ActorLoggingContext.cs @@ -135,12 +135,12 @@ public override async Task RequestAsync(PID target, object message, Cancel { if (_exceptionLogLevel != LogLevel.None && _logger.IsEnabled(_exceptionLogLevel)) { - _logger.Log(_exceptionLogLevel, x, - "Actor {Self} {ActorType} Got exception waiting for RequestAsync response of {MessageType}:{MessagePayload} from {Target}", - Self, - ActorType, - message.GetMessageTypeName(), message, target - ); + // _logger.Log(_exceptionLogLevel, x, + // "Actor {Self} {ActorType} Got exception waiting for RequestAsync response of {MessageType}:{MessagePayload} from {Target}", + // Self, + // ActorType, + // message.GetMessageTypeName(), message, target + // ); } throw; diff --git a/src/Proto.Cluster.Kubernetes/KubernetesClusterMonitor.cs b/src/Proto.Cluster.Kubernetes/KubernetesClusterMonitor.cs index 8ca2bd9423..c3b11ef09e 100644 --- a/src/Proto.Cluster.Kubernetes/KubernetesClusterMonitor.cs +++ b/src/Proto.Cluster.Kubernetes/KubernetesClusterMonitor.cs @@ -303,6 +303,7 @@ private void UpdateTopology() var memberStatuses = _clusterPods.Values .Select(x => x.GetMemberStatus()) + .Where(x => x is not null) .Where(x => x.IsRunning && (x.IsReady || x.Member.Id == _cluster.System.Id)) .Select(x => x.Member) .ToList(); diff --git a/src/Proto.Cluster.Kubernetes/KubernetesExtensions.cs b/src/Proto.Cluster.Kubernetes/KubernetesExtensions.cs index 7e10c78d09..4180b84030 100644 --- a/src/Proto.Cluster.Kubernetes/KubernetesExtensions.cs +++ b/src/Proto.Cluster.Kubernetes/KubernetesExtensions.cs @@ -10,6 +10,7 @@ using System.Linq; using System.Text.Json; using System.Threading.Tasks; +using JetBrains.Annotations; using Json.Patch; using k8s; using k8s.Models; @@ -57,9 +58,16 @@ IDictionary annotations /// /// Kubernetes Pod object /// + [CanBeNull] internal static MemberStatus GetMemberStatus(this V1Pod pod) { - var isRunning = pod.Status.Phase == "Running" && pod.Status.PodIP is not null; + var isRunning = pod.Status is { Phase: "Running", PodIP: not null }; + + if (pod.Status?.ContainerStatuses is null) + return null; + + if (pod.Metadata?.Labels is null) + return null; var kinds = pod .Metadata diff --git a/src/Proto.Cluster/ClusterConfig.cs b/src/Proto.Cluster/ClusterConfig.cs index 50b8087db0..84a2ac3303 100644 --- a/src/Proto.Cluster/ClusterConfig.cs +++ b/src/Proto.Cluster/ClusterConfig.cs @@ -338,6 +338,12 @@ public ClusterConfig WithRemotePidCacheTimeToLive(TimeSpan timeout) => /// public ClusterConfig WithHeartbeatExpiration(TimeSpan expiration) => this with { HeartbeatExpiration = expiration }; + /// + /// Disables gossip heartbeat expiration. + /// + /// + public ClusterConfig WithHeartbeatExpirationDisabled() => this with { HeartbeatExpiration = TimeSpan.Zero }; + /// /// Configuration for the PubSub extension. /// diff --git a/src/Proto.Cluster/Gossip/Gossip.cs b/src/Proto.Cluster/Gossip/Gossip.cs index 924fd4bb55..cc95475a6a 100644 --- a/src/Proto.Cluster/Gossip/Gossip.cs +++ b/src/Proto.Cluster/Gossip/Gossip.cs @@ -114,38 +114,45 @@ public void SetState(string key, IMessage message) //TODO: this does not need to use a callback, it can return a list of MemberStates public void SendState(SendStateAction sendStateToMember) { - var logger = _logger?.BeginMethodScope(); - - foreach (var member in _otherMembers) + try { - GossipStateManagement.EnsureMemberStateExists(_state, member.Id); - } + var logger = _logger?.BeginMethodScope(); - var randomMembers = _otherMembers.OrderByRandom(_rnd); + foreach (var member in _otherMembers) + { + GossipStateManagement.EnsureMemberStateExists(_state, member.Id); + } - var fanoutCount = 0; + var randomMembers = _otherMembers.OrderByRandom(_rnd); - foreach (var member in randomMembers) - { - //TODO: we can chunk up sends here - //instead of sending less state, we can send all of it, but in chunks - var memberState = GetMemberStateDelta(member.Id); + var fanoutCount = 0; - if (!memberState.HasState) + foreach (var member in randomMembers) { - continue; - } + //TODO: we can chunk up sends here + //instead of sending less state, we can send all of it, but in chunks + var memberState = GetMemberStateDelta(member.Id); - //fire and forget, we handle results in ReenterAfter - sendStateToMember(memberState, member, logger); + if (!memberState.HasState) + { + continue; + } - fanoutCount++; + //fire and forget, we handle results in ReenterAfter + sendStateToMember(memberState, member, logger); - if (fanoutCount == _gossipFanout) - { - break; + fanoutCount++; + + if (fanoutCount == _gossipFanout) + { + break; + } } } + catch (Exception x) + { + Logger.LogError(x, "SendState failed"); + } } public MemberStateDelta GetMemberStateDelta(string targetMemberId) diff --git a/src/Proto.Cluster/Gossip/GossipActor.cs b/src/Proto.Cluster/Gossip/GossipActor.cs index a6709c4cb5..5a2bb75a3f 100644 --- a/src/Proto.Cluster/Gossip/GossipActor.cs +++ b/src/Proto.Cluster/Gossip/GossipActor.cs @@ -7,7 +7,9 @@ using System; using System.Threading.Tasks; using Microsoft.Extensions.Logging; +using Proto.Extensions; using Proto.Logging; +using Proto.Remote; namespace Proto.Cluster.Gossip; @@ -32,19 +34,31 @@ int gossipMaxSend () => system.Cluster().MemberList.GetMembers()); } - public Task ReceiveAsync(IContext context) => - context.Message switch + public async Task ReceiveAsync(IContext context) + { + try { - SetGossipStateKey setState => OnSetGossipStateKey(context, setState), - GetGossipStateRequest getState => OnGetGossipStateKey(context, getState), - GetGossipStateEntryRequest getState => OnGetGossipStateEntryKey(context, getState), - GetGossipStateSnapshot => OnGetGossipStateSnapshot(context), - GossipRequest gossipRequest => OnGossipRequest(context, gossipRequest), - SendGossipStateRequest => OnSendGossipState(context), - AddConsensusCheck request => OnAddConsensusCheck(context, request), - ClusterTopology clusterTopology => OnClusterTopology(clusterTopology), - _ => Task.CompletedTask - }; + // Logger.LogInformation("GossipActor Received {MessageType}", context.Message.GetMessageTypeName()); + var t = context.Message switch + { + SetGossipStateKey setState => OnSetGossipStateKey(context, setState), + GetGossipStateRequest getState => OnGetGossipStateKey(context, getState), + GetGossipStateEntryRequest getState => OnGetGossipStateEntryKey(context, getState), + GetGossipStateSnapshot => OnGetGossipStateSnapshot(context), + GossipRequest gossipRequest => OnGossipRequest(context, gossipRequest), + SendGossipStateRequest => OnSendGossipState(context), + AddConsensusCheck request => OnAddConsensusCheck(context, request), + ClusterTopology clusterTopology => OnClusterTopology(clusterTopology), + _ => Task.CompletedTask + }; + await t; + // Logger.LogInformation("GossipActor Done {MessageType}", context.Message.GetMessageTypeName()); + } + catch (Exception x) + { + Logger.LogError(x, "GossipActor Failed {MessageType}", context.Message.GetMessageTypeName()); + } + } private Task OnGetGossipStateEntryKey(IContext context, GetGossipStateEntryRequest getState) { @@ -87,12 +101,18 @@ private Task OnGossipRequest(IContext context, GossipRequest gossipRequest) { var logger = context.Logger()?.BeginScope(); logger?.LogDebug("Gossip Request {Sender}", context.Sender!); + + if (context.Remote().BlockList.BlockedMembers.Contains(gossipRequest.MemberId)) + { + Logger.LogInformation("Blocked gossip request from {MemberId}", gossipRequest.MemberId); + return Task.CompletedTask; + } if (Logger.IsEnabled(LogLevel.Debug)) { Logger.LogDebug("Gossip Request {Sender}", context.Sender!); } - + ReceiveState(context, gossipRequest.State); context.Respond(new GossipResponse()); diff --git a/src/Proto.Remote/RemoteConfigBase.cs b/src/Proto.Remote/RemoteConfigBase.cs index 46e8dc7e07..cb7669a09c 100644 --- a/src/Proto.Remote/RemoteConfigBase.cs +++ b/src/Proto.Remote/RemoteConfigBase.cs @@ -83,7 +83,7 @@ protected RemoteConfigBase(string host, int port) public Serialization Serialization { get; init; } = new(); /// - /// After the remote connection is terminated, this is the time period the enpoint manager will monitor messages + /// After the remote connection is terminated, this is the time period the endpoint manager will monitor messages /// arriving to this connection /// and generate deadletter events for them. Default value is 3 seconds. ///