Skip to content

Commit

Permalink
[routing] Akka.Routing refactoring
Browse files Browse the repository at this point in the history
This PR refactors the classes in the Akka.Routing namespace to take
advantage of newer C# features.  It also simplifies some of the logic in
the various routers.
  • Loading branch information
sean-gilliam committed Jun 1, 2017
1 parent b1eeba6 commit 85335bf
Show file tree
Hide file tree
Showing 17 changed files with 258 additions and 428 deletions.
40 changes: 15 additions & 25 deletions src/core/Akka/Routing/Broadcast.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,38 +153,28 @@ public override RouterConfig WithFallback(RouterConfig routerConfig)

private RouterConfig OverrideUnsetConfig(RouterConfig other)
{
if (other is NoRouter)
if (other is Pool pool)
{
return this;
}
else
{
var pool = other as Pool;
if (pool != null)
BroadcastPool wssConf;

if (SupervisorStrategy != null
&& SupervisorStrategy.Equals(DefaultSupervisorStrategy)
&& !pool.SupervisorStrategy.Equals(DefaultSupervisorStrategy))
{
BroadcastPool wssConf;

if (SupervisorStrategy != null
&& SupervisorStrategy.Equals(Pool.DefaultSupervisorStrategy)
&& !(pool.SupervisorStrategy.Equals(Pool.DefaultSupervisorStrategy)))
{
wssConf = this.WithSupervisorStrategy(pool.SupervisorStrategy);
}
else
{
wssConf = this;
}

if (wssConf.Resizer == null && pool.Resizer != null)
return wssConf.WithResizer(pool.Resizer);

return wssConf;
wssConf = WithSupervisorStrategy(pool.SupervisorStrategy);
}
else
{
return this;
wssConf = this;
}

if (wssConf.Resizer == null && pool.Resizer != null)
return wssConf.WithResizer(pool.Resizer);

return wssConf;
}

return this;
}

/// <summary>
Expand Down
62 changes: 33 additions & 29 deletions src/core/Akka/Routing/ConsistentHash.cs
Original file line number Diff line number Diff line change
Expand Up @@ -289,35 +289,39 @@ public ISurrogated FromSurrogate(ActorSystem system)
/// <returns>The object encoded into bytes - in the case of custom classes, the hashcode may be used.</returns>
internal static object ToBytesOrObject(object obj)
{
if (obj == null)
return new byte[] { 0 };
if (obj is byte[])
return (byte[])obj;
if (obj is int)
return BitConverter.GetBytes((int)obj);
if (obj is uint)
return BitConverter.GetBytes((uint)obj);
if (obj is short)
return BitConverter.GetBytes((short)obj);
if (obj is ushort)
return BitConverter.GetBytes((ushort)obj);
if (obj is bool)
return BitConverter.GetBytes((bool)obj);
if (obj is long)
return BitConverter.GetBytes((long)obj);
if (obj is ulong)
return BitConverter.GetBytes((ulong)obj);
if (obj is char)
return BitConverter.GetBytes((char)obj);
if (obj is float)
return BitConverter.GetBytes((float)obj);
if (obj is double)
return BitConverter.GetBytes((double)obj);
if (obj is decimal)
return new BitArray(decimal.GetBits((decimal)obj)).ToBytes();
if (obj is Guid)
return ((Guid)obj).ToByteArray();
return obj;
switch (obj)
{
case null:
return new byte[] { 0 };
case byte[] bytes:
return bytes;
case int @int:
return BitConverter.GetBytes(@int);
case uint @uint:
return BitConverter.GetBytes(@uint);
case short @short:
return BitConverter.GetBytes(@short);
case ushort @ushort:
return BitConverter.GetBytes(@ushort);
case bool @bool:
return BitConverter.GetBytes(@bool);
case long @long:
return BitConverter.GetBytes(@long);
case ulong @ulong:
return BitConverter.GetBytes(@ulong);
case char @char:
return BitConverter.GetBytes(@char);
case float @float:
return BitConverter.GetBytes(@float);
case double @double:
return BitConverter.GetBytes(@double);
case decimal @decimal:
return new BitArray(decimal.GetBits(@decimal)).ToBytes();
case Guid guid:
return guid.ToByteArray();
default:
return obj;
}
}

/// <summary>
Expand Down
110 changes: 46 additions & 64 deletions src/core/Akka/Routing/ConsistentHashRouter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public override Routee Select(object message, Routee[] routees)
if (message == null || routees == null || routees.Length == 0)
return Routee.NoRoutee;

Func<ConsistentHash<ConsistentRoutee>> updateConsistentHash = () =>
ConsistentHash<ConsistentRoutee> UpdateConsistentHash()
{
// update consistentHash when routees are changed
// changes to routees are rare when no changes this is a quick operation
Expand All @@ -185,42 +185,43 @@ public override Routee Select(object message, Routee[] routees)
return consistentHash;
}
return oldConsistentHash;
};
}

Func<object, Routee> target = hashData =>
Routee Target(object hashData)
{
try
{
var currentConsistentHash = updateConsistentHash();
var currentConsistentHash = UpdateConsistentHash();
if (currentConsistentHash.IsEmpty) return Routee.NoRoutee;
else
{
if (hashData is byte[])
return currentConsistentHash.NodeFor(hashData as byte[]).Routee;
if (hashData is string)
return currentConsistentHash.NodeFor(hashData as string).Routee;
return
currentConsistentHash.NodeFor(
_system.Serialization.FindSerializerFor(hashData).ToBinary(hashData)).Routee;
switch (hashData)
{
case byte[] bytes:
return currentConsistentHash.NodeFor(bytes).Routee;
case string data:
return currentConsistentHash.NodeFor(data).Routee;
default:
return currentConsistentHash.NodeFor(_system.Serialization.FindSerializerFor(hashData).ToBinary(hashData)).Routee;
}
}
}
catch (Exception ex)
{
//serialization failed
_log.Value.Warning("Couldn't route message with consistent hash key [{0}] due to [{1}]", hashData,
ex.Message);
_log.Value.Warning("Couldn't route message with consistent hash key [{0}] due to [{1}]", hashData, ex.Message);
return Routee.NoRoutee;
}
};
}

if (_hashMapping(message) != null)
var hashMapping = _hashMapping(message);
if (hashMapping != null)
{
return target(ConsistentHash.ToBytesOrObject(_hashMapping(message)));
return Target(ConsistentHash.ToBytesOrObject(hashMapping));
}
else if (message is IConsistentHashable)
else if (message is IConsistentHashable hashable)
{
var hashable = (IConsistentHashable) message;
return target(ConsistentHash.ToBytesOrObject(hashable.ConsistentHashKey));
return Target(ConsistentHash.ToBytesOrObject(hashable.ConsistentHashKey));
}
else
{
Expand Down Expand Up @@ -283,25 +284,18 @@ public ConsistentRoutee(Routee routee, Address selfAddress)
/// </summary>
public Address SelfAddress { get; private set; }

/// <summary>
/// TBD
/// </summary>
/// <returns>TBD</returns>
/// <inheritdoc/>
public override string ToString()
{
if (Routee is ActorRefRoutee)
switch (Routee)
{
var actorRef = Routee as ActorRefRoutee;
return ToStringWithFullAddress(actorRef.Actor.Path);
}
else if (Routee is ActorSelectionRoutee)
{
var selection = Routee as ActorSelectionRoutee;
return ToStringWithFullAddress(selection.Selection.Anchor.Path) + selection.Selection.PathString;
}
else
{
return Routee.ToString();
case ActorRefRoutee actorRef:
return ToStringWithFullAddress(actorRef.Actor.Path);
case ActorSelectionRoutee selection:
return ToStringWithFullAddress(selection.Selection.Anchor.Path) +
selection.Selection.PathString;
default:
return Routee.ToString();
}
}

Expand Down Expand Up @@ -511,9 +505,8 @@ public override RouterConfig WithFallback(RouterConfig routerConfig)
{
return OverrideUnsetConfig(routerConfig);
}
else if (routerConfig is ConsistentHashingPool)
else if (routerConfig is ConsistentHashingPool other)
{
var other = routerConfig as ConsistentHashingPool;
return WithHashMapping(other._hashMapping).OverrideUnsetConfig(other);
}
else
Expand All @@ -524,38 +517,28 @@ public override RouterConfig WithFallback(RouterConfig routerConfig)

private RouterConfig OverrideUnsetConfig(RouterConfig other)
{
if (other is NoRouter)
if (other is Pool pool)
{
return this;
}
else
{
var pool = other as Pool;
if (pool != null)
{
ConsistentHashingPool wssConf;
ConsistentHashingPool wssConf;

if (SupervisorStrategy != null
&& SupervisorStrategy.Equals(Pool.DefaultSupervisorStrategy)
&& !(pool.SupervisorStrategy.Equals(Pool.DefaultSupervisorStrategy)))
{
wssConf = this.WithSupervisorStrategy(pool.SupervisorStrategy);
}
else
{
wssConf = this;
}

if (wssConf.Resizer == null && pool.Resizer != null)
return wssConf.WithResizer(pool.Resizer);

return wssConf;
if (SupervisorStrategy != null
&& SupervisorStrategy.Equals(DefaultSupervisorStrategy)
&& !pool.SupervisorStrategy.Equals(DefaultSupervisorStrategy))
{
wssConf = WithSupervisorStrategy(pool.SupervisorStrategy);
}
else
{
return this;
wssConf = this;
}

if (wssConf.Resizer == null && pool.Resizer != null)
return wssConf.WithResizer(pool.Resizer);

return wssConf;
}

return this;
}

/// <summary>
Expand Down Expand Up @@ -784,9 +767,8 @@ public override RouterConfig WithFallback(RouterConfig routerConfig)
{
return base.WithFallback(routerConfig);
}
else if (routerConfig is ConsistentHashingGroup)
else if (routerConfig is ConsistentHashingGroup other)
{
var other = routerConfig as ConsistentHashingGroup;
return WithHashMapping(other._hashMapping);
}
else
Expand Down
24 changes: 14 additions & 10 deletions src/core/Akka/Routing/Listeners.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,21 @@ public Receive ListenerReceive
{
return message =>
{
var match=PatternMatch.Match(message)
.With<Listen>(m => Add(m.Listener))
.With<Deafen>(d => Remove(d.Listener))
.With<WithListeners>(f =>
{
switch (message)
{
case Listen listen:
Add(listen.Listener);
return true;
case Deafen deafen:
Remove(deafen.Listener);
return true;
case WithListeners listeners:
foreach (var listener in Listeners)
{
f.ListenerFunction(listener);
}
});
return match.WasHandled;
listeners.ListenerFunction(listener);
return true;
default:
return false;
}
};
}
}
Expand Down
40 changes: 15 additions & 25 deletions src/core/Akka/Routing/Random.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,38 +154,28 @@ public override RouterConfig WithFallback(RouterConfig routerConfig)

private RouterConfig OverrideUnsetConfig(RouterConfig other)
{
if (other is NoRouter)
if (other is Pool pool)
{
return this;
}
else
{
var pool = other as Pool;
if (pool != null)
{
RandomPool wssConf;

if (SupervisorStrategy != null
&& SupervisorStrategy.Equals(Pool.DefaultSupervisorStrategy)
&& !(pool.SupervisorStrategy.Equals(Pool.DefaultSupervisorStrategy)))
{
wssConf = this.WithSupervisorStrategy(pool.SupervisorStrategy);
}
else
{
wssConf = this;
}

if (wssConf.Resizer == null && pool.Resizer != null)
return wssConf.WithResizer(pool.Resizer);
RandomPool wssConf;

return wssConf;
if (SupervisorStrategy != null
&& SupervisorStrategy.Equals(DefaultSupervisorStrategy)
&& !pool.SupervisorStrategy.Equals(DefaultSupervisorStrategy))
{
wssConf = WithSupervisorStrategy(pool.SupervisorStrategy);
}
else
{
return this;
wssConf = this;
}

if (wssConf.Resizer == null && pool.Resizer != null)
return wssConf.WithResizer(pool.Resizer);

return wssConf;
}

return this;
}

/// <summary>
Expand Down
Loading

0 comments on commit 85335bf

Please sign in to comment.