Skip to content

Commit

Permalink
Cleanup PeerDirectoryClient
Browse files Browse the repository at this point in the history
- Remove test-only method
- Extract DirectoryPeerSelector
  • Loading branch information
ocoanet committed Jan 27, 2022
1 parent 2a679a7 commit c46cca8
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 35 deletions.
40 changes: 40 additions & 0 deletions src/Abc.Zebus.Tests/Directory/DirectoryPeerSelectorTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System.Collections.Generic;
using System.Linq;
using Abc.Zebus.Core;
using Abc.Zebus.Directory;
using Abc.Zebus.Testing.Extensions;
using NUnit.Framework;

namespace Abc.Zebus.Tests.Directory
{
[TestFixture]
public class DirectoryPeerSelectorTests
{
private BusConfiguration _configuration;
private DirectoryPeerSelector _selector;

[SetUp]
public void SetUp()
{
_configuration = new BusConfiguration(new[] { "tcp://dir1:129", "tcp://dir2:129" });
_selector = new DirectoryPeerSelector(_configuration);
}

[Test]
public void should_get_peers_randomly()
{
// Arrange
var results = new List<Peer[]>();

// Act
for (int i = 0; i < 25; i++)
{
results.Add(_selector.GetPeers().ToArray());
}

// Assert
var firstResult = results[0];
results.Any(x => x[0] != firstResult[0]).ShouldBeTrue();
}
}
}
36 changes: 20 additions & 16 deletions src/Abc.Zebus.Tests/Directory/PeerDirectoryClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,26 @@ public async Task should_re_register_peer()
_directory.GetPeer(peer2.Id).ShouldNotBeNull();
}

[Test]
public async Task should_re_register_peer_using_random_directory_endpoint()
{
_configuration.IsDirectoryPickedRandomly = true;

for (var i = 0; i < 25; i++)
{
await _directory.RegisterAsync(_bus, _self, Array.Empty<Subscription>());
await _directory.UnregisterAsync(_bus);
}

var commands = _bus.Commands.OfType<RegisterPeerCommand>().ToList();
commands.ShouldHaveSize(25);

var first = commands.ExpectedFirst();
var firstRecipient = _bus.GetRecipientPeer(first)!;

commands.Any(x => _bus.GetRecipientPeer(x)!.Id != firstRecipient.Id).ShouldBeTrue();
}

[Test]
public void should_not_register_existing_peer()
{
Expand Down Expand Up @@ -492,22 +512,6 @@ public async Task should_connect_to_next_directory_if_first_is_failing()
contactedPeers.ShouldContain(new PeerId("Abc.Zebus.DirectoryService.1"));
}

[Test]
public void should_order_directory_peers_randomly()
{
_configuration.IsDirectoryPickedRandomly = true;

for (var i = 0; i < 100; i++)
{
var directoryPeers = _directory.GetDirectoryPeers();
if (directoryPeers.First().EndPoint == "tcp://backup-directory:777")
return;
Thread.Sleep(1); // Ensures that the underlying Random changes seed between tries
}

Assert.Fail("100 tries didn't succeed in returning a shuffled version of the directory peers");
}

[Test]
public async Task should_remove_peer_from_cache_when_decommission()
{
Expand Down
44 changes: 44 additions & 0 deletions src/Abc.Zebus/Directory/DirectoryPeerSelector.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System.Collections.Generic;
using System.Linq;
using Abc.Zebus.Util.Extensions;

namespace Abc.Zebus.Directory
{
internal class DirectoryPeerSelector
{
private readonly IBusConfiguration _configuration;
private string[]? _cachedEndPoints;

public DirectoryPeerSelector(IBusConfiguration configuration)
{
_configuration = configuration;
}

public IEnumerable<Peer> GetPeers()
{
_cachedEndPoints = _configuration.DirectoryServiceEndPoints;

return GetPeersImpl(_cachedEndPoints);
}

public IEnumerable<Peer> GetPeersFromCache()
{
var endPoints = _cachedEndPoints ?? _configuration.DirectoryServiceEndPoints;

return GetPeersImpl(endPoints);
}

private IEnumerable<Peer> GetPeersImpl(string[] endPoints)
{
var peers = endPoints.Select(CreateDirectoryPeer);

return _configuration.IsDirectoryPickedRandomly ? peers.Shuffle() : peers;
}

private static Peer CreateDirectoryPeer(string endPoint, int index)
{
var peerId = new PeerId("Abc.Zebus.DirectoryService." + index);

This comment has been minimized.

Copy link
@MendelMonteiro

MendelMonteiro Jan 30, 2022

Collaborator

This is a bit of a hack that relies that could break things if the configuration returns the directory services in a different order.

This comment has been minimized.

Copy link
@ocoanet

ocoanet Jan 31, 2022

Author Member

Yes, it is cleared a bad solution. There is an open issue about it. I might try to fix this during the next days or weeks.

return new Peer(peerId, endPoint);
}
}
}
24 changes: 5 additions & 19 deletions src/Abc.Zebus/Directory/PeerDirectoryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Abc.Zebus.Routing;
using Abc.Zebus.Util;
using Abc.Zebus.Util.Extensions;
using log4net;
Expand All @@ -29,14 +27,15 @@ public partial class PeerDirectoryClient : IPeerDirectory,
private readonly UniqueTimestampProvider _timestampProvider = new UniqueTimestampProvider(10);
private readonly IBusConfiguration _configuration;
private readonly Stopwatch _pingStopwatch = new Stopwatch();
private readonly DirectoryPeerSelector _directorySelector;
private BlockingCollection<IEvent> _messagesReceivedDuringRegister;
private IEnumerable<Peer> _directoryPeers = Enumerable.Empty<Peer>();
private Peer _self = default!;
private volatile HashSet<Type> _observedSubscriptionMessageTypes = new HashSet<Type>();

public PeerDirectoryClient(IBusConfiguration configuration)
{
_configuration = configuration;
_directorySelector = new DirectoryPeerSelector(configuration);

_messagesReceivedDuringRegister = new BlockingCollection<IEvent>();
_messagesReceivedDuringRegister.CompleteAdding();
Expand Down Expand Up @@ -126,7 +125,7 @@ private PeerDescriptor CreateSelfDescriptor(IEnumerable<Subscription> subscripti

private async Task TryRegisterOnDirectoryAsync(IBus bus, PeerDescriptor selfDescriptor)
{
var directoryPeers = GetDirectoryPeers().ToList();
var directoryPeers = _directorySelector.GetPeers().ToList();

foreach (var directoryPeer in directoryPeers)
{
Expand Down Expand Up @@ -182,7 +181,7 @@ public async Task UpdateSubscriptionsAsync(IBus bus, IEnumerable<SubscriptionsFo

var command = new UpdatePeerSubscriptionsForTypesCommand(_self.Id, _timestampProvider.NextUtcTimestamp(), subscriptions);

foreach (var directoryPeer in GetDirectoryPeers())
foreach (var directoryPeer in _directorySelector.GetPeers())
{
try
{
Expand All @@ -203,7 +202,7 @@ public async Task UnregisterAsync(IBus bus)
var command = new UnregisterPeerCommand(_self, _timestampProvider.NextUtcTimestamp());

// Using a cache of the directory peers in case of the underlying configuration proxy values changed before stopping
foreach (var directoryPeer in _directoryPeers)
foreach (var directoryPeer in _directorySelector.GetPeersFromCache())
{
try
{
Expand Down Expand Up @@ -254,19 +253,6 @@ public void EnableSubscriptionsUpdatedFor(IEnumerable<Type> types)
public IEnumerable<PeerDescriptor> GetPeerDescriptors()
=> _peers.Values.Select(x => x.ToPeerDescriptor()).ToList();

// Only internal for testing purposes
internal IEnumerable<Peer> GetDirectoryPeers()
{
_directoryPeers = _configuration.DirectoryServiceEndPoints.Select(CreateDirectoryPeer);
return _configuration.IsDirectoryPickedRandomly ? _directoryPeers.Shuffle() : _directoryPeers;
}

private static Peer CreateDirectoryPeer(string endPoint, int index)
{
var peerId = new PeerId("Abc.Zebus.DirectoryService." + index);
return new Peer(peerId, endPoint);
}

private void AddOrUpdatePeerEntry(PeerDescriptor peerDescriptor, bool shouldRaisePeerUpdated)
{
var subscriptions = peerDescriptor.Subscriptions ?? Array.Empty<Subscription>();
Expand Down

0 comments on commit c46cca8

Please sign in to comment.