Skip to content

Commit

Permalink
Public API cleanup (#1751)
Browse files Browse the repository at this point in the history
* actor system, context and pid

* converted channel publisher/subscriber to an example

* re-used remote channels example instead of the local one, further public api changes

* block list

* timeout names cleanup in cluster config

* cluster changes

* cluster changes

* removed usage of ClusterInit, the msg itself stays for now

* removed unused Tick message

* test CI

* restore CI config, clean up xml docs related warnings

* adjustments after merge
  • Loading branch information
marcinbudny committed Sep 13, 2022
1 parent 4a443c6 commit d56ddd1
Show file tree
Hide file tree
Showing 49 changed files with 150 additions and 146 deletions.
2 changes: 1 addition & 1 deletion ProtoActor.sln
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Server", "examples\remotech
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Client", "examples\remotechannels\Client\Client.csproj", "{5EACFB26-C757-452C-A536-E633BC352A69}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Messages", "examples\remotechannels\Messages\Messages.csproj", "{D33722CB-96B8-4B9C-AB59-B18B7FF28539}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Common", "examples\remotechannels\Common\Common.csproj", "{D33722CB-96B8-4B9C-AB59-B18B7FF28539}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Proto.Cluster.AmazonECS", "src\Proto.Cluster.AmazonECS\Proto.Cluster.AmazonECS.csproj", "{F5EB4AD3-5BCB-48E9-B974-A93EB19CE43C}"
EndProject
Expand Down
2 changes: 1 addition & 1 deletion examples/remotechannels/Client/Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

<ItemGroup>
<ProjectReference Include="..\..\..\src\Proto.Remote\Proto.Remote.csproj" />
<ProjectReference Include="..\Messages\Messages.csproj" />
<ProjectReference Include="..\Common\Common.csproj" />
</ItemGroup>

</Project>
3 changes: 1 addition & 2 deletions examples/remotechannels/Client/Program.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using Messages;
using Common;
using Proto;
using Proto.Channels;
using Proto.Remote;
using Proto.Remote.GrpcNet;
using static System.Threading.Channels.Channel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
using System.Collections.Generic;
using System.Threading.Channels;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Proto;

namespace Proto.Channels;
namespace Common;

[PublicAPI]
public static class ChannelPublisher
{
/// <summary>
Expand Down Expand Up @@ -40,7 +39,6 @@ public static PID StartNew<T>(IRootContext context, Channel<T> channel, string n
}
}

[PublicAPI]
public class ChannelPublisherActor<T> : IActor
{
private readonly HashSet<PID> _subscribers = new();
Expand All @@ -56,13 +54,17 @@ public Task ReceiveAsync(IContext context)
}

break;

case PID subscriber:
_subscribers.Add(subscriber);
context.Watch(subscriber);
context.Respond(new Subscribed());
break;

case Terminated terminated:
_subscribers.Remove(terminated.Who);
break;

case T typed:
foreach (var sub in _subscribers)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
// -----------------------------------------------------------------------
using System.Threading.Channels;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Proto;

namespace Proto.Channels;
namespace Common;

[PublicAPI]
public static class ChannelSubscriber
{
/// <summary>
Expand All @@ -21,24 +20,29 @@ public static class ChannelSubscriber
/// <param name="channel">The channel to write messages to</param>
/// <typeparam name="T">The Type of channel elements</typeparam>
/// <returns></returns>
public static PID StartNew<T>(IRootContext context, PID publisher, Channel<T> channel)
public static async Task<PID> StartNew<T>(IRootContext context, PID publisher, Channel<T> channel)
{
var props = Props.FromProducer(() => new ChannelSubscriberActor<T>(publisher, channel));
var tcs = new TaskCompletionSource();
var props = Props.FromProducer(() => new ChannelSubscriberActor<T>(publisher, channel, tcs));
var pid = context.Spawn(props);

await tcs.Task;
return pid;
}
}

[PublicAPI]
public class ChannelSubscriberActor<T> : IActor
{
private readonly Channel<T> _channel;
private readonly TaskCompletionSource _subscribed;
private readonly PID _publisher;

public ChannelSubscriberActor(PID publisher, Channel<T> channel)
public ChannelSubscriberActor(PID publisher, Channel<T> channel, TaskCompletionSource subscribed)

{
_publisher = publisher;
_channel = channel;
_subscribed = subscribed;
}

public async Task ReceiveAsync(IContext context)
Expand All @@ -49,15 +53,24 @@ public async Task ReceiveAsync(IContext context)
context.Watch(_publisher);
context.Request(_publisher, context.Self);
break;

case Subscribed:
_subscribed.SetResult();
break;

case Stopping:
_channel.Writer.Complete();
break;

case Terminated t when t.Who.Equals(_publisher):
_channel.Writer.Complete();
break;

case T typed:
await _channel.Writer.WriteAsync(typed);
break;
}
}
}
}

public record Subscribed;
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,8 @@
<LangVersion>10</LangVersion>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\src\Proto.Actor\Proto.Actor.csproj" />
</ItemGroup>

</Project>
3 changes: 3 additions & 0 deletions examples/remotechannels/Common/Messages.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace Common;

public record MyMessage(int Value){}
6 changes: 0 additions & 6 deletions examples/remotechannels/Messages/Messages.cs

This file was deleted.

3 changes: 1 addition & 2 deletions examples/remotechannels/Server/Program.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
using System;
using System.Threading.Tasks;
using Messages;
using Common;
using Proto;
using Proto.Channels;
using Proto.Remote;
using Proto.Remote.GrpcNet;
using static Proto.Remote.GrpcNet.GrpcNetRemoteConfig;
Expand Down
2 changes: 1 addition & 1 deletion examples/remotechannels/Server/Server.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

<ItemGroup>
<ProjectReference Include="..\..\..\src\Proto.Remote\Proto.Remote.csproj" />
<ProjectReference Include="..\Messages\Messages.csproj" />
<ProjectReference Include="..\Common\Common.csproj" />
</ItemGroup>

</Project>
19 changes: 12 additions & 7 deletions src/Proto.Actor/ActorSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ public ActorSystem(ActorSystemConfig config)
/// Allows to access the stop cancellation token and stop reason.
/// Use <see cref="ShutdownAsync"/> to stop the actor system.
/// </summary>
public Stopper Stopper { get; }
internal Stopper Stopper { get; }

/// <summary>
/// For stopped <see cref="ActorSystem"/>, returns the reason for the shutdown.
/// </summary>
public string StoppedReason => Stopper.StoppedReason;

/// <summary>
/// Manages all the guardians in the actor system.
Expand Down Expand Up @@ -117,13 +122,13 @@ public ActorSystem(ActorSystemConfig config)

private void RunThreadPoolStats()
{
var metricTags = new KeyValuePair<string, object?>[]{ new("id", Id), new("address", Address)};
var metricTags = new KeyValuePair<string, object?>[] {new("id", Id), new("address", Address)};

var logger = Log.CreateLogger(nameof(ThreadPoolStats));
_ = ThreadPoolStats.Run(TimeSpan.FromSeconds(5),
t => {
//collect the latency metrics
if(Metrics.Enabled)
if (Metrics.Enabled)
ActorMetrics.ThreadPoolLatency.Record(t.TotalSeconds, metricTags);
//does it take longer than 1 sec for a task to start executing?
Expand All @@ -144,7 +149,7 @@ private void RunThreadPoolStats()
/// </summary>
/// <param name="reason">Shutdown reason</param>
/// <returns></returns>
public Task ShutdownAsync(string reason="")
public Task ShutdownAsync(string reason = "")
{
try
{
Expand Down Expand Up @@ -195,15 +200,15 @@ public RootContext NewRoot(MessageHeader? headers = null, params Func<Sender, Se
/// </summary>
/// <param name="props"></param>
/// <returns></returns>
public Props ConfigureProps(Props props) => Config.ConfigureProps(props);
internal Props ConfigureProps(Props props) => Config.ConfigureProps(props);

/// <summary>
/// Applies props configuration delegate for system actors.
/// </summary>
/// <param name="name"></param>
/// <param name="props"></param>
/// <returns></returns>
public Props ConfigureSystemProps(string name, Props props) => Config.ConfigureSystemProps(name, props);
internal Props ConfigureSystemProps(string name, Props props) => Config.ConfigureSystemProps(name, props);

/// <summary>
/// Stops the actor system with reason = "Disposed"
Expand Down
18 changes: 0 additions & 18 deletions src/Proto.Actor/Context/ActorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ public ActorContext(ActorSystem system, Props props, PID? parent, PID self, IMai

public TimeSpan ReceiveTimeout { get; private set; }

public void Stash()
{
if (_messageOrEnvelope is not null) EnsureExtras().Stash.Push(_messageOrEnvelope);
}

public void Respond(object message)
{
if (Sender is not null)
Expand Down Expand Up @@ -714,19 +709,6 @@ private async ValueTask RestartAsync()
Self.SendSystemMessage(System, ResumeMailbox.Instance);

await InvokeUserMessageAsync(Started.Instance);

if (_extras?.Stash is not null)
{
var currentStash = new Stack<object>(_extras.Stash);
_extras.Stash.Clear();

//TODO: what happens if we hit a failure here?
while (currentStash.Any())
{
var msg = currentStash.Pop();
await InvokeUserMessageAsync(msg);
}
}
}

private ValueTask DisposeActorIfDisposable()
Expand Down
3 changes: 0 additions & 3 deletions src/Proto.Actor/Context/ActorContextDecorator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ public virtual Task Receive(MessageEnvelope envelope) =>
public virtual void Respond(object message) =>
_context.Respond(message);

public virtual void Stash() =>
_context.Stash();

public virtual PID SpawnNamed(Props props, string name, Action<IContext>? callback=null) =>
_context.SpawnNamed(props, name, callback);

Expand Down
4 changes: 1 addition & 3 deletions src/Proto.Actor/Context/ActorContextExtras.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading;
using Proto.Utils;
Expand All @@ -25,12 +24,11 @@ public sealed class ActorContextExtras: IDisposable
public ImmutableHashSet<PID> Children { get; private set; } = ImmutableHashSet<PID>.Empty;
public Timer? ReceiveTimeoutTimer { get; private set; }
public RestartStatistics RestartStatistics { get; } = new(0, null);
public Stack<object> Stash { get; } = new();
public ImmutableHashSet<PID> Watchers { get; private set; } = ImmutableHashSet<PID>.Empty;
public IContext Context { get; }
public CancellationTokenSource CancellationTokenSource { get; } = new();

public TypeDictionary<object, ActorContextExtras> Store { get; } = new(5, 1);
internal TypeDictionary<object, ActorContextExtras> Store { get; } = new(5, 1);

public void InitReceiveTimeoutTimer(Timer timer) => ReceiveTimeoutTimer = timer;

Expand Down
5 changes: 0 additions & 5 deletions src/Proto.Actor/Context/IContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,6 @@ public interface IContext : ISenderContext, IReceiverContext, ISpawnerContext, I
/// <param name="header"></param>
void Respond(object message, MessageHeader header) => Respond(new MessageEnvelope(message, null, header));

/// <summary>
/// Stashes the current message on a stack for re-processing when the actor restarts.
/// </summary>
void Stash();

/// <summary>
/// Registers the actor as a watcher for the specified PID. When the PID terminates the watcher is notified with <see cref="Terminated"/> message.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions src/Proto.Actor/Context/ISpawnerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public static class SpawnerContextExtensions
/// <summary>
/// Spawns a new child actor based on props and named with a unique ID.
/// </summary>
/// <param name="self"></param>
/// <param name="props">The Props used to spawn the actor</param>
/// <returns>The PID of the child actor</returns>
public static PID Spawn(this ISpawnerContext self, Props props)
Expand All @@ -36,6 +37,7 @@ public static PID Spawn(this ISpawnerContext self, Props props)
/// <summary>
/// Spawns a new child actor based on props and named with a unique ID.
/// </summary>
/// <param name="self"></param>
/// <param name="props">The Props used to spawn the actor</param>
/// <param name="callback"></param>
/// <returns>The PID of the child actor</returns>
Expand Down
1 change: 1 addition & 0 deletions src/Proto.Actor/Deduplication/DeduplicationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace Proto.Deduplication;
/// Extracts the deduplication key from the message.
/// </summary>
/// <typeparam name="T">Type of the key</typeparam>
/// <param name="envelope">Message to extract from</param>
/// <param name="key">The key should be returned in this variable</param>
/// <returns>Returns true if the key was successfully extracted, false otherwise</returns>
public delegate bool TryGetDeduplicationKey<T>(MessageEnvelope envelope, out T? key);
Expand Down
10 changes: 0 additions & 10 deletions src/Proto.Actor/Messages/MessageBatch.cs

This file was deleted.

2 changes: 1 addition & 1 deletion src/Proto.Actor/Messages/MessageEnvelope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public MessageEnvelope WithHeader(string key, string value)
/// <summary>
/// Extends the message envelope with additional headers.
/// </summary>
/// <param name="header"></param>
/// <param name="items"></param>
/// <returns>New envelope</returns>
public MessageEnvelope WithHeaders(IEnumerable<KeyValuePair<string, string>> items)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Actor/PID.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ internal void SendUserMessage(ActorSystem system, object message)
reff.SendUserMessage(this, message);
}

public void SendSystemMessage(ActorSystem system, SystemMessage sys)
internal void SendSystemMessage(ActorSystem system, SystemMessage sys)
{
var reff = Ref(system) ?? system.ProcessRegistry.Get(this);
reff.SendSystemMessage(this, sys);
Expand Down
6 changes: 3 additions & 3 deletions src/Proto.Actor/Props/Props.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,16 @@ public sealed record Props
public ImmutableList<Func<Sender, Sender>> SenderMiddleware { get; init; } =
ImmutableList<Func<Sender, Sender>>.Empty;

public Receiver? ReceiverMiddlewareChain { get; init; }
public Sender? SenderMiddlewareChain { get; init; }
internal Receiver? ReceiverMiddlewareChain { get; init; }
internal Sender? SenderMiddlewareChain { get; init; }

/// <summary>
/// List of decorators for the actor context
/// </summary>
public ImmutableList<Func<IContext, IContext>> ContextDecorator { get; init; } =
ImmutableList<Func<IContext, IContext>>.Empty;

public Func<IContext, IContext>? ContextDecoratorChain { get; init; }
internal Func<IContext, IContext>? ContextDecoratorChain { get; init; }

/// <summary>
/// Delegate that creates the actor and wires it with context and mailbox.
Expand Down
Loading

0 comments on commit d56ddd1

Please sign in to comment.