diff --git a/src/Nethermind/Nethermind.Synchronization/Peers/SyncPeerPool.cs b/src/Nethermind/Nethermind.Synchronization/Peers/SyncPeerPool.cs index 263617effbe..119c37b5c27 100644 --- a/src/Nethermind/Nethermind.Synchronization/Peers/SyncPeerPool.cs +++ b/src/Nethermind/Nethermind.Synchronization/Peers/SyncPeerPool.cs @@ -7,6 +7,7 @@ using System.Linq; using System.Runtime.CompilerServices; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using Nethermind.Blockchain; using Nethermind.Blockchain.Find; @@ -37,7 +38,7 @@ public class SyncPeerPool : ISyncPeerPool, IPeerDifficultyRefreshPool private readonly IBlockTree _blockTree; private readonly ILogger _logger; - private readonly BlockingCollection _peerRefreshQueue = new(); + private readonly Channel _peerRefreshQueue = Channel.CreateUnbounded(); private readonly ConcurrentDictionary _peers = new(); @@ -218,7 +219,7 @@ internal IEnumerable ReplaceableAllocations public void RefreshTotalDifficulty(ISyncPeer syncPeer, Hash256 blockHash) { RefreshTotalDiffTask task = new(blockHash, syncPeer); - _peerRefreshQueue.Add(task); + _peerRefreshQueue.Writer.TryWrite(task); } public void AddPeer(ISyncPeer syncPeer) @@ -257,7 +258,7 @@ public void AddPeer(ISyncPeer syncPeer) { if (_logger.IsDebug) _logger.Debug($"Adding {syncPeer.Node:c} to refresh queue"); if (NetworkDiagTracer.IsEnabled) NetworkDiagTracer.ReportInterestingEvent(syncPeer.Node.Address, "adding node to refresh queue"); - _peerRefreshQueue.Add(new RefreshTotalDiffTask(syncPeer)); + _peerRefreshQueue.Writer.TryWrite(new RefreshTotalDiffTask(syncPeer)); } } @@ -354,7 +355,6 @@ public async Task Allocate( } } - private bool TryAllocateOnce(IPeerAllocationStrategy peerAllocationStrategy, AllocationContexts allocationContexts, SyncPeerAllocation allocation) { lock (_isAllocatedChecks) @@ -392,7 +392,7 @@ public void Free(SyncPeerAllocation syncPeerAllocation) private async Task RunRefreshPeerLoop() { - foreach (RefreshTotalDiffTask refreshTask in _peerRefreshQueue.GetConsumingEnumerable(_refreshLoopCancellation.Token)) + await foreach (RefreshTotalDiffTask refreshTask in _peerRefreshQueue.Reader.ReadAllAsync(_refreshLoopCancellation.Token)) { ISyncPeer syncPeer = refreshTask.SyncPeer; if (_logger.IsTrace) _logger.Trace($"Refreshing info for {syncPeer}."); @@ -713,7 +713,7 @@ public async ValueTask DisposeAsync() await (_refreshLoopTask ?? Task.CompletedTask); Parallel.ForEach(_peers, p => { p.Value.SyncPeer.Disconnect(DisconnectReason.AppClosing, "App Close"); }); - _peerRefreshQueue.Dispose(); + _peerRefreshQueue.Writer.TryComplete(); _refreshLoopCancellation.Dispose(); _refreshLoopTask?.Dispose(); _signals.Dispose();