Skip to content

Commit

Permalink
just send target id (#1696)
Browse files Browse the repository at this point in the history
* only pass target id
* only pass sender address if different from remote
* seal apis
  • Loading branch information
rogeralsing authored Jul 2, 2022
1 parent 82a135e commit af14b11
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 40 deletions.
2 changes: 1 addition & 1 deletion src/Proto.Remote/Endpoints/BlockedEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace Proto.Remote;

public class BlockedEndpoint : IEndpoint
public sealed class BlockedEndpoint : IEndpoint
{
private readonly ActorSystem _system;
public BlockedEndpoint(ActorSystem system) => _system = system;
Expand Down
17 changes: 12 additions & 5 deletions src/Proto.Remote/Endpoints/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,24 +87,28 @@ private int DropMessagesInBatch(RemoteMessage remoteMessage)
{
var droppedMessageCount = 0;


switch (remoteMessage.MessageTypeCase)
{
case RemoteMessage.MessageTypeOneofCase.DisconnectRequest:
_logger.LogWarning("[{SystemAddress}] Dropping disconnect request for {Address}", System.Address, Address);
break;
case RemoteMessage.MessageTypeOneofCase.MessageBatch: {
var batch = remoteMessage.MessageBatch;
var targets = new PID[batch.Targets.Count];

for (var i = 0; i < batch.Targets.Count; i++)
{
batch.Targets[i].Ref(System);
var target = new PID(System.Address, batch.Targets[i]);
target.Ref(System);
targets[i] = target;
}

var typeNames = batch.TypeNames.ToArray();

foreach (var envelope in batch.Envelopes)
{
var target = batch.Targets[envelope.Target];
var target = targets[envelope.Target];

if (envelope.TargetRequestId != default)
{
Expand Down Expand Up @@ -329,7 +333,7 @@ private MessageBatch CreateBatch(IReadOnlyCollection<RemoteDeliver> m)
var envelopes = new List<MessageEnvelope>(m.Count);
var typeNames = new Dictionary<string, int>();
var targets = new Dictionary<(string address, string id), int>();
var targetList = new List<PID>();
var targetList = new List<string>();
var typeNameList = new List<string>();
var senders = new Dictionary<(string address, string id), int>();
var senderList = new List<PID>();
Expand All @@ -343,7 +347,7 @@ private MessageBatch CreateBatch(IReadOnlyCollection<RemoteDeliver> m)
if (!targets.TryGetValue(targetKey, out var targetId))
{
targetId = targets[targetKey] = targets.Count;
targetList.Add(target);
targetList.Add(target.Id);
}

var senderId = 0;
Expand All @@ -357,7 +361,10 @@ private MessageBatch CreateBatch(IReadOnlyCollection<RemoteDeliver> m)
if (!senders.TryGetValue(senderKey, out senderId))
{
senderId = senders[senderKey] = senders.Count + 1;
senderList.Add(PID.FromAddress(sender.Address, sender.Id));

senderList.Add(sender.Address == System.Address ?
PID.FromAddress("", sender.Id) :
PID.FromAddress(sender.Address, sender.Id));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Remote/Endpoints/EndpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

namespace Proto.Remote;

public class EndpointManager
public sealed class EndpointManager
{
public const string ActivatorActorName = "$activator";

Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Remote/Endpoints/EndpointReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

namespace Proto.Remote;

public class EndpointReader : Remoting.RemotingBase
public sealed class EndpointReader : Remoting.RemotingBase
{
private static readonly ILogger Logger = Log.CreateLogger<EndpointReader>();
private readonly EndpointManager _endpointManager;
Expand Down Expand Up @@ -215,7 +215,7 @@ await responseStream.WriteAsync(new RemoteMessage
if (_endpointManager.CancellationToken.IsCancellationRequested)
continue;

_endpointManager.RemoteMessageHandler.HandleRemoteMessage(currentMessage);
_endpointManager.RemoteMessageHandler.HandleRemoteMessage(currentMessage, address!);
}
}
finally
Expand Down
58 changes: 34 additions & 24 deletions src/Proto.Remote/Endpoints/RemoteMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,58 +13,68 @@

namespace Proto.Remote;

public class RemoteMessageHandler
public sealed class RemoteMessageHandler
{
private readonly ILogger _logger = Log.CreateLogger<RemoteMessageHandler>();
private readonly EndpointManager _endpointManager;
protected readonly ActorSystem System;
protected readonly Serialization Serialization;
private readonly ActorSystem _system;
private readonly Serialization _serialization;
private readonly LogLevel _deserializationErrorLogLevel;

public RemoteMessageHandler(EndpointManager endpointManager, ActorSystem system, Serialization serialization, RemoteConfigBase remoteConfig)
{
_endpointManager = endpointManager;
System = system;
Serialization = serialization;
_system = system;
_serialization = serialization;
_deserializationErrorLogLevel = remoteConfig.DeserializationErrorLogLevel;
}

public void HandleRemoteMessage(RemoteMessage currentMessage)
public void HandleRemoteMessage(RemoteMessage currentMessage, string remoteAddress)
{
switch (currentMessage.MessageTypeCase)
{
case RemoteMessage.MessageTypeOneofCase.MessageBatch: {
var batch = currentMessage.MessageBatch;

var targets = new PID[batch.Targets.Count];
for (var i = 0; i < batch.Targets.Count; i++)
{
if (batch.Targets[i].TryTranslateToLocalClientPID(out var pid))
var target = new PID(_system.Address, batch.Targets[i]);

if (target.TryTranslateToLocalClientPID(out var pid))
{
batch.Targets[i] = pid;
targets[i] = pid;
}
else
{
batch.Targets[i].Ref(System);
targets[i] = target;
target.Ref(_system);
}
}

for (var i = 0; i < batch.Senders.Count; i++)
{
batch.Senders[i].Ref(System);
var s = batch.Senders[i];

if (string.IsNullOrEmpty(s.Address))
{
s.Address = remoteAddress;
}

s.Ref(_system);
}

var typeNames = batch.TypeNames.ToArray();


Counter<long>? m = null;
if (System.Metrics.Enabled)
if (_system.Metrics.Enabled)
{
m = RemoteMetrics.RemoteDeserializedMessageCount;
}

foreach (var envelope in batch.Envelopes)
{
var target = batch.Targets[envelope.Target];
var target = targets[envelope.Target];

if (envelope.TargetRequestId != default)
{
Expand All @@ -80,20 +90,20 @@ public void HandleRemoteMessage(RemoteMessage currentMessage)

var typeName = typeNames[envelope.TypeId];

if (System.Metrics.Enabled)
if (_system.Metrics.Enabled)
{
m!.Add(1, new("id", System.Id), new("address", System.Address), new("messagetype", typeName));
m!.Add(1, new("id", _system.Id), new("address", _system.Address), new("messagetype", typeName));
}

object message;

try
{
message = Serialization.Deserialize(typeName, envelope.MessageData, envelope.SerializerId);
message = _serialization.Deserialize(typeName, envelope.MessageData, envelope.SerializerId);

//translate from on-the-wire representation to in-process representation
//this only applies to root level messages, and never on nested child messages
if (message is IRootSerialized serialized) message = serialized.Deserialize(System);
if (message is IRootSerialized serialized) message = serialized.Deserialize(_system);
}
catch (Exception ex)
{
Expand All @@ -103,7 +113,7 @@ public void HandleRemoteMessage(RemoteMessage currentMessage)
_deserializationErrorLogLevel,
ex,
"[{SystemAddress}] Unable to deserialize message with {Type}",
System.Address,
_system.Address,
typeName);
continue;
}
Expand All @@ -112,20 +122,20 @@ public void HandleRemoteMessage(RemoteMessage currentMessage)
{
case Terminated msg:
if (_logger.IsEnabled(LogLevel.Trace))
_logger.LogTrace("[{SystemAddress}] Received message {MessageType} {Message} for {Target}", System.Address,
_logger.LogTrace("[{SystemAddress}] Received message {MessageType} {Message} for {Target}", _system.Address,
msg.GetType().Name, msg, target
);
var endpoint = msg.Who.TryGetSystemId(System, out var systemId)
var endpoint = msg.Who.TryGetSystemId(_system, out var systemId)
? _endpointManager.GetClientEndpoint(systemId)
: _endpointManager.GetServerEndpoint(msg.Who.Address);
endpoint.RemoteTerminate(target, msg);
break;
case SystemMessage sys:
if (_logger.IsEnabled(LogLevel.Trace))
_logger.LogTrace("[{SystemAddress}] Received system message {MessageType} {Message} for {Target}", System.Address,
_logger.LogTrace("[{SystemAddress}] Received system message {MessageType} {Message} for {Target}", _system.Address,
sys.GetType().Name, sys, target
);
target.SendSystemMessage(System, sys);
target.SendSystemMessage(_system, sys);
break;
default:
Proto.MessageHeader? header = null;
Expand All @@ -144,9 +154,9 @@ public void HandleRemoteMessage(RemoteMessage currentMessage)

if (_logger.IsEnabled(LogLevel.Trace))
_logger.LogTrace("[{SystemAddress}] Received user message {MessageType} {Message} for {Target} from {Sender}",
System.Address, message.GetType().Name, message, target, sender
_system.Address, message.GetType().Name, message, target, sender
);
System.Root.Send(target, messageOrEnvelope);
_system.Root.Send(target, messageOrEnvelope);
break;
}
}
Expand Down
6 changes: 2 additions & 4 deletions src/Proto.Remote/Endpoints/ServerConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

namespace Proto.Remote;

public class ServerConnector
public sealed class ServerConnector
{
public enum Type
{
Expand All @@ -35,7 +35,6 @@ public enum Type
private readonly TimeSpan _backoff;
private readonly int _maxNrOfRetries;
private readonly Random _random = new();
private readonly TimeSpan? _withinTimeSpan;
private readonly Task _runner;
private readonly CancellationTokenSource _cts = new();
private readonly KeyValuePair<string, object?>[] _metricTags = Array.Empty<KeyValuePair<string, object?>>();
Expand All @@ -55,7 +54,6 @@ public ServerConnector(string address, Type connectorType, IEndpoint endpoint, I
_connectorType = connectorType;
_endpoint = endpoint;
_maxNrOfRetries = remoteConfig.EndpointWriterOptions.MaxRetries;
_withinTimeSpan = remoteConfig.EndpointWriterOptions.RetryTimeSpan;
_backoff = remoteConfig.EndpointWriterOptions.RetryBackOff;
_runner = Task.Run(() => RunAsync());
if (_system.Metrics.Enabled)
Expand Down Expand Up @@ -212,7 +210,7 @@ await call.RequestStream.WriteAsync(new RemoteMessage
if (_connectorType == Type.ServerSide)
_logger.LogWarning("[ServerConnector][{SystemAddress}] Received {Message} from {_address}", _system.Address, currentMessage, _address);
else
_remoteMessageHandler.HandleRemoteMessage(currentMessage);
_remoteMessageHandler.HandleRemoteMessage(currentMessage,_address);
break;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Remote/Endpoints/ServerEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Proto.Remote;
/// <summary>
/// Handles a connection to a remote endpoint.
/// </summary>
public class ServerEndpoint : Endpoint
public sealed class ServerEndpoint : Endpoint
{
public ServerEndpoint(ActorSystem system, RemoteConfigBase remoteConfig, string address, IChannelProvider channelProvider, ServerConnector.Type type, RemoteMessageHandler remoteMessageHandler) : base(address, system, remoteConfig)
=> Connector = new ServerConnector(Address, type, this, channelProvider, System, RemoteConfig, remoteMessageHandler);
Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Remote/Endpoints/ServerSideClientEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Proto.Remote;
/// <summary>
/// Handles connection to a client actor system.
/// </summary>
public class ServerSideClientEndpoint : Endpoint
public sealed class ServerSideClientEndpoint : Endpoint
{
public ServerSideClientEndpoint(ActorSystem system, RemoteConfigBase remoteConfig, string address) : base(address, system, remoteConfig) { }
}
2 changes: 1 addition & 1 deletion src/Proto.Remote/Protos.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ message RemoteMessage {

message MessageBatch {
repeated string type_names = 1;
repeated actor.PID targets = 2;
repeated string targets = 2;
repeated MessageEnvelope envelopes = 3;
repeated actor.PID senders = 4;
}
Expand Down

0 comments on commit af14b11

Please sign in to comment.