From c6f5969afbad78759068b2ebe3ee7ed434700a51 Mon Sep 17 00:00:00 2001 From: Jin Qiao Date: Tue, 26 May 2020 14:18:50 +0800 Subject: [PATCH 1/3] Inventory messagestream optimization --- src/neo/Consensus/ConsensusService.cs | 14 ++++++++------ src/neo/Ledger/Blockchain.cs | 1 - src/neo/NeoSystem.cs | 2 +- src/neo/Network/P2P/LocalNode.cs | 17 ----------------- .../Network/P2P/RemoteNode.ProtocolHandler.cs | 4 +++- tests/neo.UnitTests/Consensus/UT_Consensus.cs | 8 +++----- tests/neo.UnitTests/Ledger/UT_Blockchain.cs | 3 --- 7 files changed, 15 insertions(+), 34 deletions(-) diff --git a/src/neo/Consensus/ConsensusService.cs b/src/neo/Consensus/ConsensusService.cs index 5b6661dc08..4199bb13a0 100644 --- a/src/neo/Consensus/ConsensusService.cs +++ b/src/neo/Consensus/ConsensusService.cs @@ -26,6 +26,7 @@ internal class Timer { public uint Height; public byte ViewNumber; } private readonly ConsensusContext context; private readonly IActorRef localNode; private readonly IActorRef taskManager; + private readonly IActorRef blockchain; private ICancelable timer_token; private DateTime block_received_time; private bool started = false; @@ -46,15 +47,16 @@ internal class Timer { public uint Height; public byte ViewNumber; } /// private bool isRecovering = false; - public ConsensusService(IActorRef localNode, IActorRef taskManager, IStore store, Wallet wallet) - : this(localNode, taskManager, new ConsensusContext(wallet, store)) + public ConsensusService(IActorRef localNode, IActorRef taskManager, IActorRef blockchain, IStore store, Wallet wallet) + : this(localNode, taskManager, blockchain, new ConsensusContext(wallet, store)) { } - internal ConsensusService(IActorRef localNode, IActorRef taskManager, ConsensusContext context) + internal ConsensusService(IActorRef localNode, IActorRef taskManager, IActorRef blockchain, ConsensusContext context) { this.localNode = localNode; this.taskManager = taskManager; + this.blockchain = blockchain; this.context = context; Context.System.EventStream.Subscribe(Self, typeof(Blockchain.PersistCompleted)); Context.System.EventStream.Subscribe(Self, typeof(Blockchain.RelayResult)); @@ -124,7 +126,7 @@ private void CheckCommits() { Block block = context.CreateBlock(); Log($"relay block: height={block.Index} hash={block.Hash} tx={block.Transactions.Length}"); - localNode.Tell(new LocalNode.Relay { Inventory = block }); + blockchain.Tell(block); } } @@ -602,9 +604,9 @@ protected override void PostStop() base.PostStop(); } - public static Props Props(IActorRef localNode, IActorRef taskManager, IStore store, Wallet wallet) + public static Props Props(IActorRef localNode, IActorRef taskManager, IActorRef blockchain, IStore store, Wallet wallet) { - return Akka.Actor.Props.Create(() => new ConsensusService(localNode, taskManager, store, wallet)).WithMailbox("consensus-service-mailbox"); + return Akka.Actor.Props.Create(() => new ConsensusService(localNode, taskManager, blockchain, store, wallet)).WithMailbox("consensus-service-mailbox"); } private void RequestChangeView(ChangeViewReason reason) diff --git a/src/neo/Ledger/Blockchain.cs b/src/neo/Ledger/Blockchain.cs index 55b7cf85aa..2b2dcf8ab9 100644 --- a/src/neo/Ledger/Blockchain.cs +++ b/src/neo/Ledger/Blockchain.cs @@ -304,7 +304,6 @@ private void OnInventory(IInventory inventory, bool relay = true) }; if (relay && rr.Result == VerifyResult.Succeed) system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = inventory }); - Sender.Tell(rr); Context.System.EventStream.Publish(rr); } diff --git a/src/neo/NeoSystem.cs b/src/neo/NeoSystem.cs index d666a5d1a0..88cc93ebcd 100644 --- a/src/neo/NeoSystem.cs +++ b/src/neo/NeoSystem.cs @@ -81,7 +81,7 @@ internal void ResumeNodeStartup() public void StartConsensus(Wallet wallet, IStore consensus_store = null, bool ignoreRecoveryLogs = false) { - Consensus = ActorSystem.ActorOf(ConsensusService.Props(this.LocalNode, this.TaskManager, consensus_store ?? store, wallet)); + Consensus = ActorSystem.ActorOf(ConsensusService.Props(this.LocalNode, this.TaskManager, this.Blockchain, consensus_store ?? store, wallet)); Consensus.Tell(new ConsensusService.Start { IgnoreRecoveryLogs = ignoreRecoveryLogs }, Blockchain); } diff --git a/src/neo/Network/P2P/LocalNode.cs b/src/neo/Network/P2P/LocalNode.cs index f7b9093134..c1e7046725 100644 --- a/src/neo/Network/P2P/LocalNode.cs +++ b/src/neo/Network/P2P/LocalNode.cs @@ -15,7 +15,6 @@ namespace Neo.Network.P2P { public class LocalNode : Peer { - public class Relay { public IInventory Inventory; } internal class RelayDirectly { public IInventory Inventory; } internal class SendDirectly { public IInventory Inventory; } @@ -169,9 +168,6 @@ protected override void OnReceive(object message) case Message msg: BroadcastMessage(msg); break; - case Relay relay: - OnRelay(relay.Inventory); - break; case RelayDirectly relay: OnRelayDirectly(relay.Inventory); break; @@ -181,19 +177,6 @@ protected override void OnReceive(object message) } } - /// - /// For Transaction type of IInventory, it will tell Transaction to the actor of Consensus. - /// Otherwise, tell the inventory to the actor of Blockchain. - /// There are, currently, three implementations of IInventory: TX, Block and ConsensusPayload. - /// - /// The inventory to be relayed. - private void OnRelay(IInventory inventory) - { - if (inventory is Transaction transaction) - system.Consensus?.Tell(transaction); - system.Blockchain.Tell(inventory); - } - private void OnRelayDirectly(IInventory inventory) { var message = new RemoteNode.Relay { Inventory = inventory }; diff --git a/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs b/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs index 432eba0b35..d3147193f6 100644 --- a/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs +++ b/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs @@ -298,7 +298,9 @@ private void OnHeadersMessageReceived(HeadersPayload payload) private void OnInventoryReceived(IInventory inventory) { system.TaskManager.Tell(new TaskManager.TaskCompleted { Hash = inventory.Hash }); - system.LocalNode.Tell(new LocalNode.Relay { Inventory = inventory }); + if (inventory is Transaction transaction) + system.Consensus?.Tell(transaction); + system.Blockchain.Tell(inventory); pendingKnownHashes.Remove(inventory.Hash); knownHashes.Add(inventory.Hash); } diff --git a/tests/neo.UnitTests/Consensus/UT_Consensus.cs b/tests/neo.UnitTests/Consensus/UT_Consensus.cs index 1e8c3f079f..ee26c3ca85 100644 --- a/tests/neo.UnitTests/Consensus/UT_Consensus.cs +++ b/tests/neo.UnitTests/Consensus/UT_Consensus.cs @@ -100,7 +100,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm timestampVal.Should().Be(defaultTimestamp); TestProbe subscriber = CreateTestProbe(); TestActorRef actorConsensus = ActorOfAsTestActorRef( - Akka.Actor.Props.Create(() => (ConsensusService)Activator.CreateInstance(typeof(ConsensusService), BindingFlags.Instance | BindingFlags.NonPublic, null, new object[] { subscriber, subscriber, mockContext.Object }, null)) + Akka.Actor.Props.Create(() => (ConsensusService)Activator.CreateInstance(typeof(ConsensusService), BindingFlags.Instance | BindingFlags.NonPublic, null, new object[] { subscriber, subscriber, subscriber, mockContext.Object }, null)) ); var testPersistCompleted = new Blockchain.PersistCompleted @@ -361,10 +361,8 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm Console.WriteLine("\nCN4 simulation time - Final needed signatures"); TellConsensusPayload(actorConsensus, GetCommitPayloadModifiedAndSignedCopy(commitPayload, 3, kp_array[3], mockContext.Object.Block.GetHashData())); - Console.WriteLine("\nWait for subscriber Local.Node Relay"); - var onBlockRelay = subscriber.ExpectMsg(); - Console.WriteLine("\nAsserting time was Block..."); - var utBlock = (Block)onBlockRelay.Inventory; + Console.WriteLine("\nWait for subscriber Block"); + var utBlock = subscriber.ExpectMsg(); Console.WriteLine("\nAsserting CountCommitted is 5..."); mockContext.Object.CountCommitted.Should().Be(5); diff --git a/tests/neo.UnitTests/Ledger/UT_Blockchain.cs b/tests/neo.UnitTests/Ledger/UT_Blockchain.cs index c200bec7c8..0636e2ccae 100644 --- a/tests/neo.UnitTests/Ledger/UT_Blockchain.cs +++ b/tests/neo.UnitTests/Ledger/UT_Blockchain.cs @@ -131,10 +131,7 @@ public void TestValidTransaction() var tx = CreateValidTx(walletA, acc.ScriptHash, 0); senderProbe.Send(system.Blockchain, tx); - senderProbe.ExpectMsg(p => p.Result == VerifyResult.Succeed); - senderProbe.Send(system.Blockchain, tx); - senderProbe.ExpectMsg(p => p.Result == VerifyResult.AlreadyExists); } } From cb4a0a6145bc1b6a580c4f1b1630deeb1883056b Mon Sep 17 00:00:00 2001 From: Jin Qiao Date: Tue, 26 May 2020 17:18:43 +0800 Subject: [PATCH 2/3] Restore sender message --- src/neo/Ledger/Blockchain.cs | 1 + src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs | 2 +- tests/neo.UnitTests/Ledger/UT_Blockchain.cs | 3 +++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/neo/Ledger/Blockchain.cs b/src/neo/Ledger/Blockchain.cs index 2b2dcf8ab9..0896ccacce 100644 --- a/src/neo/Ledger/Blockchain.cs +++ b/src/neo/Ledger/Blockchain.cs @@ -304,6 +304,7 @@ private void OnInventory(IInventory inventory, bool relay = true) }; if (relay && rr.Result == VerifyResult.Succeed) system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = inventory }); + if (!Sender.IsNobody()) Sender.Tell(rr); Context.System.EventStream.Publish(rr); } diff --git a/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs b/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs index d3147193f6..bdf235bdc0 100644 --- a/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs +++ b/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs @@ -300,7 +300,7 @@ private void OnInventoryReceived(IInventory inventory) system.TaskManager.Tell(new TaskManager.TaskCompleted { Hash = inventory.Hash }); if (inventory is Transaction transaction) system.Consensus?.Tell(transaction); - system.Blockchain.Tell(inventory); + system.Blockchain.Tell(inventory, ActorRefs.NoSender); pendingKnownHashes.Remove(inventory.Hash); knownHashes.Add(inventory.Hash); } diff --git a/tests/neo.UnitTests/Ledger/UT_Blockchain.cs b/tests/neo.UnitTests/Ledger/UT_Blockchain.cs index 0636e2ccae..c200bec7c8 100644 --- a/tests/neo.UnitTests/Ledger/UT_Blockchain.cs +++ b/tests/neo.UnitTests/Ledger/UT_Blockchain.cs @@ -131,7 +131,10 @@ public void TestValidTransaction() var tx = CreateValidTx(walletA, acc.ScriptHash, 0); senderProbe.Send(system.Blockchain, tx); + senderProbe.ExpectMsg(p => p.Result == VerifyResult.Succeed); + senderProbe.Send(system.Blockchain, tx); + senderProbe.ExpectMsg(p => p.Result == VerifyResult.AlreadyExists); } } From cee7feb6f66867c881440a146df552addb5d3992 Mon Sep 17 00:00:00 2001 From: erikzhang Date: Tue, 26 May 2020 20:37:09 +0800 Subject: [PATCH 3/3] Remove redundant if. --- src/neo/Ledger/Blockchain.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/neo/Ledger/Blockchain.cs b/src/neo/Ledger/Blockchain.cs index 08b0c6bf10..811db655a6 100644 --- a/src/neo/Ledger/Blockchain.cs +++ b/src/neo/Ledger/Blockchain.cs @@ -302,7 +302,7 @@ private void OnInventory(IInventory inventory, bool relay = true) }; if (relay && rr.Result == VerifyResult.Succeed) system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = inventory }); - if (!Sender.IsNobody()) Sender.Tell(rr); + Sender.Tell(rr); Context.System.EventStream.Publish(rr); }