From 551857f11e418e69fffd1ff2819b6309df04aec3 Mon Sep 17 00:00:00 2001 From: zetanova Date: Sat, 4 Dec 2021 15:57:14 +0100 Subject: [PATCH 01/22] clean and seal props --- .../Actor/LocalActorRefProviderSpec.cs | 2 +- src/core/Akka/Actor/ActorCell.cs | 8 +- src/core/Akka/Actor/Props.cs | 162 ++++++------------ 3 files changed, 54 insertions(+), 118 deletions(-) diff --git a/src/core/Akka.Tests/Actor/LocalActorRefProviderSpec.cs b/src/core/Akka.Tests/Actor/LocalActorRefProviderSpec.cs index d294f25a30d..6e3a0b88458 100644 --- a/src/core/Akka.Tests/Actor/LocalActorRefProviderSpec.cs +++ b/src/core/Akka.Tests/Actor/LocalActorRefProviderSpec.cs @@ -33,7 +33,7 @@ public void A_LocalActorRefs_ActorCell_must_not_retain_its_original_Props_when_T { var childPropsAfterTermination = ((LocalActorRef)child).Underlying.Props; Assert.NotEqual(childPropsBeforeTermination, childPropsAfterTermination); - Assert.Equal(ActorCell.TerminatedProps, childPropsAfterTermination); + Assert.Equal(Props.Terminated, childPropsAfterTermination); }); } diff --git a/src/core/Akka/Actor/ActorCell.cs b/src/core/Akka/Actor/ActorCell.cs index d8eabb6c149..d84d551e9bc 100644 --- a/src/core/Akka/Actor/ActorCell.cs +++ b/src/core/Akka/Actor/ActorCell.cs @@ -154,11 +154,7 @@ internal static ActorCell Current /// TBD /// internal bool ActorHasBeenCleared { get { return _actorHasBeenCleared; } } - /// - /// TBD - /// - internal static Props TerminatedProps { get; } = new TerminatedProps(); - + /// /// TBD /// @@ -426,7 +422,7 @@ public virtual void SendMessage(IActorRef sender, object message) protected void ClearActorCell() { UnstashAll(); - _props = TerminatedProps; + _props = Props.Terminated; } /// diff --git a/src/core/Akka/Actor/Props.cs b/src/core/Akka/Actor/Props.cs index d01ddbe2b07..30ace4a12b1 100644 --- a/src/core/Akka/Actor/Props.cs +++ b/src/core/Akka/Actor/Props.cs @@ -9,7 +9,6 @@ using System.Collections.Generic; using System.Linq; using System.Linq.Expressions; -using Akka.Configuration; using Akka.Dispatch; using Akka.Routing; using Akka.Util; @@ -32,12 +31,12 @@ namespace Akka.Actor /// /// /// - public class Props : IEquatable, ISurrogated + public sealed class Props : IEquatable, ISurrogated { private const string NullActorTypeExceptionText = "Props must be instantiated with an actor type."; private static readonly Deploy DefaultDeploy = new Deploy(); - private static readonly object[] NoArgs = { }; + private static readonly object[] NoArgs = Array.Empty(); /// /// A pre-configured that doesn't create actors. @@ -45,7 +44,7 @@ public class Props : IEquatable, ISurrogated /// The value of this field is null. /// /// - public static readonly Props None = null; + public static readonly Props None = default; private Type _inputType; private Type _outputType; private readonly IIndirectActorProducer _producer; @@ -162,7 +161,7 @@ public Props(Deploy deploy, Type type, IEnumerable args) /// The type of the actor to create. /// The arguments needed to create the actor. /// This exception is thrown if is an unknown actor producer. - public Props(Deploy deploy, Type type, params object[] args) + public Props(Deploy deploy, Type type, params object[] args) : this(CreateProducer(type, args), deploy, args) // have to preserve the "CreateProducer" call here to preserve backwards compat with Akka.DI.Core { @@ -176,13 +175,12 @@ public Props(Deploy deploy, Type type, params object[] args) /// /// The type of that will be used to instantiate /// The configuration used to deploy the actor. - /// The supervisor strategy to use. /// The arguments needed to create the actor. internal Props(IIndirectActorProducer producer, Deploy deploy, params object[] args) { Deploy = deploy; _inputType = producer.ActorType; - Arguments = args ?? NoArgs; + Arguments = args?.Length > 0 ? args : NoArgs; _producer = producer; } @@ -238,18 +236,23 @@ public string TypeName /// /// The configuration used to deploy the actor. /// - public Deploy Deploy { get; protected set; } + public Deploy Deploy { get; private set; } /// /// The supervisor strategy used to manage the actor. /// - public SupervisorStrategy SupervisorStrategy { get; protected set; } + public SupervisorStrategy SupervisorStrategy { get; private set; } /// /// A pre-configured that creates an actor that doesn't respond to messages. /// public static Props Empty { get; } = Create(); + /// + /// A pre-configured used when the actor has been terminated. + /// + public static Props Terminated { get; } = new Props(new TerminatedProducer(), DefaultDeploy, NoArgs); + /// /// The arguments needed to create the actor. /// @@ -300,13 +303,13 @@ private bool CompareSupervisorStrategy(Props other) private bool CompareArguments(Props other) { - if (other == null) + if (other is null) return false; - if (Arguments == null && other.Arguments == null) + if (Arguments is null && other.Arguments is null) return true; - if (Arguments == null) + if (Arguments is null) return false; if (Arguments.Length != other.Arguments.Length) @@ -353,15 +356,15 @@ public static Props Create(Expression> factory, SupervisorStrategy supervisorStrategy = null) where TActor : ActorBase { if (factory.Body is UnaryExpression) - return new DynamicProps(factory.Compile()); + return new Props(new FactoryConsumer(factory.Compile()), DefaultDeploy, NoArgs); - var newExpression = factory.Body.AsInstanceOf(); - if (newExpression == null) - throw new ArgumentException("The create function must be a 'new T (args)' expression"); + var newExpression = factory.Body.AsInstanceOf() + ?? throw new ArgumentException("The create function must be a 'new T (args)' expression"); var args = newExpression.GetArguments(); + args = args.Length > 0 ? args : NoArgs; - return new Props(new ActivatorProducer(typeof(TActor), args), DefaultDeploy, args){ SupervisorStrategy = supervisorStrategy }; + return new Props(new ActivatorProducer(typeof(TActor), args), DefaultDeploy, args) { SupervisorStrategy = supervisorStrategy }; } /// @@ -406,7 +409,7 @@ public static Props CreateBy(IIndirectActorProducer producer, params object[] ar /// The newly created . public static Props Create(SupervisorStrategy supervisorStrategy) where TActor : ActorBase, new() { - return new Props(new ActivatorProducer(typeof(TActor), NoArgs), DefaultDeploy, NoArgs){ SupervisorStrategy = supervisorStrategy }; + return new Props(new ActivatorProducer(typeof(TActor), NoArgs), DefaultDeploy, NoArgs) { SupervisorStrategy = supervisorStrategy }; } /// @@ -480,7 +483,7 @@ public Props WithRouter(RouterConfig routerConfig) public Props WithDeploy(Deploy deploy) { var copy = Copy(); - var original = copy.Deploy; + //var original = copy.Deploy; // TODO: this is a hack designed to preserve explicit router deployments https://github.com/akkadotnet/akka.net/issues/546 // in reality, we should be able to do copy.Deploy = deploy.WithFallback(copy.Deploy); but that blows up at the moment @@ -534,10 +537,8 @@ public Props WithSupervisorStrategy(SupervisorStrategy supervisorStrategy) /// with the arguments from . /// /// The newly created actor - public virtual ActorBase NewActor() + public ActorBase NewActor() { - var type = Type; - var arguments = Arguments; try { return _producer.Produce(); @@ -545,7 +546,7 @@ public virtual ActorBase NewActor() catch (Exception e) { throw new TypeLoadException( - $"Error while creating actor instance of type {type} with {arguments.Length} args: ({StringFormat.SafeJoin(",", arguments)})", + $"Error while creating actor instance of type {Type} with {Arguments.Length} args: ({StringFormat.SafeJoin(",", Arguments)})", e); } } @@ -554,7 +555,7 @@ public virtual ActorBase NewActor() /// Creates a copy of the current instance. /// /// The newly created - protected virtual Props Copy() + private Props Copy() { return new Props(_producer, Deploy, Arguments) { SupervisorStrategy = SupervisorStrategy }; } @@ -562,7 +563,7 @@ protected virtual Props Copy() [Obsolete("we should not be calling this method. Pass in an explicit IIndirectActorProducer reference instead.")] private static IIndirectActorProducer CreateProducer(Type type, object[] args) { - if (type == null) return DefaultProducer.Instance; + if (type is null) return DefaultProducer.Instance; if (typeof(IIndirectActorProducer).IsAssignableFrom(type)) return Activator.CreateInstance(type, args).AsInstanceOf(); @@ -578,21 +579,14 @@ private static IIndirectActorProducer CreateProducer(Type type, object[] args) /// The actor to release internal void Release(ActorBase actor) { - try - { - _producer?.Release(actor); - } - finally - { - actor = null; - } + _producer?.Release(actor); } /// /// This class represents a surrogate of a configuration object. /// Its main use is to help during the serialization process. /// - public class PropsSurrogate : ISurrogate + public sealed class PropsSurrogate : ISurrogate { /// /// The type of actor to create @@ -625,7 +619,7 @@ public ISurrogated FromSurrogate(ActorSystem system) /// /// This class represents a specialized that doesn't respond to messages. /// - internal class EmptyActor : UntypedActor + internal sealed class EmptyActor : UntypedActor { /// /// Handles messages received by the actor. @@ -636,9 +630,9 @@ protected override void OnReceive(object message) } } - private class DefaultProducer : IIndirectActorProducer + private sealed class DefaultProducer : IIndirectActorProducer { - private DefaultProducer(){} + private DefaultProducer() { } public static readonly DefaultProducer Instance = new DefaultProducer(); @@ -652,11 +646,26 @@ public ActorBase Produce() public void Release(ActorBase actor) { - actor = null; + //noop } } - private class ActivatorProducer : IIndirectActorProducer + private sealed class TerminatedProducer : IIndirectActorProducer + { + public ActorBase Produce() + { + throw new InvalidOperationException("This actor has been terminated"); + } + + public Type ActorType => typeof(ActorBase); + + + public void Release(ActorBase actor) + { + } + } + + private sealed class ActivatorProducer : IIndirectActorProducer { private readonly object[] _args; @@ -676,11 +685,11 @@ public ActorBase Produce() public void Release(ActorBase actor) { - actor = null; + //noop } } - private class FactoryConsumer : IIndirectActorProducer where TActor : ActorBase + private sealed class FactoryConsumer : IIndirectActorProducer where TActor : ActorBase { private readonly Func _factory; @@ -699,82 +708,13 @@ public ActorBase Produce() public void Release(ActorBase actor) { - actor = null; + //noop } } #endregion } - /// - /// This class represents a specialized used when the actor has been terminated. - /// - public class TerminatedProps : Props - { - /// - /// N/A - /// - /// This exception is thrown automatically since the actor has been terminated. - /// N/A - public override ActorBase NewActor() - { - throw new InvalidOperationException("This actor has been terminated"); - } - } - - /// - /// This class represents a specialized that uses dynamic invocation - /// to create new actor instances, rather than a traditional . - /// - /// This is intended to be used in conjunction with Dependency Injection. - /// - /// - /// The type of the actor to create. - internal class DynamicProps : Props where TActor : ActorBase - { - private readonly Func invoker; - - /// - /// Initializes a new instance of the class. - /// - /// The factory method used to create an actor. - public DynamicProps(Func invoker) - : base(typeof(TActor)) - { - this.invoker = invoker; - } - - /// - /// Creates a new actor using the configured factory method. - /// - /// The actor created using the factory method. - public override ActorBase NewActor() - { - return invoker.Invoke(); - } - - #region Copy methods - - private DynamicProps(Props copy, Func invoker) - : base(copy) - { - this.invoker = invoker; - } - - /// - /// Creates a copy of the current instance. - /// - /// The newly created - protected override Props Copy() - { - var initialCopy = base.Copy(); - var invokerCopy = (Func)invoker.Clone(); - return new DynamicProps(initialCopy, invokerCopy); - } - - #endregion - } - /// /// This interface defines a class of actor creation strategies deviating from /// the usual default of just reflectively instantiating the Actor From 1cd6c393cb156e2e7508de7374b37e65d1919e3a Mon Sep 17 00:00:00 2001 From: zetanova Date: Sat, 4 Dec 2021 17:27:57 +0100 Subject: [PATCH 02/22] make producer singleton --- .../Akka.DI.Core/DIActorProducer.cs | 33 ++-- .../dependencyinjection/Akka.DI.Core/DIExt.cs | 2 +- .../ServiceProvider.cs | 32 +--- .../ServiceProviderDependencyResolver.cs | 7 +- src/core/Akka.Remote.Tests/RemotingSpec.cs | 9 +- src/core/Akka.Tests/Actor/PropsSpec.cs | 4 +- src/core/Akka/Actor/Props.cs | 150 ++++++++---------- src/core/Akka/Util/Resolver.cs | 4 +- 8 files changed, 97 insertions(+), 144 deletions(-) diff --git a/src/contrib/dependencyinjection/Akka.DI.Core/DIActorProducer.cs b/src/contrib/dependencyinjection/Akka.DI.Core/DIActorProducer.cs index 08bdb1dc0c2..b113f88827a 100644 --- a/src/contrib/dependencyinjection/Akka.DI.Core/DIActorProducer.cs +++ b/src/contrib/dependencyinjection/Akka.DI.Core/DIActorProducer.cs @@ -13,12 +13,11 @@ namespace Akka.DI.Core /// /// This class represents an actor creation strategy that uses dependency injection (DI) to resolve and instantiate actors based on their type. /// - public class DIActorProducer : IIndirectActorProducer + public sealed class DIActorProducer : IIndirectActorProducer { - private IDependencyResolver dependencyResolver; - private Type actorType; - - readonly Func actorFactory; + private readonly IDependencyResolver _dependencyResolver; + private readonly Type _actorType; + private readonly Func _actorFactory; /// /// Initializes a new instance of the class. @@ -33,26 +32,20 @@ public DIActorProducer(IDependencyResolver dependencyResolver, Type actorType) if (dependencyResolver == null) throw new ArgumentNullException(nameof(dependencyResolver), $"DIActorProducer requires {nameof(dependencyResolver)} to be provided"); if (actorType == null) throw new ArgumentNullException(nameof(actorType), $"DIActorProducer requires {nameof(actorType)} to be provided"); - this.dependencyResolver = dependencyResolver; - this.actorType = actorType; - this.actorFactory = dependencyResolver.CreateActorFactory(actorType); + _dependencyResolver = dependencyResolver; + _actorType = actorType; + _actorFactory = dependencyResolver.CreateActorFactory(actorType); } - - /// - /// Retrieves the type of the actor to produce. - /// - public Type ActorType - { - get { return this.actorType; } - } - + /// /// Creates an actor based on the container's implementation specific actor factory. /// /// An actor created by the container. - public ActorBase Produce() + public ActorBase Produce(Props props) { - return actorFactory(); + if (props.Type != _actorType) + throw new InvalidOperationException($"invalid actor type {props.Type}"); + return _actorFactory(); } /// @@ -61,7 +54,7 @@ public ActorBase Produce() /// The actor to remove from the container. public void Release(ActorBase actor) { - dependencyResolver.Release(actor); + _dependencyResolver.Release(actor); } } } diff --git a/src/contrib/dependencyinjection/Akka.DI.Core/DIExt.cs b/src/contrib/dependencyinjection/Akka.DI.Core/DIExt.cs index 6a8b32a5827..aca2e3c3721 100644 --- a/src/contrib/dependencyinjection/Akka.DI.Core/DIExt.cs +++ b/src/contrib/dependencyinjection/Akka.DI.Core/DIExt.cs @@ -38,7 +38,7 @@ public void Initialize(IDependencyResolver dependencyResolver) /// A configuration object for the given actor type. public Props Props(Type actorType) { - return new Props(typeof(DIActorProducer), new object[] { dependencyResolver, actorType }); + return Akka.Actor.Props.CreateBy(new DIActorProducer(dependencyResolver, actorType), actorType); } } } diff --git a/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProvider.cs b/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProvider.cs index e990debf9a9..c15d2f45362 100644 --- a/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProvider.cs +++ b/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProvider.cs @@ -63,7 +63,7 @@ public static ServiceProvider For(ActorSystem actorSystem) /// A new instance which uses DI internally. public Props Props(params object[] args) where T : ActorBase { - return Akka.Actor.Props.CreateBy(new ServiceProviderActorProducer(Provider, args)); + return Akka.Actor.Props.CreateBy(new ServiceProviderActorProducer(Provider), typeof(T), args); } } @@ -93,43 +93,23 @@ public override ServiceProvider CreateExtension(ExtendedActorSystem system) /// /// Used to create actors via the . /// - internal class ServiceProviderActorProducer : IIndirectActorProducer + internal sealed class ServiceProviderActorProducer : IIndirectActorProducer { private readonly IServiceProvider _provider; - private readonly object[] _args; - - public ServiceProviderActorProducer(IServiceProvider provider, Type actorType, object[] args) + + public ServiceProviderActorProducer(IServiceProvider provider) { _provider = provider; - _args = args; - ActorType = actorType; } - public ActorBase Produce() + public ActorBase Produce(Props props) { - return (ActorBase)ActivatorUtilities.CreateInstance(_provider, ActorType, _args); + return (ActorBase)ActivatorUtilities.CreateInstance(_provider, props.Type, props.Arguments); } - public Type ActorType { get; } - public void Release(ActorBase actor) { // no-op } } - - /// - /// INTERNAL API - /// - /// Used to create actors via the . - /// - /// the actor type - internal class ServiceProviderActorProducer : ServiceProviderActorProducer where TActor:ActorBase - { - - public ServiceProviderActorProducer(IServiceProvider provider, object[] args) - : base(provider, typeof(TActor), args) - { - } - } } diff --git a/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderDependencyResolver.cs b/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderDependencyResolver.cs index 68f10aff02b..94743f05edc 100644 --- a/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderDependencyResolver.cs +++ b/src/contrib/dependencyinjection/Akka.DependencyInjection/ServiceProviderDependencyResolver.cs @@ -18,11 +18,14 @@ namespace Akka.DependencyInjection /// public class ServiceProviderDependencyResolver : IDependencyResolver { + private readonly ServiceProviderActorProducer _producer; + public IServiceProvider ServiceProvider { get; } public ServiceProviderDependencyResolver(IServiceProvider serviceProvider) { ServiceProvider = serviceProvider; + _producer = new ServiceProviderActorProducer(ServiceProvider); } public IResolverScope CreateScope() @@ -43,7 +46,7 @@ public object GetService(Type type) public Props Props(Type type, params object[] args) { if(typeof(ActorBase).IsAssignableFrom(type)) - return Akka.Actor.Props.CreateBy(new ServiceProviderActorProducer(ServiceProvider, type, args)); + return Akka.Actor.Props.CreateBy(_producer, type, args); throw new ArgumentException(nameof(type), $"[{type}] does not implement Akka.Actor.ActorBase."); } @@ -54,7 +57,7 @@ public Props Props(Type type) public Props Props(params object[] args) where T : ActorBase { - return Akka.Actor.Props.CreateBy(new ServiceProviderActorProducer(ServiceProvider, args)); + return Akka.Actor.Props.CreateBy(_producer, typeof(T), args); } } diff --git a/src/core/Akka.Remote.Tests/RemotingSpec.cs b/src/core/Akka.Remote.Tests/RemotingSpec.cs index 5c36790f4fa..952e495619a 100644 --- a/src/core/Akka.Remote.Tests/RemotingSpec.cs +++ b/src/core/Akka.Remote.Tests/RemotingSpec.cs @@ -412,7 +412,7 @@ public void Remoting_must_create_by_IndirectActorProducer() { try { - var r = Sys.ActorOf(Props.CreateBy(new TestResolver()), "echo"); + var r = Sys.ActorOf(Props.CreateBy(new TestResolver(), typeof(Echo2)), "echo"); Assert.Equal("akka.test://remote-sys@localhost:12346/remote/akka.test/RemotingSpec@localhost:12345/user/echo", r.Path.ToString()); } finally @@ -426,7 +426,7 @@ public void Remoting_must_create_by_IndirectActorProducer_and_ping() { try { - var r = Sys.ActorOf(Props.CreateBy(new TestResolver()), "echo"); + var r = Sys.ActorOf(Props.CreateBy(new TestResolver(), typeof(Echo2)), "echo"); Assert.Equal("akka.test://remote-sys@localhost:12346/remote/akka.test/RemotingSpec@localhost:12345/user/echo", r.Path.ToString()); r.Tell("ping", TestActor); ExpectMsg(("pong", TestActor), TimeSpan.FromSeconds(1.5)); @@ -911,8 +911,11 @@ public TestResolver(params object[] args) _args = args; } - public ActorBase Produce() + public ActorBase Produce(Props props) { + if (props.Type != ActorType) + throw new InvalidOperationException("invalid actor type"); + return (ActorBase)Activator.CreateInstance(ActorType, _args); } diff --git a/src/core/Akka.Tests/Actor/PropsSpec.cs b/src/core/Akka.Tests/Actor/PropsSpec.cs index 0fea33c30d1..f7a69c1e989 100644 --- a/src/core/Akka.Tests/Actor/PropsSpec.cs +++ b/src/core/Akka.Tests/Actor/PropsSpec.cs @@ -36,7 +36,7 @@ public void Props_must_create_actor_by_producer() { TestLatch latchProducer = new TestLatch(); TestLatch latchActor = new TestLatch(); - var props = Props.CreateBy(new TestProducer(latchProducer, latchActor)); + var props = Props.CreateBy(new TestProducer(latchProducer, latchActor), typeof(PropsTestActor)); IActorRef actor = Sys.ActorOf(props); latchActor.Ready(TimeSpan.FromSeconds(1)); } @@ -87,7 +87,7 @@ public TestProducer(TestLatch lp, TestLatch la) lp.CountDown(); } - public ActorBase Produce() + public ActorBase Produce(Props props) { latchActor.CountDown(); return new PropsTestActor(); diff --git a/src/core/Akka/Actor/Props.cs b/src/core/Akka/Actor/Props.cs index 30ace4a12b1..3fe501c18aa 100644 --- a/src/core/Akka/Actor/Props.cs +++ b/src/core/Akka/Actor/Props.cs @@ -44,28 +44,9 @@ public sealed class Props : IEquatable, ISurrogated /// The value of this field is null. /// /// - public static readonly Props None = default; - private Type _inputType; - private Type _outputType; - private readonly IIndirectActorProducer _producer; - - /// - /// Initializes a new instance of the class. - /// - protected Props() - : this(DefaultDeploy, null, NoArgs) - { - } + public static readonly Props None = null; - /// - /// Initializes a new instance of the class. - /// - /// The object that is being cloned. - protected Props(Props copy) - : this(copy._producer, copy.Deploy, copy.Arguments) - { - SupervisorStrategy = copy.SupervisorStrategy; - } + private readonly IIndirectActorProducer _producer; /// /// Initializes a new instance of the class. @@ -162,9 +143,12 @@ public Props(Deploy deploy, Type type, IEnumerable args) /// The arguments needed to create the actor. /// This exception is thrown if is an unknown actor producer. public Props(Deploy deploy, Type type, params object[] args) - : this(CreateProducer(type, args), deploy, args) // have to preserve the "CreateProducer" call here to preserve backwards compat with Akka.DI.Core { - + Deploy = deploy; + Type = type; + Arguments = args?.Length > 0 ? args : NoArgs; + // have to preserve the "CreateProducer" call here to preserve backwards compat with Akka.DI.Core + _producer = CreateProducer(type, args); } /// @@ -175,11 +159,12 @@ public Props(Deploy deploy, Type type, params object[] args) /// /// The type of that will be used to instantiate /// The configuration used to deploy the actor. + /// The type of the actor to create. /// The arguments needed to create the actor. - internal Props(IIndirectActorProducer producer, Deploy deploy, params object[] args) + internal Props(IIndirectActorProducer producer, Deploy deploy, Type type, params object[] args) { Deploy = deploy; - _inputType = producer.ActorType; + Type = type; Arguments = args?.Length > 0 ? args : NoArgs; _producer = producer; } @@ -188,15 +173,7 @@ internal Props(IIndirectActorProducer producer, Deploy deploy, params object[] a /// The type of the actor that is created. /// [JsonIgnore] - public Type Type - { - get - { - if (_outputType == null) _outputType = _producer.ActorType; - - return _outputType; - } - } + public Type Type { get; private set; } /// /// The dispatcher used in the deployment of the actor. @@ -222,9 +199,9 @@ public string Dispatcher /// public string TypeName { - get => _inputType.AssemblyQualifiedName; + get => Type.AssemblyQualifiedName; //for serialization - private set => _inputType = Type.GetType(value); + private set => Type = Type.GetType(value); } /// @@ -251,7 +228,7 @@ public string TypeName /// /// A pre-configured used when the actor has been terminated. /// - public static Props Terminated { get; } = new Props(new TerminatedProducer(), DefaultDeploy, NoArgs); + public static Props Terminated { get; } = new Props(InvalidProducer.Terminated, DefaultDeploy, typeof(EmptyActor), NoArgs); /// /// The arguments needed to create the actor. @@ -285,7 +262,7 @@ public ISurrogate ToSurrogate(ActorSystem system) private bool CompareInputType(Props other) { - return _inputType == other._inputType; + return Type == other.Type; } private bool CompareDeploy(Props other) @@ -339,7 +316,7 @@ public override int GetHashCode() var hashCode = Deploy != null ? Deploy.GetHashCode() : 0; // hashCode = (hashCode*397) ^ (SupervisorStrategy != null ? SupervisorStrategy.GetHashCode() : 0); // hashCode = (hashCode*397) ^ (Arguments != null ? Arguments.GetHashCode() : 0); - hashCode = (hashCode * 397) ^ (_inputType != null ? _inputType.GetHashCode() : 0); + hashCode = (hashCode * 397) ^ (Type?.GetHashCode() ?? 0); return hashCode; } } @@ -356,15 +333,15 @@ public static Props Create(Expression> factory, SupervisorStrategy supervisorStrategy = null) where TActor : ActorBase { if (factory.Body is UnaryExpression) - return new Props(new FactoryConsumer(factory.Compile()), DefaultDeploy, NoArgs); + return new Props(new FactoryConsumer(factory.Compile()), DefaultDeploy, typeof(TActor), NoArgs); - var newExpression = factory.Body.AsInstanceOf() + var newExpression = factory.Body as NewExpression ?? throw new ArgumentException("The create function must be a 'new T (args)' expression"); var args = newExpression.GetArguments(); args = args.Length > 0 ? args : NoArgs; - return new Props(new ActivatorProducer(typeof(TActor), args), DefaultDeploy, args) { SupervisorStrategy = supervisorStrategy }; + return new Props(ActivatorProducer.Instance, DefaultDeploy, typeof(TActor), args) { SupervisorStrategy = supervisorStrategy }; } /// @@ -375,30 +352,38 @@ public static Props Create(Expression> factory, /// The newly created . public static Props Create(params object[] args) where TActor : ActorBase { - return new Props(new ActivatorProducer(typeof(TActor), args), DefaultDeploy, args); + return new Props(ActivatorProducer.Instance, DefaultDeploy, typeof(TActor), args); } /// /// Creates an actor using a specified actor producer. /// /// The type of producer used to create the actor. + /// The type of the actor to create. /// The arguments needed to create the actor. /// The newly created . [Obsolete("Do not use this method. Call CreateBy(IIndirectActorProducer, params object[] args) instead")] - public static Props CreateBy(params object[] args) where TProducer : class, IIndirectActorProducer + public static Props CreateBy(Type type, params object[] args) where TProducer : class, IIndirectActorProducer { - return new Props(typeof(TProducer), args); + IIndirectActorProducer producer; + if (typeof(TProducer) == typeof(ActivatorProducer)) + producer = ActivatorProducer.Instance; + else + producer = Activator.CreateInstance(); + + return new Props(producer, DefaultDeploy, type, args); } /// /// Creates an actor using a specified actor producer. /// /// The actor producer that will be used to create the underlying actor.. + /// The type of the actor to create. /// The arguments needed to create the actor. /// The newly created . - public static Props CreateBy(IIndirectActorProducer producer, params object[] args) + public static Props CreateBy(IIndirectActorProducer producer, Type type, params object[] args) { - return new Props(producer, DefaultDeploy, args); + return new Props(producer, DefaultDeploy, type, args); } /// @@ -409,7 +394,7 @@ public static Props CreateBy(IIndirectActorProducer producer, params object[] ar /// The newly created . public static Props Create(SupervisorStrategy supervisorStrategy) where TActor : ActorBase, new() { - return new Props(new ActivatorProducer(typeof(TActor), NoArgs), DefaultDeploy, NoArgs) { SupervisorStrategy = supervisorStrategy }; + return new Props(ActivatorProducer.Instance, DefaultDeploy, typeof(TActor), NoArgs) { SupervisorStrategy = supervisorStrategy }; } /// @@ -541,7 +526,7 @@ public ActorBase NewActor() { try { - return _producer.Produce(); + return _producer.Produce(this); } catch (Exception e) { @@ -557,18 +542,20 @@ public ActorBase NewActor() /// The newly created private Props Copy() { - return new Props(_producer, Deploy, Arguments) { SupervisorStrategy = SupervisorStrategy }; + return new Props(_producer, Deploy, Type, Arguments) { SupervisorStrategy = SupervisorStrategy }; } [Obsolete("we should not be calling this method. Pass in an explicit IIndirectActorProducer reference instead.")] private static IIndirectActorProducer CreateProducer(Type type, object[] args) { - if (type is null) return DefaultProducer.Instance; + if (type is null) + return DefaultProducer.Instance; if (typeof(IIndirectActorProducer).IsAssignableFrom(type)) return Activator.CreateInstance(type, args).AsInstanceOf(); - if (typeof(ActorBase).IsAssignableFrom(type)) return new ActivatorProducer(type, args); + if (typeof(ActorBase).IsAssignableFrom(type)) + return ActivatorProducer.Instance; throw new ArgumentException($"Unknown actor producer [{type.FullName}]", nameof(type)); } @@ -636,56 +623,52 @@ private DefaultProducer() { } public static readonly DefaultProducer Instance = new DefaultProducer(); - public ActorBase Produce() + public ActorBase Produce(Props props) { - throw new InvalidOperationException("No actor producer specified!"); + throw new InvalidOperationException(); } - public Type ActorType => typeof(ActorBase); - - public void Release(ActorBase actor) { //noop } } - private sealed class TerminatedProducer : IIndirectActorProducer + private sealed class InvalidProducer : IIndirectActorProducer { - public ActorBase Produce() + public readonly static InvalidProducer Default = new InvalidProducer("No actor producer specified!"); + public readonly static InvalidProducer Terminated = new InvalidProducer("This actor has been terminated"); + + private readonly string _message; + + public InvalidProducer(string message) { - throw new InvalidOperationException("This actor has been terminated"); + _message = message; } - public Type ActorType => typeof(ActorBase); - + public ActorBase Produce(Props props) + { + throw new InvalidOperationException(_message); + } public void Release(ActorBase actor) { + //no-op } } private sealed class ActivatorProducer : IIndirectActorProducer { - private readonly object[] _args; - - public ActivatorProducer(Type actorType, object[] args) - { - ActorType = actorType; - _args = args; - } + public readonly static ActivatorProducer Instance = new ActivatorProducer(); - public ActorBase Produce() + public ActorBase Produce(Props props) { - return Activator.CreateInstance(ActorType, _args).AsInstanceOf(); + return (ActorBase)Activator.CreateInstance(props.Type, props.Arguments); } - public Type ActorType { get; } - - public void Release(ActorBase actor) { - //noop + //no-op } } @@ -698,17 +681,14 @@ public FactoryConsumer(Func factory) _factory = factory; } - public ActorBase Produce() + public ActorBase Produce(Props props) { return _factory.Invoke(); } - public Type ActorType => typeof(TActor); - - public void Release(ActorBase actor) { - //noop + //no-op } } @@ -723,20 +703,14 @@ public void Release(ActorBase actor) /// public interface IIndirectActorProducer { - /// - /// This method is used by to determine the type of actor to create. - /// The returned type is not used to produce the actor. - /// - /// The type of the actor created. - Type ActorType { get; } - /// /// This factory method must produce a fresh actor instance upon each /// invocation. It is not permitted to return the same instance more than /// once. /// + /// The actor props /// A fresh actor instance. - ActorBase Produce(); + ActorBase Produce(Props props); /// /// This method is used by to signal the producer that it can diff --git a/src/core/Akka/Util/Resolver.cs b/src/core/Akka/Util/Resolver.cs index 12655a36de1..06ccfc59f71 100644 --- a/src/core/Akka/Util/Resolver.cs +++ b/src/core/Akka/Util/Resolver.cs @@ -33,7 +33,7 @@ public abstract class Resolve : IIndirectActorProducer /// TBD /// /// TBD - public abstract ActorBase Produce(); + public abstract ActorBase Produce(Props props); /// /// TBD /// @@ -85,7 +85,7 @@ public Resolve(params object[] args) /// This exception is thrown if the current is undefined. /// /// TBD - public override ActorBase Produce() + public override ActorBase Produce(Props props) { if (Resolver == null) { From f36a4e7395500ba2376e0d111148dcb6b26f3c03 Mon Sep 17 00:00:00 2001 From: zetanova Date: Sat, 4 Dec 2021 18:02:37 +0100 Subject: [PATCH 03/22] add legacy support --- .../Akka.DI.Core/DIActorProducer.cs | 4 +- src/core/Akka/Actor/Props.cs | 47 +++++++++---------- 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/src/contrib/dependencyinjection/Akka.DI.Core/DIActorProducer.cs b/src/contrib/dependencyinjection/Akka.DI.Core/DIActorProducer.cs index b113f88827a..24fc1350d43 100644 --- a/src/contrib/dependencyinjection/Akka.DI.Core/DIActorProducer.cs +++ b/src/contrib/dependencyinjection/Akka.DI.Core/DIActorProducer.cs @@ -13,12 +13,14 @@ namespace Akka.DI.Core /// /// This class represents an actor creation strategy that uses dependency injection (DI) to resolve and instantiate actors based on their type. /// - public sealed class DIActorProducer : IIndirectActorProducer + public sealed class DIActorProducer : IIndirectActorProducerWithActorType { private readonly IDependencyResolver _dependencyResolver; private readonly Type _actorType; private readonly Func _actorFactory; + public Type ActorType => _actorType; + /// /// Initializes a new instance of the class. /// diff --git a/src/core/Akka/Actor/Props.cs b/src/core/Akka/Actor/Props.cs index 3fe501c18aa..2f2b044a2da 100644 --- a/src/core/Akka/Actor/Props.cs +++ b/src/core/Akka/Actor/Props.cs @@ -12,7 +12,6 @@ using Akka.Dispatch; using Akka.Routing; using Akka.Util; -using Akka.Util.Internal; using Akka.Util.Reflection; using Newtonsoft.Json; @@ -145,10 +144,9 @@ public Props(Deploy deploy, Type type, IEnumerable args) public Props(Deploy deploy, Type type, params object[] args) { Deploy = deploy; - Type = type; - Arguments = args?.Length > 0 ? args : NoArgs; // have to preserve the "CreateProducer" call here to preserve backwards compat with Akka.DI.Core - _producer = CreateProducer(type, args); + (_producer, Type, Arguments) = CreateProducer(type, args); + Arguments = Arguments?.Length > 0 ? Arguments : NoArgs; } /// @@ -546,16 +544,23 @@ private Props Copy() } [Obsolete("we should not be calling this method. Pass in an explicit IIndirectActorProducer reference instead.")] - private static IIndirectActorProducer CreateProducer(Type type, object[] args) + private static (IIndirectActorProducer, Type, object[] args) CreateProducer(Type type, object[] args) { if (type is null) - return DefaultProducer.Instance; + return (InvalidProducer.Default, typeof(EmptyActor), NoArgs); if (typeof(IIndirectActorProducer).IsAssignableFrom(type)) - return Activator.CreateInstance(type, args).AsInstanceOf(); + { + var producer = (IIndirectActorProducer)Activator.CreateInstance(type, args); + if (producer is IIndirectActorProducerWithActorType p) + return (producer, p.ActorType, NoArgs); + + //maybe throw new ArgumentException($"Unsupported legacy actor producer [{type.FullName}]", nameof(type)); + return (producer, typeof(ActorBase), NoArgs); + } if (typeof(ActorBase).IsAssignableFrom(type)) - return ActivatorProducer.Instance; + return (ActivatorProducer.Instance, type, args); throw new ArgumentException($"Unknown actor producer [{type.FullName}]", nameof(type)); } @@ -617,23 +622,6 @@ protected override void OnReceive(object message) } } - private sealed class DefaultProducer : IIndirectActorProducer - { - private DefaultProducer() { } - - public static readonly DefaultProducer Instance = new DefaultProducer(); - - public ActorBase Produce(Props props) - { - throw new InvalidOperationException(); - } - - public void Release(ActorBase actor) - { - //noop - } - } - private sealed class InvalidProducer : IIndirectActorProducer { public readonly static InvalidProducer Default = new InvalidProducer("No actor producer specified!"); @@ -723,4 +711,13 @@ public interface IIndirectActorProducer /// The actor to release void Release(ActorBase actor); } + + /// + /// Interface for legacy Akka.DI.Core support + /// + [Obsolete("Do not use this interface")] + public interface IIndirectActorProducerWithActorType : IIndirectActorProducer + { + Type ActorType { get; } + } } From d2b2508a602e6d3ec6642bb0b2accb4e15968321 Mon Sep 17 00:00:00 2001 From: zetanova Date: Sat, 4 Dec 2021 18:04:16 +0100 Subject: [PATCH 04/22] update api --- .../CoreAPISpec.ApproveCore.approved.txt | 35 +++++++++---------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt index 1f49a93fc58..7848ff9a9f1 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt @@ -1072,10 +1072,14 @@ namespace Akka.Actor } public interface IIndirectActorProducer { - System.Type ActorType { get; } - Akka.Actor.ActorBase Produce(); + Akka.Actor.ActorBase Produce(Akka.Actor.Props props); void Release(Akka.Actor.ActorBase actor); } + [System.ObsoleteAttribute("Do not use this interface")] + public interface IIndirectActorProducerWithActorType : Akka.Actor.IIndirectActorProducer + { + System.Type ActorType { get; } + } public interface IInternalActor { Akka.Actor.IActorContext ActorContext { get; } @@ -1409,11 +1413,9 @@ namespace Akka.Actor public System.Exception RestartException { get; } public override void GetObjectData(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { } } - public class Props : Akka.Util.ISurrogated, System.IEquatable + public sealed class Props : Akka.Util.ISurrogated, System.IEquatable { public static readonly Akka.Actor.Props None; - protected Props() { } - protected Props(Akka.Actor.Props copy) { } public Props(System.Type type, object[] args) { } public Props(System.Type type) { } public Props(System.Type type, Akka.Actor.SupervisorStrategy supervisorStrategy, System.Collections.Generic.IEnumerable args) { } @@ -1421,7 +1423,7 @@ namespace Akka.Actor public Props(Akka.Actor.Deploy deploy, System.Type type, System.Collections.Generic.IEnumerable args) { } public Props(Akka.Actor.Deploy deploy, System.Type type, params object[] args) { } public object[] Arguments { get; } - public Akka.Actor.Deploy Deploy { get; set; } + public Akka.Actor.Deploy Deploy { get; } [Newtonsoft.Json.JsonIgnoreAttribute()] public string Dispatcher { get; } public static Akka.Actor.Props Empty { get; } @@ -1429,11 +1431,11 @@ namespace Akka.Actor public string Mailbox { get; } [Newtonsoft.Json.JsonIgnoreAttribute()] public Akka.Routing.RouterConfig RouterConfig { get; } - public Akka.Actor.SupervisorStrategy SupervisorStrategy { get; set; } + public Akka.Actor.SupervisorStrategy SupervisorStrategy { get; } + public static Akka.Actor.Props Terminated { get; } [Newtonsoft.Json.JsonIgnoreAttribute()] public System.Type Type { get; } public string TypeName { get; } - protected virtual Akka.Actor.Props Copy() { } public static Akka.Actor.Props Create(System.Linq.Expressions.Expression> factory, Akka.Actor.SupervisorStrategy supervisorStrategy = null) where TActor : Akka.Actor.ActorBase { } public static Akka.Actor.Props Create(params object[] args) @@ -1443,20 +1445,20 @@ namespace Akka.Actor public static Akka.Actor.Props Create(System.Type type, params object[] args) { } [System.ObsoleteAttribute("Do not use this method. Call CreateBy(IIndirectActorProducer, params object[] arg" + "s) instead")] - public static Akka.Actor.Props CreateBy(params object[] args) + public static Akka.Actor.Props CreateBy(System.Type type, params object[] args) where TProducer : class, Akka.Actor.IIndirectActorProducer { } - public static Akka.Actor.Props CreateBy(Akka.Actor.IIndirectActorProducer producer, params object[] args) { } + public static Akka.Actor.Props CreateBy(Akka.Actor.IIndirectActorProducer producer, System.Type type, params object[] args) { } public bool Equals(Akka.Actor.Props other) { } public override bool Equals(object obj) { } public override int GetHashCode() { } - public virtual Akka.Actor.ActorBase NewActor() { } + public Akka.Actor.ActorBase NewActor() { } public Akka.Util.ISurrogate ToSurrogate(Akka.Actor.ActorSystem system) { } public Akka.Actor.Props WithDeploy(Akka.Actor.Deploy deploy) { } public Akka.Actor.Props WithDispatcher(string dispatcher) { } public Akka.Actor.Props WithMailbox(string mailbox) { } public Akka.Actor.Props WithRouter(Akka.Routing.RouterConfig routerConfig) { } public Akka.Actor.Props WithSupervisorStrategy(Akka.Actor.SupervisorStrategy supervisorStrategy) { } - public class PropsSurrogate : Akka.Util.ISurrogate + public sealed class PropsSurrogate : Akka.Util.ISurrogate { public PropsSurrogate() { } public object[] Arguments { get; set; } @@ -1783,11 +1785,6 @@ namespace Akka.Actor public override int GetHashCode() { } public override string ToString() { } } - public class TerminatedProps : Akka.Actor.Props - { - public TerminatedProps() { } - public override Akka.Actor.ActorBase NewActor() { } - } [System.ObsoleteAttribute("TypedActor in its current shape will be removed in v1.5")] public abstract class TypedActor : Akka.Actor.ActorBase { @@ -5033,7 +5030,7 @@ namespace Akka.Util protected Resolve() { } public abstract System.Type ActorType { get; } protected static Akka.Util.IResolver Resolver { get; } - public abstract Akka.Actor.ActorBase Produce(); + public abstract Akka.Actor.ActorBase Produce(Akka.Actor.Props props); public void Release(Akka.Actor.ActorBase actor) { } public static void SetResolver(Akka.Util.IResolver resolver) { } } @@ -5043,7 +5040,7 @@ namespace Akka.Util public Resolve(params object[] args) { } public override System.Type ActorType { get; } public object[] Arguments { get; } - public override Akka.Actor.ActorBase Produce() { } + public override Akka.Actor.ActorBase Produce(Akka.Actor.Props props) { } } public class static Result { From f22a29c27ba5c53635b6f56e03031dc0d190b083 Mon Sep 17 00:00:00 2001 From: zetanova Date: Sun, 5 Dec 2021 02:04:43 +0100 Subject: [PATCH 05/22] add producer type compare --- src/core/Akka/Actor/Props.cs | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/src/core/Akka/Actor/Props.cs b/src/core/Akka/Actor/Props.cs index 2f2b044a2da..3e2735abd9f 100644 --- a/src/core/Akka/Actor/Props.cs +++ b/src/core/Akka/Actor/Props.cs @@ -244,8 +244,11 @@ public bool Equals(Props other) { if (ReferenceEquals(null, other)) return false; if (ReferenceEquals(this, other)) return true; - return CompareDeploy(other) && CompareSupervisorStrategy(other) && CompareArguments(other) && - CompareInputType(other); + return Type == other.Type + && _producer.GetType() == other._producer.GetType() + && Deploy.Equals(other.Deploy) + && CompareSupervisorStrategy(other) + && CompareArguments(other); } /// @@ -258,16 +261,6 @@ public ISurrogate ToSurrogate(ActorSystem system) return new PropsSurrogate { Arguments = Arguments, Type = Type, Deploy = Deploy }; } - private bool CompareInputType(Props other) - { - return Type == other.Type; - } - - private bool CompareDeploy(Props other) - { - return Deploy.Equals(other.Deploy); - } - #pragma warning disable CS0162 // Disabled because it's marked as a TODO private bool CompareSupervisorStrategy(Props other) { @@ -302,7 +295,6 @@ public override bool Equals(object obj) { if (ReferenceEquals(null, obj)) return false; if (ReferenceEquals(this, obj)) return true; - if (obj.GetType() != GetType()) return false; return Equals((Props)obj); } From 3d3452e19ed0d155af1c936c2dd1384b50a3b97c Mon Sep 17 00:00:00 2001 From: zetanova Date: Sun, 5 Dec 2021 08:14:24 +0100 Subject: [PATCH 06/22] fix props equals --- src/core/Akka/Actor/Props.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka/Actor/Props.cs b/src/core/Akka/Actor/Props.cs index 3e2735abd9f..393b75bd6bf 100644 --- a/src/core/Akka/Actor/Props.cs +++ b/src/core/Akka/Actor/Props.cs @@ -295,7 +295,7 @@ public override bool Equals(object obj) { if (ReferenceEquals(null, obj)) return false; if (ReferenceEquals(this, obj)) return true; - return Equals((Props)obj); + return Equals(obj as Props); } /// From b9d6b0787a01646bb1bbf4cd1b1e2fe5050509be Mon Sep 17 00:00:00 2001 From: zetanova Date: Sun, 5 Dec 2021 08:27:12 +0100 Subject: [PATCH 07/22] fix test for invalid props expression --- src/core/Akka.FSharp.Tests/ApiTests.fs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka.FSharp.Tests/ApiTests.fs b/src/core/Akka.FSharp.Tests/ApiTests.fs index 548a4db67dd..8f3a16d51e6 100644 --- a/src/core/Akka.FSharp.Tests/ApiTests.fs +++ b/src/core/Akka.FSharp.Tests/ApiTests.fs @@ -120,7 +120,7 @@ let ``cannot spawn actor with simple expr args from expression`` () = let system = Configuration.load() |> System.create "test" // this formulation is supported in FsApi's expression evaluator, however the checks in Props.Create // do not support this, so we test that we can evaluate this but not actually run it, as a proof of concept - Assert.Throws(fun () -> + Assert.Throws(fun () -> let actor = spawnObj system "test-actor" <@ fun () -> let arg1 = 1 let arg2 = true From 8bb422b9ca6b470fb68c59ab81a7a211a4aa0cd9 Mon Sep 17 00:00:00 2001 From: zetanova Date: Sun, 5 Dec 2021 11:00:48 +0100 Subject: [PATCH 08/22] fix racy test and testkit --- .../Dsl/FutureFlattenSourceSpec.cs | 2 +- .../ExceptionEventFilterTests.cs | 6 +++- .../Internal/EventFilterApplier.cs | 31 ++++++++++--------- src/core/Akka/Event/Logging.cs | 5 ++- 4 files changed, 27 insertions(+), 17 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/FutureFlattenSourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FutureFlattenSourceSpec.cs index c00c35994c5..609c5ddd14f 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FutureFlattenSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FutureFlattenSourceSpec.cs @@ -249,7 +249,7 @@ public void TaskSource_must_fail_when_the_task_source_materialization_fails() var (innerSourceMat, outerSinkMat) = Source.FromTaskSource(inner).ToMaterialized(Sink.Seq(), Keep.Both).Run(_materializer); // wait until the underlying tasks are completed - Thread.Sleep(100); + AwaitCondition(() => outerSinkMat.IsFaulted && innerSourceMat.IsFaulted, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(100)); outerSinkMat.Exception.Flatten().InnerException.Should().Be(new TestException("INNER_FAILED")); innerSourceMat.Exception.Flatten().InnerException.Should().Be(new TestException("INNER_FAILED")); diff --git a/src/core/Akka.TestKit.Tests/TestEventListenerTests/ExceptionEventFilterTests.cs b/src/core/Akka.TestKit.Tests/TestEventListenerTests/ExceptionEventFilterTests.cs index 13544350e5a..32c17c0d7a3 100644 --- a/src/core/Akka.TestKit.Tests/TestEventListenerTests/ExceptionEventFilterTests.cs +++ b/src/core/Akka.TestKit.Tests/TestEventListenerTests/ExceptionEventFilterTests.cs @@ -120,13 +120,17 @@ public void SpecifiedNumbersOfExceptionsCanBeIntercepted() [Fact] public void ShouldFailIfMoreExceptionsThenSpecifiedAreLogged() { + //todo fix logging race var exception = XAssert.Throws(() => + { EventFilter.Exception().Expect(2, () => { Log.Error(new SomeException(), "whatever"); Log.Error(new SomeException(), "whatever"); Log.Error(new SomeException(), "whatever"); - })); + }); + }); + Assert.Contains("1 message too many", exception.Message, StringComparison.OrdinalIgnoreCase); } diff --git a/src/core/Akka.TestKit/EventFilter/Internal/EventFilterApplier.cs b/src/core/Akka.TestKit/EventFilter/Internal/EventFilterApplier.cs index d375a7cda3f..75a2ab15a99 100644 --- a/src/core/Akka.TestKit/EventFilter/Internal/EventFilterApplier.cs +++ b/src/core/Akka.TestKit/EventFilter/Internal/EventFilterApplier.cs @@ -395,22 +395,25 @@ protected async Task InterceptAsync(Func> func, ActorSystem system /// TBD protected bool AwaitDone(TimeSpan timeout, int? expectedOccurrences, MatchedEventHandler matchedEventHandler) { - if (expectedOccurrences.HasValue) + if (!expectedOccurrences.HasValue) return true; + + var expected = expectedOccurrences ?? 0; + if (expected > 0) { - var expected = expectedOccurrences.GetValueOrDefault(); - if (expected > 0) - { - _testkit.AwaitConditionNoThrow(() => matchedEventHandler.ReceivedCount >= expected, timeout); - return matchedEventHandler.ReceivedCount == expected; - } - else - { - // if expecting no events to arrive - assert that given condition will never match - var foundEvent = _testkit.AwaitConditionNoThrow(() => matchedEventHandler.ReceivedCount > 0, timeout); - return foundEvent == false; - } + _testkit.AwaitConditionNoThrow(() => matchedEventHandler.ReceivedCount >= expected, timeout); + + //wait for late tail messages to arrive + if (matchedEventHandler.ReceivedCount == expected) + TimeSpan.FromMilliseconds(100); + + return matchedEventHandler.ReceivedCount == expected; + } + else + { + // if expecting no events to arrive - assert that given condition will never match + var foundEvent = _testkit.AwaitConditionNoThrow(() => matchedEventHandler.ReceivedCount > 0, timeout); + return foundEvent == false; } - return true; } /// diff --git a/src/core/Akka/Event/Logging.cs b/src/core/Akka/Event/Logging.cs index a1104ed3dd5..21f37eb32d8 100644 --- a/src/core/Akka/Event/Logging.cs +++ b/src/core/Akka/Event/Logging.cs @@ -105,9 +105,12 @@ public static string FromActor(IActorContext actor, ActorSystem system) public static string FromActorRef(IActorRef a, ActorSystem system) { + var extendedSystem = system as ExtendedActorSystem + ?? throw new ArgumentException("instance of ExtendedActorSystem required", nameof(system)); + try { - return a.Path.ToStringWithAddress(system.AsInstanceOf().Provider.DefaultAddress); + return a.Path.ToStringWithAddress(extendedSystem.Provider.DefaultAddress); } catch // can fail if the ActorSystem (remoting) is not completely started yet { From 07fc952a9503172d0a6afc6dd64c5b81f99604e2 Mon Sep 17 00:00:00 2001 From: zetanova Date: Sun, 5 Dec 2021 11:12:23 +0100 Subject: [PATCH 09/22] fix thread sleep --- .../Internal/EventFilterApplier.cs | 32 ++++++++----------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/src/core/Akka.TestKit/EventFilter/Internal/EventFilterApplier.cs b/src/core/Akka.TestKit/EventFilter/Internal/EventFilterApplier.cs index 75a2ab15a99..f897259d823 100644 --- a/src/core/Akka.TestKit/EventFilter/Internal/EventFilterApplier.cs +++ b/src/core/Akka.TestKit/EventFilter/Internal/EventFilterApplier.cs @@ -395,25 +395,21 @@ protected async Task InterceptAsync(Func> func, ActorSystem system /// TBD protected bool AwaitDone(TimeSpan timeout, int? expectedOccurrences, MatchedEventHandler matchedEventHandler) { - if (!expectedOccurrences.HasValue) return true; - - var expected = expectedOccurrences ?? 0; - if (expected > 0) - { - _testkit.AwaitConditionNoThrow(() => matchedEventHandler.ReceivedCount >= expected, timeout); - - //wait for late tail messages to arrive - if (matchedEventHandler.ReceivedCount == expected) - TimeSpan.FromMilliseconds(100); + if (!expectedOccurrences.HasValue) + return true; - return matchedEventHandler.ReceivedCount == expected; - } - else - { - // if expecting no events to arrive - assert that given condition will never match - var foundEvent = _testkit.AwaitConditionNoThrow(() => matchedEventHandler.ReceivedCount > 0, timeout); - return foundEvent == false; - } + // if expecting no events to arrive - assert that given condition will never match + if (expectedOccurrences.Value == 0) + return !_testkit.AwaitConditionNoThrow(() => matchedEventHandler.ReceivedCount > 0, timeout); + + var expected = expectedOccurrences.Value; + _testkit.AwaitConditionNoThrow(() => matchedEventHandler.ReceivedCount >= expected, timeout); + + //wait for late tail messages to arrive + if (matchedEventHandler.ReceivedCount == expected) + Thread.Sleep(100); + + return matchedEventHandler.ReceivedCount == expected; } /// From 171a6bc3ed62dfb6b04a81525737121d8f092fee Mon Sep 17 00:00:00 2001 From: zetanova Date: Sun, 5 Dec 2021 11:22:07 +0100 Subject: [PATCH 10/22] lower condition interval --- src/core/Akka.Streams.Tests/Dsl/FutureFlattenSourceSpec.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/FutureFlattenSourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FutureFlattenSourceSpec.cs index 609c5ddd14f..6b23961ec10 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FutureFlattenSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FutureFlattenSourceSpec.cs @@ -249,7 +249,7 @@ public void TaskSource_must_fail_when_the_task_source_materialization_fails() var (innerSourceMat, outerSinkMat) = Source.FromTaskSource(inner).ToMaterialized(Sink.Seq(), Keep.Both).Run(_materializer); // wait until the underlying tasks are completed - AwaitCondition(() => outerSinkMat.IsFaulted && innerSourceMat.IsFaulted, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(100)); + AwaitCondition(() => outerSinkMat.IsFaulted && innerSourceMat.IsFaulted, TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(100)); outerSinkMat.Exception.Flatten().InnerException.Should().Be(new TestException("INNER_FAILED")); innerSourceMat.Exception.Flatten().InnerException.Should().Be(new TestException("INNER_FAILED")); From bf73061a4669ed148f3777178eaab9c5fbb6f120 Mon Sep 17 00:00:00 2001 From: zetanova Date: Sun, 5 Dec 2021 12:18:38 +0100 Subject: [PATCH 11/22] fix await done async --- .../Internal/EventFilterApplier.cs | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/core/Akka.TestKit/EventFilter/Internal/EventFilterApplier.cs b/src/core/Akka.TestKit/EventFilter/Internal/EventFilterApplier.cs index f897259d823..d83b30530b8 100644 --- a/src/core/Akka.TestKit/EventFilter/Internal/EventFilterApplier.cs +++ b/src/core/Akka.TestKit/EventFilter/Internal/EventFilterApplier.cs @@ -398,11 +398,12 @@ protected bool AwaitDone(TimeSpan timeout, int? expectedOccurrences, MatchedEven if (!expectedOccurrences.HasValue) return true; + var expected = expectedOccurrences.Value; + // if expecting no events to arrive - assert that given condition will never match - if (expectedOccurrences.Value == 0) + if (expected == 0) return !_testkit.AwaitConditionNoThrow(() => matchedEventHandler.ReceivedCount > 0, timeout); - var expected = expectedOccurrences.Value; _testkit.AwaitConditionNoThrow(() => matchedEventHandler.ReceivedCount >= expected, timeout); //wait for late tail messages to arrive @@ -417,22 +418,21 @@ protected bool AwaitDone(TimeSpan timeout, int? expectedOccurrences, MatchedEven /// protected async Task AwaitDoneAsync(TimeSpan timeout, int? expectedOccurrences, MatchedEventHandler matchedEventHandler) { - if(expectedOccurrences.HasValue) - { - var expected = expectedOccurrences.GetValueOrDefault(); - if (expected > 0) - { - await _testkit.AwaitConditionNoThrowAsync(() => matchedEventHandler.ReceivedCount >= expected, timeout); - return matchedEventHandler.ReceivedCount == expected; - } - else - { - // if expecting no events to arrive - assert that given condition will never match - var foundEvent = await _testkit.AwaitConditionNoThrowAsync(() => matchedEventHandler.ReceivedCount > 0, timeout); - return foundEvent == false; - } - } - return true; + if (!expectedOccurrences.HasValue) + return true; + + var expected = expectedOccurrences.GetValueOrDefault(); + + if (expected == 0) + return !(await _testkit.AwaitConditionNoThrowAsync(() => matchedEventHandler.ReceivedCount > 0, timeout)); + + await _testkit.AwaitConditionNoThrowAsync(() => matchedEventHandler.ReceivedCount >= expected, timeout); + + //wait for late tail messages to arrive + if (matchedEventHandler.ReceivedCount == expected) + await Task.Delay(100); + + return matchedEventHandler.ReceivedCount == expected; } /// From 71bac1535d8481d164ea125bcc086fbe9de57403 Mon Sep 17 00:00:00 2001 From: zetanova Date: Sun, 5 Dec 2021 12:49:55 +0100 Subject: [PATCH 12/22] improve logging startup check --- src/core/Akka/Event/Logging.cs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/src/core/Akka/Event/Logging.cs b/src/core/Akka/Event/Logging.cs index 21f37eb32d8..7191a109fff 100644 --- a/src/core/Akka/Event/Logging.cs +++ b/src/core/Akka/Event/Logging.cs @@ -105,17 +105,11 @@ public static string FromActor(IActorContext actor, ActorSystem system) public static string FromActorRef(IActorRef a, ActorSystem system) { - var extendedSystem = system as ExtendedActorSystem - ?? throw new ArgumentException("instance of ExtendedActorSystem required", nameof(system)); - - try - { + if (system is ExtendedActorSystem extendedSystem && !(extendedSystem.Provider is null)) return a.Path.ToStringWithAddress(extendedSystem.Provider.DefaultAddress); - } - catch // can fail if the ActorSystem (remoting) is not completely started yet - { - return a.Path.ToString(); - } + + // the ActorSystem (remoting) is not completely started yet + return a.Path.ToString(); } } From a47d5811bf2f555f8fa6c0382264e43ff4c886e3 Mon Sep 17 00:00:00 2001 From: zetanova Date: Tue, 7 Dec 2021 12:22:02 +0100 Subject: [PATCH 13/22] enable inline execution --- .../ActorCellKeepingSynchronizationContext.cs | 29 +++++++++++++++---- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/src/core/Akka.TestKit/ActorCellKeepingSynchronizationContext.cs b/src/core/Akka.TestKit/ActorCellKeepingSynchronizationContext.cs index 6fcd50a9cd1..c85a0ce6b77 100644 --- a/src/core/Akka.TestKit/ActorCellKeepingSynchronizationContext.cs +++ b/src/core/Akka.TestKit/ActorCellKeepingSynchronizationContext.cs @@ -19,7 +19,7 @@ namespace Akka.TestKit /// /// TBD /// - class ActorCellKeepingSynchronizationContext : SynchronizationContext + sealed class ActorCellKeepingSynchronizationContext : SynchronizationContext { private readonly ActorCell _cell; @@ -41,23 +41,25 @@ public ActorCellKeepingSynchronizationContext(ActorCell cell) /// TBD public override void Post(SendOrPostCallback d, object state) { - ThreadPool.QueueUserWorkItem(_ => + ThreadPool.QueueUserWorkItem(s => { + var t = ((SendOrPostCallback, object, ActorCellKeepingSynchronizationContext, ActorCell))s; + var oldCell = InternalCurrentActorCellKeeper.Current; var oldContext = Current; - SetSynchronizationContext(this); - InternalCurrentActorCellKeeper.Current = AsyncCache ?? _cell; + SetSynchronizationContext(t.Item3); + InternalCurrentActorCellKeeper.Current = t.Item4; try { - d(state); + t.Item1(t.Item2); } finally { InternalCurrentActorCellKeeper.Current = oldCell; SetSynchronizationContext(oldContext); } - }, state); + }, (d, state, this, AsyncCache ?? _cell)); } /// @@ -67,6 +69,21 @@ public override void Post(SendOrPostCallback d, object state) /// TBD public override void Send(SendOrPostCallback d, object state) { + if(ReferenceEquals(Current, this)) + { + var oldCell = InternalCurrentActorCellKeeper.Current; + InternalCurrentActorCellKeeper.Current = AsyncCache ?? _cell; + try + { + d(state); + } + finally + { + InternalCurrentActorCellKeeper.Current = oldCell; + } + return; + } + var tcs = new TaskCompletionSource(); Post(_ => { From af98f790e43ce8e667fb97a4dedea7378b112511 Mon Sep 17 00:00:00 2001 From: zetanova Date: Tue, 7 Dec 2021 12:23:59 +0100 Subject: [PATCH 14/22] fix racy sync-over-async-over-sync call --- src/core/Akka/Pattern/CircuitBreaker.cs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/core/Akka/Pattern/CircuitBreaker.cs b/src/core/Akka/Pattern/CircuitBreaker.cs index 0f5084b5534..0dcaddee899 100644 --- a/src/core/Akka/Pattern/CircuitBreaker.cs +++ b/src/core/Akka/Pattern/CircuitBreaker.cs @@ -226,15 +226,12 @@ public Task WithCircuitBreaker(Func body) /// TBD public void WithSyncCircuitBreaker(Action body) { - var cbTask = WithCircuitBreaker(() => Task.Factory.StartNew(body)); - if (!cbTask.Wait(CallTimeout)) - { - //throw new TimeoutException( string.Format( "Execution did not complete within the time allotted {0} ms", CallTimeout.TotalMilliseconds ) ); - } - if (cbTask.Exception != null) - { - ExceptionDispatchInfo.Capture(cbTask.Exception).Throw(); - } + var cts = new CancellationTokenSource(CallTimeout); + var task = new Task(body, cts.Token); + var cbTask = WithCircuitBreaker(() => task); + task.RunSynchronously(); + if (!task.IsCanceled) + cbTask.GetAwaiter().GetResult(); } /// @@ -253,8 +250,11 @@ public void WithSyncCircuitBreaker(Action body) /// or default() public T WithSyncCircuitBreaker(Func body) { - var cbTask = WithCircuitBreaker(() => Task.Factory.StartNew(body)); - return cbTask.Wait(CallTimeout) ? cbTask.Result : default(T); + var cts = new CancellationTokenSource(CallTimeout); + var task = new Task(body, cts.Token); + var cbTask = WithCircuitBreaker(() => task); + task.RunSynchronously(); + return !task.IsCanceled ? cbTask.GetAwaiter().GetResult() : default; } /// From 57d9c43099f89f46e18f7791512acfcaa32643a7 Mon Sep 17 00:00:00 2001 From: zetanova Date: Tue, 7 Dec 2021 14:03:54 +0100 Subject: [PATCH 15/22] add actor path address validation --- src/core/Akka.Tests/Actor/ActorPathSpec.cs | 13 ++++++++++++- src/core/Akka/Actor/ActorPath.cs | 12 ++++++++---- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/src/core/Akka.Tests/Actor/ActorPathSpec.cs b/src/core/Akka.Tests/Actor/ActorPathSpec.cs index b22f2eedc12..66ec1b4a2d7 100644 --- a/src/core/Akka.Tests/Actor/ActorPathSpec.cs +++ b/src/core/Akka.Tests/Actor/ActorPathSpec.cs @@ -193,7 +193,6 @@ public void Create_correct_ToString_with_address() (root / "user").ToStringWithAddress(c).ShouldBe("akka.tcp://mysys@cccc:2552/user"); (root / "user" / "foo").ToStringWithAddress(c).ShouldBe("akka.tcp://mysys@cccc:2552/user/foo"); - root.ToStringWithAddress(d).ShouldBe("akka.tcp://mysys@192.168.107.1:2552/"); (root / "user").ToStringWithAddress(d).ShouldBe("akka.tcp://mysys@192.168.107.1:2552/user"); (root / "user" / "foo").ToStringWithAddress(d).ShouldBe("akka.tcp://mysys@192.168.107.1:2552/user/foo"); @@ -202,7 +201,19 @@ public void Create_correct_ToString_with_address() rootA.ToStringWithAddress(b).ShouldBe("akka.tcp://mysys@aaa:2552/"); (rootA / "user").ToStringWithAddress(b).ShouldBe("akka.tcp://mysys@aaa:2552/user"); (rootA / "user" / "foo").ToStringWithAddress(b).ShouldBe("akka.tcp://mysys@aaa:2552/user/foo"); + } + + [Fact] + public void Should_throw_on_null_parts() + { + var local = new Address("akka.tcp", "mysys"); + var root = new RootActorPath(local); + var user = root / "user"; + Assert.Throws(() => new RootActorPath(null)); + Assert.Throws(() => new ChildActorPath(null, "user", 0)); + Assert.Throws(() => new ChildActorPath(root, null, 0)); + Assert.Throws(() => user.ToStringWithAddress(null)); } /// diff --git a/src/core/Akka/Actor/ActorPath.cs b/src/core/Akka/Actor/ActorPath.cs index 8ddc08ec71f..e8d9a48a602 100644 --- a/src/core/Akka/Actor/ActorPath.cs +++ b/src/core/Akka/Actor/ActorPath.cs @@ -155,10 +155,10 @@ private static bool Validate(string chars) /// The name. protected ActorPath(Address address, string name) { - _address = address; + _address = address ?? throw new ArgumentNullException(nameof(address)); _parent = null; _depth = 0; - _name = name; + _name = name ?? string.Empty; _uid = ActorCell.UndefinedUid; } @@ -170,11 +170,13 @@ protected ActorPath(Address address, string name) /// The uid. protected ActorPath(ActorPath parentPath, string name, long uid) { - _parent = parentPath; + if (string.IsNullOrEmpty(name)) + throw new ArgumentNullException(nameof(name)); + _parent = parentPath ?? throw new ArgumentNullException(nameof(parentPath)); _address = parentPath._address; _depth = parentPath._depth + 1; _name = name; - _uid = uid; + _uid = uid; //maybe test for positive number } /// @@ -727,6 +729,8 @@ private string AppendUidFragment(string withAddress) /// System.String. public string ToStringWithAddress(Address address) { + if (address is null) + throw new ArgumentNullException(nameof(address)); if (IgnoreActorRef.IsIgnoreRefPath(this)) { // we never change address for IgnoreActorRef From 292ec8a62d9b8a05a3b79b61ef49ab792f80c66f Mon Sep 17 00:00:00 2001 From: zetanova Date: Tue, 7 Dec 2021 14:06:56 +0100 Subject: [PATCH 16/22] fix racy starup in logging --- src/core/Akka/Event/Logging.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/core/Akka/Event/Logging.cs b/src/core/Akka/Event/Logging.cs index 7191a109fff..bcafcfd14b4 100644 --- a/src/core/Akka/Event/Logging.cs +++ b/src/core/Akka/Event/Logging.cs @@ -105,11 +105,12 @@ public static string FromActor(IActorContext actor, ActorSystem system) public static string FromActorRef(IActorRef a, ActorSystem system) { + Address address = null; if (system is ExtendedActorSystem extendedSystem && !(extendedSystem.Provider is null)) - return a.Path.ToStringWithAddress(extendedSystem.Provider.DefaultAddress); + address = extendedSystem.Provider.DefaultAddress; - // the ActorSystem (remoting) is not completely started yet - return a.Path.ToString(); + // maybe the ActorSystem (remoting) is not completely started yet + return address is null ? a.Path.ToString() : a.Path.ToStringWithAddress(address); } } From eb01377f05659e3d61da0de3a2b3bcd665ec4066 Mon Sep 17 00:00:00 2001 From: zetanova Date: Tue, 7 Dec 2021 14:37:36 +0100 Subject: [PATCH 17/22] fix DefaultAddress NRE --- src/core/Akka.Remote/RemoteActorRefProvider.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka.Remote/RemoteActorRefProvider.cs b/src/core/Akka.Remote/RemoteActorRefProvider.cs index b512d74fbdc..ee64c7e25c3 100644 --- a/src/core/Akka.Remote/RemoteActorRefProvider.cs +++ b/src/core/Akka.Remote/RemoteActorRefProvider.cs @@ -189,7 +189,7 @@ public ActorPath RootPath public Deployer Deployer { get; protected set; } /// - public Address DefaultAddress { get { return Transport.DefaultAddress; } } + public Address DefaultAddress { get { return Transport?.DefaultAddress; } } private Information _serializationInformationCache; From c17b320ce921c5ef334e67d4e3c59d32114187b1 Mon Sep 17 00:00:00 2001 From: zetanova Date: Tue, 7 Dec 2021 19:04:48 +0100 Subject: [PATCH 18/22] wait on provider ready --- src/core/Akka/Actor/Internal/ActorSystemImpl.cs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/core/Akka/Actor/Internal/ActorSystemImpl.cs b/src/core/Akka/Actor/Internal/ActorSystemImpl.cs index f7540c33f04..34d329ecb4a 100644 --- a/src/core/Akka/Actor/Internal/ActorSystemImpl.cs +++ b/src/core/Akka/Actor/Internal/ActorSystemImpl.cs @@ -225,6 +225,17 @@ public void Start() { _log.Info(Settings.ToString()); } + + //HACK: ensure provider started + { + if (_provider.DefaultAddress is null) + Thread.Yield(); + var i = 1; + while(i < 10 && _provider.DefaultAddress is null) + Thread.Sleep(i++ * 16); + if (i == 10 && _provider.DefaultAddress is null) + throw new TimeoutException($"Provider '{_provider.GetType()}' startup timeout"); + } } catch (Exception) { From 0c93b18d995cbf90d22abb7ad80c7f78693aa136 Mon Sep 17 00:00:00 2001 From: zetanova Date: Tue, 7 Dec 2021 22:28:17 +0100 Subject: [PATCH 19/22] remove useless memory barrier --- .../Scheduler/HashedWheelTimerScheduler.cs | 53 +++++++------------ 1 file changed, 19 insertions(+), 34 deletions(-) diff --git a/src/core/Akka/Actor/Scheduler/HashedWheelTimerScheduler.cs b/src/core/Akka/Actor/Scheduler/HashedWheelTimerScheduler.cs index 5a7a1aa64ed..5068758ef09 100644 --- a/src/core/Akka/Actor/Scheduler/HashedWheelTimerScheduler.cs +++ b/src/core/Akka/Actor/Scheduler/HashedWheelTimerScheduler.cs @@ -13,7 +13,6 @@ using Akka.Configuration; using Akka.Dispatch; using Akka.Event; -using Akka.Util; // ReSharper disable NotResolvedInText @@ -32,7 +31,7 @@ namespace Akka.Actor /// Further reading: http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf /// Presentation: http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt /// - public class HashedWheelTimerScheduler : SchedulerBase, IDateTimeOffsetNowTimeProvider, IDisposable + public sealed class HashedWheelTimerScheduler : SchedulerBase, IDateTimeOffsetNowTimeProvider, IDisposable { private readonly TimeSpan _shutdownTimeout; private readonly long _tickDuration; // a timespan expressed as ticks @@ -82,7 +81,7 @@ public HashedWheelTimerScheduler(Config scheduler, ILoggingAdapter log) : base(s /// /// 0 - init, 1 - started, 2 - shutdown /// - private volatile int _workerState = WORKER_STATE_INIT; + private int _workerState = WORKER_STATE_INIT; private static Bucket[] CreateWheel(int ticksPerWheel, ILoggingAdapter log) { @@ -121,16 +120,14 @@ private void Start() if (_workerState == WORKER_STATE_STARTED) { } // do nothing else if (_workerState == WORKER_STATE_INIT) { - _worker = new Thread(Run) { IsBackground = true }; -#pragma warning disable 420 - if (Interlocked.CompareExchange(ref _workerState, WORKER_STATE_STARTED, WORKER_STATE_INIT) == -#pragma warning restore 420 - WORKER_STATE_INIT) + if (Interlocked.CompareExchange(ref _workerState, WORKER_STATE_STARTED, WORKER_STATE_INIT) == WORKER_STATE_INIT) { + _worker = new Thread(Run) { IsBackground = true }; _worker.Start(); + if (_startTime == 0) + _workerInitialized.Wait(TimeSpan.FromSeconds(3)); } } - else if (_workerState == WORKER_STATE_SHUTDOWN) { throw new SchedulerException("cannot enqueue after timer shutdown"); @@ -139,11 +136,6 @@ private void Start() { throw new InvalidOperationException($"Worker in invalid state: {_workerState}"); } - - while (_startTime == 0) - { - _workerInitialized.Wait(); - } } /// @@ -152,12 +144,8 @@ private void Start() private void Run() { // Initialize the clock - _startTime = HighResMonotonicClock.Ticks; - if (_startTime == 0) - { - // 0 means it's an uninitialized value, so bump to 1 to indicate it's started - _startTime = 1; - } + // 0 means it's an uninitialized value, so bump to 1 to indicate it's started + Volatile.Write(ref _startTime, Math.Max(1, HighResMonotonicClock.Ticks)); _workerInitialized.Signal(); @@ -189,7 +177,7 @@ private void Run() } // return the list of unprocessedRegistrations and signal that we're finished - _stopped.Value.TrySetResult(_unprocessedRegistrations); + _stopped.TrySetResult(_unprocessedRegistrations); } private void ProcessReschedule() @@ -208,7 +196,7 @@ private long WaitForNextTick() var deadline = _tickDuration * (_tick + 1); unchecked // just to avoid trouble with long-running applications { - for (;;) + for (; ; ) { long currentTime = HighResMonotonicClock.Ticks - _startTime; var sleepMs = ((deadline - currentTime + TimeSpan.TicksPerMillisecond - 1) / TimeSpan.TicksPerMillisecond); @@ -351,22 +339,19 @@ protected override void InternalScheduleRepeatedly(TimeSpan initialDelay, TimeSp InternalSchedule(initialDelay, interval, action, cancelable); } - private AtomicReference>> _stopped = new AtomicReference>>(); + private TaskCompletionSource> _stopped = null; private static readonly Task> Completed = Task.FromResult((IEnumerable)new List()); private Task> Stop() { - var p = new TaskCompletionSource>(); + if (_stopped is null) + _stopped = new TaskCompletionSource>(); - if (_stopped.CompareAndSet(null, p) -#pragma warning disable 420 - && Interlocked.CompareExchange(ref _workerState, WORKER_STATE_SHUTDOWN, WORKER_STATE_STARTED) == WORKER_STATE_STARTED) -#pragma warning restore 420 - { + if (Interlocked.CompareExchange(ref _workerState, WORKER_STATE_SHUTDOWN, WORKER_STATE_STARTED) == WORKER_STATE_STARTED) // Let remaining work that is already being processed finished. The termination task will complete afterwards - return p.Task; - } + return _stopped.Task; + return Completed; } @@ -432,7 +417,7 @@ public override string ToString() } } - private class SchedulerRegistration + private sealed class SchedulerRegistration { /// /// The cancellation handle, if any @@ -565,7 +550,7 @@ public void Reschedule(SchedulerRegistration reg) /// A set of registrations to populate. public void ClearRegistrations(HashSet registrations) { - for (;;) + for (; ; ) { var reg = Poll(); if (reg == null) @@ -582,7 +567,7 @@ public void ClearRegistrations(HashSet registrations) /// A set of registrations to populate. public void ClearReschedule(HashSet registrations) { - for (;;) + for (; ; ) { var reg = PollReschedule(); if (reg == null) From c5744a390304e7652edcd9f8330674322311ec6c Mon Sep 17 00:00:00 2001 From: zetanova Date: Wed, 8 Dec 2021 00:49:25 +0100 Subject: [PATCH 20/22] update api --- src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt index 7848ff9a9f1..9e1d07ec398 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt @@ -897,7 +897,7 @@ namespace Akka.Actor protected override void PreStart() { } protected override bool Receive(object message) { } } - public class HashedWheelTimerScheduler : Akka.Actor.SchedulerBase, Akka.Actor.IDateTimeOffsetNowTimeProvider, Akka.Actor.ITimeProvider, System.IDisposable + public sealed class HashedWheelTimerScheduler : Akka.Actor.SchedulerBase, Akka.Actor.IDateTimeOffsetNowTimeProvider, Akka.Actor.ITimeProvider, System.IDisposable { public HashedWheelTimerScheduler(Akka.Configuration.Config scheduler, Akka.Event.ILoggingAdapter log) { } public override System.TimeSpan HighResMonotonicClock { get; } From fcf18c24996d8df2b836386e7155a0ce002398ef Mon Sep 17 00:00:00 2001 From: zetanova Date: Wed, 8 Dec 2021 09:58:45 +0100 Subject: [PATCH 21/22] add testkit log test on startup --- src/core/Akka.TestKit/TestKitBase.cs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/core/Akka.TestKit/TestKitBase.cs b/src/core/Akka.TestKit/TestKitBase.cs index b7a43dd9128..67772ea952d 100644 --- a/src/core/Akka.TestKit/TestKitBase.cs +++ b/src/core/Akka.TestKit/TestKitBase.cs @@ -187,6 +187,12 @@ protected void InitializeTest(ActorSystem system, ActorSystemSetup config, strin new ActorCellKeepingSynchronizationContext(InternalCurrentActorCellKeeper.Current)); _testState.TestActor = testActor; + + //HACK: await one log message + { + var msg = $"testprope-{Guid.NewGuid()}"; + EventFilter.Warning(contains: msg).ExpectOne(TimeSpan.FromSeconds(3), () => system.Log.Warning(msg)); + } } /// From 0171d9908f252791ca66ae80eef988c1ebd518e5 Mon Sep 17 00:00:00 2001 From: zetanova Date: Wed, 8 Dec 2021 19:52:08 +0100 Subject: [PATCH 22/22] remove await log message hack --- src/core/Akka.TestKit/TestKitBase.cs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/core/Akka.TestKit/TestKitBase.cs b/src/core/Akka.TestKit/TestKitBase.cs index 67772ea952d..922452dbae9 100644 --- a/src/core/Akka.TestKit/TestKitBase.cs +++ b/src/core/Akka.TestKit/TestKitBase.cs @@ -186,13 +186,7 @@ protected void InitializeTest(ActorSystem system, ActorSystemSetup config, strin SynchronizationContext.SetSynchronizationContext( new ActorCellKeepingSynchronizationContext(InternalCurrentActorCellKeeper.Current)); - _testState.TestActor = testActor; - - //HACK: await one log message - { - var msg = $"testprope-{Guid.NewGuid()}"; - EventFilter.Warning(contains: msg).ExpectOne(TimeSpan.FromSeconds(3), () => system.Log.Warning(msg)); - } + _testState.TestActor = testActor; } ///