Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORDictionary with POCO value missing items, add MultiNode spec #4910

Prev Previous commit
Next Next commit
Change Replicator class to UntypedActor
Arkatufus committed Apr 22, 2021
commit 56a2bd4b0f1c17e21c24591611eaf540a4c8e33b
116 changes: 62 additions & 54 deletions src/contrib/cluster/Akka.DistributedData/Replicator.cs
Original file line number Diff line number Diff line change
@@ -261,7 +261,7 @@ namespace Akka.DistributedData
/// </list>
/// </para>
/// </summary>
internal sealed class Replicator : ReceiveActor, IWithUnboundedStash
internal sealed class Replicator : UntypedActor, IWithUnboundedStash
{
public static Props Props(ReplicatorSettings settings) =>
Actor.Props.Create(() => new Replicator(settings)).WithDeploy(Deploy.Local).WithDispatcher(settings.Dispatcher);
@@ -354,6 +354,9 @@ private bool IsKnownNode(Address node) => _nodes.Contains(node) || _weaklyUpNode
private readonly ICancelable _deltaPropagationTask;
private readonly int _maxDeltaSize;

private int _count;
private DateTime _startTime;

public Replicator(ReplicatorSettings settings)
{
_settings = settings;
@@ -407,7 +410,12 @@ public Replicator(ReplicatorSettings settings)
TimeSpan.TicksPerMillisecond * 200));
_deltaPropagationTask = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(deltaPropagationInterval, deltaPropagationInterval, Self, DeltaPropagationTick.Instance, Self);

if (_hasDurableKeys) Become(Load);
if (_hasDurableKeys)
{
_count = 0;
_startTime = DateTime.UtcNow;
Become(Load);
}
else Become(NormalReceive);
}

@@ -444,56 +452,63 @@ protected override void PostStop()
else return Actor.SupervisorStrategy.DefaultDecider.Decide(e);
});

private void Load()
private bool Load(object message)
{
var startTime = DateTime.UtcNow;
var count = 0;

Receive<LoadData>(load =>
switch (message)
{
count += load.Data.Count;
foreach (var entry in load.Data)
{
var envelope = entry.Value.DataEnvelope;
var newEnvelope = Write(entry.Key, envelope);
if (!ReferenceEquals(newEnvelope, envelope))
case LoadData load:
_count += load.Data.Count;
foreach (var entry in load.Data)
{
_durableStore.Tell(new Store(entry.Key, new DurableDataEnvelope(newEnvelope), null));
var envelope = entry.Value.DataEnvelope;
var newEnvelope = Write(entry.Key, envelope);
if (!ReferenceEquals(newEnvelope, envelope))
{
_durableStore.Tell(new Store(entry.Key, new DurableDataEnvelope(newEnvelope), null));
}
}
}
});
Receive<LoadAllCompleted>(_ =>
{
_log.Debug("Loading {0} entries from durable store took {1} ms", count,
(DateTime.UtcNow - startTime).TotalMilliseconds);
Become(NormalReceive);
Stash.UnstashAll();
Self.Tell(FlushChanges.Instance);
});
Receive<GetReplicaCount>(_ =>
{
// 0 until durable data has been loaded, used by test
Sender.Tell(new ReplicaCount(0));
});

// ignore scheduled ticks when loading durable data
Receive(new Action<RemovedNodePruningTick>(Ignore));
Receive(new Action<FlushChanges>(Ignore));
Receive(new Action<GossipTick>(Ignore));

// ignore gossip and replication when loading durable data
Receive(new Action<Read>(IgnoreDebug));
Receive(new Action<Write>(IgnoreDebug));
Receive(new Action<Status>(IgnoreDebug));
Receive(new Action<Gossip>(IgnoreDebug));

Receive<ClusterEvent.IClusterDomainEvent>(msg =>
{
if(!NormalReceive(msg))
Unhandled(msg);
});
return true;

case LoadAllCompleted _:
_log.Debug("Loading {0} entries from durable store took {1} ms", _count,
(DateTime.UtcNow - _startTime).TotalMilliseconds);
Become(NormalReceive);
Stash.UnstashAll();
Self.Tell(FlushChanges.Instance);
return true;

case GetReplicaCount _:
// 0 until durable data has been loaded, used by test
Sender.Tell(new ReplicaCount(0));
return true;

// ignore scheduled ticks when loading durable data
case RemovedNodePruningTick _:
case FlushChanges _:
case GossipTick _:
// Ignored
return true;

// ignore gossip and replication when loading durable data
case Read _:
case Write _:
case Status _:
case Gossip _:
_log.Debug("ignoring message [{0}] when loading durable data", message.GetType());
return true;

case ClusterEvent.IClusterDomainEvent msg:
return NormalReceive(msg);

default:
Stash.Stash();
return true;
}
}

ReceiveAny(_ => Stash.Stash());
protected override void OnReceive(object message)
{
throw new NotImplementedException();
}

private bool NormalReceive(object message)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is practically the same, needed to change it to untyped receive so that we can call it from somewhere else and checks if it handled the message or not (partial function in scala)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And that works even with a ReceiveActor ? Never tried that before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I should probably change the base class to UntypedActor instead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me

@@ -573,13 +588,6 @@ private bool NormalReceive(object message)
return false;
}

private void IgnoreDebug<T>(T msg)
{
_log.Debug("ignoring message [{0}] when loading durable data", typeof(T));
}

private static void Ignore<T>(T msg) { }

private void ReceiveGet(IKey key, IReadConsistency consistency, object req)
{
var localValue = GetData(key.Id);