Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Public API cleanup #1751

Merged
merged 13 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ProtoActor.sln
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Server", "examples\remotech
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Client", "examples\remotechannels\Client\Client.csproj", "{5EACFB26-C757-452C-A536-E633BC352A69}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Messages", "examples\remotechannels\Messages\Messages.csproj", "{D33722CB-96B8-4B9C-AB59-B18B7FF28539}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Common", "examples\remotechannels\Common\Common.csproj", "{D33722CB-96B8-4B9C-AB59-B18B7FF28539}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Proto.Cluster.AmazonECS", "src\Proto.Cluster.AmazonECS\Proto.Cluster.AmazonECS.csproj", "{F5EB4AD3-5BCB-48E9-B974-A93EB19CE43C}"
EndProject
Expand Down
2 changes: 1 addition & 1 deletion examples/remotechannels/Client/Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

<ItemGroup>
<ProjectReference Include="..\..\..\src\Proto.Remote\Proto.Remote.csproj" />
<ProjectReference Include="..\Messages\Messages.csproj" />
<ProjectReference Include="..\Common\Common.csproj" />
</ItemGroup>

</Project>
3 changes: 1 addition & 2 deletions examples/remotechannels/Client/Program.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using Messages;
using Common;
using Proto;
using Proto.Channels;
using Proto.Remote;
using Proto.Remote.GrpcNet;
using static System.Threading.Channels.Channel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
using System.Collections.Generic;
using System.Threading.Channels;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Proto;

namespace Proto.Channels;
namespace Common;

[PublicAPI]
public static class ChannelPublisher
{
/// <summary>
Expand Down Expand Up @@ -40,7 +39,6 @@ public static PID StartNew<T>(IRootContext context, Channel<T> channel, string n
}
}

[PublicAPI]
public class ChannelPublisherActor<T> : IActor
{
private readonly HashSet<PID> _subscribers = new();
Expand All @@ -56,13 +54,17 @@ public Task ReceiveAsync(IContext context)
}

break;

case PID subscriber:
_subscribers.Add(subscriber);
context.Watch(subscriber);
context.Respond(new Subscribed());
break;

case Terminated terminated:
_subscribers.Remove(terminated.Who);
break;

case T typed:
foreach (var sub in _subscribers)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
// -----------------------------------------------------------------------
using System.Threading.Channels;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Proto;

namespace Proto.Channels;
namespace Common;

[PublicAPI]
public static class ChannelSubscriber
{
/// <summary>
Expand All @@ -21,24 +20,29 @@ public static class ChannelSubscriber
/// <param name="channel">The channel to write messages to</param>
/// <typeparam name="T">The Type of channel elements</typeparam>
/// <returns></returns>
public static PID StartNew<T>(IRootContext context, PID publisher, Channel<T> channel)
public static async Task<PID> StartNew<T>(IRootContext context, PID publisher, Channel<T> channel)
{
var props = Props.FromProducer(() => new ChannelSubscriberActor<T>(publisher, channel));
var tcs = new TaskCompletionSource();
var props = Props.FromProducer(() => new ChannelSubscriberActor<T>(publisher, channel, tcs));
var pid = context.Spawn(props);

await tcs.Task;
return pid;
}
}

[PublicAPI]
public class ChannelSubscriberActor<T> : IActor
{
private readonly Channel<T> _channel;
private readonly TaskCompletionSource _subscribed;
private readonly PID _publisher;

public ChannelSubscriberActor(PID publisher, Channel<T> channel)
public ChannelSubscriberActor(PID publisher, Channel<T> channel, TaskCompletionSource subscribed)

{
_publisher = publisher;
_channel = channel;
_subscribed = subscribed;
}

public async Task ReceiveAsync(IContext context)
Expand All @@ -49,15 +53,24 @@ public async Task ReceiveAsync(IContext context)
context.Watch(_publisher);
context.Request(_publisher, context.Self);
break;

case Subscribed:
_subscribed.SetResult();
break;

case Stopping:
_channel.Writer.Complete();
break;

case Terminated t when t.Who.Equals(_publisher):
_channel.Writer.Complete();
break;

case T typed:
await _channel.Writer.WriteAsync(typed);
break;
}
}
}
}

public record Subscribed;
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,8 @@
<LangVersion>10</LangVersion>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\src\Proto.Actor\Proto.Actor.csproj" />
</ItemGroup>

</Project>
3 changes: 3 additions & 0 deletions examples/remotechannels/Common/Messages.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace Common;

public record MyMessage(int Value){}
6 changes: 0 additions & 6 deletions examples/remotechannels/Messages/Messages.cs

This file was deleted.

3 changes: 1 addition & 2 deletions examples/remotechannels/Server/Program.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
using System;
using System.Threading.Tasks;
using Messages;
using Common;
using Proto;
using Proto.Channels;
using Proto.Remote;
using Proto.Remote.GrpcNet;
using static Proto.Remote.GrpcNet.GrpcNetRemoteConfig;
Expand Down
2 changes: 1 addition & 1 deletion examples/remotechannels/Server/Server.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

<ItemGroup>
<ProjectReference Include="..\..\..\src\Proto.Remote\Proto.Remote.csproj" />
<ProjectReference Include="..\Messages\Messages.csproj" />
<ProjectReference Include="..\Common\Common.csproj" />
</ItemGroup>

</Project>
19 changes: 12 additions & 7 deletions src/Proto.Actor/ActorSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ public ActorSystem(ActorSystemConfig config)
/// Allows to access the stop cancellation token and stop reason.
/// Use <see cref="ShutdownAsync"/> to stop the actor system.
/// </summary>
public Stopper Stopper { get; }
internal Stopper Stopper { get; }

/// <summary>
/// For stopped <see cref="ActorSystem"/>, returns the reason for the shutdown.
/// </summary>
public string StoppedReason => Stopper.StoppedReason;

/// <summary>
/// Manages all the guardians in the actor system.
Expand Down Expand Up @@ -117,13 +122,13 @@ public ActorSystem(ActorSystemConfig config)

private void RunThreadPoolStats()
{
var metricTags = new KeyValuePair<string, object?>[]{ new("id", Id), new("address", Address)};
var metricTags = new KeyValuePair<string, object?>[] {new("id", Id), new("address", Address)};

var logger = Log.CreateLogger(nameof(ThreadPoolStats));
_ = ThreadPoolStats.Run(TimeSpan.FromSeconds(5),
t => {
//collect the latency metrics
if(Metrics.Enabled)
if (Metrics.Enabled)
ActorMetrics.ThreadPoolLatency.Record(t.TotalSeconds, metricTags);

//does it take longer than 1 sec for a task to start executing?
Expand All @@ -144,7 +149,7 @@ private void RunThreadPoolStats()
/// </summary>
/// <param name="reason">Shutdown reason</param>
/// <returns></returns>
public Task ShutdownAsync(string reason="")
public Task ShutdownAsync(string reason = "")
{
try
{
Expand Down Expand Up @@ -195,15 +200,15 @@ public RootContext NewRoot(MessageHeader? headers = null, params Func<Sender, Se
/// </summary>
/// <param name="props"></param>
/// <returns></returns>
public Props ConfigureProps(Props props) => Config.ConfigureProps(props);
internal Props ConfigureProps(Props props) => Config.ConfigureProps(props);

/// <summary>
/// Applies props configuration delegate for system actors.
/// </summary>
/// <param name="name"></param>
/// <param name="props"></param>
/// <returns></returns>
public Props ConfigureSystemProps(string name, Props props) => Config.ConfigureSystemProps(name, props);
internal Props ConfigureSystemProps(string name, Props props) => Config.ConfigureSystemProps(name, props);

/// <summary>
/// Stops the actor system with reason = "Disposed"
Expand Down
18 changes: 0 additions & 18 deletions src/Proto.Actor/Context/ActorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ public ActorContext(ActorSystem system, Props props, PID? parent, PID self, IMai

public TimeSpan ReceiveTimeout { get; private set; }

public void Stash()
{
if (_messageOrEnvelope is not null) EnsureExtras().Stash.Push(_messageOrEnvelope);
}

public void Respond(object message)
{
if (Sender is not null)
Expand Down Expand Up @@ -714,19 +709,6 @@ private async ValueTask RestartAsync()
Self.SendSystemMessage(System, ResumeMailbox.Instance);

await InvokeUserMessageAsync(Started.Instance);

if (_extras?.Stash is not null)
{
var currentStash = new Stack<object>(_extras.Stash);
_extras.Stash.Clear();

//TODO: what happens if we hit a failure here?
while (currentStash.Any())
{
var msg = currentStash.Pop();
await InvokeUserMessageAsync(msg);
}
}
}

private ValueTask DisposeActorIfDisposable()
Expand Down
3 changes: 0 additions & 3 deletions src/Proto.Actor/Context/ActorContextDecorator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ public virtual Task Receive(MessageEnvelope envelope) =>
public virtual void Respond(object message) =>
_context.Respond(message);

public virtual void Stash() =>
_context.Stash();

public virtual PID SpawnNamed(Props props, string name, Action<IContext>? callback=null) =>
_context.SpawnNamed(props, name, callback);

Expand Down
4 changes: 1 addition & 3 deletions src/Proto.Actor/Context/ActorContextExtras.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading;
using Proto.Utils;
Expand All @@ -25,12 +24,11 @@ public sealed class ActorContextExtras: IDisposable
public ImmutableHashSet<PID> Children { get; private set; } = ImmutableHashSet<PID>.Empty;
public Timer? ReceiveTimeoutTimer { get; private set; }
public RestartStatistics RestartStatistics { get; } = new(0, null);
public Stack<object> Stash { get; } = new();
public ImmutableHashSet<PID> Watchers { get; private set; } = ImmutableHashSet<PID>.Empty;
public IContext Context { get; }
public CancellationTokenSource CancellationTokenSource { get; } = new();

public TypeDictionary<object, ActorContextExtras> Store { get; } = new(5, 1);
internal TypeDictionary<object, ActorContextExtras> Store { get; } = new(5, 1);

public void InitReceiveTimeoutTimer(Timer timer) => ReceiveTimeoutTimer = timer;

Expand Down
5 changes: 0 additions & 5 deletions src/Proto.Actor/Context/IContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,6 @@ public interface IContext : ISenderContext, IReceiverContext, ISpawnerContext, I
/// <param name="header"></param>
void Respond(object message, MessageHeader header) => Respond(new MessageEnvelope(message, null, header));

/// <summary>
/// Stashes the current message on a stack for re-processing when the actor restarts.
/// </summary>
void Stash();

/// <summary>
/// Registers the actor as a watcher for the specified PID. When the PID terminates the watcher is notified with <see cref="Terminated"/> message.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions src/Proto.Actor/Context/ISpawnerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public static class SpawnerContextExtensions
/// <summary>
/// Spawns a new child actor based on props and named with a unique ID.
/// </summary>
/// <param name="self"></param>
/// <param name="props">The Props used to spawn the actor</param>
/// <returns>The PID of the child actor</returns>
public static PID Spawn(this ISpawnerContext self, Props props)
Expand All @@ -36,6 +37,7 @@ public static PID Spawn(this ISpawnerContext self, Props props)
/// <summary>
/// Spawns a new child actor based on props and named with a unique ID.
/// </summary>
/// <param name="self"></param>
/// <param name="props">The Props used to spawn the actor</param>
/// <param name="callback"></param>
/// <returns>The PID of the child actor</returns>
Expand Down
1 change: 1 addition & 0 deletions src/Proto.Actor/Deduplication/DeduplicationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace Proto.Deduplication;
/// Extracts the deduplication key from the message.
/// </summary>
/// <typeparam name="T">Type of the key</typeparam>
/// <param name="envelope">Message to extract from</param>
/// <param name="key">The key should be returned in this variable</param>
/// <returns>Returns true if the key was successfully extracted, false otherwise</returns>
public delegate bool TryGetDeduplicationKey<T>(MessageEnvelope envelope, out T? key);
Expand Down
10 changes: 0 additions & 10 deletions src/Proto.Actor/Messages/MessageBatch.cs

This file was deleted.

2 changes: 1 addition & 1 deletion src/Proto.Actor/Messages/MessageEnvelope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public MessageEnvelope WithHeader(string key, string value)
/// <summary>
/// Extends the message envelope with additional headers.
/// </summary>
/// <param name="header"></param>
/// <param name="items"></param>
/// <returns>New envelope</returns>
public MessageEnvelope WithHeaders(IEnumerable<KeyValuePair<string, string>> items)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Actor/PID.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ internal void SendUserMessage(ActorSystem system, object message)
reff.SendUserMessage(this, message);
}

public void SendSystemMessage(ActorSystem system, SystemMessage sys)
internal void SendSystemMessage(ActorSystem system, SystemMessage sys)
{
var reff = Ref(system) ?? system.ProcessRegistry.Get(this);
reff.SendSystemMessage(this, sys);
Expand Down
6 changes: 3 additions & 3 deletions src/Proto.Actor/Props/Props.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,16 @@ public sealed record Props
public ImmutableList<Func<Sender, Sender>> SenderMiddleware { get; init; } =
ImmutableList<Func<Sender, Sender>>.Empty;

public Receiver? ReceiverMiddlewareChain { get; init; }
public Sender? SenderMiddlewareChain { get; init; }
internal Receiver? ReceiverMiddlewareChain { get; init; }
internal Sender? SenderMiddlewareChain { get; init; }

/// <summary>
/// List of decorators for the actor context
/// </summary>
public ImmutableList<Func<IContext, IContext>> ContextDecorator { get; init; } =
ImmutableList<Func<IContext, IContext>>.Empty;

public Func<IContext, IContext>? ContextDecoratorChain { get; init; }
internal Func<IContext, IContext>? ContextDecoratorChain { get; init; }

/// <summary>
/// Delegate that creates the actor and wires it with context and mailbox.
Expand Down
Loading