diff --git a/src/neo/Consensus/ConsensusService.cs b/src/neo/Consensus/ConsensusService.cs index 788719ae14..5b6661dc08 100644 --- a/src/neo/Consensus/ConsensusService.cs +++ b/src/neo/Consensus/ConsensusService.cs @@ -57,6 +57,7 @@ internal ConsensusService(IActorRef localNode, IActorRef taskManager, ConsensusC this.taskManager = taskManager; this.context = context; Context.System.EventStream.Subscribe(Self, typeof(Blockchain.PersistCompleted)); + Context.System.EventStream.Subscribe(Self, typeof(Blockchain.RelayResult)); } private bool AddTransaction(Transaction tx, bool verify) @@ -506,15 +507,16 @@ protected override void OnReceive(object message) case Timer timer: OnTimer(timer); break; - case ConsensusPayload payload: - OnConsensusPayload(payload); - break; case Transaction transaction: OnTransaction(transaction); break; case Blockchain.PersistCompleted completed: OnPersistCompleted(completed.Block); break; + case Blockchain.RelayResult rr: + if (rr.Result == VerifyResult.Succeed && rr.Inventory is ConsensusPayload payload) + OnConsensusPayload(payload); + break; } } } diff --git a/src/neo/Ledger/Blockchain.cs b/src/neo/Ledger/Blockchain.cs index a0153063d1..c5b80b8eec 100644 --- a/src/neo/Ledger/Blockchain.cs +++ b/src/neo/Ledger/Blockchain.cs @@ -64,7 +64,7 @@ public class RelayResult { public IInventory Inventory; public VerifyResult Resu private uint stored_header_count = 0; private readonly Dictionary block_cache = new Dictionary(); private readonly Dictionary> block_cache_unverified = new Dictionary>(); - internal readonly RelayCache ConsensusRelayCache = new RelayCache(100); + internal readonly RelayCache RelayCache = new RelayCache(100); private SnapshotView currentSnapshot; public IStore Store { get; } @@ -299,8 +299,7 @@ private void OnInventory(IInventory inventory, bool relay = true) { Block block => OnNewBlock(block), Transaction transaction => OnNewTransaction(transaction), - ConsensusPayload payload => OnNewConsensus(payload), - _ => VerifyResult.Unknown + _ => OnNewInventory(inventory) } }; if (relay && rr.Result == VerifyResult.Succeed) @@ -388,14 +387,6 @@ private VerifyResult OnNewBlock(Block block) return VerifyResult.Succeed; } - private VerifyResult OnNewConsensus(ConsensusPayload payload) - { - if (!payload.Verify(currentSnapshot)) return VerifyResult.Invalid; - system.Consensus?.Tell(payload); - ConsensusRelayCache.Add(payload); - return VerifyResult.Succeed; - } - private void OnNewHeaders(Header[] headers) { using (SnapshotView snapshot = GetSnapshot()) @@ -417,6 +408,13 @@ private void OnNewHeaders(Header[] headers) system.TaskManager.Tell(new TaskManager.HeaderTaskCompleted(), Sender); } + private VerifyResult OnNewInventory(IInventory inventory) + { + if (!inventory.Verify(currentSnapshot)) return VerifyResult.Invalid; + RelayCache.Add(inventory); + return VerifyResult.Succeed; + } + private VerifyResult OnNewTransaction(Transaction transaction) { if (ContainsTransaction(transaction.Hash)) return VerifyResult.AlreadyExists; diff --git a/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs b/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs index 2146501e85..432eba0b35 100644 --- a/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs +++ b/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs @@ -248,9 +248,9 @@ private void OnGetDataMessageReceived(InvPayload payload) notFound.Add(hash); } break; - case InventoryType.Consensus: - if (Blockchain.Singleton.ConsensusRelayCache.TryGet(hash, out IInventory inventoryConsensus)) - EnqueueMessage(Message.Create(MessageCommand.Consensus, inventoryConsensus)); + default: + if (Blockchain.Singleton.RelayCache.TryGet(hash, out IInventory inventory)) + EnqueueMessage(Message.Create((MessageCommand)payload.Type, inventory)); break; } } diff --git a/tests/neo.UnitTests/Consensus/UT_Consensus.cs b/tests/neo.UnitTests/Consensus/UT_Consensus.cs index 1baf64acb8..1e8c3f079f 100644 --- a/tests/neo.UnitTests/Consensus/UT_Consensus.cs +++ b/tests/neo.UnitTests/Consensus/UT_Consensus.cs @@ -1,3 +1,4 @@ +using Akka.Actor; using Akka.TestKit; using Akka.TestKit.Xunit2; using FluentAssertions; @@ -186,7 +187,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm mockContext.Object.PreparationPayloads[prepReq.ValidatorIndex] = null; Console.WriteLine("will tell prepare request!"); - actorConsensus.Tell(prepReq); + TellConsensusPayload(actorConsensus, prepReq); Console.WriteLine("Waiting for something related to the PrepRequest...\nNothing happens...Recovery will come due to failed nodes"); var backupOnRecoveryDueToFailedNodesII = subscriber.ExpectMsg(); var recoveryPayloadII = (ConsensusPayload)backupOnRecoveryDueToFailedNodesII.Inventory; @@ -201,7 +202,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm // cleaning old try with Self ValidatorIndex mockContext.Object.PreparationPayloads[mockContext.Object.MyIndex] = null; - actorConsensus.Tell(prepReq); + TellConsensusPayload(actorConsensus, prepReq); var OnPrepResponse = subscriber.ExpectMsg(); var prepResponsePayload = (ConsensusPayload)OnPrepResponse.Inventory; PrepareResponse prm = (PrepareResponse)prepResponsePayload.ConsensusMessage; @@ -212,7 +213,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm mockContext.Object.CountFailed.Should().Be(5); // Simulating CN 3 - actorConsensus.Tell(GetPayloadAndModifyValidator(prepResponsePayload, 2)); + TellConsensusPayload(actorConsensus, GetPayloadAndModifyValidator(prepResponsePayload, 2)); //Waiting for RecoveryRequest for a more deterministic UT backupOnRecoveryDueToFailedNodes = subscriber.ExpectMsg(); recoveryPayload = (ConsensusPayload)backupOnRecoveryDueToFailedNodes.Inventory; @@ -225,7 +226,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm mockContext.Object.CountFailed.Should().Be(4); // Simulating CN 5 - actorConsensus.Tell(GetPayloadAndModifyValidator(prepResponsePayload, 4)); + TellConsensusPayload(actorConsensus, GetPayloadAndModifyValidator(prepResponsePayload, 4)); //Waiting for RecoveryRequest for a more deterministic UT backupOnRecoveryDueToFailedNodes = subscriber.ExpectMsg(); recoveryPayload = (ConsensusPayload)backupOnRecoveryDueToFailedNodes.Inventory; @@ -238,7 +239,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm mockContext.Object.CountFailed.Should().Be(3); // Simulating CN 4 - actorConsensus.Tell(GetPayloadAndModifyValidator(prepResponsePayload, 3)); + TellConsensusPayload(actorConsensus, GetPayloadAndModifyValidator(prepResponsePayload, 3)); var onCommitPayload = subscriber.ExpectMsg(); var commitPayload = (ConsensusPayload)onCommitPayload.Inventory; Commit cm = (Commit)commitPayload.ConsensusMessage; @@ -300,7 +301,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm Console.WriteLine("\n=========================="); Console.WriteLine("\nCN7 simulation time"); - actorConsensus.Tell(cmPayloadTemp); + TellConsensusPayload(actorConsensus, cmPayloadTemp); var tempPayloadToBlockAndWait = subscriber.ExpectMsg(); var rmPayload = (ConsensusPayload)tempPayloadToBlockAndWait.Inventory; RecoveryMessage rmm = (RecoveryMessage)rmPayload.ConsensusMessage; @@ -310,7 +311,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm mockContext.Object.CountFailed.Should().Be(1); Console.WriteLine("\nCN6 simulation time"); - actorConsensus.Tell(GetCommitPayloadModifiedAndSignedCopy(commitPayload, 5, kp_array[5], updatedBlockHashData)); + TellConsensusPayload(actorConsensus, GetCommitPayloadModifiedAndSignedCopy(commitPayload, 5, kp_array[5], updatedBlockHashData)); tempPayloadToBlockAndWait = subscriber.ExpectMsg(); rmPayload = (ConsensusPayload)tempPayloadToBlockAndWait.Inventory; rmm = (RecoveryMessage)rmPayload.ConsensusMessage; @@ -320,7 +321,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm mockContext.Object.CountFailed.Should().Be(0); Console.WriteLine("\nCN5 simulation time"); - actorConsensus.Tell(GetCommitPayloadModifiedAndSignedCopy(commitPayload, 4, kp_array[4], updatedBlockHashData)); + TellConsensusPayload(actorConsensus, GetCommitPayloadModifiedAndSignedCopy(commitPayload, 4, kp_array[4], updatedBlockHashData)); tempPayloadToBlockAndWait = subscriber.ExpectMsg(); Console.WriteLine("\nAsserting CountCommitted is 4..."); mockContext.Object.CountCommitted.Should().Be(4); @@ -329,7 +330,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm // Testing commit with wrong signature not valid // It will be invalid signature because we did not change ECPoint Console.WriteLine("\nCN4 simulation time. Wrong signature, KeyPair is not known"); - actorConsensus.Tell(GetPayloadAndModifyValidator(commitPayload, 3)); + TellConsensusPayload(actorConsensus, GetPayloadAndModifyValidator(commitPayload, 3)); Console.WriteLine("\nWaiting for recovery due to failed nodes... "); var backupOnRecoveryMessageAfterCommit = subscriber.ExpectMsg(); rmPayload = (ConsensusPayload)backupOnRecoveryMessageAfterCommit.Inventory; @@ -358,7 +359,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm Console.WriteLine($"\nNew Hash is {mockContext.Object.Block.GetHashData().ToScriptHash()}"); Console.WriteLine("\nCN4 simulation time - Final needed signatures"); - actorConsensus.Tell(GetCommitPayloadModifiedAndSignedCopy(commitPayload, 3, kp_array[3], mockContext.Object.Block.GetHashData())); + TellConsensusPayload(actorConsensus, GetCommitPayloadModifiedAndSignedCopy(commitPayload, 3, kp_array[3], mockContext.Object.Block.GetHashData())); Console.WriteLine("\nWait for subscriber Local.Node Relay"); var onBlockRelay = subscriber.ExpectMsg(); @@ -388,7 +389,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm mockContext.Object.LastSeenMessage = new int[] { -1, -1, -1, -1, -1, -1, -1 }; mockContext.Object.CountFailed.Should().Be(7); - actorConsensus.Tell(rmPayload); + TellConsensusPayload(actorConsensus, rmPayload); Console.WriteLine("\nWaiting for RecoveryRequest before final asserts..."); var onRecoveryRequestAfterRecovery = subscriber.ExpectMsg(); @@ -921,5 +922,14 @@ private StorageKey CreateStorageKeyForNativeNeo(byte prefix) storageKey.Key[0] = prefix; return storageKey; } + + private void TellConsensusPayload(IActorRef actor, ConsensusPayload payload) + { + actor.Tell(new Blockchain.RelayResult + { + Inventory = payload, + Result = VerifyResult.Succeed + }); + } } }