From c46cca82eb5c86593115396e03d3641f44c5e677 Mon Sep 17 00:00:00 2001 From: Olivier Coanet Date: Thu, 27 Jan 2022 15:46:37 +0100 Subject: [PATCH] Cleanup PeerDirectoryClient - Remove test-only method - Extract DirectoryPeerSelector --- .../Directory/DirectoryPeerSelectorTests.cs | 40 +++++++++++++++++ .../Directory/PeerDirectoryClientTests.cs | 36 ++++++++------- .../Directory/DirectoryPeerSelector.cs | 44 +++++++++++++++++++ .../Directory/PeerDirectoryClient.cs | 24 +++------- 4 files changed, 109 insertions(+), 35 deletions(-) create mode 100644 src/Abc.Zebus.Tests/Directory/DirectoryPeerSelectorTests.cs create mode 100644 src/Abc.Zebus/Directory/DirectoryPeerSelector.cs diff --git a/src/Abc.Zebus.Tests/Directory/DirectoryPeerSelectorTests.cs b/src/Abc.Zebus.Tests/Directory/DirectoryPeerSelectorTests.cs new file mode 100644 index 00000000..ad85b300 --- /dev/null +++ b/src/Abc.Zebus.Tests/Directory/DirectoryPeerSelectorTests.cs @@ -0,0 +1,40 @@ +using System.Collections.Generic; +using System.Linq; +using Abc.Zebus.Core; +using Abc.Zebus.Directory; +using Abc.Zebus.Testing.Extensions; +using NUnit.Framework; + +namespace Abc.Zebus.Tests.Directory +{ + [TestFixture] + public class DirectoryPeerSelectorTests + { + private BusConfiguration _configuration; + private DirectoryPeerSelector _selector; + + [SetUp] + public void SetUp() + { + _configuration = new BusConfiguration(new[] { "tcp://dir1:129", "tcp://dir2:129" }); + _selector = new DirectoryPeerSelector(_configuration); + } + + [Test] + public void should_get_peers_randomly() + { + // Arrange + var results = new List(); + + // Act + for (int i = 0; i < 25; i++) + { + results.Add(_selector.GetPeers().ToArray()); + } + + // Assert + var firstResult = results[0]; + results.Any(x => x[0] != firstResult[0]).ShouldBeTrue(); + } + } +} diff --git a/src/Abc.Zebus.Tests/Directory/PeerDirectoryClientTests.cs b/src/Abc.Zebus.Tests/Directory/PeerDirectoryClientTests.cs index 5866df16..fe19186d 100644 --- a/src/Abc.Zebus.Tests/Directory/PeerDirectoryClientTests.cs +++ b/src/Abc.Zebus.Tests/Directory/PeerDirectoryClientTests.cs @@ -97,6 +97,26 @@ public async Task should_re_register_peer() _directory.GetPeer(peer2.Id).ShouldNotBeNull(); } + [Test] + public async Task should_re_register_peer_using_random_directory_endpoint() + { + _configuration.IsDirectoryPickedRandomly = true; + + for (var i = 0; i < 25; i++) + { + await _directory.RegisterAsync(_bus, _self, Array.Empty()); + await _directory.UnregisterAsync(_bus); + } + + var commands = _bus.Commands.OfType().ToList(); + commands.ShouldHaveSize(25); + + var first = commands.ExpectedFirst(); + var firstRecipient = _bus.GetRecipientPeer(first)!; + + commands.Any(x => _bus.GetRecipientPeer(x)!.Id != firstRecipient.Id).ShouldBeTrue(); + } + [Test] public void should_not_register_existing_peer() { @@ -492,22 +512,6 @@ public async Task should_connect_to_next_directory_if_first_is_failing() contactedPeers.ShouldContain(new PeerId("Abc.Zebus.DirectoryService.1")); } - [Test] - public void should_order_directory_peers_randomly() - { - _configuration.IsDirectoryPickedRandomly = true; - - for (var i = 0; i < 100; i++) - { - var directoryPeers = _directory.GetDirectoryPeers(); - if (directoryPeers.First().EndPoint == "tcp://backup-directory:777") - return; - Thread.Sleep(1); // Ensures that the underlying Random changes seed between tries - } - - Assert.Fail("100 tries didn't succeed in returning a shuffled version of the directory peers"); - } - [Test] public async Task should_remove_peer_from_cache_when_decommission() { diff --git a/src/Abc.Zebus/Directory/DirectoryPeerSelector.cs b/src/Abc.Zebus/Directory/DirectoryPeerSelector.cs new file mode 100644 index 00000000..0a79f368 --- /dev/null +++ b/src/Abc.Zebus/Directory/DirectoryPeerSelector.cs @@ -0,0 +1,44 @@ +using System.Collections.Generic; +using System.Linq; +using Abc.Zebus.Util.Extensions; + +namespace Abc.Zebus.Directory +{ + internal class DirectoryPeerSelector + { + private readonly IBusConfiguration _configuration; + private string[]? _cachedEndPoints; + + public DirectoryPeerSelector(IBusConfiguration configuration) + { + _configuration = configuration; + } + + public IEnumerable GetPeers() + { + _cachedEndPoints = _configuration.DirectoryServiceEndPoints; + + return GetPeersImpl(_cachedEndPoints); + } + + public IEnumerable GetPeersFromCache() + { + var endPoints = _cachedEndPoints ?? _configuration.DirectoryServiceEndPoints; + + return GetPeersImpl(endPoints); + } + + private IEnumerable GetPeersImpl(string[] endPoints) + { + var peers = endPoints.Select(CreateDirectoryPeer); + + return _configuration.IsDirectoryPickedRandomly ? peers.Shuffle() : peers; + } + + private static Peer CreateDirectoryPeer(string endPoint, int index) + { + var peerId = new PeerId("Abc.Zebus.DirectoryService." + index); + return new Peer(peerId, endPoint); + } + } +} diff --git a/src/Abc.Zebus/Directory/PeerDirectoryClient.cs b/src/Abc.Zebus/Directory/PeerDirectoryClient.cs index 88f9d9de..066daca3 100644 --- a/src/Abc.Zebus/Directory/PeerDirectoryClient.cs +++ b/src/Abc.Zebus/Directory/PeerDirectoryClient.cs @@ -3,9 +3,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; -using System.Threading; using System.Threading.Tasks; -using Abc.Zebus.Routing; using Abc.Zebus.Util; using Abc.Zebus.Util.Extensions; using log4net; @@ -29,14 +27,15 @@ public partial class PeerDirectoryClient : IPeerDirectory, private readonly UniqueTimestampProvider _timestampProvider = new UniqueTimestampProvider(10); private readonly IBusConfiguration _configuration; private readonly Stopwatch _pingStopwatch = new Stopwatch(); + private readonly DirectoryPeerSelector _directorySelector; private BlockingCollection _messagesReceivedDuringRegister; - private IEnumerable _directoryPeers = Enumerable.Empty(); private Peer _self = default!; private volatile HashSet _observedSubscriptionMessageTypes = new HashSet(); public PeerDirectoryClient(IBusConfiguration configuration) { _configuration = configuration; + _directorySelector = new DirectoryPeerSelector(configuration); _messagesReceivedDuringRegister = new BlockingCollection(); _messagesReceivedDuringRegister.CompleteAdding(); @@ -126,7 +125,7 @@ private PeerDescriptor CreateSelfDescriptor(IEnumerable subscripti private async Task TryRegisterOnDirectoryAsync(IBus bus, PeerDescriptor selfDescriptor) { - var directoryPeers = GetDirectoryPeers().ToList(); + var directoryPeers = _directorySelector.GetPeers().ToList(); foreach (var directoryPeer in directoryPeers) { @@ -182,7 +181,7 @@ public async Task UpdateSubscriptionsAsync(IBus bus, IEnumerable types) public IEnumerable GetPeerDescriptors() => _peers.Values.Select(x => x.ToPeerDescriptor()).ToList(); - // Only internal for testing purposes - internal IEnumerable GetDirectoryPeers() - { - _directoryPeers = _configuration.DirectoryServiceEndPoints.Select(CreateDirectoryPeer); - return _configuration.IsDirectoryPickedRandomly ? _directoryPeers.Shuffle() : _directoryPeers; - } - - private static Peer CreateDirectoryPeer(string endPoint, int index) - { - var peerId = new PeerId("Abc.Zebus.DirectoryService." + index); - return new Peer(peerId, endPoint); - } - private void AddOrUpdatePeerEntry(PeerDescriptor peerDescriptor, bool shouldRaisePeerUpdated) { var subscriptions = peerDescriptor.Subscriptions ?? Array.Empty();