Skip to content

Commit

Permalink
Inventory message stream optimization (neo-project#1667)
Browse files Browse the repository at this point in the history
  • Loading branch information
Qiao-Jin authored and Tommo-L committed Jun 22, 2020
1 parent 9eaf7b4 commit 0aa75cf
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 30 deletions.
14 changes: 8 additions & 6 deletions src/neo/Consensus/ConsensusService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ internal class Timer { public uint Height; public byte ViewNumber; }
private readonly ConsensusContext context;
private readonly IActorRef localNode;
private readonly IActorRef taskManager;
private readonly IActorRef blockchain;
private ICancelable timer_token;
private DateTime block_received_time;
private bool started = false;
Expand All @@ -46,15 +47,16 @@ internal class Timer { public uint Height; public byte ViewNumber; }
/// </summary>
private bool isRecovering = false;

public ConsensusService(IActorRef localNode, IActorRef taskManager, IStore store, Wallet wallet)
: this(localNode, taskManager, new ConsensusContext(wallet, store))
public ConsensusService(IActorRef localNode, IActorRef taskManager, IActorRef blockchain, IStore store, Wallet wallet)
: this(localNode, taskManager, blockchain, new ConsensusContext(wallet, store))
{
}

internal ConsensusService(IActorRef localNode, IActorRef taskManager, ConsensusContext context)
internal ConsensusService(IActorRef localNode, IActorRef taskManager, IActorRef blockchain, ConsensusContext context)
{
this.localNode = localNode;
this.taskManager = taskManager;
this.blockchain = blockchain;
this.context = context;
Context.System.EventStream.Subscribe(Self, typeof(Blockchain.PersistCompleted));
Context.System.EventStream.Subscribe(Self, typeof(Blockchain.RelayResult));
Expand Down Expand Up @@ -124,7 +126,7 @@ private void CheckCommits()
{
Block block = context.CreateBlock();
Log($"relay block: height={block.Index} hash={block.Hash} tx={block.Transactions.Length}");
localNode.Tell(new LocalNode.Relay { Inventory = block });
blockchain.Tell(block);
}
}

Expand Down Expand Up @@ -602,9 +604,9 @@ protected override void PostStop()
base.PostStop();
}

public static Props Props(IActorRef localNode, IActorRef taskManager, IStore store, Wallet wallet)
public static Props Props(IActorRef localNode, IActorRef taskManager, IActorRef blockchain, IStore store, Wallet wallet)
{
return Akka.Actor.Props.Create(() => new ConsensusService(localNode, taskManager, store, wallet)).WithMailbox("consensus-service-mailbox");
return Akka.Actor.Props.Create(() => new ConsensusService(localNode, taskManager, blockchain, store, wallet)).WithMailbox("consensus-service-mailbox");
}

private void RequestChangeView(ChangeViewReason reason)
Expand Down
2 changes: 1 addition & 1 deletion src/neo/NeoSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ internal void ResumeNodeStartup()

public void StartConsensus(Wallet wallet, IStore consensus_store = null, bool ignoreRecoveryLogs = false)
{
Consensus = ActorSystem.ActorOf(ConsensusService.Props(this.LocalNode, this.TaskManager, consensus_store ?? store, wallet));
Consensus = ActorSystem.ActorOf(ConsensusService.Props(this.LocalNode, this.TaskManager, this.Blockchain, consensus_store ?? store, wallet));
Consensus.Tell(new ConsensusService.Start { IgnoreRecoveryLogs = ignoreRecoveryLogs }, Blockchain);
}

Expand Down
17 changes: 0 additions & 17 deletions src/neo/Network/P2P/LocalNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ namespace Neo.Network.P2P
{
public class LocalNode : Peer
{
public class Relay { public IInventory Inventory; }
internal class RelayDirectly { public IInventory Inventory; }
internal class SendDirectly { public IInventory Inventory; }

Expand Down Expand Up @@ -169,9 +168,6 @@ protected override void OnReceive(object message)
case Message msg:
BroadcastMessage(msg);
break;
case Relay relay:
OnRelay(relay.Inventory);
break;
case RelayDirectly relay:
OnRelayDirectly(relay.Inventory);
break;
Expand All @@ -181,19 +177,6 @@ protected override void OnReceive(object message)
}
}

/// <summary>
/// For Transaction type of IInventory, it will tell Transaction to the actor of Consensus.
/// Otherwise, tell the inventory to the actor of Blockchain.
/// There are, currently, three implementations of IInventory: TX, Block and ConsensusPayload.
/// </summary>
/// <param name="inventory">The inventory to be relayed.</param>
private void OnRelay(IInventory inventory)
{
if (inventory is Transaction transaction)
system.Consensus?.Tell(transaction);
system.Blockchain.Tell(inventory);
}

private void OnRelayDirectly(IInventory inventory)
{
var message = new RemoteNode.Relay { Inventory = inventory };
Expand Down
4 changes: 3 additions & 1 deletion src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,9 @@ private void OnHeadersMessageReceived(HeadersPayload payload)
private void OnInventoryReceived(IInventory inventory)
{
system.TaskManager.Tell(new TaskManager.TaskCompleted { Hash = inventory.Hash });
system.LocalNode.Tell(new LocalNode.Relay { Inventory = inventory });
if (inventory is Transaction transaction)
system.Consensus?.Tell(transaction);
system.Blockchain.Tell(inventory, ActorRefs.NoSender);
pendingKnownHashes.Remove(inventory.Hash);
knownHashes.Add(inventory.Hash);
}
Expand Down
8 changes: 3 additions & 5 deletions tests/neo.UnitTests/Consensus/UT_Consensus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm
timestampVal.Should().Be(defaultTimestamp);
TestProbe subscriber = CreateTestProbe();
TestActorRef<ConsensusService> actorConsensus = ActorOfAsTestActorRef<ConsensusService>(
Akka.Actor.Props.Create(() => (ConsensusService)Activator.CreateInstance(typeof(ConsensusService), BindingFlags.Instance | BindingFlags.NonPublic, null, new object[] { subscriber, subscriber, mockContext.Object }, null))
Akka.Actor.Props.Create(() => (ConsensusService)Activator.CreateInstance(typeof(ConsensusService), BindingFlags.Instance | BindingFlags.NonPublic, null, new object[] { subscriber, subscriber, subscriber, mockContext.Object }, null))
);

var testPersistCompleted = new Blockchain.PersistCompleted
Expand Down Expand Up @@ -361,10 +361,8 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm
Console.WriteLine("\nCN4 simulation time - Final needed signatures");
TellConsensusPayload(actorConsensus, GetCommitPayloadModifiedAndSignedCopy(commitPayload, 3, kp_array[3], mockContext.Object.Block.GetHashData()));

Console.WriteLine("\nWait for subscriber Local.Node Relay");
var onBlockRelay = subscriber.ExpectMsg<LocalNode.Relay>();
Console.WriteLine("\nAsserting time was Block...");
var utBlock = (Block)onBlockRelay.Inventory;
Console.WriteLine("\nWait for subscriber Block");
var utBlock = subscriber.ExpectMsg<Block>();
Console.WriteLine("\nAsserting CountCommitted is 5...");
mockContext.Object.CountCommitted.Should().Be(5);

Expand Down

0 comments on commit 0aa75cf

Please sign in to comment.