diff --git a/src/neo/Consensus/ConsensusService.cs b/src/neo/Consensus/ConsensusService.cs index 8054147eb5..9e33202299 100644 --- a/src/neo/Consensus/ConsensusService.cs +++ b/src/neo/Consensus/ConsensusService.cs @@ -657,6 +657,7 @@ private void SendPrepareRequest() if (context.TransactionHashes.Length > 0) { + taskManager.Tell(new TaskManager.InvHashes { Hashes = context.TransactionHashes }); foreach (InvPayload payload in InvPayload.CreateGroup(InventoryType.TX, context.TransactionHashes)) localNode.Tell(Message.Create(MessageCommand.Inv, payload)); } diff --git a/src/neo/Network/P2P/LocalNode.cs b/src/neo/Network/P2P/LocalNode.cs index c8bc937001..1b7f0388fd 100644 --- a/src/neo/Network/P2P/LocalNode.cs +++ b/src/neo/Network/P2P/LocalNode.cs @@ -17,6 +17,7 @@ public class LocalNode : Peer { internal class RelayDirectly { public IInventory Inventory; } internal class SendDirectly { public IInventory Inventory; } + internal class MaliciousNode { public IActorRef actor; } public const uint ProtocolVersion = 0; private const int MaxCountFromSeedList = 5; @@ -196,6 +197,9 @@ protected override void OnReceive(object message) case SendDirectly send: OnSendDirectly(send.Inventory); break; + case MaliciousNode actor: + RemoveMaliciousNode(actor.actor); + break; } } diff --git a/src/neo/Network/P2P/Peer.cs b/src/neo/Network/P2P/Peer.cs index 106f530b71..e0e32a5885 100644 --- a/src/neo/Network/P2P/Peer.cs +++ b/src/neo/Network/P2P/Peer.cs @@ -34,8 +34,10 @@ private class WsConnected { public WebSocket Socket; public IPEndPoint Remote; p private ICancelable timer; protected ActorSelection Connections => Context.ActorSelection("connection_*"); + private static readonly TimeSpan BlacklistTime = TimeSpan.FromMinutes(10); private static readonly HashSet localAddresses = new HashSet(); private readonly Dictionary ConnectedAddresses = new Dictionary(); + private readonly Dictionary IPAddressBlacklist = new Dictionary(); /// /// A dictionary that stores the connected nodes. /// @@ -89,12 +91,21 @@ protected void AddPeers(IEnumerable peers) } } + private void AddToBlacklist(IPEndPoint endPoint) + { + if (IPAddressBlacklist.ContainsKey(endPoint)) + IPAddressBlacklist[endPoint] = TimeProvider.Current.UtcNow; + else + IPAddressBlacklist.TryAdd(endPoint, TimeProvider.Current.UtcNow); + } + protected void ConnectToPeer(IPEndPoint endPoint, bool isTrusted = false) { endPoint = endPoint.Unmap(); // If the address is the same, the ListenerTcpPort should be different, otherwise, return if (endPoint.Port == ListenerTcpPort && localAddresses.Contains(endPoint.Address)) return; - + // If the address is in blacklist, return + if (IPAddressBlacklist.ContainsKey(endPoint)) return; if (isTrusted) TrustedIpAddresses.Add(endPoint.Address); // If connections with the peer greater than or equal to MaxConnectionsPerAddress, return. if (ConnectedAddresses.TryGetValue(endPoint.Address, out int count) && count >= MaxConnectionsPerAddress) @@ -121,6 +132,16 @@ private static bool IsIntranetAddress(IPAddress address) /// Number of peers that are being requested. protected abstract void NeedMorePeers(int count); + public void RemoveMaliciousNode(IActorRef actor) + { + if (ConnectedPeers.TryRemove(actor, out IPEndPoint ipEndPoint)) + { + AddToBlacklist(ipEndPoint); + UnconnectedPeers.Remove(ipEndPoint); + actor.Tell(Tcp.Abort.Instance); + } + } + protected override void OnReceive(object message) { switch (message) @@ -264,6 +285,13 @@ private void OnTerminated(IActorRef actorRef) private void OnTimer() { + var now = DateTime.Now; + foreach (var item in IPAddressBlacklist.ToList()) + { + if (now - item.Value > BlacklistTime) + IPAddressBlacklist.Remove(item.Key); + } + // Check if the number of desired connections is already enough if (ConnectedPeers.Count >= MinDesiredConnections) return; diff --git a/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs b/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs index 0cddbd49d7..25c8fa24ac 100644 --- a/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs +++ b/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs @@ -29,6 +29,7 @@ protected override UInt256 GetKeyForItem((UInt256, DateTime) item) private readonly PendingKnownHashesCollection pendingKnownHashes = new PendingKnownHashesCollection(); private readonly HashSetCache knownHashes = new HashSetCache(Blockchain.Singleton.MemPool.Capacity * 2 / 5); private readonly HashSetCache sentHashes = new HashSetCache(Blockchain.Singleton.MemPool.Capacity * 2 / 5); + private readonly HashSet receivedBlcokIndex = new HashSet(Blockchain.Singleton.MemPool.Capacity * 2 / 5); private bool verack = false; private BloomFilter bloom_filter; @@ -208,13 +209,24 @@ private void OnGetBlockByIndexMessageReceived(GetBlockByIndexPayload payload) } } + private void OnGetDataMessageReceived(InvPayload payload) + { + if (payload.Type is InventoryType.TX) + { + system.TaskManager.Tell(new TaskManager.GetDataHashes { Hashes = payload.Hashes }); + return; + } + else + OnGetData(payload); + } + /// /// Will be triggered when a MessageCommand.GetData message is received. /// The payload includes an array of hash values. /// For different payload.Type (Tx, Block, Consensus), get the corresponding (Txs, Blocks, Consensus) and tell them to RemoteNode actor. /// /// The payload containing the requested information. - private void OnGetDataMessageReceived(InvPayload payload) + private void OnGetData(InvPayload payload) { var notFound = new List(); foreach (UInt256 hash in payload.Hashes.Where(p => sentHashes.Add(p))) @@ -270,7 +282,7 @@ private void OnGetDataMessageReceived(InvPayload payload) private void OnGetHeadersMessageReceived(GetBlockByIndexPayload payload) { uint index = payload.IndexStart; - uint count = payload.Count == -1 ? HeadersPayload.MaxHeadersCount : (uint)payload.Count; + uint count = payload.Count == -1 ? HeadersPayload.MaxHeadersCount : Math.Min((uint)payload.Count, HeadersPayload.MaxHeadersCount); if (index > Blockchain.Singleton.HeaderHeight) return; List
headers = new List
(); @@ -286,19 +298,17 @@ private void OnGetHeadersMessageReceived(GetBlockByIndexPayload payload) private void OnInventoryReceived(IInventory inventory) { - pendingKnownHashes.Remove(inventory.Hash); - knownHashes.Add(inventory.Hash); - system.TaskManager.Tell(inventory); - system.Blockchain.Tell(inventory, ActorRefs.NoSender); - switch (inventory) + if (inventory is Block block) { - case Transaction transaction: - system.Consensus?.Tell(transaction); - break; - case Block block: - UpdateLastBlockIndex(block.Index, false); - break; + if (receivedBlcokIndex.Contains(block.Index)) return; + receivedBlcokIndex.Add(block.Index); + UpdateLastBlockIndex(block.Index, false); } + + system.TaskManager.Tell(inventory); + system.Blockchain.Tell(inventory, ActorRefs.NoSender); + if (inventory is Transaction transaction) + system.Consensus?.Tell(transaction); } private void OnInvMessageReceived(InvPayload payload) @@ -371,7 +381,7 @@ private void OnVersionMessageReceived(VersionPayload payload) SendMessage(Message.Create(MessageCommand.Verack)); } - private void RefreshPendingKnownHashes() + private void OnTimer() { while (pendingKnownHashes.Count > 0) { @@ -380,6 +390,13 @@ private void RefreshPendingKnownHashes() break; pendingKnownHashes.RemoveAt(0); } + + var height = Blockchain.Singleton.Height; + foreach (var item in receivedBlcokIndex.ToArray()) + { + if (item <= height) + receivedBlcokIndex.Remove(item); + } } private void UpdateLastBlockIndex(uint lastBlockIndex, bool requestTasks) diff --git a/src/neo/Network/P2P/RemoteNode.cs b/src/neo/Network/P2P/RemoteNode.cs index b1457a8522..807781e89b 100644 --- a/src/neo/Network/P2P/RemoteNode.cs +++ b/src/neo/Network/P2P/RemoteNode.cs @@ -7,6 +7,7 @@ using Neo.Ledger; using Neo.Network.P2P.Capabilities; using Neo.Network.P2P.Payloads; +using System; using System.Collections; using System.Collections.Generic; using System.Linq; @@ -24,6 +25,9 @@ internal class Relay { public IInventory Inventory; } private readonly Queue message_queue_low = new Queue(); private ByteString msg_buffer = ByteString.Empty; private bool ack = true; + private int message_count = 0; + private DateTime start_message_time; + private readonly double time_threshold = 100; public IPEndPoint Listener => new IPEndPoint(Remote.Address, ListenerTcpPort); public int ListenerTcpPort { get; private set; } = 0; @@ -112,7 +116,21 @@ protected override void OnData(ByteString data) msg_buffer = msg_buffer.Concat(data); for (Message message = TryParseMessage(); message != null; message = TryParseMessage()) + { + if (message_count == 0) + start_message_time = TimeProvider.Current.UtcNow; + else if (message_count > 1000) + { + message_count = 0; + if ((TimeProvider.Current.UtcNow - start_message_time).TotalMilliseconds < time_threshold) + { + Disconnect(true); + return; + } + } OnMessage(message); + message_count++; + } } protected override void OnReceive(object message) @@ -121,7 +139,7 @@ protected override void OnReceive(object message) switch (message) { case Timer _: - RefreshPendingKnownHashes(); + OnTimer(); break; case Message msg: EnqueueMessage(msg); @@ -135,6 +153,9 @@ protected override void OnReceive(object message) case StartProtocol _: OnStartProtocol(); break; + case InvPayload payload: + OnGetData(payload); + break; } } diff --git a/src/neo/Network/P2P/TaskManager.cs b/src/neo/Network/P2P/TaskManager.cs index 5bea005a36..a73965d4b3 100644 --- a/src/neo/Network/P2P/TaskManager.cs +++ b/src/neo/Network/P2P/TaskManager.cs @@ -18,14 +18,18 @@ public class Register { public VersionPayload Version; } public class Update { public uint LastBlockIndex; public bool RequestTasks; } public class NewTasks { public InvPayload Payload; } public class RestartTasks { public InvPayload Payload; } + public class InvHashes { public UInt256[] Hashes; } + public class GetDataHashes { public UInt256[] Hashes; } private class Timer { } private static readonly TimeSpan TimerInterval = TimeSpan.FromSeconds(30); private static readonly TimeSpan TaskTimeout = TimeSpan.FromMinutes(1); + private static readonly TimeSpan InvHashTimeout = TimeSpan.FromMinutes(30); + private static readonly TimeSpan ReduceTimeoutRecord = TimeSpan.FromDays(1); private static readonly UInt256 MemPoolTaskHash = UInt256.Parse("0x0000000000000000000000000000000000000000000000000000000000000001"); private const int MaxConncurrentTasks = 3; - private const int MaxSyncTasksCount = 50; + private const int MaxSyncTasksCount = 500; private const int PingCoolingOffPeriod = 60_000; // in ms. private readonly NeoSystem system; @@ -33,6 +37,7 @@ private class Timer { } /// A set of known hashes, of inventories or payloads, already received. /// private readonly HashSetCache knownHashes; + private readonly Dictionary sentInvHashes = new Dictionary(); private readonly Dictionary globalTasks = new Dictionary(); private readonly Dictionary receivedBlockIndex = new Dictionary(); private readonly HashSet failedSyncTasks = new HashSet(); @@ -53,10 +58,8 @@ private bool AssignSyncTask(uint index, TaskSession filterSession = null) { if (index <= Blockchain.Singleton.Height || sessions.Values.Any(p => p != filterSession && p.IndexTasks.ContainsKey(index))) return true; - Random rand = new Random(); KeyValuePair remoteNode = sessions.Where(p => p.Value != filterSession && p.Value.LastBlockIndex >= index) - .OrderBy(p => p.Value.IndexTasks.Count) - .ThenBy(s => rand.Next()) + .OrderBy(p => p.Value.Weight) .FirstOrDefault(); if (remoteNode.Value == null) { @@ -65,6 +68,7 @@ private bool AssignSyncTask(uint index, TaskSession filterSession = null) } TaskSession session = remoteNode.Value; session.IndexTasks.TryAdd(index, TimeProvider.Current.UtcNow); + session.UpdateWeight(); remoteNode.Key.Tell(Message.Create(MessageCommand.GetBlockByIndex, GetBlockByIndexPayload.Create(index, 1))); failedSyncTasks.Remove(index); return true; @@ -74,19 +78,29 @@ private void OnBlock(Block block) { var session = sessions.Values.FirstOrDefault(p => p.IndexTasks.ContainsKey(block.Index)); if (session is null) return; + var newRTT = (TimeProvider.Current.UtcNow - session.IndexTasks[block.Index]).TotalMilliseconds; + session.UpdateRTT(newRTT); session.IndexTasks.Remove(block.Index); + session.UpdateWeight(); receivedBlockIndex.TryAdd(block.Index, session); RequestTasks(); } private void OnInvalidBlock(Block invalidBlock) { - receivedBlockIndex.TryGetValue(invalidBlock.Index, out TaskSession session); - if (session is null) return; - session.InvalidBlockCount++; - session.IndexTasks.Remove(invalidBlock.Index); + if (!receivedBlockIndex.TryGetValue(invalidBlock.Index, out TaskSession session)) + { + AssignSyncTask(invalidBlock.Index); + return; + } receivedBlockIndex.Remove(invalidBlock.Index); - AssignSyncTask(invalidBlock.Index, session); + var actor = sessions.Where(p => p.Value == session).FirstOrDefault().Key; + if (actor != null) + { + OnTerminated(actor); + system.LocalNode.Tell(new LocalNode.MaliciousNode { actor = actor }); + AssignSyncTask(invalidBlock.Index); + } } private void OnNewTasks(InvPayload payload) @@ -139,6 +153,7 @@ protected override void OnReceive(object message) break; case Block block: OnBlock(block); + OnTaskCompleted(block.Hash); break; case IInventory inventory: OnTaskCompleted(inventory.Hash); @@ -150,6 +165,12 @@ protected override void OnReceive(object message) if (rr.Inventory is Block invalidBlock && rr.Result == VerifyResult.Invalid) OnInvalidBlock(invalidBlock); break; + case InvHashes invHashes: + OnInvHashes(invHashes.Hashes); + break; + case GetDataHashes getDataHashes: + OnGetDataHashes(getDataHashes.Hashes); + break; case Timer _: OnTimer(); break; @@ -195,6 +216,33 @@ private void OnTaskCompleted(UInt256 hash) session.InvTasks.Remove(hash); } + private void OnInvHashes(UInt256[] hashes) + { + foreach (var hash in hashes) + { + if (sentInvHashes.ContainsKey(hash)) + sentInvHashes[hash] = TimeProvider.Current.UtcNow; + else + sentInvHashes.TryAdd(hash, TimeProvider.Current.UtcNow); + } + } + + private void OnGetDataHashes(UInt256[] hashes) + { + var invalidHashesCount = 0; + + foreach (var hash in hashes) + { + if (!sentInvHashes.ContainsKey(hash)) + invalidHashesCount++; + } + + if (invalidHashesCount < 3) + Sender.Tell(InvPayload.Create(InventoryType.TX, hashes)); + else + system.LocalNode.Tell(new LocalNode.MaliciousNode { actor = Sender }); + } + [MethodImpl(MethodImplOptions.AggressiveInlining)] private void DecrementGlobalTask(UInt256 hash) { @@ -243,20 +291,39 @@ private void OnTimer() if (TimeProvider.Current.UtcNow - kvp.Value > TaskTimeout) { session.IndexTasks.Remove(kvp.Key); - session.TimeoutTimes++; - AssignSyncTask(kvp.Key, session); + session.TimeoutRecord.Add(TimeProvider.Current.UtcNow); + if (session.TimeoutRecord.Count > 3) + OnTerminated(sessions.Where(p => p.Value == session).First().Key); + else + { + session.UpdateWeight(); + AssignSyncTask(kvp.Key, session); + } } } foreach (var task in session.InvTasks.ToArray()) { - if (DateTime.UtcNow - task.Value > TaskTimeout) + if (TimeProvider.Current.UtcNow - task.Value > TaskTimeout) { if (session.InvTasks.Remove(task.Key)) DecrementGlobalTask(task.Key); } } + + for (var i = session.TimeoutRecord.Count; i > 0; i--) + { + if (TimeProvider.Current.UtcNow - session.TimeoutRecord[i] > ReduceTimeoutRecord) + session.TimeoutRecord.RemoveAt(i); + } } + + foreach (var hash in sentInvHashes) + { + if (TimeProvider.Current.UtcNow - hash.Value > InvHashTimeout) + sentInvHashes.Remove(hash.Key); + } + RequestTasks(); } @@ -277,22 +344,27 @@ private void RequestTasks() SendPingMessage(); + int taskCounts = sessions.Values.Sum(p => p.IndexTasks.Count); + while (failedSyncTasks.Count() > 0) { - var failedTask = failedSyncTasks.First(); + if (taskCounts >= MaxSyncTasksCount) return; + var failedTask = failedSyncTasks.OrderBy(p => p).First(); if (failedTask <= Blockchain.Singleton.Height) { failedSyncTasks.Remove(failedTask); continue; } - if (!AssignSyncTask(failedTask)) return; + if (AssignSyncTask(failedTask)) + taskCounts++; + else + return; } - int taskCounts = sessions.Values.Sum(p => p.IndexTasks.Count); var highestBlockIndex = sessions.Values.Max(p => p.LastBlockIndex); for (; taskCounts < MaxSyncTasksCount; taskCounts++) { - if (lastTaskIndex >= highestBlockIndex) break; + if (lastTaskIndex >= highestBlockIndex || lastTaskIndex > Blockchain.Singleton.HeaderHeight + 2000) break; if (!AssignSyncTask(++lastTaskIndex)) break; } } diff --git a/src/neo/Network/P2P/TaskSession.cs b/src/neo/Network/P2P/TaskSession.cs index b8a78bd5f6..2ebe153903 100644 --- a/src/neo/Network/P2P/TaskSession.cs +++ b/src/neo/Network/P2P/TaskSession.cs @@ -10,12 +10,23 @@ internal class TaskSession { public readonly Dictionary InvTasks = new Dictionary(); public readonly Dictionary IndexTasks = new Dictionary(); + public List TimeoutRecord = new List(); public bool IsFullNode { get; } public uint LastBlockIndex { get; set; } - public uint TimeoutTimes = 0; - public uint InvalidBlockCount = 0; public DateTime ExpireTime = DateTime.MinValue; + public double RTT = 100.0; + public double Weight = 1000.0; + + public void UpdateRTT(double newRTT) + { + RTT = 0.9 * RTT + 0.1 * newRTT; + } + + public void UpdateWeight() + { + Weight = RTT * (1.0 / Math.Pow(2, TimeoutRecord.Count)) * (IndexTasks.Count); + } public TaskSession(VersionPayload version) { diff --git a/tests/neo.UnitTests/Network/P2P/UT_TaskSession.cs b/tests/neo.UnitTests/Network/P2P/UT_TaskSession.cs index 6ae403a946..b01c6d36de 100644 --- a/tests/neo.UnitTests/Network/P2P/UT_TaskSession.cs +++ b/tests/neo.UnitTests/Network/P2P/UT_TaskSession.cs @@ -21,8 +21,7 @@ public void CreateTest() Assert.AreEqual((uint)123, ses.LastBlockIndex); Assert.AreEqual(0, ses.IndexTasks.Count); Assert.AreEqual(0, ses.InvTasks.Count); - Assert.AreEqual((uint)0, ses.TimeoutTimes); - Assert.AreEqual((uint)0, ses.InvalidBlockCount); + Assert.AreEqual((int)0, ses.TimeoutRecord.Count); ses = new TaskSession(new VersionPayload() { Capabilities = new NodeCapability[0] }); @@ -30,8 +29,7 @@ public void CreateTest() Assert.AreEqual((uint)0, ses.LastBlockIndex); Assert.AreEqual(0, ses.IndexTasks.Count); Assert.AreEqual(0, ses.InvTasks.Count); - Assert.AreEqual((uint)0, ses.TimeoutTimes); - Assert.AreEqual((uint)0, ses.InvalidBlockCount); + Assert.AreEqual((int)0, ses.TimeoutRecord.Count); } } }