Skip to content

Commit

Permalink
Subscribe to RelayResult messages in ConsensusService (#1647)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikzhang authored May 15, 2020
1 parent b90aedf commit b8fcb94
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 28 deletions.
8 changes: 5 additions & 3 deletions src/neo/Consensus/ConsensusService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ internal ConsensusService(IActorRef localNode, IActorRef taskManager, ConsensusC
this.taskManager = taskManager;
this.context = context;
Context.System.EventStream.Subscribe(Self, typeof(Blockchain.PersistCompleted));
Context.System.EventStream.Subscribe(Self, typeof(Blockchain.RelayResult));
}

private bool AddTransaction(Transaction tx, bool verify)
Expand Down Expand Up @@ -506,15 +507,16 @@ protected override void OnReceive(object message)
case Timer timer:
OnTimer(timer);
break;
case ConsensusPayload payload:
OnConsensusPayload(payload);
break;
case Transaction transaction:
OnTransaction(transaction);
break;
case Blockchain.PersistCompleted completed:
OnPersistCompleted(completed.Block);
break;
case Blockchain.RelayResult rr:
if (rr.Result == VerifyResult.Succeed && rr.Inventory is ConsensusPayload payload)
OnConsensusPayload(payload);
break;
}
}
}
Expand Down
20 changes: 9 additions & 11 deletions src/neo/Ledger/Blockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class RelayResult { public IInventory Inventory; public VerifyResult Resu
private uint stored_header_count = 0;
private readonly Dictionary<UInt256, Block> block_cache = new Dictionary<UInt256, Block>();
private readonly Dictionary<uint, LinkedList<Block>> block_cache_unverified = new Dictionary<uint, LinkedList<Block>>();
internal readonly RelayCache ConsensusRelayCache = new RelayCache(100);
internal readonly RelayCache RelayCache = new RelayCache(100);
private SnapshotView currentSnapshot;

public IStore Store { get; }
Expand Down Expand Up @@ -299,8 +299,7 @@ private void OnInventory(IInventory inventory, bool relay = true)
{
Block block => OnNewBlock(block),
Transaction transaction => OnNewTransaction(transaction),
ConsensusPayload payload => OnNewConsensus(payload),
_ => VerifyResult.Unknown
_ => OnNewInventory(inventory)
}
};
if (relay && rr.Result == VerifyResult.Succeed)
Expand Down Expand Up @@ -388,14 +387,6 @@ private VerifyResult OnNewBlock(Block block)
return VerifyResult.Succeed;
}

private VerifyResult OnNewConsensus(ConsensusPayload payload)
{
if (!payload.Verify(currentSnapshot)) return VerifyResult.Invalid;
system.Consensus?.Tell(payload);
ConsensusRelayCache.Add(payload);
return VerifyResult.Succeed;
}

private void OnNewHeaders(Header[] headers)
{
using (SnapshotView snapshot = GetSnapshot())
Expand All @@ -417,6 +408,13 @@ private void OnNewHeaders(Header[] headers)
system.TaskManager.Tell(new TaskManager.HeaderTaskCompleted(), Sender);
}

private VerifyResult OnNewInventory(IInventory inventory)
{
if (!inventory.Verify(currentSnapshot)) return VerifyResult.Invalid;
RelayCache.Add(inventory);
return VerifyResult.Succeed;
}

private VerifyResult OnNewTransaction(Transaction transaction)
{
if (ContainsTransaction(transaction.Hash)) return VerifyResult.AlreadyExists;
Expand Down
6 changes: 3 additions & 3 deletions src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,9 @@ private void OnGetDataMessageReceived(InvPayload payload)
notFound.Add(hash);
}
break;
case InventoryType.Consensus:
if (Blockchain.Singleton.ConsensusRelayCache.TryGet(hash, out IInventory inventoryConsensus))
EnqueueMessage(Message.Create(MessageCommand.Consensus, inventoryConsensus));
default:
if (Blockchain.Singleton.RelayCache.TryGet(hash, out IInventory inventory))
EnqueueMessage(Message.Create((MessageCommand)payload.Type, inventory));
break;
}
}
Expand Down
32 changes: 21 additions & 11 deletions tests/neo.UnitTests/Consensus/UT_Consensus.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Akka.Actor;
using Akka.TestKit;
using Akka.TestKit.Xunit2;
using FluentAssertions;
Expand Down Expand Up @@ -186,7 +187,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm
mockContext.Object.PreparationPayloads[prepReq.ValidatorIndex] = null;

Console.WriteLine("will tell prepare request!");
actorConsensus.Tell(prepReq);
TellConsensusPayload(actorConsensus, prepReq);
Console.WriteLine("Waiting for something related to the PrepRequest...\nNothing happens...Recovery will come due to failed nodes");
var backupOnRecoveryDueToFailedNodesII = subscriber.ExpectMsg<LocalNode.SendDirectly>();
var recoveryPayloadII = (ConsensusPayload)backupOnRecoveryDueToFailedNodesII.Inventory;
Expand All @@ -201,7 +202,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm
// cleaning old try with Self ValidatorIndex
mockContext.Object.PreparationPayloads[mockContext.Object.MyIndex] = null;

actorConsensus.Tell(prepReq);
TellConsensusPayload(actorConsensus, prepReq);
var OnPrepResponse = subscriber.ExpectMsg<LocalNode.SendDirectly>();
var prepResponsePayload = (ConsensusPayload)OnPrepResponse.Inventory;
PrepareResponse prm = (PrepareResponse)prepResponsePayload.ConsensusMessage;
Expand All @@ -212,7 +213,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm
mockContext.Object.CountFailed.Should().Be(5);

// Simulating CN 3
actorConsensus.Tell(GetPayloadAndModifyValidator(prepResponsePayload, 2));
TellConsensusPayload(actorConsensus, GetPayloadAndModifyValidator(prepResponsePayload, 2));
//Waiting for RecoveryRequest for a more deterministic UT
backupOnRecoveryDueToFailedNodes = subscriber.ExpectMsg<LocalNode.SendDirectly>();
recoveryPayload = (ConsensusPayload)backupOnRecoveryDueToFailedNodes.Inventory;
Expand All @@ -225,7 +226,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm
mockContext.Object.CountFailed.Should().Be(4);

// Simulating CN 5
actorConsensus.Tell(GetPayloadAndModifyValidator(prepResponsePayload, 4));
TellConsensusPayload(actorConsensus, GetPayloadAndModifyValidator(prepResponsePayload, 4));
//Waiting for RecoveryRequest for a more deterministic UT
backupOnRecoveryDueToFailedNodes = subscriber.ExpectMsg<LocalNode.SendDirectly>();
recoveryPayload = (ConsensusPayload)backupOnRecoveryDueToFailedNodes.Inventory;
Expand All @@ -238,7 +239,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm
mockContext.Object.CountFailed.Should().Be(3);

// Simulating CN 4
actorConsensus.Tell(GetPayloadAndModifyValidator(prepResponsePayload, 3));
TellConsensusPayload(actorConsensus, GetPayloadAndModifyValidator(prepResponsePayload, 3));
var onCommitPayload = subscriber.ExpectMsg<LocalNode.SendDirectly>();
var commitPayload = (ConsensusPayload)onCommitPayload.Inventory;
Commit cm = (Commit)commitPayload.ConsensusMessage;
Expand Down Expand Up @@ -300,7 +301,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm

Console.WriteLine("\n==========================");
Console.WriteLine("\nCN7 simulation time");
actorConsensus.Tell(cmPayloadTemp);
TellConsensusPayload(actorConsensus, cmPayloadTemp);
var tempPayloadToBlockAndWait = subscriber.ExpectMsg<LocalNode.SendDirectly>();
var rmPayload = (ConsensusPayload)tempPayloadToBlockAndWait.Inventory;
RecoveryMessage rmm = (RecoveryMessage)rmPayload.ConsensusMessage;
Expand All @@ -310,7 +311,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm
mockContext.Object.CountFailed.Should().Be(1);

Console.WriteLine("\nCN6 simulation time");
actorConsensus.Tell(GetCommitPayloadModifiedAndSignedCopy(commitPayload, 5, kp_array[5], updatedBlockHashData));
TellConsensusPayload(actorConsensus, GetCommitPayloadModifiedAndSignedCopy(commitPayload, 5, kp_array[5], updatedBlockHashData));
tempPayloadToBlockAndWait = subscriber.ExpectMsg<LocalNode.SendDirectly>();
rmPayload = (ConsensusPayload)tempPayloadToBlockAndWait.Inventory;
rmm = (RecoveryMessage)rmPayload.ConsensusMessage;
Expand All @@ -320,7 +321,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm
mockContext.Object.CountFailed.Should().Be(0);

Console.WriteLine("\nCN5 simulation time");
actorConsensus.Tell(GetCommitPayloadModifiedAndSignedCopy(commitPayload, 4, kp_array[4], updatedBlockHashData));
TellConsensusPayload(actorConsensus, GetCommitPayloadModifiedAndSignedCopy(commitPayload, 4, kp_array[4], updatedBlockHashData));
tempPayloadToBlockAndWait = subscriber.ExpectMsg<LocalNode.SendDirectly>();
Console.WriteLine("\nAsserting CountCommitted is 4...");
mockContext.Object.CountCommitted.Should().Be(4);
Expand All @@ -329,7 +330,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm
// Testing commit with wrong signature not valid
// It will be invalid signature because we did not change ECPoint
Console.WriteLine("\nCN4 simulation time. Wrong signature, KeyPair is not known");
actorConsensus.Tell(GetPayloadAndModifyValidator(commitPayload, 3));
TellConsensusPayload(actorConsensus, GetPayloadAndModifyValidator(commitPayload, 3));
Console.WriteLine("\nWaiting for recovery due to failed nodes... ");
var backupOnRecoveryMessageAfterCommit = subscriber.ExpectMsg<LocalNode.SendDirectly>();
rmPayload = (ConsensusPayload)backupOnRecoveryMessageAfterCommit.Inventory;
Expand Down Expand Up @@ -358,7 +359,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm
Console.WriteLine($"\nNew Hash is {mockContext.Object.Block.GetHashData().ToScriptHash()}");

Console.WriteLine("\nCN4 simulation time - Final needed signatures");
actorConsensus.Tell(GetCommitPayloadModifiedAndSignedCopy(commitPayload, 3, kp_array[3], mockContext.Object.Block.GetHashData()));
TellConsensusPayload(actorConsensus, GetCommitPayloadModifiedAndSignedCopy(commitPayload, 3, kp_array[3], mockContext.Object.Block.GetHashData()));

Console.WriteLine("\nWait for subscriber Local.Node Relay");
var onBlockRelay = subscriber.ExpectMsg<LocalNode.Relay>();
Expand Down Expand Up @@ -388,7 +389,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm
mockContext.Object.LastSeenMessage = new int[] { -1, -1, -1, -1, -1, -1, -1 };
mockContext.Object.CountFailed.Should().Be(7);

actorConsensus.Tell(rmPayload);
TellConsensusPayload(actorConsensus, rmPayload);

Console.WriteLine("\nWaiting for RecoveryRequest before final asserts...");
var onRecoveryRequestAfterRecovery = subscriber.ExpectMsg<LocalNode.SendDirectly>();
Expand Down Expand Up @@ -921,5 +922,14 @@ private StorageKey CreateStorageKeyForNativeNeo(byte prefix)
storageKey.Key[0] = prefix;
return storageKey;
}

private void TellConsensusPayload(IActorRef actor, ConsensusPayload payload)
{
actor.Tell(new Blockchain.RelayResult
{
Inventory = payload,
Result = VerifyResult.Succeed
});
}
}
}

0 comments on commit b8fcb94

Please sign in to comment.