Skip to content

Commit

Permalink
close akkadotnet#2288 - verified that all IScheduler implementation…
Browse files Browse the repository at this point in the history
…s which support `IDisposable` will be disposed before `ActorSystem.WhenTerminated` completes.

close akkadotnet#1593 - significantly improved upon the `DedicatedThreadScheduler` performance

Signed-off-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Aaronontheweb committed Sep 8, 2016
1 parent 3db8e49 commit 9433e38
Show file tree
Hide file tree
Showing 28 changed files with 1,716 additions and 339 deletions.
10 changes: 9 additions & 1 deletion src/SharedAssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
// <auto-generated/>
//-----------------------------------------------------------------------
// <copyright file="SharedAssemblyInfo.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

// <auto-generated/>
using System.Reflection;

[assembly: AssemblyCompanyAttribute("Akka.NET Team")]
[assembly: AssemblyCopyrightAttribute("Copyright © 2013-2016 Akka.NET Team")]
[assembly: AssemblyTrademarkAttribute("")]
[assembly: AssemblyVersionAttribute("1.1.2.0")]
[assembly: AssemblyFileVersionAttribute("1.1.2.0")]

Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
using System.Collections.Generic;
//-----------------------------------------------------------------------
// <copyright file="ClusterClientMessageSerializerSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System.Collections.Generic;
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Cluster.Tools.Client;
Expand Down Expand Up @@ -46,3 +53,4 @@ public void ClusterClientMessages_must_be_serializable()
}
}
}

26 changes: 23 additions & 3 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -517,12 +517,13 @@ namespace Akka.Actor
public static Akka.Actor.DeployableDecider From(Akka.Actor.Directive defaultDirective, System.Collections.Generic.IEnumerable<System.Collections.Generic.KeyValuePair<System.Type, Akka.Actor.Directive>> pairs) { }
public static Akka.Actor.LocalOnlyDecider From(System.Func<System.Exception, Akka.Actor.Directive> localOnlyDecider) { }
}
public class DedicatedThreadScheduler : Akka.Actor.SchedulerBase, Akka.Actor.IDateTimeOffsetNowTimeProvider, Akka.Actor.ITimeProvider
public class DedicatedThreadScheduler : Akka.Actor.SchedulerBase, Akka.Actor.IDateTimeOffsetNowTimeProvider, Akka.Actor.ITimeProvider, System.IDisposable
{
public DedicatedThreadScheduler(Akka.Actor.ActorSystem system) { }
public DedicatedThreadScheduler(Akka.Configuration.Config config, Akka.Event.ILoggingAdapter log) { }
public override System.TimeSpan HighResMonotonicClock { get; }
public override System.TimeSpan MonotonicClock { get; }
protected override System.DateTimeOffset TimeNow { get; }
public void Dispose() { }
protected override void InternalScheduleOnce(System.TimeSpan delay, System.Action action, Akka.Actor.ICancelable cancelable) { }
protected override void InternalScheduleRepeatedly(System.TimeSpan initialDelay, System.TimeSpan interval, System.Action action, Akka.Actor.ICancelable cancelable) { }
protected override void InternalScheduleTellOnce(System.TimeSpan delay, Akka.Actor.ICanTell receiver, object message, Akka.Actor.IActorRef sender, Akka.Actor.ICancelable cancelable) { }
Expand Down Expand Up @@ -844,6 +845,18 @@ namespace Akka.Actor
protected override void PreStart() { }
protected override bool Receive(object message) { }
}
public class HashedWheelTimerScheduler : Akka.Actor.SchedulerBase, Akka.Actor.IDateTimeOffsetNowTimeProvider, Akka.Actor.ITimeProvider, System.IDisposable
{
public HashedWheelTimerScheduler(Akka.Configuration.Config scheduler, Akka.Event.ILoggingAdapter log) { }
public override System.TimeSpan HighResMonotonicClock { get; }
public override System.TimeSpan MonotonicClock { get; }
protected override System.DateTimeOffset TimeNow { get; }
public void Dispose() { }
protected override void InternalScheduleOnce(System.TimeSpan delay, System.Action action, Akka.Actor.ICancelable cancelable) { }
protected override void InternalScheduleRepeatedly(System.TimeSpan initialDelay, System.TimeSpan interval, System.Action action, Akka.Actor.ICancelable cancelable) { }
protected override void InternalScheduleTellOnce(System.TimeSpan delay, Akka.Actor.ICanTell receiver, object message, Akka.Actor.IActorRef sender, Akka.Actor.ICancelable cancelable) { }
protected override void InternalScheduleTellRepeatedly(System.TimeSpan initialDelay, System.TimeSpan interval, Akka.Actor.ICanTell receiver, object message, Akka.Actor.IActorRef sender, Akka.Actor.ICancelable cancelable) { }
}
public interface IActionScheduler
{
void ScheduleOnce(System.TimeSpan delay, System.Action action, Akka.Actor.ICancelable cancelable);
Expand Down Expand Up @@ -1470,7 +1483,9 @@ namespace Akka.Actor
}
public abstract class SchedulerBase : Akka.Actor.IActionScheduler, Akka.Actor.IAdvancedScheduler, Akka.Actor.IScheduler, Akka.Actor.ITellScheduler, Akka.Actor.ITimeProvider
{
protected SchedulerBase() { }
protected readonly Akka.Event.ILoggingAdapter Log;
protected readonly Akka.Configuration.Config SchedulerConfig;
protected SchedulerBase(Akka.Configuration.Config scheduler, Akka.Event.ILoggingAdapter log) { }
public abstract System.TimeSpan HighResMonotonicClock { get; }
public abstract System.TimeSpan MonotonicClock { get; }
protected abstract System.DateTimeOffset TimeNow { get; }
Expand All @@ -1481,6 +1496,10 @@ namespace Akka.Actor
protected static void ValidateDelay(System.TimeSpan delay, string parameterName) { }
protected static void ValidateInterval(System.TimeSpan interval, string parameterName) { }
}
public sealed class SchedulerException : Akka.Actor.AkkaException
{
public SchedulerException(string message) { }
}
public class static SchedulerExtensions
{
public static void ScheduleOnce(this Akka.Actor.IActionScheduler scheduler, int millisecondsDelay, System.Action action, Akka.Actor.ICancelable cancelable = null) { }
Expand Down Expand Up @@ -1550,6 +1569,7 @@ namespace Akka.Actor
public string LogLevel { get; }
public string ProviderClass { get; }
public string SchedulerClass { get; }
public System.TimeSpan SchedulerShutdownTimeout { get; }
public bool SerializeAllCreators { get; }
public bool SerializeAllMessages { get; }
public string StdoutLogLevel { get; }
Expand Down
12 changes: 10 additions & 2 deletions src/core/Akka.Cluster.Tests/DowningProviderSpec.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
using System;
//-----------------------------------------------------------------------
// <copyright file="DowningProviderSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Threading;
using Akka.Actor;
using Akka.Configuration;
Expand Down Expand Up @@ -108,4 +115,5 @@ public void Downing_provider_should_stop_the_cluster_if_the_downing_provider_thr
}
}
}
}
}

12 changes: 10 additions & 2 deletions src/core/Akka.Cluster/DowningProvider.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
using System;
//-----------------------------------------------------------------------
// <copyright file="DowningProvider.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using Akka.Actor;
using Akka.Configuration;

Expand Down Expand Up @@ -64,4 +71,5 @@ public static IDowningProvider Load(Type downingProviderType, ActorSystem system
}
}

}
}

10 changes: 9 additions & 1 deletion src/core/Akka.FSharp/Properties/AssemblyInfo.fs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
namespace System
//-----------------------------------------------------------------------
// <copyright file="AssemblyInfo.fs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

namespace System
open System
open System.Reflection
open System.Runtime.InteropServices
Expand All @@ -16,3 +23,4 @@ do ()

module internal AssemblyVersionInformation =
let [<Literal>] Version = "1.1.2.0"

4 changes: 3 additions & 1 deletion src/core/Akka.TestKit/TestScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
using System.Collections.Generic;
using System.Linq;
using Akka.Actor;
using Akka.Configuration;
using Akka.Event;

namespace Akka.TestKit
{
Expand All @@ -19,7 +21,7 @@ public class TestScheduler : IScheduler,
private DateTimeOffset _now;
private readonly ConcurrentDictionary<long, Queue<ScheduledItem>> _scheduledWork;

public TestScheduler(ActorSystem system)
public TestScheduler(Config schedulerConfig, ILoggingAdapter log)
{
_now = DateTimeOffset.UtcNow;
_scheduledWork = new ConcurrentDictionary<long, Queue<ScheduledItem>>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//-----------------------------------------------------------------------
// <copyright file="DefaultSchedulerPerformanceTests.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using NBench;

namespace Akka.Tests.Performance.Actor.Scheduler
{
/// <summary>
/// Designed to measure how many operations per-second the scheduler can perform in a fixed window of time.
///
/// Meant to measure the efficiency of the default <see cref="IScheduler"/> and to see if there's much contention
/// around the
/// </summary>
public class DefaultSchedulerPerformanceTests
{
private ActorSystem _actorSystem;
public const string ScheduledOps = "ScheduleInvokes";
private Counter _scheduledOpsCounter;
public const string ScheduledJobs = "ScheduledJobs";
private Counter _jobsScheduled;
private ICancelable _cancelSignal;

/// <summary>
/// number of concurrent schedulers
/// </summary>
private const int DegreeOfParallelism = 10;

private const int SchedulePerBatch = 200;
private static readonly TimeSpan RunTime = TimeSpan.FromSeconds(20);
private Action _eventLoop;
private Action _counterIncrement;

public const int IterationCount = 1; // these are LONG-running benchmarks

[PerfSetup]
public void SetUp(BenchmarkContext context)
{
_scheduledOpsCounter = context.GetCounter(ScheduledOps);
_jobsScheduled = context.GetCounter(ScheduledJobs);
_actorSystem = ActorSystem.Create("SchedulerPerformanceSpecs");
_cancelSignal = new Cancelable(_actorSystem.Scheduler);
_counterIncrement = () => _scheduledOpsCounter.Increment();

_eventLoop = () =>
{
while (!_cancelSignal.IsCancellationRequested)
{
for (var i = 0; i < SchedulePerBatch; i++)
{
_actorSystem.Scheduler.Advanced.ScheduleRepeatedly(0, 10, _counterIncrement, _cancelSignal);
_jobsScheduled.Increment();
}
Thread.Sleep(40); // wait a bit, then keep going
}
};
}

[PerfBenchmark(Description = "Tests to see how many concurrent jobs we can schedule and what the effects are on throughput", RunMode = RunMode.Iterations, NumberOfIterations = IterationCount)]
[CounterMeasurement(ScheduledJobs)]
[CounterMeasurement(ScheduledOps)]
[MemoryMeasurement(MemoryMetric.TotalBytesAllocated)]
[GcMeasurement(GcMetric.TotalCollections, GcGeneration.AllGc)]
public void SchedulerThroughputStressTest()
{
for (var i = 0; i < DegreeOfParallelism; i++)
{
Task.Factory.StartNew(_eventLoop);
}
Task.Delay(RunTime).Wait();
_cancelSignal.Cancel(false);
}

[PerfCleanup]
public void CleanUp()
{
_actorSystem.Terminate().Wait();
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
<Compile Include="Actor\MinimalActorRefThroughputSpec.cs" />
<Compile Include="Actor\Pattern\AskSpec.cs" />
<Compile Include="Actor\ReceiveActorThroughputSpec.cs" />
<Compile Include="Actor\Scheduler\DefaultSchedulerPerformanceTests.cs" />
<Compile Include="Actor\UntypedActorThroughputSpec.cs" />
<Compile Include="Dispatch\CallingThreadExecutor.cs" />
<Compile Include="Dispatch\DefaultDispatcherColdThroughputSpec.cs" />
Expand Down
7 changes: 4 additions & 3 deletions src/core/Akka.Tests/Actor/ActorSystemSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using System.Threading;
using Akka.Configuration;
using Akka.Dispatch;
using Akka.Event;
using FluentAssertions.Execution;

namespace Akka.Tests.Actor
Expand Down Expand Up @@ -234,11 +235,11 @@ public void Handle_extensions_that_fail_to_initialize()
[Fact]
public void Setup_the_default_scheduler()
{
Assert.True(Sys.Scheduler.GetType() == typeof(DedicatedThreadScheduler));
Assert.True(Sys.Scheduler.GetType() == typeof(HashedWheelTimerScheduler));
}

[Fact]
public void Support_using_a_customer_scheduler()
public void Support_using_a_custom_scheduler()
{
var actorSystem = ActorSystem.Create(Guid.NewGuid().ToString(), DefaultConfig.WithFallback("akka.scheduler.implementation = \"Akka.Tests.Actor.TestScheduler, Akka.Tests\""));
Assert.True(actorSystem.Scheduler.GetType() == typeof(TestScheduler));
Expand Down Expand Up @@ -352,7 +353,7 @@ public FailingTestExtensionImpl(ActorSystem system)

public class TestScheduler : IScheduler
{
public TestScheduler(ActorSystem system)
public TestScheduler(Config config, ILoggingAdapter log)
{

}
Expand Down
Loading

0 comments on commit 9433e38

Please sign in to comment.