Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add node health #1811

Closed
wants to merge 15 commits into from
1 change: 1 addition & 0 deletions src/neo/Consensus/ConsensusService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,7 @@ private void SendPrepareRequest()

if (context.TransactionHashes.Length > 0)
{
taskManager.Tell(new TaskManager.InvHashes { Hashes = context.TransactionHashes });
foreach (InvPayload payload in InvPayload.CreateGroup(InventoryType.TX, context.TransactionHashes))
localNode.Tell(Message.Create(MessageCommand.Inv, payload));
}
Expand Down
4 changes: 4 additions & 0 deletions src/neo/Network/P2P/LocalNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class LocalNode : Peer
{
internal class RelayDirectly { public IInventory Inventory; }
internal class SendDirectly { public IInventory Inventory; }
internal class MaliciousNode { public IActorRef actor; }

public const uint ProtocolVersion = 0;
private const int MaxCountFromSeedList = 5;
Expand Down Expand Up @@ -196,6 +197,9 @@ protected override void OnReceive(object message)
case SendDirectly send:
OnSendDirectly(send.Inventory);
break;
case MaliciousNode actor:
RemoveMaliciousNode(actor.actor);
break;
}
}

Expand Down
30 changes: 29 additions & 1 deletion src/neo/Network/P2P/Peer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ private class WsConnected { public WebSocket Socket; public IPEndPoint Remote; p
private ICancelable timer;
protected ActorSelection Connections => Context.ActorSelection("connection_*");

private static readonly TimeSpan BlacklistTime = TimeSpan.FromMinutes(10);
private static readonly HashSet<IPAddress> localAddresses = new HashSet<IPAddress>();
private readonly Dictionary<IPAddress, int> ConnectedAddresses = new Dictionary<IPAddress, int>();
private readonly Dictionary<IPEndPoint, DateTime> IPAddressBlacklist = new Dictionary<IPEndPoint, DateTime>();
/// <summary>
/// A dictionary that stores the connected nodes.
/// </summary>
Expand Down Expand Up @@ -89,12 +91,21 @@ protected void AddPeers(IEnumerable<IPEndPoint> peers)
}
}

private void AddToBlacklist(IPEndPoint endPoint)
{
if (IPAddressBlacklist.ContainsKey(endPoint))
IPAddressBlacklist[endPoint] = TimeProvider.Current.UtcNow;
else
IPAddressBlacklist.TryAdd(endPoint, TimeProvider.Current.UtcNow);
}

protected void ConnectToPeer(IPEndPoint endPoint, bool isTrusted = false)
{
endPoint = endPoint.Unmap();
// If the address is the same, the ListenerTcpPort should be different, otherwise, return
if (endPoint.Port == ListenerTcpPort && localAddresses.Contains(endPoint.Address)) return;

// If the address is in blacklist, return
if (IPAddressBlacklist.ContainsKey(endPoint)) return;
if (isTrusted) TrustedIpAddresses.Add(endPoint.Address);
// If connections with the peer greater than or equal to MaxConnectionsPerAddress, return.
if (ConnectedAddresses.TryGetValue(endPoint.Address, out int count) && count >= MaxConnectionsPerAddress)
Expand All @@ -121,6 +132,16 @@ private static bool IsIntranetAddress(IPAddress address)
/// <param name="count">Number of peers that are being requested.</param>
protected abstract void NeedMorePeers(int count);

public void RemoveMaliciousNode(IActorRef actor)
{
if (ConnectedPeers.TryRemove(actor, out IPEndPoint ipEndPoint))
{
AddToBlacklist(ipEndPoint);
UnconnectedPeers.Remove(ipEndPoint);
actor.Tell(Tcp.Abort.Instance);
}
}

protected override void OnReceive(object message)
{
switch (message)
Expand Down Expand Up @@ -264,6 +285,13 @@ private void OnTerminated(IActorRef actorRef)

private void OnTimer()
{
var now = DateTime.Now;
foreach (var item in IPAddressBlacklist.ToList())
{
if (now - item.Value > BlacklistTime)
IPAddressBlacklist.Remove(item.Key);
}

// Check if the number of desired connections is already enough
if (ConnectedPeers.Count >= MinDesiredConnections) return;

Expand Down
45 changes: 31 additions & 14 deletions src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ protected override UInt256 GetKeyForItem((UInt256, DateTime) item)
private readonly PendingKnownHashesCollection pendingKnownHashes = new PendingKnownHashesCollection();
private readonly HashSetCache<UInt256> knownHashes = new HashSetCache<UInt256>(Blockchain.Singleton.MemPool.Capacity * 2 / 5);
private readonly HashSetCache<UInt256> sentHashes = new HashSetCache<UInt256>(Blockchain.Singleton.MemPool.Capacity * 2 / 5);
private readonly HashSet<uint> receivedBlcokIndex = new HashSet<uint>(Blockchain.Singleton.MemPool.Capacity * 2 / 5);
private bool verack = false;
private BloomFilter bloom_filter;

Expand Down Expand Up @@ -208,13 +209,24 @@ private void OnGetBlockByIndexMessageReceived(GetBlockByIndexPayload payload)
}
}

private void OnGetDataMessageReceived(InvPayload payload)
{
if (payload.Type is InventoryType.TX)
{
system.TaskManager.Tell(new TaskManager.GetDataHashes { Hashes = payload.Hashes });
return;
}
else
OnGetData(payload);
}

/// <summary>
/// Will be triggered when a MessageCommand.GetData message is received.
/// The payload includes an array of hash values.
/// For different payload.Type (Tx, Block, Consensus), get the corresponding (Txs, Blocks, Consensus) and tell them to RemoteNode actor.
/// </summary>
/// <param name="payload">The payload containing the requested information.</param>
private void OnGetDataMessageReceived(InvPayload payload)
private void OnGetData(InvPayload payload)
{
var notFound = new List<UInt256>();
foreach (UInt256 hash in payload.Hashes.Where(p => sentHashes.Add(p)))
Expand Down Expand Up @@ -270,7 +282,7 @@ private void OnGetDataMessageReceived(InvPayload payload)
private void OnGetHeadersMessageReceived(GetBlockByIndexPayload payload)
{
uint index = payload.IndexStart;
uint count = payload.Count == -1 ? HeadersPayload.MaxHeadersCount : (uint)payload.Count;
uint count = payload.Count == -1 ? HeadersPayload.MaxHeadersCount : Math.Min((uint)payload.Count, HeadersPayload.MaxHeadersCount);
if (index > Blockchain.Singleton.HeaderHeight)
return;
List<Header> headers = new List<Header>();
Expand All @@ -286,19 +298,17 @@ private void OnGetHeadersMessageReceived(GetBlockByIndexPayload payload)

private void OnInventoryReceived(IInventory inventory)
{
pendingKnownHashes.Remove(inventory.Hash);
knownHashes.Add(inventory.Hash);
system.TaskManager.Tell(inventory);
system.Blockchain.Tell(inventory, ActorRefs.NoSender);
switch (inventory)
if (inventory is Block block)
{
case Transaction transaction:
system.Consensus?.Tell(transaction);
break;
case Block block:
UpdateLastBlockIndex(block.Index, false);
break;
if (receivedBlcokIndex.Contains(block.Index)) return;
receivedBlcokIndex.Add(block.Index);
UpdateLastBlockIndex(block.Index, false);
}

system.TaskManager.Tell(inventory);
system.Blockchain.Tell(inventory, ActorRefs.NoSender);
if (inventory is Transaction transaction)
system.Consensus?.Tell(transaction);
}

private void OnInvMessageReceived(InvPayload payload)
Expand Down Expand Up @@ -371,7 +381,7 @@ private void OnVersionMessageReceived(VersionPayload payload)
SendMessage(Message.Create(MessageCommand.Verack));
}

private void RefreshPendingKnownHashes()
private void OnTimer()
{
while (pendingKnownHashes.Count > 0)
{
Expand All @@ -380,6 +390,13 @@ private void RefreshPendingKnownHashes()
break;
pendingKnownHashes.RemoveAt(0);
}

var height = Blockchain.Singleton.Height;
foreach (var item in receivedBlcokIndex.ToArray())
{
if (item <= height)
receivedBlcokIndex.Remove(item);
}
}

private void UpdateLastBlockIndex(uint lastBlockIndex, bool requestTasks)
Expand Down
23 changes: 22 additions & 1 deletion src/neo/Network/P2P/RemoteNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Neo.Ledger;
using Neo.Network.P2P.Capabilities;
using Neo.Network.P2P.Payloads;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
Expand All @@ -24,6 +25,9 @@ internal class Relay { public IInventory Inventory; }
private readonly Queue<Message> message_queue_low = new Queue<Message>();
private ByteString msg_buffer = ByteString.Empty;
private bool ack = true;
private int message_count = 0;
private DateTime start_message_time;
private readonly double time_threshold = 100;

public IPEndPoint Listener => new IPEndPoint(Remote.Address, ListenerTcpPort);
public int ListenerTcpPort { get; private set; } = 0;
Expand Down Expand Up @@ -112,7 +116,21 @@ protected override void OnData(ByteString data)
msg_buffer = msg_buffer.Concat(data);

for (Message message = TryParseMessage(); message != null; message = TryParseMessage())
{
if (message_count == 0)
start_message_time = TimeProvider.Current.UtcNow;
else if (message_count > 1000)
ShawnYun marked this conversation as resolved.
Show resolved Hide resolved
{
message_count = 0;
if ((TimeProvider.Current.UtcNow - start_message_time).TotalMilliseconds < time_threshold)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a threshold for receive, but not for send, what happens if it's a normal behaviour be fast?

{
Disconnect(true);
return;
}
}
OnMessage(message);
message_count++;
}
}

protected override void OnReceive(object message)
Expand All @@ -121,7 +139,7 @@ protected override void OnReceive(object message)
switch (message)
{
case Timer _:
RefreshPendingKnownHashes();
OnTimer();
break;
case Message msg:
EnqueueMessage(msg);
Expand All @@ -135,6 +153,9 @@ protected override void OnReceive(object message)
case StartProtocol _:
OnStartProtocol();
break;
case InvPayload payload:
OnGetData(payload);
break;
}
}

Expand Down
Loading