Skip to content

Commit

Permalink
Valuetask clustercontext (#1698)
Browse files Browse the repository at this point in the history
* ValueTask cluster context
* add tps to skyrise benchmark
* short pids
  • Loading branch information
rogeralsing authored Jul 4, 2022
1 parent de6c2d8 commit 9017e6e
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 18 deletions.
28 changes: 16 additions & 12 deletions benchmarks/SkyriseMini/Client/Tests/MessagingTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ namespace SkyriseMini.Tests;

public class MessagingTest
{
readonly Activate _activate;
readonly Ping _ping;
readonly ILogger<MessagingTest> _logger;
private readonly Activate _activate;
private readonly Ping _ping;
private readonly ILogger<MessagingTest> _logger;

public MessagingTest(Activate activate, Ping ping, ILogger<MessagingTest> logger)
{
Expand Down Expand Up @@ -60,26 +60,30 @@ async Task<object[]> ActivateActors(string[] actorIds)
var overallStopwatch = new Stopwatch();
overallStopwatch.Start();

var tasks = handles.Select(async handle =>
{
var messageStopwatch = new Stopwatch();
while (!cancel.IsCancellationRequested)

bool error = false;
var sw = Stopwatch.StartNew();
var tasks = handles.Select(async handle => {
while (!cancel.IsCancellationRequested && !error)
{
try
{
messageStopwatch.Restart();
await _ping(handle, Guid.NewGuid().ToString("N"));
Interlocked.Increment(ref totalMessages);
var res = Interlocked.Increment(ref totalMessages);
if (res % 100000 == 0)
{
var tps = (int)(totalMessages / (double) sw.ElapsedMilliseconds * 1000.0);
Console.WriteLine(tps);
}
}
catch (Exception e)
{
error = true;
_logger.LogError(e, "Error during test");
}
}
messageStopwatch.Stop();
});

await Task.WhenAll(tasks);
Expand Down
8 changes: 7 additions & 1 deletion src/Proto.Actor/Context/ActorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,13 @@ public PID SpawnNamed(Props props, string name, Action<IContext>? callback = nul

try
{
var pid = props.Spawn(System, $"{Self.Id}/{name}", Self, callback);
var id = name switch
{
"" => System.ProcessRegistry.NextId(),
_ => $"{Self.Id}/{name}",
};

var pid = props.Spawn(System, id, Self, callback);
EnsureExtras().AddChild(pid);

return pid;
Expand Down
14 changes: 12 additions & 2 deletions src/Proto.Actor/Context/ISpawnerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,18 @@ public static class SpawnerContextExtensions
/// <returns>The PID of the child actor</returns>
public static PID Spawn(this ISpawnerContext self, Props props)
{
var id = self.System.ProcessRegistry.NextId();
return self.SpawnNamed(props, id);
return self.SpawnNamed(props, "");
}

/// <summary>
/// Spawns a new child actor based on props and named with a unique ID.
/// </summary>
/// <param name="props">The Props used to spawn the actor</param>
/// <param name="callback"></param>
/// <returns>The PID of the child actor</returns>
public static PID Spawn(this ISpawnerContext self, Props props, Action<IContext> callback)
{
return self.SpawnNamed(props, "", callback);
}

/// <summary>
Expand Down
5 changes: 5 additions & 0 deletions src/Proto.Actor/Context/RootContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public PID SpawnNamed(Props props, string name, Action<IContext>? callback=null)
{
try
{
if (string.IsNullOrEmpty(name))
{
name = System.ProcessRegistry.NextId();
}

var parent = props.GuardianStrategy is not null
? System.Guardians.GetGuardianPid(props.GuardianStrategy)
: null;
Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Cluster/DefaultClusterContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private async ValueTask RemoveFromSource(ClusterIdentity clusterIdentity, PidSou

private PID? GetCachedPid(ClusterIdentity clusterIdentity) => _pidCache.TryGet(clusterIdentity, out var pid) ? pid : null;

private async Task<PID?> GetPidFromLookup(ClusterIdentity clusterIdentity, ISenderContext context, CancellationToken ct)
private async ValueTask<PID?> GetPidFromLookup(ClusterIdentity clusterIdentity, ISenderContext context, CancellationToken ct)
{
try
{
Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Cluster/Partition/PartitionPlacementActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ private void Spawn(ActivationRequest msg, IContext context, ActivatedClusterKind
{
try
{
var pid = context.SpawnPrefix(clusterKind.Props, msg.ClusterIdentity.Identity, ctx => ctx.Set(msg.ClusterIdentity));
var pid = context.Spawn(clusterKind.Props, ctx => ctx.Set(msg.ClusterIdentity));
_actors.Add(msg.ClusterIdentity, pid);
context.Respond(new ActivationResponse
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ private void Spawn(ActivationRequest msg, IContext context, ActivatedClusterKind
{
try
{
var pid = context.SpawnPrefix(clusterKind.Props, msg.ClusterIdentity.Identity, ctx => ctx.Set(msg.ClusterIdentity));
var pid = context.Spawn(clusterKind.Props, ctx => ctx.Set(msg.ClusterIdentity));
_actors.Add(msg.ClusterIdentity, pid);
context.Respond(new ActivationResponse
{
Expand Down
15 changes: 15 additions & 0 deletions tests/Proto.Remote.Tests/SerializationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ public class MockSerializer2 : ISerializer
public ByteString Serialize(object obj) => ByteString.CopyFrom(new byte[0]);
}

[Fact]
public void ProtobufDefaultValuesAreSameAsEmpty()
{
var p1 = new PID();
var p2 = new PID()
{
Address = "",
Id = "",
};

var b1 = p1.ToByteArray();
var b2 = p2.ToByteArray();
b1.Length.Should().Be(b2.Length);
}

[Fact]
public void CanUtilizeMultipleSerializers()
{
Expand Down

0 comments on commit 9017e6e

Please sign in to comment.