diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClient.cs b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClient.cs
index 2c52deed2e3..8007c4621e5 100644
--- a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClient.cs
+++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClient.cs
@@ -41,10 +41,25 @@ public sealed class ClusterClient : ActorBase
[Serializable]
public sealed class Send
{
+ ///
+ /// TBD
+ ///
public string Path { get; }
+ ///
+ /// TBD
+ ///
public object Message { get; }
+ ///
+ /// TBD
+ ///
public bool LocalAffinity { get; }
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
+ /// TBD
public Send(string path, object message, bool localAffinity = false)
{
Path = path;
@@ -59,9 +74,20 @@ public Send(string path, object message, bool localAffinity = false)
[Serializable]
public sealed class SendToAll
{
+ ///
+ /// TBD
+ ///
public string Path { get; }
+ ///
+ /// TBD
+ ///
public object Message { get; }
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public SendToAll(string path, object message)
{
Path = path;
@@ -76,9 +102,20 @@ public SendToAll(string path, object message)
[Serializable]
public sealed class Publish
{
+ ///
+ /// TBD
+ ///
public string Topic { get; }
+ ///
+ /// TBD
+ ///
public object Message { get; }
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public Publish(string topic, object message)
{
Topic = topic;
@@ -86,23 +123,41 @@ public Publish(string topic, object message)
}
}
+ ///
+ /// TBD
+ ///
[Serializable]
internal sealed class RefreshContactsTick
{
+ ///
+ /// TBD
+ ///
public static readonly RefreshContactsTick Instance = new RefreshContactsTick();
private RefreshContactsTick() { }
}
+ ///
+ /// TBD
+ ///
[Serializable]
internal sealed class HeartbeatTick
{
+ ///
+ /// TBD
+ ///
public static readonly HeartbeatTick Instance = new HeartbeatTick();
private HeartbeatTick() { }
}
+ ///
+ /// TBD
+ ///
[Serializable]
internal sealed class ReconnectTimeout
{
+ ///
+ /// TBD
+ ///
public static readonly ReconnectTimeout Instance = new ReconnectTimeout();
private ReconnectTimeout() { }
}
@@ -112,6 +167,9 @@ private ReconnectTimeout() { }
///
/// Factory method for .
///
+ /// TBD
+ /// TBD
+ /// TBD
public static Props Props(ClusterClientSettings settings)
{
if (settings == null)
@@ -132,6 +190,12 @@ public static Props Props(ClusterClientSettings settings)
private ICancelable _refreshContactsCancelable;
private readonly Queue> _buffer;
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
+ /// TBD
public ClusterClient(ClusterClientSettings settings)
{
if (settings.InitialContacts.Count == 0)
@@ -181,6 +245,9 @@ private void ScheduleRefreshContactsTick(TimeSpan interval)
Self);
}
+ ///
+ /// TBD
+ ///
protected override void PostStop()
{
base.PostStop();
@@ -193,6 +260,11 @@ protected override void PostStop()
}
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
protected override bool Receive(object message)
{
return Establishing(message);
@@ -448,6 +520,9 @@ private void PublishContactPoints()
///
public interface IContactPointChange
{
+ ///
+ /// TBD
+ ///
ActorPath ContactPoint { get; }
}
@@ -457,11 +532,18 @@ public interface IContactPointChange
///
public sealed class ContactPointAdded : IContactPointChange
{
+ ///
+ /// TBD
+ ///
+ /// TBD
public ContactPointAdded(ActorPath contactPoint)
{
ContactPoint = contactPoint;
}
+ ///
+ /// TBD
+ ///
public ActorPath ContactPoint { get; }
}
@@ -471,14 +553,24 @@ public ContactPointAdded(ActorPath contactPoint)
///
public sealed class ContactPointRemoved : IContactPointChange
{
+ ///
+ /// TBD
+ ///
+ /// TBD
public ContactPointRemoved(ActorPath contactPoint)
{
ContactPoint = contactPoint;
}
+ ///
+ /// TBD
+ ///
public ActorPath ContactPoint { get; }
}
+ ///
+ /// TBD
+ ///
public interface ISubscribeContactPoints
{
}
@@ -493,10 +585,16 @@ public interface ISubscribeContactPoints
///
public sealed class SubscribeContactPoints : ISubscribeContactPoints
{
+ ///
+ /// TBD
+ ///
public static readonly SubscribeContactPoints Instance = new SubscribeContactPoints();
private SubscribeContactPoints() { }
}
+ ///
+ /// TBD
+ ///
public interface IUnsubscribeContactPoints
{
}
@@ -506,10 +604,16 @@ public interface IUnsubscribeContactPoints
///
public sealed class UnsubscribeContactPoints : IUnsubscribeContactPoints
{
+ ///
+ /// TBD
+ ///
public static readonly UnsubscribeContactPoints Instance = new UnsubscribeContactPoints();
private UnsubscribeContactPoints() { }
}
+ ///
+ /// TBD
+ ///
public interface IGetContactPoints
{
}
@@ -520,6 +624,9 @@ public interface IGetContactPoints
///
public sealed class GetContactPoints : IGetContactPoints
{
+ ///
+ /// TBD
+ ///
public static readonly GetContactPoints Instance = new GetContactPoints();
private GetContactPoints() { }
}
@@ -529,11 +636,18 @@ private GetContactPoints() { }
///
public sealed class ContactPoints
{
+ ///
+ /// TBD
+ ///
+ /// TBD
public ContactPoints(ImmutableHashSet contactPoints)
{
ContactPointsList = contactPoints;
}
+ ///
+ /// TBD
+ ///
public ImmutableHashSet ContactPointsList { get; }
}
}
diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientReceptionist.cs b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientReceptionist.cs
index f53f409486e..48ddea940cb 100644
--- a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientReceptionist.cs
+++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientReceptionist.cs
@@ -19,11 +19,20 @@ namespace Akka.Cluster.Tools.Client
///
public sealed class ClusterClientReceptionist : IExtension
{
+ ///
+ /// TBD
+ ///
+ /// TBD
public static Config DefaultConfig()
{
return ConfigurationFactory.FromResource("Akka.Cluster.Tools.Client.reference.conf");
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public static ClusterClientReceptionist Get(ActorSystem system)
{
return system.WithExtension();
@@ -34,6 +43,10 @@ public static ClusterClientReceptionist Get(ActorSystem system)
private readonly Config _config;
private readonly IActorRef _receptionist;
+ ///
+ /// TBD
+ ///
+ /// TBD
public ClusterClientReceptionist(ExtendedActorSystem system)
{
_system = system;
@@ -70,6 +83,7 @@ internal IActorRef PubSubMediator
/// or using the path elements
/// of the , e.g. "/user/myservice".
///
+ /// TBD
public void RegisterService(IActorRef actorRef)
{
PubSubMediator.Tell(new PublishSubscribe.Put(actorRef));
@@ -79,6 +93,7 @@ public void RegisterService(IActorRef actorRef)
/// A registered actor will be automatically unregistered when terminated,
/// but it can also be explicitly unregistered before termination.
///
+ /// TBD
public void UnregisterService(IActorRef actorRef)
{
PubSubMediator.Tell(new PublishSubscribe.Remove(actorRef.Path.ToStringWithoutAddress()));
@@ -90,6 +105,8 @@ public void UnregisterService(IActorRef actorRef)
/// published messages.
/// The client can publish messages to this topic with .
///
+ /// TBD
+ /// TBD
public void RegisterSubscriber(string topic, IActorRef actorRef)
{
PubSubMediator.Tell(new PublishSubscribe.Subscribe(topic, actorRef));
@@ -99,6 +116,8 @@ public void RegisterSubscriber(string topic, IActorRef actorRef)
/// A registered subscriber will be automatically unregistered when terminated,
/// but it can also be explicitly unregistered before termination.
///
+ /// TBD
+ /// TBD
public void UnregisterSubscriber(string topic, IActorRef actorRef)
{
PubSubMediator.Tell(new PublishSubscribe.Unsubscribe(topic, actorRef));
@@ -133,8 +152,16 @@ private IActorRef CreateReceptionist()
public IActorRef Underlying => _receptionist;
}
+ ///
+ /// TBD
+ ///
public class ClusterClientReceptionistExtensionProvider : ExtensionIdProvider
{
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public override ClusterClientReceptionist CreateExtension(ExtendedActorSystem system)
{
return new ClusterClientReceptionist(system);
diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientSettings.cs b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientSettings.cs
index d65d695e457..0bee45a9f86 100644
--- a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientSettings.cs
+++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientSettings.cs
@@ -15,11 +15,17 @@
namespace Akka.Cluster.Tools.Client
{
+ ///
+ /// TBD
+ ///
public sealed class ClusterClientSettings : INoSerializationVerificationNeeded
{
///
/// Create settings from the default configuration 'akka.cluster.client'.
///
+ /// TBD
+ /// TBD
+ /// TBD
public static ClusterClientSettings Create(ActorSystem system)
{
system.Settings.InjectTopLevelFallback(ClusterClientReceptionist.DefaultConfig());
@@ -34,6 +40,8 @@ public static ClusterClientSettings Create(ActorSystem system)
///
/// Java API: Create settings from a configuration with the same layout as the default configuration 'akka.cluster.client'.
///
+ /// TBD
+ /// TBD
public static ClusterClientSettings Create(Config config)
{
var initialContacts = config.GetStringList("initial-contacts").Select(ActorPath.Parse).ToImmutableSortedSet();
@@ -93,6 +101,17 @@ public static ClusterClientSettings Create(Config config)
///
public readonly TimeSpan? ReconnectTimeout;
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
+ /// TBD
+ /// TBD
+ /// TBD
+ /// TBD
+ /// TBD
+ /// TBD
public ClusterClientSettings(
IImmutableSet initialContacts,
TimeSpan establishingGetContactsInterval,
@@ -116,6 +135,12 @@ public ClusterClientSettings(
ReconnectTimeout = reconnectTimeout;
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
+ /// TBD
public ClusterClientSettings WithInitialContacts(IImmutableSet initialContacts)
{
if (initialContacts.Count == 0)
@@ -126,32 +151,63 @@ public ClusterClientSettings WithInitialContacts(IImmutableSet initia
return Copy(initialContacts: initialContacts);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
+ /// TBD
[Obsolete("Use WithInitialContacts(IImmutableSet initialContacts) instead")]
public ClusterClientSettings WithInitialContacts(IEnumerable initialContacts)
{
return WithInitialContacts(initialContacts.ToImmutableHashSet());
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public ClusterClientSettings WithEstablishingGetContactsInterval(TimeSpan value)
{
return Copy(establishingGetContactsInterval: value);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public ClusterClientSettings WithRefreshContactsInterval(TimeSpan value)
{
return Copy(refreshContactsInterval: value);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public ClusterClientSettings WithHeartbeatInterval(TimeSpan value)
{
return Copy(heartbeatInterval: value);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public ClusterClientSettings WithBufferSize(int bufferSize)
{
return Copy(bufferSize: bufferSize);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public ClusterClientSettings WithReconnectTimeout(TimeSpan? reconnectTimeout)
{
return Copy(reconnectTimeout: reconnectTimeout);
diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterReceptionist.cs b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterReceptionist.cs
index 2a25ee3b9a4..cfa3ad2c65f 100644
--- a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterReceptionist.cs
+++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterReceptionist.cs
@@ -31,6 +31,9 @@ public interface IClusterClientMessage { }
///
public interface IClusterClientInteraction
{
+ ///
+ /// TBD
+ ///
IActorRef ClusterClient { get; }
}
@@ -40,11 +43,18 @@ public interface IClusterClientInteraction
///
public sealed class ClusterClientUp : IClusterClientInteraction
{
+ ///
+ /// TBD
+ ///
+ /// TBD
public ClusterClientUp(IActorRef clusterClient)
{
ClusterClient = clusterClient;
}
+ ///
+ /// TBD
+ ///
public IActorRef ClusterClient { get; }
}
@@ -54,11 +64,18 @@ public ClusterClientUp(IActorRef clusterClient)
///
public sealed class ClusterClientUnreachable : IClusterClientInteraction
{
+ ///
+ /// TBD
+ ///
+ /// TBD
public ClusterClientUnreachable(IActorRef clusterClient)
{
ClusterClient = clusterClient;
}
+ ///
+ /// TBD
+ ///
public IActorRef ClusterClient { get; }
}
@@ -72,6 +89,9 @@ public ClusterClientUnreachable(IActorRef clusterClient)
///
public sealed class SubscribeClusterClients
{
+ ///
+ /// TBD
+ ///
public static readonly SubscribeClusterClients Instance = new SubscribeClusterClients();
private SubscribeClusterClients() { }
}
@@ -81,6 +101,9 @@ private SubscribeClusterClients() { }
///
public sealed class UnsubscribeClusterClients
{
+ ///
+ /// TBD
+ ///
public static readonly UnsubscribeClusterClients Instance = new UnsubscribeClusterClients();
private UnsubscribeClusterClients() { }
}
@@ -91,6 +114,9 @@ private UnsubscribeClusterClients() { }
///
public sealed class GetClusterClients
{
+ ///
+ /// TBD
+ ///
public static readonly GetClusterClients Instance = new GetClusterClients();
private GetClusterClients() { }
}
@@ -109,6 +135,9 @@ public ClusterClients(ImmutableHashSet clusterClientsList)
ClusterClientsList = clusterClientsList;
}
+ ///
+ /// TBD
+ ///
public ImmutableHashSet ClusterClientsList { get; }
}
@@ -138,23 +167,44 @@ public sealed class ClusterReceptionist : ActorBase
{
#region Messages
+ ///
+ /// TBD
+ ///
[Serializable]
internal sealed class GetContacts : IClusterClientMessage, IDeadLetterSuppression
{
+ ///
+ /// TBD
+ ///
public static readonly GetContacts Instance = new GetContacts();
private GetContacts() { }
}
+ ///
+ /// TBD
+ ///
[Serializable]
internal sealed class Contacts : IClusterClientMessage
{
+ ///
+ /// TBD
+ ///
public readonly ImmutableList ContactPoints;
+ ///
+ /// TBD
+ ///
+ /// TBD
public Contacts(ImmutableList contactPoints)
{
ContactPoints = contactPoints;
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj)) return false;
@@ -166,6 +216,10 @@ public override bool Equals(object obj)
return ContactPoints.SequenceEqual(other.ContactPoints);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override int GetHashCode()
{
unchecked
@@ -180,29 +234,53 @@ public override int GetHashCode()
}
}
+ ///
+ /// TBD
+ ///
[Serializable]
internal sealed class Heartbeat : IClusterClientMessage, IDeadLetterSuppression
{
+ ///
+ /// TBD
+ ///
public static readonly Heartbeat Instance = new Heartbeat();
private Heartbeat() { }
}
+ ///
+ /// TBD
+ ///
[Serializable]
internal sealed class HeartbeatRsp : IClusterClientMessage, IDeadLetterSuppression
{
+ ///
+ /// TBD
+ ///
public static readonly HeartbeatRsp Instance = new HeartbeatRsp();
private HeartbeatRsp() { }
}
+ ///
+ /// TBD
+ ///
[Serializable]
internal sealed class Ping : IDeadLetterSuppression
{
+ ///
+ /// TBD
+ ///
public static readonly Ping Instance = new Ping();
private Ping() { }
}
+ ///
+ /// TBD
+ ///
internal sealed class CheckDeadlines
{
+ ///
+ /// TBD
+ ///
public static readonly CheckDeadlines Instance = new CheckDeadlines();
private CheckDeadlines() { }
}
@@ -211,6 +289,9 @@ private CheckDeadlines() { }
///
/// Factory method for .
///
+ /// TBD
+ /// TBD
+ /// TBD
public static Props Props(IActorRef pubSubMediator, ClusterReceptionistSettings settings)
{
return Actor.Props.Create(() => new ClusterReceptionist(
@@ -220,11 +301,23 @@ public static Props Props(IActorRef pubSubMediator, ClusterReceptionistSettings
#region RingOrdering
+ ///
+ /// TBD
+ ///
internal class RingOrdering : IComparer
{
+ ///
+ /// TBD
+ ///
public static readonly RingOrdering Instance = new RingOrdering();
private RingOrdering() { }
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
+ /// TBD
public static int HashFor(Address node)
{
// cluster node identifier is the host and port of the address; protocol and system is assumed to be the same
@@ -234,6 +327,12 @@ public static int HashFor(Address node)
throw new IllegalStateException("Unexpected address without host/port: " + node);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
+ /// TBD
public int Compare(Address a, Address b)
{
var ha = HashFor(a);
@@ -258,6 +357,12 @@ public int Compare(Address a, Address b)
private ImmutableList _subscribers;
private ICancelable _checkDeadlinesTask;
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
+ /// TBD
public ClusterReceptionist(IActorRef pubSubMediator, ClusterReceptionistSettings settings)
{
_log = Context.GetLogger();
@@ -285,6 +390,10 @@ public ClusterReceptionist(IActorRef pubSubMediator, ClusterReceptionistSettings
Self);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
protected override void PreStart()
{
base.PreStart();
@@ -295,6 +404,9 @@ protected override void PreStart()
_cluster.Subscribe(Self, typeof(ClusterEvent.IMemberEvent));
}
+ ///
+ /// TBD
+ ///
protected override void PostStop()
{
base.PostStop();
@@ -317,6 +429,11 @@ private IActorRef ResponseTunnel(IActorRef client)
: Context.ActorOf(Actor.Props.Create(() => new ClientResponseTunnel(client, _settings.ResponseTunnelReceiveTimeout)), encName);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
protected override bool Receive(object message)
{
if (message is PublishSubscribe.Send
@@ -480,6 +597,11 @@ internal class ClientResponseTunnel : ActorBase
private readonly IActorRef _client;
private readonly ILoggingAdapter _log;
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public ClientResponseTunnel(IActorRef client, TimeSpan timeout)
{
_client = client;
@@ -487,6 +609,11 @@ public ClientResponseTunnel(IActorRef client, TimeSpan timeout)
Context.SetReceiveTimeout(timeout);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
protected override bool Receive(object message)
{
if (message is ClusterReceptionist.Ping)
diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterReceptionistSettings.cs b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterReceptionistSettings.cs
index 5db9a37a777..99f5e3853aa 100644
--- a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterReceptionistSettings.cs
+++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterReceptionistSettings.cs
@@ -11,11 +11,17 @@
namespace Akka.Cluster.Tools.Client
{
+ ///
+ /// TBD
+ ///
public sealed class ClusterReceptionistSettings : INoSerializationVerificationNeeded
{
///
/// Create settings from the default configuration "akka.cluster.client.receptionist".
///
+ /// TBD
+ /// TBD
+ /// TBD
public static ClusterReceptionistSettings Create(ActorSystem system)
{
system.Settings.InjectTopLevelFallback(ClusterClientReceptionist.DefaultConfig());
@@ -30,6 +36,8 @@ public static ClusterReceptionistSettings Create(ActorSystem system)
///
/// Create settings from a configuration with the same layout as the default configuration "akka.cluster.client.receptionist".
///
+ /// TBD
+ /// TBD
public static ClusterReceptionistSettings Create(Config config)
{
var role = config.GetString("role");
@@ -79,6 +87,15 @@ public static ClusterReceptionistSettings Create(Config config)
///
public readonly TimeSpan FailureDetectionInterval;
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
+ /// TBD
+ /// TBD
+ /// TBD
+ /// TBD
public ClusterReceptionistSettings(
string role,
int numberOfContacts,
@@ -95,26 +112,52 @@ public ClusterReceptionistSettings(
FailureDetectionInterval = failureDetectionInterval;
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public ClusterReceptionistSettings WithRole(string role)
{
return Copy(role: role);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public ClusterReceptionistSettings WithoutRole()
{
return Copy(role: "");
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public ClusterReceptionistSettings WithNumberOfContacts(int numberOfContacts)
{
return Copy(numberOfContacts: numberOfContacts);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public ClusterReceptionistSettings WithResponseTunnelReceiveTimeout(TimeSpan responseTunnelReceiveTimeout)
{
return Copy(responseTunnelReceiveTimeout: responseTunnelReceiveTimeout);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
+ /// TBD
+ /// TBD
public ClusterReceptionistSettings WithHeartbeat(TimeSpan heartbeatInterval, TimeSpan acceptableHeartbeatPause, TimeSpan failureDetectionInterval)
{
return Copy(
diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/Serialization/ClusterClientMessageSerializer.cs b/src/contrib/cluster/Akka.Cluster.Tools/Client/Serialization/ClusterClientMessageSerializer.cs
index 1b98956fc22..ad844788e2f 100644
--- a/src/contrib/cluster/Akka.Cluster.Tools/Client/Serialization/ClusterClientMessageSerializer.cs
+++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/Serialization/ClusterClientMessageSerializer.cs
@@ -18,8 +18,14 @@
namespace Akka.Cluster.Tools.Client.Serialization
{
+ ///
+ /// TBD
+ ///
internal class ClusterClientMessageSerializer : SerializerWithStringManifest
{
+ ///
+ /// TBD
+ ///
public const int BufferSize = 1024 * 4;
private const string ContactsManifest = "A";
@@ -30,8 +36,15 @@ internal class ClusterClientMessageSerializer : SerializerWithStringManifest
private static readonly byte[] EmptyBytes = new byte[0];
private readonly IDictionary> _fromBinaryMap;
+ ///
+ /// TBD
+ ///
public override int Identifier { get; }
+ ///
+ /// TBD
+ ///
+ /// TBD
public ClusterClientMessageSerializer(ExtendedActorSystem system) : base(system)
{
Identifier = SerializerIdentifierHelper.GetSerializerIdentifierFromConfig(GetType(), system);
@@ -44,6 +57,12 @@ public ClusterClientMessageSerializer(ExtendedActorSystem system) : base(system)
};
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
+ /// TBD
public override byte[] ToBinary(object obj)
{
if (obj is ClusterReceptionist.Contacts) return Compress(ContactsToProto(obj as ClusterReceptionist.Contacts));
@@ -54,6 +73,13 @@ public override byte[] ToBinary(object obj)
throw new ArgumentException($"Can't serialize object of type [{obj.GetType()}] in [{GetType()}]");
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
+ /// TBD
+ /// TBD
public override object FromBinary(byte[] bytes, string manifestString)
{
Func deserializer;
@@ -65,6 +91,12 @@ public override object FromBinary(byte[] bytes, string manifestString)
throw new ArgumentException($"Unimplemented deserialization of message with manifest [{manifestString}] in serializer {GetType()}");
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
+ /// TBD
public override string Manifest(object o)
{
if (o is ClusterReceptionist.Contacts) return ContactsManifest;
diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedMessages.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedMessages.cs
index 03bbb4cf13b..c1320c95f56 100644
--- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedMessages.cs
+++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedMessages.cs
@@ -11,16 +11,31 @@
namespace Akka.Cluster.Tools.PublishSubscribe
{
+ ///
+ /// TBD
+ ///
[Serializable]
public sealed class Put : IEquatable
{
+ ///
+ /// TBD
+ ///
public readonly IActorRef Ref;
+ ///
+ /// TBD
+ ///
+ /// TBD
public Put(IActorRef @ref)
{
Ref = @ref;
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public bool Equals(Put other)
{
if (ReferenceEquals(other, null)) return false;
@@ -28,32 +43,60 @@ public bool Equals(Put other)
return Equals(Ref, other.Ref);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public override bool Equals(object obj)
{
return Equals(obj as Put);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override int GetHashCode()
{
return (Ref != null ? Ref.GetHashCode() : 0);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override string ToString()
{
return string.Format("Put", Ref);
}
}
+ ///
+ /// TBD
+ ///
[Serializable]
public sealed class Remove : IEquatable
{
+ ///
+ /// TBD
+ ///
public readonly string Path;
+ ///
+ /// TBD
+ ///
+ /// TBD
public Remove(string path)
{
Path = path;
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public bool Equals(Remove other)
{
if (ReferenceEquals(other, null)) return false;
@@ -61,29 +104,61 @@ public bool Equals(Remove other)
return Equals(Path, other.Path);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public override bool Equals(object obj)
{
return Equals(obj as Remove);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override int GetHashCode()
{
return (Path != null ? Path.GetHashCode() : 0);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override string ToString()
{
return string.Format("Remove", Path);
}
}
+ ///
+ /// TBD
+ ///
[Serializable]
public sealed class Subscribe : IEquatable
{
+ ///
+ /// TBD
+ ///
public readonly string Topic;
+ ///
+ /// TBD
+ ///
public readonly string Group;
+ ///
+ /// TBD
+ ///
public readonly IActorRef Ref;
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
+ /// TBD
+ /// TBD
public Subscribe(string topic, IActorRef @ref, string @group = null)
{
if (string.IsNullOrEmpty(topic)) throw new ArgumentException("topic must be defined");
@@ -93,6 +168,11 @@ public Subscribe(string topic, IActorRef @ref, string @group = null)
Ref = @ref;
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public bool Equals(Subscribe other)
{
if (ReferenceEquals(other, null)) return false;
@@ -102,11 +182,20 @@ public bool Equals(Subscribe other)
Equals(Ref, other.Ref);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public override bool Equals(object obj)
{
return Equals(obj as Subscribe);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override int GetHashCode()
{
unchecked
@@ -118,19 +207,42 @@ public override int GetHashCode()
}
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override string ToString()
{
return string.Format("Subscribe", Topic, Group, Ref);
}
}
+ ///
+ /// TBD
+ ///
[Serializable]
public sealed class Unsubscribe : IEquatable
{
+ ///
+ /// TBD
+ ///
public readonly string Topic;
+ ///
+ /// TBD
+ ///
public readonly string Group;
+ ///
+ /// TBD
+ ///
public readonly IActorRef Ref;
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
+ /// TBD
+ /// TBD
public Unsubscribe(string topic, IActorRef @ref, string @group = null)
{
if (string.IsNullOrEmpty(topic)) throw new ArgumentException("topic must be defined");
@@ -140,6 +252,11 @@ public Unsubscribe(string topic, IActorRef @ref, string @group = null)
Ref = @ref;
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public bool Equals(Unsubscribe other)
{
if (ReferenceEquals(other, null)) return false;
@@ -149,11 +266,20 @@ public bool Equals(Unsubscribe other)
Equals(Ref, other.Ref);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public override bool Equals(object obj)
{
return Equals(obj as Unsubscribe);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override int GetHashCode()
{
unchecked
@@ -165,22 +291,42 @@ public override int GetHashCode()
}
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override string ToString()
{
return string.Format("Unsubscribe", Topic, Group, Ref);
}
}
+ ///
+ /// TBD
+ ///
[Serializable]
public sealed class SubscribeAck : IEquatable, IDeadLetterSuppression
{
+ ///
+ /// TBD
+ ///
public readonly Subscribe Subscribe;
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public SubscribeAck(Subscribe subscribe)
{
Subscribe = subscribe;
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public bool Equals(SubscribeAck other)
{
if (ReferenceEquals(other, null)) return false;
@@ -188,32 +334,59 @@ public bool Equals(SubscribeAck other)
return Equals(Subscribe, other.Subscribe);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public override bool Equals(object obj)
{
return Equals(obj as SubscribeAck);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override int GetHashCode()
{
return (Subscribe != null ? Subscribe.GetHashCode() : 0);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override string ToString()
{
return string.Format("SubscribeAck<{0}>", Subscribe);
}
}
+ ///
+ /// TBD
+ ///
[Serializable]
public sealed class UnsubscribeAck : IEquatable
{
+ ///
+ /// TBD
+ ///
public readonly Unsubscribe Unsubscribe;
+ ///
+ /// TBD
+ ///
public UnsubscribeAck(Unsubscribe unsubscribe)
{
Unsubscribe = unsubscribe;
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public bool Equals(UnsubscribeAck other)
{
if (ReferenceEquals(other, null)) return false;
@@ -221,29 +394,60 @@ public bool Equals(UnsubscribeAck other)
return Equals(Unsubscribe, other.Unsubscribe);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public override bool Equals(object obj)
{
return Equals(obj as UnsubscribeAck);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override int GetHashCode()
{
return (Unsubscribe != null ? Unsubscribe.GetHashCode() : 0);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override string ToString()
{
return string.Format("UnsubscribeAck<{0}>", Unsubscribe);
}
}
+ ///
+ /// TBD
+ ///
[Serializable]
public sealed class Publish : IDistributedPubSubMessage, IEquatable
{
+ ///
+ /// TBD
+ ///
public readonly string Topic;
+ ///
+ /// TBD
+ ///
public readonly object Message;
+ ///
+ /// TBD
+ ///
public readonly bool SendOneMessageToEachGroup;
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
+ /// TBD
public Publish(string topic, object message, bool sendOneMessageToEachGroup = false)
{
Topic = topic;
@@ -251,6 +455,11 @@ public Publish(string topic, object message, bool sendOneMessageToEachGroup = fa
SendOneMessageToEachGroup = sendOneMessageToEachGroup;
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public bool Equals(Publish other)
{
if (ReferenceEquals(other, null)) return false;
@@ -260,11 +469,20 @@ public bool Equals(Publish other)
Equals(Message, other.Message);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public override bool Equals(object obj)
{
return Equals(obj as Publish);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override int GetHashCode()
{
unchecked
@@ -276,19 +494,41 @@ public override int GetHashCode()
}
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override string ToString()
{
return string.Format("Publish", Topic, SendOneMessageToEachGroup, Message);
}
}
+ ///
+ /// TBD
+ ///
[Serializable]
public sealed class Send : IDistributedPubSubMessage, IEquatable
{
+ ///
+ /// TBD
+ ///
public readonly string Path;
+ ///
+ /// TBD
+ ///
public readonly object Message;
+ ///
+ /// TBD
+ ///
public readonly bool LocalAffinity;
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
+ /// TBD
public Send(string path, object message, bool localAffinity = false)
{
Path = path;
@@ -296,6 +536,11 @@ public Send(string path, object message, bool localAffinity = false)
LocalAffinity = localAffinity;
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public bool Equals(Send other)
{
if (ReferenceEquals(other, null)) return false;
@@ -305,11 +550,20 @@ public bool Equals(Send other)
Equals(Message, other.Message);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public override bool Equals(object obj)
{
return Equals(obj as Send);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override int GetHashCode()
{
unchecked
@@ -321,19 +575,41 @@ public override int GetHashCode()
}
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override string ToString()
{
return string.Format("Send", Path, LocalAffinity, Message);
}
}
+ ///
+ /// TBD
+ ///
[Serializable]
public sealed class SendToAll : IDistributedPubSubMessage, IEquatable
{
+ ///
+ /// TBD
+ ///
public readonly string Path;
+ ///
+ /// TBD
+ ///
public readonly object Message;
+ ///
+ /// TBD
+ ///
public readonly bool ExcludeSelf;
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
+ /// TBD
public SendToAll(string path, object message, bool excludeSelf = false)
{
Path = path;
@@ -341,6 +617,11 @@ public SendToAll(string path, object message, bool excludeSelf = false)
ExcludeSelf = excludeSelf;
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public bool Equals(SendToAll other)
{
if (ReferenceEquals(other, null)) return false;
@@ -350,11 +631,19 @@ public bool Equals(SendToAll other)
Equals(Message, other.Message);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override bool Equals(object obj)
{
return Equals(obj as SendToAll);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override int GetHashCode()
{
unchecked
@@ -366,45 +655,84 @@ public override int GetHashCode()
}
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override string ToString()
{
return string.Format("SendToAll", Path, ExcludeSelf, Message);
}
}
+ ///
+ /// TBD
+ ///
[Serializable]
public sealed class GetTopics : IEquatable
{
+ ///
+ /// TBD
+ ///
public static readonly GetTopics Instance = new GetTopics();
private GetTopics() { }
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public bool Equals(GetTopics other)
{
if (other == null) return false;
return true;
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public override bool Equals(object obj)
{
return Equals(obj as GetTopics);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override string ToString()
{
return "GetTopics<>";
}
}
+ ///
+ /// TBD
+ ///
[Serializable]
public sealed class CurrentTopics : IEquatable
{
+ ///
+ /// TBD
+ ///
public readonly string[] Topics;
+ ///
+ /// TBD
+ ///
+ /// TBD
public CurrentTopics(string[] topics)
{
Topics = topics ?? new string[0];
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public bool Equals(CurrentTopics other)
{
if (ReferenceEquals(other, null)) return false;
@@ -413,16 +741,29 @@ public bool Equals(CurrentTopics other)
return Equals(Topics, other.Topics);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public override bool Equals(object obj)
{
return Equals(obj as CurrentTopics);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override int GetHashCode()
{
return (Topics != null ? Topics.GetHashCode() : 0);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override string ToString()
{
return string.Format("CurrentTopics<{0}>", string.Join(",", Topics));
diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSub.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSub.cs
index cb09c458b0c..658fc564c17 100644
--- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSub.cs
+++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSub.cs
@@ -17,8 +17,16 @@ namespace Akka.Cluster.Tools.PublishSubscribe
///
public interface IDistributedPubSubMessage { }
+ ///
+ /// TBD
+ ///
public class DistributedPubSubExtensionProvider : ExtensionIdProvider
{
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public override DistributedPubSub CreateExtension(ExtendedActorSystem system)
{
return new DistributedPubSub(system);
@@ -36,16 +44,29 @@ public class DistributedPubSub : IExtension
private readonly Cluster _cluster;
private readonly IActorRef _mediatorRef;
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public static DistributedPubSub Get(ActorSystem system)
{
return system.WithExtension();
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public static Config DefaultConfig()
{
return ConfigurationFactory.FromResource("Akka.Cluster.Tools.PublishSubscribe.reference.conf");
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public DistributedPubSub(ExtendedActorSystem system)
{
_system = system;
diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs
index a2b3d8d4f5d..cf537c6f8c4 100644
--- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs
+++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs
@@ -102,6 +102,11 @@ namespace Akka.Cluster.Tools.PublishSubscribe
///
public class DistributedPubSubMediator : ReceiveActor
{
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public static Props Props(DistributedPubSubSettings settings)
{
return Actor.Props.Create(() => new DistributedPubSubMediator(settings)).WithDeploy(Deploy.Local);
@@ -119,8 +124,14 @@ public static Props Props(DistributedPubSubSettings settings)
private ILoggingAdapter _log;
private IDictionary _registry = new Dictionary();
+ ///
+ /// TBD
+ ///
public ILoggingAdapter Log { get { return _log ?? (_log = Context.GetLogger()); } }
+ ///
+ /// TBD
+ ///
public IDictionary OwnVersions
{
get
@@ -131,6 +142,12 @@ public IDictionary OwnVersions
}
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
+ /// TBD
public DistributedPubSubMediator(DistributedPubSubSettings settings)
{
if (settings.RoutingLogic is ConsistentHashingRoutingLogic)
@@ -387,11 +404,11 @@ private bool OtherHasNewerVersions(IDictionary versions)
return versions.Any(entry =>
{
Bucket bucket;
- if (_registry.TryGetValue(entry.Key, out bucket))
- {
- return entry.Value > bucket.Version;
- }
- return entry.Value > 0L;
+ if (_registry.TryGetValue(entry.Key, out bucket))
+ {
+ return entry.Value > bucket.Version;
+ }
+ return entry.Value > 0L;
});
}
@@ -587,6 +604,9 @@ private Address SelectRandomNode(IList addresses)
return addresses[ThreadLocalRandom.Current.Next(addresses.Count)];
}
+ ///
+ /// TBD
+ ///
protected override void PreStart()
{
base.PreStart();
@@ -594,6 +614,9 @@ protected override void PreStart()
_cluster.Subscribe(Self, typeof(ClusterEvent.IMemberEvent));
}
+ ///
+ /// TBD
+ ///
protected override void PostStop()
{
base.PostStop();
diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubSettings.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubSettings.cs
index 314fff16ed9..a5be3ab58ec 100644
--- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubSettings.cs
+++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubSettings.cs
@@ -12,11 +12,17 @@
namespace Akka.Cluster.Tools.PublishSubscribe
{
+ ///
+ /// TBD
+ ///
public class DistributedPubSubSettings
{
///
/// Creates cluster publish/subscribe settings from the default configuration `akka.cluster.pub-sub`.
///
+ /// TBD
+ /// TBD
+ /// TBD
public static DistributedPubSubSettings Create(ActorSystem system)
{
system.Settings.InjectTopLevelFallback(DistributedPubSub.DefaultConfig());
@@ -29,6 +35,9 @@ public static DistributedPubSubSettings Create(ActorSystem system)
///
/// Creates cluster publish subscribe settings from provided configuration with the same layout as `akka.cluster.pub-sub`.
///
+ /// TBD
+ /// TBD
+ /// TBD
public static DistributedPubSubSettings Create(Config config)
{
RoutingLogic routingLogic = null;
@@ -86,8 +95,14 @@ public static DistributedPubSubSettings Create(Config config)
public readonly int MaxDeltaElements;
///
- /// Creates a new instance of the .
+ /// Creates a new instance of the .
///
+ /// TBD
+ /// TBD
+ /// TBD
+ /// TBD
+ /// TBD
+ /// TBD
public DistributedPubSubSettings(string role, RoutingLogic routingLogic, TimeSpan gossipInterval, TimeSpan removedTimeToLive, int maxDeltaElements)
{
if (routingLogic is ConsistentHashingRoutingLogic)
@@ -102,26 +117,51 @@ public DistributedPubSubSettings(string role, RoutingLogic routingLogic, TimeSpa
MaxDeltaElements = maxDeltaElements;
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public DistributedPubSubSettings WithRole(string role)
{
return new DistributedPubSubSettings(role, RoutingLogic, GossipInterval, RemovedTimeToLive, MaxDeltaElements);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public DistributedPubSubSettings WithRoutingLogic(RoutingLogic routingLogic)
{
return new DistributedPubSubSettings(Role, routingLogic, GossipInterval, RemovedTimeToLive, MaxDeltaElements);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public DistributedPubSubSettings WithGossipInterval(TimeSpan gossipInterval)
{
return new DistributedPubSubSettings(Role, RoutingLogic, gossipInterval, RemovedTimeToLive, MaxDeltaElements);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public DistributedPubSubSettings WithRemovedTimeToLive(TimeSpan removedTtl)
{
return new DistributedPubSubSettings(Role, RoutingLogic, GossipInterval, removedTtl, MaxDeltaElements);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public DistributedPubSubSettings WithMaxDeltaElements(int maxDeltaElements)
{
return new DistributedPubSubSettings(Role, RoutingLogic, GossipInterval, RemovedTimeToLive, maxDeltaElements);
diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/TopicMessages.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/TopicMessages.cs
index b6da54e001f..225e88c2d3b 100644
--- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/TopicMessages.cs
+++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/TopicMessages.cs
@@ -15,31 +15,65 @@
namespace Akka.Cluster.Tools.PublishSubscribe.Internal
{
+ ///
+ /// TBD
+ ///
[Serializable]
internal sealed class Prune
{
+ ///
+ /// TBD
+ ///
public static readonly Prune Instance = new Prune();
private Prune() { }
}
// Only for testing purposes, to poll/await replication
+ ///
+ /// TBD
+ ///
internal sealed class Count
{
+ ///
+ /// TBD
+ ///
public static readonly Count Instance = new Count();
private Count() { }
}
+ ///
+ /// TBD
+ ///
[Serializable]
internal class Bucket : IEquatable
{
+ ///
+ /// TBD
+ ///
public readonly Address Owner;
+ ///
+ /// TBD
+ ///
public readonly long Version;
+ ///
+ /// TBD
+ ///
public readonly IImmutableDictionary Content;
+ ///
+ /// TBD
+ ///
+ /// TBD
public Bucket(Address owner) : this(owner, 0L, ImmutableDictionary.Empty)
{
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
+ /// TBD
public Bucket(Address owner, long version, IImmutableDictionary content)
{
Owner = owner;
@@ -47,6 +81,11 @@ public Bucket(Address owner, long version, IImmutableDictionary
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public bool Equals(Bucket other)
{
if (ReferenceEquals(other, null)) return false;
@@ -57,11 +96,20 @@ public bool Equals(Bucket other)
&& Content.SequenceEqual(other.Content);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public override bool Equals(object obj)
{
return Equals(obj as Bucket);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override int GetHashCode()
{
unchecked
@@ -74,23 +122,45 @@ public override int GetHashCode()
}
}
+ ///
+ /// TBD
+ ///
[Serializable]
internal sealed class ValueHolder : IEquatable
{
+ ///
+ /// TBD
+ ///
public readonly long Version;
+ ///
+ /// TBD
+ ///
public readonly IActorRef Ref;
[NonSerialized]
private Routee _routee;
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public ValueHolder(long version, IActorRef @ref)
{
Version = version;
Ref = @ref;
}
+ ///
+ /// TBD
+ ///
public Routee Routee { get { return _routee ?? (_routee = Ref != null ? new ActorRefRoutee(Ref) : null); } }
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public bool Equals(ValueHolder other)
{
if (ReferenceEquals(other, null)) return false;
@@ -99,11 +169,20 @@ public bool Equals(ValueHolder other)
Equals(Ref, other.Ref);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public override bool Equals(object obj)
{
return Equals(obj as ValueHolder);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override int GetHashCode()
{
unchecked
@@ -115,19 +194,38 @@ public override int GetHashCode()
}
}
+ ///
+ /// TBD
+ ///
[Serializable]
internal sealed class Status : IDistributedPubSubMessage, IDeadLetterSuppression
{
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public Status(IDictionary versions, bool isReplyToStatus)
{
Versions = versions ?? new Dictionary(0);
IsReplyToStatus = isReplyToStatus;
}
+ ///
+ /// TBD
+ ///
public IDictionary Versions { get; }
+ ///
+ /// TBD
+ ///
public bool IsReplyToStatus { get; }
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public override bool Equals(object obj)
{
if (ReferenceEquals(obj, null)) return false;
@@ -141,6 +239,10 @@ public override bool Equals(object obj)
&& IsReplyToStatus.Equals(other.IsReplyToStatus);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override int GetHashCode()
{
unchecked
@@ -158,16 +260,31 @@ public override int GetHashCode()
}
}
+ ///
+ /// TBD
+ ///
[Serializable]
internal sealed class Delta : IDistributedPubSubMessage, IEquatable, IDeadLetterSuppression
{
+ ///
+ /// TBD
+ ///
public readonly Bucket[] Buckets;
+ ///
+ /// TBD
+ ///
+ /// TBD
public Delta(Bucket[] buckets)
{
Buckets = buckets ?? new Bucket[0];
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public bool Equals(Delta other)
{
if (ReferenceEquals(other, null)) return false;
@@ -176,11 +293,20 @@ public bool Equals(Delta other)
return Buckets.SequenceEqual(other.Buckets);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public override bool Equals(object obj)
{
return Equals(obj as Delta);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
public override int GetHashCode()
{
return Buckets != null ? Buckets.GetHashCode() : 0;
@@ -188,39 +314,75 @@ public override int GetHashCode()
}
// Only for testing purposes, to verify replication
+ ///
+ /// TBD
+ ///
[Serializable]
internal sealed class DeltaCount
{
+ ///
+ /// TBD
+ ///
public static readonly DeltaCount Instance = new DeltaCount();
private DeltaCount() { }
}
+ ///
+ /// TBD
+ ///
[Serializable]
internal sealed class GossipTick
{
+ ///
+ /// TBD
+ ///
public static readonly GossipTick Instance = new GossipTick();
private GossipTick() { }
}
+ ///
+ /// TBD
+ ///
[Serializable]
internal sealed class RegisterTopic
{
+ ///
+ /// TBD
+ ///
public readonly IActorRef TopicRef;
+ ///
+ /// TBD
+ ///
+ /// TBD
public RegisterTopic(IActorRef topicRef)
{
TopicRef = topicRef;
}
}
+ ///
+ /// TBD
+ ///
[Serializable]
internal sealed class Subscribed
{
+ ///
+ /// TBD
+ ///
public readonly SubscribeAck Ack;
+ ///
+ /// TBD
+ ///
public readonly IActorRef Subscriber;
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public Subscribed(SubscribeAck ack, IActorRef subscriber)
{
Ack = ack;
@@ -228,12 +390,26 @@ public Subscribed(SubscribeAck ack, IActorRef subscriber)
}
}
+ ///
+ /// TBD
+ ///
[Serializable]
internal sealed class Unsubscribed
{
+ ///
+ /// TBD
+ ///
public readonly UnsubscribeAck Ack;
+ ///
+ /// TBD
+ ///
public readonly IActorRef Subscriber;
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public Unsubscribed(UnsubscribeAck ack, IActorRef subscriber)
{
Ack = ack;
@@ -241,11 +417,21 @@ public Unsubscribed(UnsubscribeAck ack, IActorRef subscriber)
}
}
+ ///
+ /// TBD
+ ///
[Serializable]
internal sealed class SendToOneSubscriber
{
+ ///
+ /// TBD
+ ///
public readonly object Message;
+ ///
+ /// TBD
+ ///
+ /// TBD
public SendToOneSubscriber(object message)
{
Message = message;
@@ -269,6 +455,9 @@ internal interface IChildActorTerminationProtocol
///
internal class NoMoreSubscribers : IChildActorTerminationProtocol
{
+ ///
+ /// TBD
+ ///
public static NoMoreSubscribers Instance = new NoMoreSubscribers();
private NoMoreSubscribers()
{
@@ -281,6 +470,9 @@ private NoMoreSubscribers()
///
internal class TerminateRequest : IChildActorTerminationProtocol
{
+ ///
+ /// TBD
+ ///
public static TerminateRequest Instance = new TerminateRequest();
private TerminateRequest()
{
@@ -294,15 +486,25 @@ private TerminateRequest()
///
internal class NewSubscriberArrived : IChildActorTerminationProtocol
{
+ ///
+ /// TBD
+ ///
public static NewSubscriberArrived Instance = new NewSubscriberArrived();
private NewSubscriberArrived()
{
}
}
+ ///
+ /// TBD
+ ///
[Serializable]
internal sealed class MediatorRouterEnvelope : RouterEnvelope
{
+ ///
+ /// TBD
+ ///
+ /// TBD
public MediatorRouterEnvelope(object message) : base(message) { }
}
}
\ No newline at end of file
diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs
index 3f9d7ffed3d..19f798fc7b7 100644
--- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs
+++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Internal/Topics.cs
@@ -15,15 +15,37 @@
namespace Akka.Cluster.Tools.PublishSubscribe.Internal
{
+ ///
+ /// TBD
+ ///
internal abstract class TopicLike : ActorBase
{
+ ///
+ /// TBD
+ ///
protected readonly TimeSpan PruneInterval;
+ ///
+ /// TBD
+ ///
protected readonly ICancelable PruneCancelable;
+ ///
+ /// TBD
+ ///
protected readonly ISet Subscribers;
+ ///
+ /// TBD
+ ///
protected readonly TimeSpan EmptyTimeToLive;
+ ///
+ /// TBD
+ ///
protected Deadline PruneDeadline = null;
+ ///
+ /// TBD
+ ///
+ /// TBD
protected TopicLike(TimeSpan emptyTimeToLive)
{
Subscribers = new HashSet();
@@ -32,12 +54,20 @@ protected TopicLike(TimeSpan emptyTimeToLive)
PruneCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(PruneInterval, PruneInterval, Self, Prune.Instance, Self);
}
+ ///
+ /// TBD
+ ///
protected override void PostStop()
{
base.PostStop();
PruneCancelable.Cancel();
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
protected bool DefaultReceive(object message)
{
if (message is Subscribe)
@@ -90,8 +120,18 @@ protected bool DefaultReceive(object message)
return true;
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
protected abstract bool Business(object message);
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
protected override bool Receive(object message)
{
return Business(message) || DefaultReceive(message);
@@ -108,17 +148,30 @@ private void Remove(IActorRef actorRef)
}
}
+ ///
+ /// TBD
+ ///
internal class Topic : TopicLike
{
private readonly RoutingLogic _routingLogic;
private readonly PerGroupingBuffer _buffer;
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public Topic(TimeSpan emptyTimeToLive, RoutingLogic routingLogic) : base(emptyTimeToLive)
{
_routingLogic = routingLogic;
_buffer = new PerGroupingBuffer();
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
protected override bool Business(object message)
{
Subscribe subscribe;
@@ -194,15 +247,28 @@ private IActorRef NewGroupActor(string encodedGroup)
}
}
+ ///
+ /// TBD
+ ///
internal class Group : TopicLike
{
private readonly RoutingLogic _routingLogic;
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public Group(TimeSpan emptyTimeToLive, RoutingLogic routingLogic) : base(emptyTimeToLive)
{
_routingLogic = routingLogic;
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
protected override bool Business(object message)
{
if (message is SendToOneSubscriber)
@@ -219,6 +285,9 @@ protected override bool Business(object message)
}
}
+ ///
+ /// TBD
+ ///
internal static class Utils
{
///
@@ -233,21 +302,38 @@ internal static class Utils
/// user message.
///
///
+ /// TBD
+ /// TBD
public static object WrapIfNeeded(object message)
{
return message is RouterEnvelope ? new MediatorRouterEnvelope(message) : message;
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public static string MakeKey(IActorRef actorRef)
{
return MakeKey(actorRef.Path);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public static string EncodeName(string name)
{
return name == null ? null : Uri.EscapeDataString(name);
}
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
public static string MakeKey(ActorPath path)
{
return path.ToStringWithoutAddress();
diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/PerGroupingBuffer.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/PerGroupingBuffer.cs
index 607f26f3160..dac14406a2e 100644
--- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/PerGroupingBuffer.cs
+++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/PerGroupingBuffer.cs
@@ -12,11 +12,21 @@
namespace Akka.Cluster.Tools.PublishSubscribe
{
+ ///
+ /// TBD
+ ///
internal class PerGroupingBuffer
{
private readonly Dictionary _buffers = new Dictionary();
private int _totalBufferSize = 0;
+ ///
+ /// TBD
+ ///
+ /// TBD
+ /// TBD
+ /// TBD
+ /// TBD
public void BufferOr(string grouping, object message, IActorRef originalSender, Action action)
{
BufferedMessages messages = null;
@@ -25,10 +35,15 @@ public void BufferOr(string grouping, object message, IActorRef originalSender,
_buffers[grouping].Add(new KeyValuePair