Skip to content

Commit

Permalink
WiP enable exp cluster context (#1684)
Browse files Browse the repository at this point in the history
Enable experimental cluster context
  • Loading branch information
rogeralsing committed Jun 28, 2022
1 parent 133aded commit fcadfd5
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 213 deletions.
3 changes: 2 additions & 1 deletion benchmarks/AutoClusterBenchmark/Configuration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ IIdentityLookup identityLookup
var helloProps = Props.FromProducer(() => new WorkerActor());
return ClusterConfig
.Setup("mycluster", clusterProvider, identityLookup)
.WithClusterContextProducer(cluster => new ExperimentalClusterContext(cluster))
.WithClusterContextProducer(cluster => new DefaultClusterContext(cluster))
.WithClusterKind("hello", helloProps)
.WithGossipFanOut(3);
}
Expand Down Expand Up @@ -119,6 +119,7 @@ public static async Task<Cluster> SpawnMember()
private static ActorSystemConfig GetMemberActorSystemConfig()
{
var config = new ActorSystemConfig()
.WithSharedFutures()
.WithDeadLetterThrottleCount(3)
.WithDeadLetterThrottleInterval(TimeSpan.FromSeconds(1))
.WithDeadLetterRequestLogging(false);
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/ClusterBenchmark/Configuration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ IIdentityLookup identityLookup
var helloProps = Props.FromProducer(() => new WorkerActor());
return ClusterConfig
.Setup("mycluster", clusterProvider, identityLookup)
.WithClusterContextProducer(cluster => new ExperimentalClusterContext(cluster))
.WithClusterContextProducer(cluster => new DefaultClusterContext(cluster))
.WithClusterKind("hello", helloProps)
.WithGossipFanOut(3);
}
Expand Down Expand Up @@ -182,7 +182,7 @@ public static async Task<Cluster> SpawnMember()
private static ActorSystemConfig GetMemberActorSystemConfig()
{
var config = new ActorSystemConfig()
// .WithSharedFutures()
.WithSharedFutures()
.WithDeadLetterThrottleCount(3)
.WithDeadLetterThrottleInterval(TimeSpan.FromSeconds(1))
.WithDeadLetterRequestLogging(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private ClusterConfig ClusterConfig()

if (ExperimentalContext)
{
config = config.WithClusterContextProducer(cluster => new ExperimentalClusterContext(cluster));
config = config.WithClusterContextProducer(cluster => new DefaultClusterContext(cluster));
}

return config;
Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Actor/Configuration/ActorSystemConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public record ActorSystemConfig
/// SharedFutures allows the ActorSystem to avoid registering a new temporary process for each request
/// Instead registering a SharedFuture that can handle multiple requests internally
/// </summary>
public bool SharedFutures { get; init; }
public bool SharedFutures { get; init; } = true;

/// <summary>
/// Sets the number of requests that can be handled by a SharedFuture
Expand Down
3 changes: 1 addition & 2 deletions src/Proto.Cluster/ClusterConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ private ClusterConfig(string clusterName, IClusterProvider clusterProvider, IIde
public TimeSpan RemotePidCacheTimeToLive { get; set; }
public TimeSpan RemotePidCacheClearInterval { get; set; }

public Func<Cluster, IClusterContext> ClusterContextProducer { get; init; } =
c => new DefaultClusterContext(c.System, c.IdentityLookup, c.PidCache, c.Config.ToClusterContextConfig(), c.System.Shutdown);
public Func<Cluster, IClusterContext> ClusterContextProducer { get; init; } = c => new DefaultClusterContext(c);
public TimeSpan ActorRequestRetryInterval { get; init; }

public ClusterConfig WithTimeout(TimeSpan timeSpan) =>
Expand Down
212 changes: 90 additions & 122 deletions src/Proto.Cluster/DefaultClusterContext.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// -----------------------------------------------------------------------
// <copyright file="DefaultClusterContext.cs" company="Asynkron AB">
// <copyright file="ExperimentalClusterContext.cs" company="Asynkron AB">
// Copyright (C) 2015-2022 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------
Expand All @@ -10,7 +10,6 @@
using Microsoft.Extensions.Logging;
using Proto.Cluster.Identity;
using Proto.Cluster.Metrics;
using Proto.Future;
using Proto.Utils;

namespace Proto.Cluster;
Expand All @@ -23,91 +22,131 @@ public class DefaultClusterContext : IClusterContext
private readonly ShouldThrottle _requestLogThrottle;
private readonly TaskClock _clock;
private readonly ActorSystem _system;

private static readonly ILogger Logger = Log.CreateLogger<DefaultClusterContext>();

public DefaultClusterContext(
ActorSystem system,
IIdentityLookup identityLookup,
PidCache pidCache,
ClusterContextConfig config,
CancellationToken killSwitch
)
public DefaultClusterContext(Cluster cluster)
{
_identityLookup = identityLookup;
_pidCache = pidCache;
_system = system;
_identityLookup = cluster.IdentityLookup;
_pidCache = cluster.PidCache;
var config = cluster.Config;
_system = cluster.System;

_requestLogThrottle = Throttle.Create(
config.MaxNumberOfEventsInRequestLogThrottlePeriod,
config.RequestLogThrottlePeriod,
i => Logger.LogInformation("Throttled {LogCount} TryRequestAsync logs", i)
);

_clock = new TaskClock(config.ActorRequestTimeout, config.ActorRequestRetryInterval, killSwitch);
_clock = new TaskClock(config.ActorRequestTimeout, TimeSpan.FromSeconds(1), cluster.System.Shutdown);
_clock.Start();
}

public async Task<T?> RequestAsync<T>(ClusterIdentity clusterIdentity, object message, ISenderContext context, CancellationToken ct)
{
var start = Stopwatch.StartNew();
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("Requesting {ClusterIdentity} Message {Message}", clusterIdentity, message);
var i = 0;

var future = context.GetFuture();
PID? lastPid = null;

try
{
while (!ct.IsCancellationRequested)
while (!ct.IsCancellationRequested && !context.System.Shutdown.IsCancellationRequested)
{
if (context.System.Shutdown.IsCancellationRequested) return default;
i++;

var source = PidSource.Cache;
var pid = GetCachedPid(clusterIdentity);
var pid = clusterIdentity.CachedPid;

if (pid == null)
{
pid = GetCachedPid(clusterIdentity);
}

if (pid is null)
{
source = PidSource.Lookup;
pid = await GetPidFromLookup(clusterIdentity, context, ct);
}

if (context.System.Shutdown.IsCancellationRequested) return default;

if (pid is null)
{
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("Requesting {ClusterIdentity} - Did not get PID from IdentityLookup", clusterIdentity);
await Task.Delay(++i * 20, CancellationToken.None);
await Task.Delay(i * 20, CancellationToken.None);
continue;
}

// Ensures that a future is not re-used against another actor.
if (lastPid is not null && !pid.Equals(lastPid)) RefreshFuture();

// Logger.LogDebug("Requesting {ClusterIdentity} - Got PID {Pid} from {Source}", clusterIdentity, pid, source);
var (status, res) = await TryRequestAsync<T>(clusterIdentity, message, pid, source, context, future);
Stopwatch t = null!;

switch (status)
if (context.System.Metrics.Enabled)
{
t=Stopwatch.StartNew();
}

try
{
context.Request(pid, message, future.Pid);
var task = future.Task;

await Task.WhenAny(task, _clock.CurrentBucket);

if (task.IsCompleted)
{
var (status, result) = ToResult<T>(source, context, task.Result);

switch (status)
{
case ResponseStatus.Ok: return result;
case ResponseStatus.InvalidResponse:
RefreshFuture();
await RemoveFromSource(clusterIdentity, source, pid);
break;
case ResponseStatus.DeadLetter:
RefreshFuture();
await RemoveFromSource(clusterIdentity, PidSource.Lookup, pid);
break;
}
}
else
{
if (!context.System.Shutdown.IsCancellationRequested)
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("TryRequestAsync timed out, PID from {Source}", source);
_pidCache.RemoveByVal(clusterIdentity, pid);
}
}
catch (TimeoutException)
{
lastPid = pid;
await RemoveFromSource(clusterIdentity, PidSource.Cache, pid);
continue;
}
catch (Exception x)
{
case ResponseStatus.Ok:
return res;

case ResponseStatus.Exception:
RefreshFuture();
await RemoveFromSource(clusterIdentity, PidSource.Cache, pid);
await Task.Delay(++i * 20, CancellationToken.None);
break;
case ResponseStatus.DeadLetter:
RefreshFuture();
await RemoveFromSource(clusterIdentity, source, pid);
break;
case ResponseStatus.TimedOut:
lastPid = pid;
await RemoveFromSource(clusterIdentity, PidSource.Cache, pid);
break;
x.CheckFailFast();
if (!context.System.Shutdown.IsCancellationRequested && _requestLogThrottle().IsOpen())
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug(x, "TryRequestAsync failed with exception, PID from {Source}", source);
_pidCache.RemoveByVal(clusterIdentity, pid);
RefreshFuture();
await RemoveFromSource(clusterIdentity, PidSource.Cache, pid);
await Task.Delay(i * 20, CancellationToken.None);
continue;
}
finally
{
if (context.System.Metrics.Enabled)
{
var elapsed = t.Elapsed;
ClusterMetrics.ClusterRequestDuration
.Record(elapsed.TotalSeconds,
new("id", _system.Id), new("address", _system.Address),
new("clusterkind", clusterIdentity.Kind), new("messagetype", message.GetType().Name),
new("pidsource", source == PidSource.Cache ? "PidCache" : "IIdentityLookup")
);
}
}

if (_system.Metrics.Enabled)
if (context.System.Metrics.Enabled)
{
ClusterMetrics.ClusterRequestRetryCount.Add(
1, new("id", _system.Id), new("address", _system.Address),
Expand All @@ -118,8 +157,7 @@ CancellationToken killSwitch

if (!context.System.Shutdown.IsCancellationRequested && _requestLogThrottle().IsOpen())
{
var t = start.Elapsed;
Logger.LogWarning("RequestAsync retried but failed for {ClusterIdentity}, elapsed {Time}", clusterIdentity, t);
Logger.LogWarning("RequestAsync retried but failed for {ClusterIdentity}", clusterIdentity);
}

return default!;
Expand All @@ -144,6 +182,9 @@ private async ValueTask RemoveFromSource(ClusterIdentity clusterIdentity, PidSou
_pidCache.RemoveByVal(clusterIdentity, pid);
}


private PID? GetCachedPid(ClusterIdentity clusterIdentity) => _pidCache.TryGet(clusterIdentity, out var pid) ? pid : null;

private async Task<PID?> GetPidFromLookup(ClusterIdentity clusterIdentity, ISenderContext context, CancellationToken ct)
{
try
Expand Down Expand Up @@ -177,81 +218,9 @@ private async ValueTask RemoveFromSource(ClusterIdentity clusterIdentity, PidSou
}
}

private async ValueTask<(ResponseStatus Status, T?)> TryRequestAsync<T>(
ClusterIdentity clusterIdentity,
object message,
PID pid,
PidSource source,
ISenderContext context,
IFuture future
)
{
var t = Stopwatch.StartNew();

try
{
context.Request(pid, message, future.Pid);
var task = future.Task;

await Task.WhenAny(task, _clock.CurrentBucket);

if (task.IsCompleted)
{
var res = task.Result;

return ToResult<T>(source, context, res);
}

if (!context.System.Shutdown.IsCancellationRequested)
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("TryRequestAsync timed out, PID from {Source}", source);

_pidCache.RemoveByVal(clusterIdentity, pid);

return (ResponseStatus.TimedOut, default);
}
catch (TimeoutException)
{
return (ResponseStatus.TimedOut, default);
}
catch (Exception x)
{
x.CheckFailFast();
if (!context.System.Shutdown.IsCancellationRequested && _requestLogThrottle().IsOpen())
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug(x, "TryRequestAsync failed with exception, PID from {Source}", source);
_pidCache.RemoveByVal(clusterIdentity, pid);
return (ResponseStatus.Exception, default);
}
finally
{
if (context.System.Metrics.Enabled)
{
var elapsed = t.Elapsed;
ClusterMetrics.ClusterRequestDuration
.Record(elapsed.TotalSeconds,
new("id", _system.Id), new("address", _system.Address),
new("clusterkind", clusterIdentity.Kind), new("messagetype", message.GetType().Name),
new("pidsource", source == PidSource.Cache ? "PidCache" : "IIdentityLookup")
);
}
}
}

private PID? GetCachedPid(ClusterIdentity clusterIdentity)
{
var pid = clusterIdentity.CachedPid;

if (pid is null && _pidCache.TryGet(clusterIdentity, out pid))
{
clusterIdentity.CachedPid = pid;
}

return pid;
}

private static (ResponseStatus Status, T?) ToResult<T>(PidSource source, ISenderContext context, object result)
private static (ResponseStatus Ok, T?) ToResult<T>(PidSource source, ISenderContext context, object result)
{
var message = MessageEnvelope.UnwrapMessage(result);

switch (message)
{
case DeadLetterResponse:
Expand All @@ -266,16 +235,15 @@ private static (ResponseStatus Status, T?) ToResult<T>(PidSource source, ISender
{
return (ResponseStatus.Ok, (T) (object) MessageEnvelope.Wrap(result));
}
Logger.LogWarning("Unexpected message. Was type {Type} but expected {ExpectedType}", message.GetType(), typeof(T));
return (ResponseStatus.Exception, default);
Logger.LogError("Unexpected message. Was type {Type} but expected {ExpectedType}", message.GetType(), typeof(T));
return (ResponseStatus.InvalidResponse, default);
}
}

private enum ResponseStatus
{
Ok,
TimedOut,
Exception,
InvalidResponse,
DeadLetter
}

Expand Down
Loading

0 comments on commit fcadfd5

Please sign in to comment.