Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster request timeout now throws TimeoutException #1727

Merged
merged 7 commits into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions benchmarks/AutoClusterBenchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ public static class Program
private static object Request = new HelloRequest();

public static async Task Main(string[] args)
{
{
ThreadPool.SetMinThreads(0, 0);
foreach (var batchSize in new[] { 100, 150, 200, 250, 300 })

foreach (var batchSize in new[] {100, 150, 200, 250, 300})
{
Configuration.ResetAgent();
ResetCounters();

var cluster = await Configuration.SpawnClient();

var elapsed = await RunWorkers(() => new RunMemberInProcGraceful(), () => RunBatchClient(batchSize, cluster));
var tps = requestCount / elapsed.TotalMilliseconds * 1000;
Console.WriteLine();
Expand All @@ -47,7 +48,7 @@ public static async Task Main(string[] args)
Console.WriteLine($"Failures:\t{failureCount:N0}");
Console.WriteLine($"Throughput:\t{tps:N0} requests/sec -> {(tps * 2):N0} msg/sec");
await cluster.ShutdownAsync();

await Task.Delay(5000);
}
}
Expand All @@ -70,11 +71,10 @@ private static async Task SendRequest(Cluster cluster, ClusterIdentity id, Cance

try
{

var x = await cluster.RequestAsync<object>(id, Request, context, cancellationToken);

if (x != null)
try
{
await cluster.RequestAsync<object>(id, Request, context, cancellationToken);

var res = Interlocked.Increment(ref successCount);

if (res % 10000 == 0)
Expand All @@ -86,6 +86,10 @@ private static async Task SendRequest(Cluster cluster, ClusterIdentity id, Cance

return;
}
catch (TimeoutException)
{
// ignored
}

OnError();
}
Expand All @@ -109,16 +113,16 @@ void OnError()
private static void RunBatchClient(int batchSize, Cluster cluster)
{
var identities = new ClusterIdentity[actorCount];

for (var i = 0; i < actorCount; i++)
{
var id = "myactor" + i;
identities[i] = ClusterIdentity.Create(id,"hello");
identities[i] = ClusterIdentity.Create(id, "hello");
}

var logger = Log.CreateLogger(nameof(Program));

_ = SafeTask.Run(() => {

var rnd = new Random();
var semaphore = new AsyncSemaphore(5);

Expand All @@ -139,7 +143,8 @@ async Task RunBatch(Random? rnd, Cluster cluster)
{
var ct = CancellationTokens.FromSeconds(20);

var ctx = cluster.System.Root.CreateBatchContext(batchSize,ct);
var ctx = cluster.System.Root.CreateBatchContext(batchSize, ct);

for (var i = 0; i < batchSize; i++)
{
var id = identities[rnd!.Next(0, actorCount)];
Expand Down
41 changes: 26 additions & 15 deletions benchmarks/ClusterBenchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ public static async Task Main(string[] args)
if (args.Length > 0)
{
// InteractiveOutput = args[0] == "1";

var l = typeof(Program).Assembly.Location;
Console.WriteLine($"Worker running {l}");
var worker = await Configuration.SpawnMember();
AppDomain.CurrentDomain.ProcessExit += (sender, args) => { worker.ShutdownAsync().Wait(); };
Thread.Sleep(Timeout.Infinite);

return;
}

Expand Down Expand Up @@ -154,7 +154,6 @@ public static async Task Main(string[] args)

private static void RunNoopClient()
{

}

private static void RunFireForgetClient()
Expand All @@ -166,6 +165,7 @@ private static void RunFireForgetClient()
var cluster = await Configuration.SpawnClient();
// var rnd = new Random();
var i = 0;

while (true)
{
var id = "myactor" + (i++ % actorCount);
Expand All @@ -186,10 +186,10 @@ private static async Task SendRequest(Cluster cluster, ClusterIdentity id, Cance

try
{
var x = await cluster.RequestAsync<object>(id, Request, context, cancellationToken);

if (x != null)
try
{
await cluster.RequestAsync<object>(id, Request, context, cancellationToken);

var res = Interlocked.Increment(ref successCount);

if (res % 10000 == 0)
Expand All @@ -201,6 +201,10 @@ private static async Task SendRequest(Cluster cluster, ClusterIdentity id, Cance

return;
}
catch (TimeoutException)
{
// ignored
}

OnError();
}
Expand All @@ -218,7 +222,7 @@ void OnError()
Console.ResetColor();
}
}

private static async Task<bool> SendRequest(Cluster cluster, string id, CancellationToken cancellationToken, ISenderContext? context = null)
{
Interlocked.Increment(ref requestCount);
Expand All @@ -230,10 +234,10 @@ private static async Task<bool> SendRequest(Cluster cluster, string id, Cancella

try
{
var x = await cluster.RequestAsync<object>(id, "hello", Request, context, cancellationToken);

if (x != null)
try
{
await cluster.RequestAsync<object>(id, "hello", Request, context, cancellationToken);

var res = Interlocked.Increment(ref successCount);

if (res % 10000 == 0)
Expand All @@ -245,6 +249,10 @@ private static async Task<bool> SendRequest(Cluster cluster, string id, Cancella

return true;
}
catch (TimeoutException)
{
// ignored
}

OnError();
}
Expand All @@ -268,19 +276,21 @@ void OnError()
private static void RunBatchClient(int batchSize)
{
var identities = new ClusterIdentity[actorCount];

for (var i = 0; i < actorCount; i++)
{
var id = "myactor" + i;
identities[i] = ClusterIdentity.Create(id,"hello");
identities[i] = ClusterIdentity.Create(id, "hello");
}

var logger = Log.CreateLogger(nameof(Program));

_ = SafeTask.Run(async () => {
var cluster = await Configuration.SpawnClient();
// var rnd = new Random();
var semaphore = new AsyncSemaphore(5);
var i = 0;

while (true)
{
var b = i;
Expand All @@ -298,7 +308,8 @@ async Task RunBatch(int startIndex, Cluster cluster)
{
var ct = CancellationTokens.FromSeconds(20);

var ctx = cluster.System.Root.CreateBatchContext(batchSize,ct);
var ctx = cluster.System.Root.CreateBatchContext(batchSize, ct);

for (var i = 0; i < batchSize; i++)
{
var id = identities[(startIndex + i) % identities.Length];
Expand All @@ -315,7 +326,7 @@ async Task RunBatch(int startIndex, Cluster cluster)
}
}
}

private static void RunDebugClient()
{
var logger = Log.CreateLogger(nameof(Program));
Expand All @@ -332,7 +343,7 @@ private static void RunDebugClient()

if (!res)
{
var pid = await cluster.GetAsync(ClusterIdentity.Create(id,"hello"),CancellationTokens.FromSeconds(10));
var pid = await cluster.GetAsync(ClusterIdentity.Create(id, "hello"), CancellationTokens.FromSeconds(10));

if (pid != null)
{
Expand Down
14 changes: 13 additions & 1 deletion benchmarks/ClusterMicroBenchmarks/ConcurrentSpawnBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,19 @@ public void RemoveActivations()
tasks[i] = _cluster.RequestAsync<Terminated>(id, PoisonPill.Instance, cts.Token);
}

Task.WhenAll(tasks).GetAwaiter().GetResult();
try
{
Task.WhenAll(tasks).GetAwaiter().GetResult();
marcinbudny marked this conversation as resolved.
Show resolved Hide resolved
}
catch (TimeoutException)
{
// ignore
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}

public enum IdentityLookup
Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Cluster.CodeGen/Template.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public class {{Name}}Client
GrainResponseMessage grainResponse => {{#if UseReturn}}({{OutputName}}?)grainResponse.ResponseMessage{{else}}Nothing.Instance{{/if}},
// error response
GrainErrorResponse grainErrorResponse => throw new Exception(grainErrorResponse.Err),
//timeout
// timeout (when enabled by ClusterConfig.LegacyRequestTimeoutBehavior), othwerwise TimeoutException is thrown
null => null,
// unsupported response
_ => throw new NotSupportedException($""Unknown response type {res.GetType().FullName}"")
Expand All @@ -116,7 +116,7 @@ public class {{Name}}Client
GrainResponseMessage grainResponse => {{#if UseReturn}}({{OutputName}}?)grainResponse.ResponseMessage{{else}}Nothing.Instance{{/if}},
// error response
GrainErrorResponse grainErrorResponse => throw new Exception(grainErrorResponse.Err),
//timeout
// timeout (when enabled by ClusterConfig.LegacyRequestTimeoutBehavior), othwerwise TimeoutException is thrown
null => null,
// unsupported response
_ => throw new NotSupportedException($""Unknown response type {res.GetType().FullName}"")
Expand Down
13 changes: 13 additions & 0 deletions src/Proto.Cluster/ClusterConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ private ClusterConfig(string clusterName, IClusterProvider clusterProvider, IIde
/// </summary>
public PubSubConfig PubSubConfig { get; init; } = PubSubConfig.Setup();

/// <summary>
/// Backwards compatibility. Set to true to have timed out requests return default(TResponse) instead of throwing <see cref="TimeoutException"/>
/// Default is false.
/// </summary>
public bool LegacyRequestTimeoutBehavior { get; init; }

/// <summary>
/// Timeout for spawning an actor in the Partition Identity Lookup. Default is 5s.
/// </summary>
Expand Down Expand Up @@ -326,6 +332,13 @@ public ClusterConfig WithHeartbeatExpiration(TimeSpan expiration) =>
public ClusterConfig WithPubSubConfig(PubSubConfig config) =>
this with {PubSubConfig = config};

/// <summary>
/// Backwards compatibility. Set to true to have timed out requests return default(TResponse) instead of throwing <see cref="TimeoutException"/>
/// Default is false.
/// </summary>
public ClusterConfig WithLegacyRequestTimeoutBehavior(bool enabled = true) =>
this with {LegacyRequestTimeoutBehavior = enabled};

/// <summary>
/// Creates a new <see cref="ClusterConfig"/>
/// </summary>
Expand Down
24 changes: 18 additions & 6 deletions src/Proto.Cluster/DefaultClusterContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class DefaultClusterContext : IClusterContext
private readonly ActorSystem _system;
private static readonly ILogger Logger = Log.CreateLogger<DefaultClusterContext>();
private readonly int _requestTimeoutSeconds;
private readonly bool _legacyTimeouts;

public DefaultClusterContext(Cluster cluster)
{
Expand All @@ -41,6 +42,7 @@ public DefaultClusterContext(Cluster cluster)
i => Logger.LogInformation("Throttled {LogCount} TryRequestAsync logs", i)
);
_requestTimeoutSeconds = (int) config.ActorRequestTimeout.TotalSeconds;
_legacyTimeouts = config.LegacyRequestTimeoutBehavior;
#if !NET6_0_OR_GREATER
var updateInterval = TimeSpan.FromMilliseconds(Math.Min(config.ActorRequestTimeout.TotalMilliseconds / 2, 1000));
_clock = new TaskClock(config.ActorRequestTimeout, updateInterval, cluster.System.Shutdown);
Expand Down Expand Up @@ -110,15 +112,14 @@ public DefaultClusterContext(Cluster cluster)
return t1;
}

if (typeof(T) == typeof(MessageEnvelope))
if (untypedResult == null) // timeout, actual valid response cannot be null
{
return (T) (object) MessageEnvelope.Wrap(task.Result);
return TimeoutOrThrow();
}

if (untypedResult == null)
if (typeof(T) == typeof(MessageEnvelope))
{
//null = timeout
return default;
return (T) (object) MessageEnvelope.Wrap(task.Result);
}

if (untypedResult is DeadLetterResponse)
Expand Down Expand Up @@ -199,7 +200,7 @@ public DefaultClusterContext(Cluster cluster)
Logger.LogWarning("RequestAsync retried but failed for {ClusterIdentity}", clusterIdentity);
}

return default!;
return TimeoutOrThrow();
}
finally
{
Expand All @@ -212,6 +213,17 @@ void RefreshFuture()
future = context.GetFuture();
lastPid = null;
}

T? TimeoutOrThrow()
{
if (_legacyTimeouts)
{
//null = timeout
return default;
}

throw new TimeoutException("Request timed out");
}
}

private async ValueTask RemoveFromSource(ClusterIdentity clusterIdentity, PidSource source, PID pid)
Expand Down
3 changes: 2 additions & 1 deletion src/Proto.Cluster/IClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ public interface IClusterProvider
Task StartMemberAsync(Cluster cluster);

/// <summary>
/// Starts the cluster provider in client mode. The client member does not support any kinds.
/// Starts the cluster provider in client mode. The client member does not host any virtual actors and it is not registered in the membership provider.
/// It only monitors other member's presence and allows to send messages to virtual actors hosted by other members.
/// </summary>
/// <param name="cluster"></param>
/// <returns></returns>
Expand Down
Loading