Skip to content

Commit

Permalink
SyncPeerPool to Channels (#7844)
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams authored Dec 1, 2024
1 parent 8fed53f commit 6adf106
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions src/Nethermind/Nethermind.Synchronization/Peers/SyncPeerPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Find;
Expand Down Expand Up @@ -37,7 +38,7 @@ public class SyncPeerPool : ISyncPeerPool, IPeerDifficultyRefreshPool

private readonly IBlockTree _blockTree;
private readonly ILogger _logger;
private readonly BlockingCollection<RefreshTotalDiffTask> _peerRefreshQueue = new();
private readonly Channel<RefreshTotalDiffTask> _peerRefreshQueue = Channel.CreateUnbounded<RefreshTotalDiffTask>();

private readonly ConcurrentDictionary<PublicKey, PeerInfo> _peers = new();

Expand Down Expand Up @@ -218,7 +219,7 @@ internal IEnumerable<SyncPeerAllocation> ReplaceableAllocations
public void RefreshTotalDifficulty(ISyncPeer syncPeer, Hash256 blockHash)
{
RefreshTotalDiffTask task = new(blockHash, syncPeer);
_peerRefreshQueue.Add(task);
_peerRefreshQueue.Writer.TryWrite(task);
}

public void AddPeer(ISyncPeer syncPeer)
Expand Down Expand Up @@ -257,7 +258,7 @@ public void AddPeer(ISyncPeer syncPeer)
{
if (_logger.IsDebug) _logger.Debug($"Adding {syncPeer.Node:c} to refresh queue");
if (NetworkDiagTracer.IsEnabled) NetworkDiagTracer.ReportInterestingEvent(syncPeer.Node.Address, "adding node to refresh queue");
_peerRefreshQueue.Add(new RefreshTotalDiffTask(syncPeer));
_peerRefreshQueue.Writer.TryWrite(new RefreshTotalDiffTask(syncPeer));
}
}

Expand Down Expand Up @@ -354,7 +355,6 @@ public async Task<SyncPeerAllocation> Allocate(
}
}


private bool TryAllocateOnce(IPeerAllocationStrategy peerAllocationStrategy, AllocationContexts allocationContexts, SyncPeerAllocation allocation)
{
lock (_isAllocatedChecks)
Expand Down Expand Up @@ -392,7 +392,7 @@ public void Free(SyncPeerAllocation syncPeerAllocation)

private async Task RunRefreshPeerLoop()
{
foreach (RefreshTotalDiffTask refreshTask in _peerRefreshQueue.GetConsumingEnumerable(_refreshLoopCancellation.Token))
await foreach (RefreshTotalDiffTask refreshTask in _peerRefreshQueue.Reader.ReadAllAsync(_refreshLoopCancellation.Token))
{
ISyncPeer syncPeer = refreshTask.SyncPeer;
if (_logger.IsTrace) _logger.Trace($"Refreshing info for {syncPeer}.");
Expand Down Expand Up @@ -713,7 +713,7 @@ public async ValueTask DisposeAsync()
await (_refreshLoopTask ?? Task.CompletedTask);
Parallel.ForEach(_peers, p => { p.Value.SyncPeer.Disconnect(DisconnectReason.AppClosing, "App Close"); });

_peerRefreshQueue.Dispose();
_peerRefreshQueue.Writer.TryComplete();
_refreshLoopCancellation.Dispose();
_refreshLoopTask?.Dispose();
_signals.Dispose();
Expand Down

0 comments on commit 6adf106

Please sign in to comment.