Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WiP enable exp cluster context #1684

Merged
merged 2 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion benchmarks/AutoClusterBenchmark/Configuration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ IIdentityLookup identityLookup
var helloProps = Props.FromProducer(() => new WorkerActor());
return ClusterConfig
.Setup("mycluster", clusterProvider, identityLookup)
.WithClusterContextProducer(cluster => new ExperimentalClusterContext(cluster))
.WithClusterContextProducer(cluster => new DefaultClusterContext(cluster))
.WithClusterKind("hello", helloProps)
.WithGossipFanOut(3);
}
Expand Down Expand Up @@ -119,6 +119,7 @@ public static async Task<Cluster> SpawnMember()
private static ActorSystemConfig GetMemberActorSystemConfig()
{
var config = new ActorSystemConfig()
.WithSharedFutures()
.WithDeadLetterThrottleCount(3)
.WithDeadLetterThrottleInterval(TimeSpan.FromSeconds(1))
.WithDeadLetterRequestLogging(false);
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/ClusterBenchmark/Configuration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ IIdentityLookup identityLookup
var helloProps = Props.FromProducer(() => new WorkerActor());
return ClusterConfig
.Setup("mycluster", clusterProvider, identityLookup)
.WithClusterContextProducer(cluster => new ExperimentalClusterContext(cluster))
.WithClusterContextProducer(cluster => new DefaultClusterContext(cluster))
.WithClusterKind("hello", helloProps)
.WithGossipFanOut(3);
}
Expand Down Expand Up @@ -182,7 +182,7 @@ public static async Task<Cluster> SpawnMember()
private static ActorSystemConfig GetMemberActorSystemConfig()
{
var config = new ActorSystemConfig()
// .WithSharedFutures()
.WithSharedFutures()
.WithDeadLetterThrottleCount(3)
.WithDeadLetterThrottleInterval(TimeSpan.FromSeconds(1))
.WithDeadLetterRequestLogging(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private ClusterConfig ClusterConfig()

if (ExperimentalContext)
{
config = config.WithClusterContextProducer(cluster => new ExperimentalClusterContext(cluster));
config = config.WithClusterContextProducer(cluster => new DefaultClusterContext(cluster));
}

return config;
Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Actor/Configuration/ActorSystemConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public record ActorSystemConfig
/// SharedFutures allows the ActorSystem to avoid registering a new temporary process for each request
/// Instead registering a SharedFuture that can handle multiple requests internally
/// </summary>
public bool SharedFutures { get; init; }
public bool SharedFutures { get; init; } = true;

/// <summary>
/// Sets the number of requests that can be handled by a SharedFuture
Expand Down
3 changes: 1 addition & 2 deletions src/Proto.Cluster/ClusterConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ private ClusterConfig(string clusterName, IClusterProvider clusterProvider, IIde
public TimeSpan RemotePidCacheTimeToLive { get; set; }
public TimeSpan RemotePidCacheClearInterval { get; set; }

public Func<Cluster, IClusterContext> ClusterContextProducer { get; init; } =
c => new DefaultClusterContext(c.System, c.IdentityLookup, c.PidCache, c.Config.ToClusterContextConfig(), c.System.Shutdown);
public Func<Cluster, IClusterContext> ClusterContextProducer { get; init; } = c => new DefaultClusterContext(c);
public TimeSpan ActorRequestRetryInterval { get; init; }

public ClusterConfig WithTimeout(TimeSpan timeSpan) =>
Expand Down
212 changes: 90 additions & 122 deletions src/Proto.Cluster/DefaultClusterContext.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// -----------------------------------------------------------------------
// <copyright file="DefaultClusterContext.cs" company="Asynkron AB">
// <copyright file="ExperimentalClusterContext.cs" company="Asynkron AB">
// Copyright (C) 2015-2022 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------
Expand All @@ -10,7 +10,6 @@
using Microsoft.Extensions.Logging;
using Proto.Cluster.Identity;
using Proto.Cluster.Metrics;
using Proto.Future;
using Proto.Utils;

namespace Proto.Cluster;
Expand All @@ -23,91 +22,131 @@ public class DefaultClusterContext : IClusterContext
private readonly ShouldThrottle _requestLogThrottle;
private readonly TaskClock _clock;
private readonly ActorSystem _system;

private static readonly ILogger Logger = Log.CreateLogger<DefaultClusterContext>();

public DefaultClusterContext(
ActorSystem system,
IIdentityLookup identityLookup,
PidCache pidCache,
ClusterContextConfig config,
CancellationToken killSwitch
)
public DefaultClusterContext(Cluster cluster)
{
_identityLookup = identityLookup;
_pidCache = pidCache;
_system = system;
_identityLookup = cluster.IdentityLookup;
_pidCache = cluster.PidCache;
var config = cluster.Config;
_system = cluster.System;

_requestLogThrottle = Throttle.Create(
config.MaxNumberOfEventsInRequestLogThrottlePeriod,
config.RequestLogThrottlePeriod,
i => Logger.LogInformation("Throttled {LogCount} TryRequestAsync logs", i)
);

_clock = new TaskClock(config.ActorRequestTimeout, config.ActorRequestRetryInterval, killSwitch);
_clock = new TaskClock(config.ActorRequestTimeout, TimeSpan.FromSeconds(1), cluster.System.Shutdown);
_clock.Start();
}

public async Task<T?> RequestAsync<T>(ClusterIdentity clusterIdentity, object message, ISenderContext context, CancellationToken ct)
{
var start = Stopwatch.StartNew();
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("Requesting {ClusterIdentity} Message {Message}", clusterIdentity, message);
var i = 0;

var future = context.GetFuture();
PID? lastPid = null;

try
{
while (!ct.IsCancellationRequested)
while (!ct.IsCancellationRequested && !context.System.Shutdown.IsCancellationRequested)
{
if (context.System.Shutdown.IsCancellationRequested) return default;
i++;

var source = PidSource.Cache;
var pid = GetCachedPid(clusterIdentity);
var pid = clusterIdentity.CachedPid;

if (pid == null)
{
pid = GetCachedPid(clusterIdentity);
}

if (pid is null)
{
source = PidSource.Lookup;
pid = await GetPidFromLookup(clusterIdentity, context, ct);
}

if (context.System.Shutdown.IsCancellationRequested) return default;

if (pid is null)
{
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("Requesting {ClusterIdentity} - Did not get PID from IdentityLookup", clusterIdentity);
await Task.Delay(++i * 20, CancellationToken.None);
await Task.Delay(i * 20, CancellationToken.None);
continue;
}

// Ensures that a future is not re-used against another actor.
if (lastPid is not null && !pid.Equals(lastPid)) RefreshFuture();

// Logger.LogDebug("Requesting {ClusterIdentity} - Got PID {Pid} from {Source}", clusterIdentity, pid, source);
var (status, res) = await TryRequestAsync<T>(clusterIdentity, message, pid, source, context, future);
Stopwatch t = null!;

switch (status)
if (context.System.Metrics.Enabled)
{
t=Stopwatch.StartNew();
}

try
{
context.Request(pid, message, future.Pid);
var task = future.Task;

await Task.WhenAny(task, _clock.CurrentBucket);

if (task.IsCompleted)
{
var (status, result) = ToResult<T>(source, context, task.Result);

switch (status)
{
case ResponseStatus.Ok: return result;
case ResponseStatus.InvalidResponse:
RefreshFuture();
await RemoveFromSource(clusterIdentity, source, pid);
break;
case ResponseStatus.DeadLetter:
RefreshFuture();
await RemoveFromSource(clusterIdentity, PidSource.Lookup, pid);
break;
}
}
else
{
if (!context.System.Shutdown.IsCancellationRequested)
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("TryRequestAsync timed out, PID from {Source}", source);
_pidCache.RemoveByVal(clusterIdentity, pid);
}
}
catch (TimeoutException)
{
lastPid = pid;
await RemoveFromSource(clusterIdentity, PidSource.Cache, pid);
continue;
}
catch (Exception x)
{
case ResponseStatus.Ok:
return res;

case ResponseStatus.Exception:
RefreshFuture();
await RemoveFromSource(clusterIdentity, PidSource.Cache, pid);
await Task.Delay(++i * 20, CancellationToken.None);
break;
case ResponseStatus.DeadLetter:
RefreshFuture();
await RemoveFromSource(clusterIdentity, source, pid);
break;
case ResponseStatus.TimedOut:
lastPid = pid;
await RemoveFromSource(clusterIdentity, PidSource.Cache, pid);
break;
x.CheckFailFast();
if (!context.System.Shutdown.IsCancellationRequested && _requestLogThrottle().IsOpen())
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug(x, "TryRequestAsync failed with exception, PID from {Source}", source);
_pidCache.RemoveByVal(clusterIdentity, pid);
RefreshFuture();
await RemoveFromSource(clusterIdentity, PidSource.Cache, pid);
await Task.Delay(i * 20, CancellationToken.None);
continue;
}
finally
{
if (context.System.Metrics.Enabled)
{
var elapsed = t.Elapsed;
ClusterMetrics.ClusterRequestDuration
.Record(elapsed.TotalSeconds,
new("id", _system.Id), new("address", _system.Address),
new("clusterkind", clusterIdentity.Kind), new("messagetype", message.GetType().Name),
new("pidsource", source == PidSource.Cache ? "PidCache" : "IIdentityLookup")
);
}
}

if (_system.Metrics.Enabled)
if (context.System.Metrics.Enabled)
{
ClusterMetrics.ClusterRequestRetryCount.Add(
1, new("id", _system.Id), new("address", _system.Address),
Expand All @@ -118,8 +157,7 @@ CancellationToken killSwitch

if (!context.System.Shutdown.IsCancellationRequested && _requestLogThrottle().IsOpen())
{
var t = start.Elapsed;
Logger.LogWarning("RequestAsync retried but failed for {ClusterIdentity}, elapsed {Time}", clusterIdentity, t);
Logger.LogWarning("RequestAsync retried but failed for {ClusterIdentity}", clusterIdentity);
}

return default!;
Expand All @@ -144,6 +182,9 @@ private async ValueTask RemoveFromSource(ClusterIdentity clusterIdentity, PidSou
_pidCache.RemoveByVal(clusterIdentity, pid);
}


private PID? GetCachedPid(ClusterIdentity clusterIdentity) => _pidCache.TryGet(clusterIdentity, out var pid) ? pid : null;

private async Task<PID?> GetPidFromLookup(ClusterIdentity clusterIdentity, ISenderContext context, CancellationToken ct)
{
try
Expand Down Expand Up @@ -177,81 +218,9 @@ private async ValueTask RemoveFromSource(ClusterIdentity clusterIdentity, PidSou
}
}

private async ValueTask<(ResponseStatus Status, T?)> TryRequestAsync<T>(
ClusterIdentity clusterIdentity,
object message,
PID pid,
PidSource source,
ISenderContext context,
IFuture future
)
{
var t = Stopwatch.StartNew();

try
{
context.Request(pid, message, future.Pid);
var task = future.Task;

await Task.WhenAny(task, _clock.CurrentBucket);

if (task.IsCompleted)
{
var res = task.Result;

return ToResult<T>(source, context, res);
}

if (!context.System.Shutdown.IsCancellationRequested)
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("TryRequestAsync timed out, PID from {Source}", source);

_pidCache.RemoveByVal(clusterIdentity, pid);

return (ResponseStatus.TimedOut, default);
}
catch (TimeoutException)
{
return (ResponseStatus.TimedOut, default);
}
catch (Exception x)
{
x.CheckFailFast();
if (!context.System.Shutdown.IsCancellationRequested && _requestLogThrottle().IsOpen())
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug(x, "TryRequestAsync failed with exception, PID from {Source}", source);
_pidCache.RemoveByVal(clusterIdentity, pid);
return (ResponseStatus.Exception, default);
}
finally
{
if (context.System.Metrics.Enabled)
{
var elapsed = t.Elapsed;
ClusterMetrics.ClusterRequestDuration
.Record(elapsed.TotalSeconds,
new("id", _system.Id), new("address", _system.Address),
new("clusterkind", clusterIdentity.Kind), new("messagetype", message.GetType().Name),
new("pidsource", source == PidSource.Cache ? "PidCache" : "IIdentityLookup")
);
}
}
}

private PID? GetCachedPid(ClusterIdentity clusterIdentity)
{
var pid = clusterIdentity.CachedPid;

if (pid is null && _pidCache.TryGet(clusterIdentity, out pid))
{
clusterIdentity.CachedPid = pid;
}

return pid;
}

private static (ResponseStatus Status, T?) ToResult<T>(PidSource source, ISenderContext context, object result)
private static (ResponseStatus Ok, T?) ToResult<T>(PidSource source, ISenderContext context, object result)
{
var message = MessageEnvelope.UnwrapMessage(result);

switch (message)
{
case DeadLetterResponse:
Expand All @@ -266,16 +235,15 @@ private static (ResponseStatus Status, T?) ToResult<T>(PidSource source, ISender
{
return (ResponseStatus.Ok, (T) (object) MessageEnvelope.Wrap(result));
}
Logger.LogWarning("Unexpected message. Was type {Type} but expected {ExpectedType}", message.GetType(), typeof(T));
return (ResponseStatus.Exception, default);
Logger.LogError("Unexpected message. Was type {Type} but expected {ExpectedType}", message.GetType(), typeof(T));
return (ResponseStatus.InvalidResponse, default);
}
}

private enum ResponseStatus
{
Ok,
TimedOut,
Exception,
InvalidResponse,
DeadLetter
}

Expand Down
Loading