Skip to content

Commit

Permalink
Added an UnboundStablePriorityMailbox (akkadotnet#3536)
Browse files Browse the repository at this point in the history
* Added an UnboundStablePriorityMailbox, sending messages according to priority. Messages with the same priority will be send using the same order as they appear. akkadotnet#2652 is the related issue.

* added PropertyTest for StableListPriorityQueue

* forgot to add spec

* verified that the UnboundedStablePriorityMailbox can be loaded and used

* validate4d that UnboundedStablePriorityMailbox supports stashing

* added API approval for core
  • Loading branch information
AndreSteenbergen authored and madmonkey committed Jul 12, 2019
1 parent c84fb5b commit 5366dd8
Show file tree
Hide file tree
Showing 8 changed files with 492 additions and 9 deletions.
26 changes: 26 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2612,6 +2612,14 @@ namespace Akka.Dispatch
public virtual Akka.Dispatch.MessageQueues.IMessageQueue Create(Akka.Actor.IActorRef owner, Akka.Actor.ActorSystem system) { }
protected abstract int PriorityGenerator(object message);
}
public abstract class UnboundedStablePriorityMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue<Akka.Dispatch.MessageQueues.UnboundedStablePriorityMessageQueue>
{
public const int DefaultCapacity = 11;
protected UnboundedStablePriorityMailbox(Akka.Actor.Settings settings, Akka.Configuration.Config config) { }
public int InitialCapacity { get; }
public virtual Akka.Dispatch.MessageQueues.IMessageQueue Create(Akka.Actor.IActorRef owner, Akka.Actor.ActorSystem system) { }
protected abstract int PriorityGenerator(object message);
}
}
namespace Akka.Dispatch.MessageQueues
{
Expand Down Expand Up @@ -2687,6 +2695,14 @@ namespace Akka.Dispatch.MessageQueues
protected override void LockedEnqueue(Akka.Actor.Envelope envelope) { }
protected override bool LockedTryDequeue(out Akka.Actor.Envelope envelope) { }
}
public class UnboundedStablePriorityMessageQueue : Akka.Dispatch.MessageQueues.BlockingMessageQueue, Akka.Dispatch.IDequeBasedMessageQueueSemantics, Akka.Dispatch.ISemantics, Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics, Akka.Dispatch.IUnboundedMessageQueueSemantics
{
public UnboundedStablePriorityMessageQueue(System.Func<object, int> priorityGenerator, int initialCapacity) { }
protected override int LockedCount { get; }
public void EnqueueFirst(Akka.Actor.Envelope envelope) { }
protected override void LockedEnqueue(Akka.Actor.Envelope envelope) { }
protected override bool LockedTryDequeue(out Akka.Actor.Envelope envelope) { }
}
}
namespace Akka.Dispatch.SysMsg
{
Expand Down Expand Up @@ -4773,6 +4789,16 @@ namespace Akka.Util
public static readonly bool IsMono;
public static readonly bool IsWindows;
}
public sealed class StableListPriorityQueue
{
public StableListPriorityQueue(int initialCapacity, System.Func<object, int> priorityCalculator) { }
public int Count() { }
public Akka.Actor.Envelope Dequeue() { }
public void Enqueue(Akka.Actor.Envelope item) { }
public bool IsConsistent() { }
public Akka.Actor.Envelope Peek() { }
public override string ToString() { }
}
public class static StandardOutWriter
{
public static void Write(string message, System.Nullable<System.ConsoleColor> foregroundColor = null, System.Nullable<System.ConsoleColor> backgroundColor = null) { }
Expand Down
107 changes: 106 additions & 1 deletion src/core/Akka.Tests/Dispatch/MailboxesSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,27 @@ public TestPriorityMailbox(Settings settings, Config config) : base(settings, co
}
}

public class TestStablePriorityMailbox : UnboundedStablePriorityMailbox
{
protected override int PriorityGenerator(object message)
{
if (message is string)
return 1;
if (message is bool)
return 2;
if (message is int)
return 3;
if (message is double)
return 4;

return 5;
}

public TestStablePriorityMailbox(Settings settings, Config config) : base(settings, config)
{
}
}

public class IntPriorityMailbox : UnboundedPriorityMailbox
{
protected override int PriorityGenerator(object message)
Expand Down Expand Up @@ -138,6 +159,9 @@ private static string GetConfig()
int-prio-mailbox {
mailbox-type : """ + typeof(IntPriorityMailbox).AssemblyQualifiedName + @"""
}
stable-prio-mailbox{
mailbox-type : """ + typeof(TestStablePriorityMailbox).AssemblyQualifiedName + @"""
}
";
}

Expand Down Expand Up @@ -204,7 +228,45 @@ public void Can_use_unbounded_priority_mailbox()
ExpectMsg(2.0);

ExpectNoMsg(TimeSpan.FromSeconds(0.3));
}
}

[Fact]
public void Can_use_unbounded_stable_priority_mailbox()
{
var actor = (IInternalActorRef)Sys.ActorOf(EchoActor.Props(this).WithMailbox("stable-prio-mailbox"), "echo");

//pause mailbox until all messages have been told
actor.SendSystemMessage(new Suspend());

// wait until we can confirm that the mailbox is suspended before we begin sending messages
AwaitCondition(() => (((ActorRefWithCell)actor).Underlying is ActorCell) && ((ActorRefWithCell)actor).Underlying.AsInstanceOf<ActorCell>().Mailbox.IsSuspended());

actor.Tell(true);
for (var i = 0; i < 30; i++)
{
actor.Tell(i);
}
actor.Tell("a");
actor.Tell(2.0);
for (var i = 0; i < 30; i++)
{
actor.Tell(i + 30);
}
actor.SendSystemMessage(new Resume(null));

//resume mailbox, this prevents the mailbox from running to early
//priority mailbox is best effort only

ExpectMsg("a");
ExpectMsg(true);
for (var i = 0; i < 60; i++)
{
ExpectMsg(i);
}
ExpectMsg(2.0);

ExpectNoMsg(TimeSpan.FromSeconds(0.3));
}

[Fact]
public void Priority_mailbox_keeps_ordering_with_many_priority_values()
Expand Down Expand Up @@ -286,6 +348,49 @@ public void Unbounded_Priority_Mailbox_Supports_Unbounded_Stashing()

ExpectNoMsg(TimeSpan.FromSeconds(0.3));
}

[Fact]
public void Unbounded_Stable_Priority_Mailbox_Supports_Unbounded_Stashing()
{
var actor = (IInternalActorRef)Sys.ActorOf(StashingActor.Props(this).WithMailbox("stable-prio-mailbox"), "echo");

//pause mailbox until all messages have been told
actor.SendSystemMessage(new Suspend());

AwaitCondition(() => (((ActorRefWithCell)actor).Underlying is ActorCell) && ((ActorRefWithCell)actor).Underlying.AsInstanceOf<ActorCell>().Mailbox.IsSuspended());

var values = new int[10];
var increment = (int)(UInt32.MaxValue / values.Length);

for (var i = 0; i < values.Length; i++)
values[i] = Int32.MinValue + increment * i;

// tell the actor in order
foreach (var value in values)
{
actor.Tell(value);
actor.Tell(value);
actor.Tell(value);
}

actor.Tell(new StashingActor.Start());

//resume mailbox, this prevents the mailbox from running to early
actor.SendSystemMessage(new Resume(null));

this.Within(5.Seconds(), () =>
{
// expect the messages in the original order
foreach (var value in values)
{
ExpectMsg(value);
ExpectMsg(value);
ExpectMsg(value);
}
});

ExpectNoMsg(TimeSpan.FromSeconds(0.3));
}
}
}

55 changes: 55 additions & 0 deletions src/core/Akka.Tests/Util/StableListPriorityQueueSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// //-----------------------------------------------------------------------
// // <copyright file="StableListPriorityQueueSpec.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2019 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2019 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using System.Collections.Generic;
using System.Linq;
using Akka.Actor;
using Akka.Util;
using FsCheck;
using FsCheck.Xunit;

namespace Akka.Tests.Util
{
public class StableListPriorityQueueSpec
{
public static int Priority(object obj)
{
switch (obj)
{
case int i:
return i;
case string str:
return str.Length;
default:
return 1;
}
}

[Property(MaxTest = 1000)]
public Property StableListPriorityQueue_must_be_stable(NonEmptyString[] values)
{
var sortedValues = values
.Select(x => x.Item)
.GroupBy(x => x.Length)
.OrderBy(x => x.Key)
.SelectMany(x => x).ToList();
var pq = new StableListPriorityQueue(10, Priority);

foreach (var i in values.Select(x => x.Item)) pq.Enqueue(new Envelope(i, ActorRefs.NoSender));

var isConsistent = pq.IsConsistent().ToProperty().Label("Expected queue to be consistent, but was not.");

var queueValues = new List<string>();
while (pq.Count() > 0) queueValues.Add((string)pq.Dequeue().Message);

var sequenceEqual = queueValues.SequenceEqual(sortedValues).ToProperty().Label(
$"Expected output to be [{string.Join(",", sortedValues)}] but was [{string.Join(",", queueValues)}]");

return sequenceEqual.And(isConsistent);
}
}
}
42 changes: 40 additions & 2 deletions src/core/Akka/Dispatch/Mailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ public override IMessageQueue Create(IActorRef owner, ActorSystem system)
/// Priority mailbox base class; unbounded mailbox that allows for prioritization of its contents.
/// Extend this class and implement the <see cref="PriorityGenerator"/> method with your own prioritization.
/// The value returned by the <see cref="PriorityGenerator"/> method will be used to order the message in the mailbox.
/// Lower values will be delivered first. Messages ordered by the same number will remain in delivery order.
/// Lower values will be delivered first. Messages ordered by the same number will remain in delivered in undefined order.
/// </summary>
public abstract class UnboundedPriorityMailbox : MailboxType, IProducesMessageQueue<UnboundedPriorityMessageQueue>
{
Expand Down Expand Up @@ -716,7 +716,45 @@ protected UnboundedPriorityMailbox(Settings settings, Config config) : base(sett
}
}

//todo: bounded priority mailbox; stable priority mailboxes
//todo: bounded priority mailbox;

/// <summary>
/// Priority mailbox - an unbounded mailbox that allows for prioritization of its contents.
/// Extend this class and implement the <see cref="PriorityGenerator"/> method with your own prioritization.
/// The value returned by the <see cref="PriorityGenerator"/> method will be used to order the message in the mailbox.
/// Lower values will be delivered first. Messages ordered by the same number will remain in delivery order.
/// </summary>
public abstract class UnboundedStablePriorityMailbox : MailboxType, IProducesMessageQueue<UnboundedStablePriorityMessageQueue>
{
/// <summary>
/// Function responsible for generating the priority value of a message based on its type and content.
/// </summary>
/// <param name="message">The message to inspect.</param>
/// <returns>An integer. The lower the value, the higher the priority.</returns>
protected abstract int PriorityGenerator(object message);

/// <summary>
/// The initial capacity of the unbounded mailbox.
/// </summary>
public int InitialCapacity { get; }

/// <summary>
/// The default capacity of an unbounded priority mailbox.
/// </summary>
public const int DefaultCapacity = 11;

/// <inheritdoc cref="MailboxType"/>
public sealed override IMessageQueue Create(IActorRef owner, ActorSystem system)
{
return new UnboundedStablePriorityMessageQueue(PriorityGenerator, InitialCapacity);
}

/// <inheritdoc cref="MailboxType"/>
protected UnboundedStablePriorityMailbox(Settings settings, Config config) : base(settings, config)
{
InitialCapacity = DefaultCapacity;
}
}

/// <summary>
/// UnboundedDequeBasedMailbox is an unbounded <see cref="MailboxType"/> backed by a double-ended queue. Used for stashing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,4 @@ public void EnqueueFirst(Envelope envelope)
_prependBuffer.Push(envelope);
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//-----------------------------------------------------------------------
// <copyright file="UnboundedPriorityMessageQueue.cs" company="Akka.NET Project">
// Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2018 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using Akka.Actor;
using Akka.Util;

namespace Akka.Dispatch.MessageQueues
{
/// <summary>
/// Base class for a message queue that uses a priority generator for messages
/// </summary>
public class UnboundedStablePriorityMessageQueue : BlockingMessageQueue, IUnboundedDequeBasedMessageQueueSemantics
{
private readonly StableListPriorityQueue _prioQueue;
// doesn't need to be threadsafe - only called from within actor
private readonly Stack<Envelope> _prependBuffer = new Stack<Envelope>();


/// <summary>
/// Creates a new unbounded priority message queue.
/// </summary>
/// <param name="priorityGenerator">The calculator function for determining the priority of inbound messages.</param>
/// <param name="initialCapacity">The initial capacity of the queue.</param>
public UnboundedStablePriorityMessageQueue(Func<object, int> priorityGenerator, int initialCapacity)
{
_prioQueue = new StableListPriorityQueue(initialCapacity, priorityGenerator);
}

/// <summary>
/// Unsafe method for computing the underlying message count.
/// </summary>
/// <remarks>
/// Called from within a synchronization mechanism.
/// </remarks>
protected override int LockedCount
{
get { return _prioQueue.Count(); }
}

/// <summary>
/// Unsafe method for enqueuing a new message to the queue.
/// </summary>
/// <param name="envelope">The message to enqueue.</param>
/// <remarks>
/// Called from within a synchronization mechanism.
/// </remarks>
protected override void LockedEnqueue(Envelope envelope)
{
_prioQueue.Enqueue(envelope);
}

/// <summary>
/// Unsafe method for attempting to dequeue a message.
/// </summary>
/// <param name="envelope">The message that might be dequeued.</param>
/// <returns><c>true</c> if a message was available to be dequeued, <c>false</c> otherwise.</returns>
/// <remarks>
/// Called from within a synchronization mechanism.
/// </remarks>
protected override bool LockedTryDequeue(out Envelope envelope)
{
if (_prependBuffer.Count > 0)
{
envelope = _prependBuffer.Pop();
return true;
}

if (_prioQueue.Count() > 0)
{
envelope = _prioQueue.Dequeue();
return true;
}
envelope = default(Envelope);
return false;
}

public void EnqueueFirst(Envelope envelope)
{
_prependBuffer.Push(envelope);
}
}
}

Loading

0 comments on commit 5366dd8

Please sign in to comment.