Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[routing] Akka.Routing refactoring #2719

Merged
merged 1 commit into from
Jun 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 15 additions & 25 deletions src/core/Akka/Routing/Broadcast.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,38 +153,28 @@ public override RouterConfig WithFallback(RouterConfig routerConfig)

private RouterConfig OverrideUnsetConfig(RouterConfig other)
{
if (other is NoRouter)
if (other is Pool pool)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

everything that isn't a Pool returned this , so this is the only real check we need. Same with the other routers.

{
return this;
}
else
{
var pool = other as Pool;
if (pool != null)
BroadcastPool wssConf;

if (SupervisorStrategy != null
&& SupervisorStrategy.Equals(DefaultSupervisorStrategy)
&& !pool.SupervisorStrategy.Equals(DefaultSupervisorStrategy))
{
BroadcastPool wssConf;

if (SupervisorStrategy != null
&& SupervisorStrategy.Equals(Pool.DefaultSupervisorStrategy)
&& !(pool.SupervisorStrategy.Equals(Pool.DefaultSupervisorStrategy)))
{
wssConf = this.WithSupervisorStrategy(pool.SupervisorStrategy);
}
else
{
wssConf = this;
}

if (wssConf.Resizer == null && pool.Resizer != null)
return wssConf.WithResizer(pool.Resizer);

return wssConf;
wssConf = WithSupervisorStrategy(pool.SupervisorStrategy);
}
else
{
return this;
wssConf = this;
}

if (wssConf.Resizer == null && pool.Resizer != null)
return wssConf.WithResizer(pool.Resizer);

return wssConf;
}

return this;
}

/// <summary>
Expand Down
62 changes: 33 additions & 29 deletions src/core/Akka/Routing/ConsistentHash.cs
Original file line number Diff line number Diff line change
Expand Up @@ -289,35 +289,39 @@ public ISurrogated FromSurrogate(ActorSystem system)
/// <returns>The object encoded into bytes - in the case of custom classes, the hashcode may be used.</returns>
internal static object ToBytesOrObject(object obj)
{
if (obj == null)
return new byte[] { 0 };
if (obj is byte[])
return (byte[])obj;
if (obj is int)
return BitConverter.GetBytes((int)obj);
if (obj is uint)
return BitConverter.GetBytes((uint)obj);
if (obj is short)
return BitConverter.GetBytes((short)obj);
if (obj is ushort)
return BitConverter.GetBytes((ushort)obj);
if (obj is bool)
return BitConverter.GetBytes((bool)obj);
if (obj is long)
return BitConverter.GetBytes((long)obj);
if (obj is ulong)
return BitConverter.GetBytes((ulong)obj);
if (obj is char)
return BitConverter.GetBytes((char)obj);
if (obj is float)
return BitConverter.GetBytes((float)obj);
if (obj is double)
return BitConverter.GetBytes((double)obj);
if (obj is decimal)
return new BitArray(decimal.GetBits((decimal)obj)).ToBytes();
if (obj is Guid)
return ((Guid)obj).ToByteArray();
return obj;
switch (obj)
{
case null:
return new byte[] { 0 };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know the context, and how often this one called and maybe it's super micro, but seems like a constant?

case byte[] bytes:
return bytes;
case int @int:
return BitConverter.GetBytes(@int);
case uint @uint:
return BitConverter.GetBytes(@uint);
case short @short:
return BitConverter.GetBytes(@short);
case ushort @ushort:
return BitConverter.GetBytes(@ushort);
case bool @bool:
return BitConverter.GetBytes(@bool);
case long @long:
return BitConverter.GetBytes(@long);
case ulong @ulong:
return BitConverter.GetBytes(@ulong);
case char @char:
return BitConverter.GetBytes(@char);
case float @float:
return BitConverter.GetBytes(@float);
case double @double:
return BitConverter.GetBytes(@double);
case decimal @decimal:
return new BitArray(decimal.GetBits(@decimal)).ToBytes();
case Guid guid:
return guid.ToByteArray();
default:
return obj;
}
}

/// <summary>
Expand Down
110 changes: 46 additions & 64 deletions src/core/Akka/Routing/ConsistentHashRouter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public override Routee Select(object message, Routee[] routees)
if (message == null || routees == null || routees.Length == 0)
return Routee.NoRoutee;

Func<ConsistentHash<ConsistentRoutee>> updateConsistentHash = () =>
ConsistentHash<ConsistentRoutee> UpdateConsistentHash()
{
// update consistentHash when routees are changed
// changes to routees are rare when no changes this is a quick operation
Expand All @@ -185,42 +185,43 @@ public override Routee Select(object message, Routee[] routees)
return consistentHash;
}
return oldConsistentHash;
};
}

Func<object, Routee> target = hashData =>
Routee Target(object hashData)
{
try
{
var currentConsistentHash = updateConsistentHash();
var currentConsistentHash = UpdateConsistentHash();
if (currentConsistentHash.IsEmpty) return Routee.NoRoutee;
else
{
if (hashData is byte[])
return currentConsistentHash.NodeFor(hashData as byte[]).Routee;
if (hashData is string)
return currentConsistentHash.NodeFor(hashData as string).Routee;
return
currentConsistentHash.NodeFor(
_system.Serialization.FindSerializerFor(hashData).ToBinary(hashData)).Routee;
switch (hashData)
{
case byte[] bytes:
return currentConsistentHash.NodeFor(bytes).Routee;
case string data:
return currentConsistentHash.NodeFor(data).Routee;
default:
return currentConsistentHash.NodeFor(_system.Serialization.FindSerializerFor(hashData).ToBinary(hashData)).Routee;
}
}
}
catch (Exception ex)
{
//serialization failed
_log.Value.Warning("Couldn't route message with consistent hash key [{0}] due to [{1}]", hashData,
ex.Message);
_log.Value.Warning("Couldn't route message with consistent hash key [{0}] due to [{1}]", hashData, ex.Message);
return Routee.NoRoutee;
}
};
}

if (_hashMapping(message) != null)
var hashMapping = _hashMapping(message);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This delegate was being called twice which should return the same value each time. So I stored it in a variable so it only needed to be called once.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch

if (hashMapping != null)
{
return target(ConsistentHash.ToBytesOrObject(_hashMapping(message)));
return Target(ConsistentHash.ToBytesOrObject(hashMapping));
}
else if (message is IConsistentHashable)
else if (message is IConsistentHashable hashable)
{
var hashable = (IConsistentHashable) message;
return target(ConsistentHash.ToBytesOrObject(hashable.ConsistentHashKey));
return Target(ConsistentHash.ToBytesOrObject(hashable.ConsistentHashKey));
}
else
{
Expand Down Expand Up @@ -283,25 +284,18 @@ public ConsistentRoutee(Routee routee, Address selfAddress)
/// </summary>
public Address SelfAddress { get; private set; }

/// <summary>
/// TBD
/// </summary>
/// <returns>TBD</returns>
/// <inheritdoc/>
public override string ToString()
{
if (Routee is ActorRefRoutee)
switch (Routee)
{
var actorRef = Routee as ActorRefRoutee;
return ToStringWithFullAddress(actorRef.Actor.Path);
}
else if (Routee is ActorSelectionRoutee)
{
var selection = Routee as ActorSelectionRoutee;
return ToStringWithFullAddress(selection.Selection.Anchor.Path) + selection.Selection.PathString;
}
else
{
return Routee.ToString();
case ActorRefRoutee actorRef:
return ToStringWithFullAddress(actorRef.Actor.Path);
case ActorSelectionRoutee selection:
return ToStringWithFullAddress(selection.Selection.Anchor.Path) +
selection.Selection.PathString;
default:
return Routee.ToString();
}
}

Expand Down Expand Up @@ -511,9 +505,8 @@ public override RouterConfig WithFallback(RouterConfig routerConfig)
{
return OverrideUnsetConfig(routerConfig);
}
else if (routerConfig is ConsistentHashingPool)
else if (routerConfig is ConsistentHashingPool other)
{
var other = routerConfig as ConsistentHashingPool;
return WithHashMapping(other._hashMapping).OverrideUnsetConfig(other);
}
else
Expand All @@ -524,38 +517,28 @@ public override RouterConfig WithFallback(RouterConfig routerConfig)

private RouterConfig OverrideUnsetConfig(RouterConfig other)
{
if (other is NoRouter)
if (other is Pool pool)
{
return this;
}
else
{
var pool = other as Pool;
if (pool != null)
{
ConsistentHashingPool wssConf;
ConsistentHashingPool wssConf;

if (SupervisorStrategy != null
&& SupervisorStrategy.Equals(Pool.DefaultSupervisorStrategy)
&& !(pool.SupervisorStrategy.Equals(Pool.DefaultSupervisorStrategy)))
{
wssConf = this.WithSupervisorStrategy(pool.SupervisorStrategy);
}
else
{
wssConf = this;
}

if (wssConf.Resizer == null && pool.Resizer != null)
return wssConf.WithResizer(pool.Resizer);

return wssConf;
if (SupervisorStrategy != null
&& SupervisorStrategy.Equals(DefaultSupervisorStrategy)
&& !pool.SupervisorStrategy.Equals(DefaultSupervisorStrategy))
{
wssConf = WithSupervisorStrategy(pool.SupervisorStrategy);
}
else
{
return this;
wssConf = this;
}

if (wssConf.Resizer == null && pool.Resizer != null)
return wssConf.WithResizer(pool.Resizer);

return wssConf;
}

return this;
}

/// <summary>
Expand Down Expand Up @@ -784,9 +767,8 @@ public override RouterConfig WithFallback(RouterConfig routerConfig)
{
return base.WithFallback(routerConfig);
}
else if (routerConfig is ConsistentHashingGroup)
else if (routerConfig is ConsistentHashingGroup other)
{
var other = routerConfig as ConsistentHashingGroup;
return WithHashMapping(other._hashMapping);
}
else
Expand Down
24 changes: 14 additions & 10 deletions src/core/Akka/Routing/Listeners.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,21 @@ public Receive ListenerReceive
{
return message =>
{
var match=PatternMatch.Match(message)
.With<Listen>(m => Add(m.Listener))
.With<Deafen>(d => Remove(d.Listener))
.With<WithListeners>(f =>
{
switch (message)
{
case Listen listen:
Add(listen.Listener);
return true;
case Deafen deafen:
Remove(deafen.Listener);
return true;
case WithListeners listeners:
foreach (var listener in Listeners)
{
f.ListenerFunction(listener);
}
});
return match.WasHandled;
listeners.ListenerFunction(listener);
return true;
default:
return false;
}
};
}
}
Expand Down
40 changes: 15 additions & 25 deletions src/core/Akka/Routing/Random.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,38 +154,28 @@ public override RouterConfig WithFallback(RouterConfig routerConfig)

private RouterConfig OverrideUnsetConfig(RouterConfig other)
{
if (other is NoRouter)
if (other is Pool pool)
{
return this;
}
else
{
var pool = other as Pool;
if (pool != null)
{
RandomPool wssConf;

if (SupervisorStrategy != null
&& SupervisorStrategy.Equals(Pool.DefaultSupervisorStrategy)
&& !(pool.SupervisorStrategy.Equals(Pool.DefaultSupervisorStrategy)))
{
wssConf = this.WithSupervisorStrategy(pool.SupervisorStrategy);
}
else
{
wssConf = this;
}

if (wssConf.Resizer == null && pool.Resizer != null)
return wssConf.WithResizer(pool.Resizer);
RandomPool wssConf;

return wssConf;
if (SupervisorStrategy != null
&& SupervisorStrategy.Equals(DefaultSupervisorStrategy)
&& !pool.SupervisorStrategy.Equals(DefaultSupervisorStrategy))
{
wssConf = WithSupervisorStrategy(pool.SupervisorStrategy);
}
else
{
return this;
wssConf = this;
}

if (wssConf.Resizer == null && pool.Resizer != null)
return wssConf.WithResizer(pool.Resizer);

return wssConf;
}

return this;
}

/// <summary>
Expand Down
Loading