Skip to content

Commit

Permalink
fix isse with dropping remote messages (#1709)
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing committed Jul 8, 2022
1 parent 2c7e796 commit 3d46810
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 16 deletions.
44 changes: 31 additions & 13 deletions src/Proto.Remote/Endpoints/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ namespace Proto.Remote;

public abstract class Endpoint : IEndpoint
{
internal Endpoint(string address, ActorSystem system, RemoteConfigBase remoteConfig)
internal Endpoint(string remoteAddress, ActorSystem system, RemoteConfigBase remoteConfig)
{
Address = address;
RemoteAddress = remoteAddress;
System = system;
RemoteConfig = remoteConfig;
_sender = Task.Run(RunAsync);
Expand All @@ -31,7 +31,7 @@ internal Endpoint(string address, ActorSystem system, RemoteConfigBase remoteCon
public Channel<RemoteMessage> Outgoing { get; } = Channel.CreateBounded<RemoteMessage>(3);
public ConcurrentStack<RemoteMessage> OutgoingStash { get; } = new();
protected readonly ActorSystem System;
protected readonly string Address;
protected readonly string RemoteAddress;
protected readonly RemoteConfigBase RemoteConfig;
private readonly ILogger _logger = Log.CreateLogger<Endpoint>();
private readonly Dictionary<string, HashSet<PID>> _watchedActors = new();
Expand All @@ -44,15 +44,15 @@ internal Endpoint(string address, ActorSystem system, RemoteConfigBase remoteCon

public virtual async ValueTask DisposeAsync()
{
_logger.LogDebug("[{SystemAddress}] Disposing endpoint {Address}", System.Address, Address);
_logger.LogDebug("[{SystemAddress}] Disposing endpoint {Address}", System.Address, RemoteAddress);
_remoteDelivers.Writer.TryComplete();
_cancellationTokenSource.Cancel();
Outgoing.Writer.TryComplete();
TerminateEndpoint();
await _sender.ConfigureAwait(false);
_cancellationTokenSource.Dispose();
GC.SuppressFinalize(this);
_logger.LogDebug("[{SystemAddress}] Disposed endpoint {Address}", System.Address, Address);
_logger.LogDebug("[{SystemAddress}] Disposed endpoint {Address}", System.Address, RemoteAddress);
}

public bool IsActive { get; private set; } = true;
Expand Down Expand Up @@ -80,7 +80,7 @@ private void TerminateEndpoint()
}

if (droppedMessageCount > 0)
_logger.LogInformation("[{SystemAddress}] Dropped {Count} messages for {Address}", System.Address, droppedMessageCount, Address);
_logger.LogInformation("[{SystemAddress}] Dropped {Count} messages for {Address}", System.Address, droppedMessageCount, RemoteAddress);
}

private int DropMessagesInBatch(RemoteMessage remoteMessage)
Expand All @@ -91,17 +91,37 @@ private int DropMessagesInBatch(RemoteMessage remoteMessage)
switch (remoteMessage.MessageTypeCase)
{
case RemoteMessage.MessageTypeOneofCase.DisconnectRequest:
_logger.LogWarning("[{SystemAddress}] Dropping disconnect request for {Address}", System.Address, Address);
_logger.LogWarning("[{SystemAddress}] Dropping disconnect request for {Address}", System.Address, RemoteAddress);
break;
case RemoteMessage.MessageTypeOneofCase.MessageBatch: {
var batch = remoteMessage.MessageBatch;
var targets = new PID[batch.Targets.Count];

var targets = new PID[batch.Targets.Count];
for (var i = 0; i < batch.Targets.Count; i++)
{
var target = new PID(System.Address, batch.Targets[i]);
target.Ref(System);
targets[i] = target;

if (target.TryTranslateToLocalClientPID(out var pid))
{
targets[i] = pid;
}
else
{
targets[i] = target;
target.Ref(System);
}
}

for (var i = 0; i < batch.Senders.Count; i++)
{
var s = batch.Senders[i];

if (string.IsNullOrEmpty(s.Address))
{
s.Address = RemoteAddress;
}

s.Ref(System);
}

var typeNames = batch.TypeNames.ToArray();
Expand Down Expand Up @@ -176,8 +196,6 @@ private int DropMessagesInBatch(RemoteMessage remoteMessage)
}
}
break;
default:
break;
}

return droppedMessageCount;
Expand Down Expand Up @@ -250,7 +268,7 @@ public void SendMessage(PID target, object msg)
);
}

if (sender is not null && sender.TryTranslateToProxyPID(System, Address, out var clientPID))
if (sender is not null && sender.TryTranslateToProxyPID(System, RemoteAddress, out var clientPID))
sender = clientPID;
var env = new RemoteDeliver(header, message, target, sender);

Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Remote/Endpoints/ServerEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ namespace Proto.Remote;
/// </summary>
public sealed class ServerEndpoint : Endpoint
{
public ServerEndpoint(ActorSystem system, RemoteConfigBase remoteConfig, string address, IChannelProvider channelProvider, ServerConnector.Type type, RemoteMessageHandler remoteMessageHandler) : base(address, system, remoteConfig)
=> Connector = new ServerConnector(Address, type, this, channelProvider, System, RemoteConfig, remoteMessageHandler);
public ServerEndpoint(ActorSystem system, RemoteConfigBase remoteConfig, string remoteAddress, IChannelProvider channelProvider, ServerConnector.Type type, RemoteMessageHandler remoteMessageHandler) : base(remoteAddress, system, remoteConfig)
=> Connector = new ServerConnector(RemoteAddress, type, this, channelProvider, System, RemoteConfig, remoteMessageHandler);

public ServerConnector Connector { get; }

Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Remote/Endpoints/ServerSideClientEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ namespace Proto.Remote;
/// </summary>
public sealed class ServerSideClientEndpoint : Endpoint
{
public ServerSideClientEndpoint(ActorSystem system, RemoteConfigBase remoteConfig, string address) : base(address, system, remoteConfig) { }
public ServerSideClientEndpoint(ActorSystem system, RemoteConfigBase remoteConfig, string remoteAddress) : base(remoteAddress, system, remoteConfig) { }
}

0 comments on commit 3d46810

Please sign in to comment.