Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TaskManager optimization #2615

Merged
merged 10 commits into from
Nov 1, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/neo/Ledger/Blockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,15 @@ private void OnNewHeaders(Header[] headers)
{
if (header.Index > headerHeight + 1) break;
if (header.Index < headerHeight + 1) continue;
if (!header.Verify(system.Settings, snapshot, system.HeaderCache)) break;
if (!header.Verify(system.Settings, snapshot, system.HeaderCache))
{
Sender.Tell(VerifyResult.Invalid);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Sender.Tell(VerifyResult.Invalid);
Sender.Tell(VerifyResult.Invalid);
break;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why send VerifyResult.Invalid to RemoteNode? It doesn't process the message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is similar to some other places where akka replies are sent even not processed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove it. VerifyResult is not used for this purpose.

break;
}
system.HeaderCache.Add(header);
++headerHeight;
}
system.TaskManager.Tell(headers, Sender);
}

private VerifyResult OnNewExtensiblePayload(ExtensiblePayload payload)
Expand Down
3 changes: 1 addition & 2 deletions src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,14 @@ private void OnGetHeadersMessageReceived(GetBlockByIndexPayload payload)
private void OnHeadersMessageReceived(HeadersPayload payload)
{
UpdateLastBlockIndex(payload.Headers[^1].Index);
system.TaskManager.Tell(payload.Headers);
system.Blockchain.Tell(payload.Headers);
}

private void OnInventoryReceived(IInventory inventory)
{
knownHashes.Add(inventory.Hash);
pendingKnownHashes.Remove(inventory.Hash);
system.TaskManager.Tell(inventory);
switch (inventory)
{
case Transaction transaction:
Expand All @@ -322,7 +322,6 @@ private void OnInventoryReceived(IInventory inventory)
system.Blockchain.Tell(inventory);
break;
}
system.TaskManager.Tell(inventory);
}

private void OnInvMessageReceived(InvPayload payload)
Expand Down
14 changes: 10 additions & 4 deletions src/neo/Network/P2P/TaskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ private class Timer { }
private readonly Dictionary<uint, int> globalIndexTasks = new();
private readonly Dictionary<IActorRef, TaskSession> sessions = new();
private readonly ICancelable timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimerInterval, TimerInterval, Context.Self, new Timer(), ActorRefs.NoSender);
private uint lastSeenPersistedIndex = 0;

private bool HasHeaderTask => globalInvTasks.ContainsKey(HeaderTaskHash);

Expand Down Expand Up @@ -136,6 +137,8 @@ private void OnNewTasks(InvPayload payload)

private void OnPersistCompleted(Block block)
{
lastSeenPersistedIndex = block.Index;

foreach (var (actor, session) in sessions)
if (session.ReceivedBlock.Remove(block.Index, out Block receivedBlock))
{
Expand Down Expand Up @@ -236,7 +239,10 @@ private void OnTaskCompleted(IInventory inventory)
session.ReceivedBlock.Add(block.Index, block);
}
}
RequestTasks(Sender, session);
if (inventory is not Block)
shargon marked this conversation as resolved.
Show resolved Hide resolved
{
RequestTasks(Sender, session);
}
}
}

Expand Down Expand Up @@ -372,7 +378,7 @@ private void RequestTasks(IActorRef remoteNode, TaskSession session)
}

uint currentHeight = NativeContract.Ledger.CurrentIndex(snapshot);
uint headerHeight = system.HeaderCache.Last?.Index ?? currentHeight;
uint headerHeight = (system.HeaderCache.Last?.Index ?? currentHeight) + 1;
erikzhang marked this conversation as resolved.
Show resolved Hide resolved
// When the number of AvailableTasks is no more than 0, no pending tasks of InventoryType.Block, it should process pending the tasks of headers
// If not HeaderTask pending to be processed it should ask for more Blocks
if ((!HasHeaderTask || globalInvTasks[HeaderTaskHash] < MaxConncurrentTasks) && headerHeight < session.LastBlockIndex && !system.HeaderCache.Full)
Expand All @@ -383,8 +389,8 @@ private void RequestTasks(IActorRef remoteNode, TaskSession session)
}
else if (currentHeight < session.LastBlockIndex)
{
uint startHeight = currentHeight;
while (globalIndexTasks.ContainsKey(++startHeight)) { }
uint startHeight = Math.Max(currentHeight, lastSeenPersistedIndex + 1);
while (globalIndexTasks.ContainsKey(startHeight) || session.ReceivedBlock.ContainsKey(startHeight)) { startHeight++; }
if (startHeight > session.LastBlockIndex || startHeight >= currentHeight + InvPayload.MaxHashesCount) return;
uint endHeight = startHeight;
while (!globalIndexTasks.ContainsKey(++endHeight) && endHeight <= session.LastBlockIndex && endHeight <= currentHeight + InvPayload.MaxHashesCount) { }
Expand Down