Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel transaction execution #2919

Closed
wants to merge 11 commits into from
66 changes: 53 additions & 13 deletions src/Neo/Ledger/Blockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
using System.Collections.Immutable;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Neo.Ledger
{
Expand Down Expand Up @@ -414,26 +416,47 @@ private void Persist(Block block)
all_application_executed.Add(application_executed);
transactionStates = engine.GetState<TransactionState[]>();
}
DataCache clonedSnapshot = snapshot.CreateSnapshot();
// Warning: Do not write into variable snapshot directly. Write into variable clonedSnapshot and commit instead.

var tasks = new List<Task<(TransactionState, DataCache, ApplicationExecuted)>>();

// Warning: Do not write into variable snapshot directly.
// Write into variable clonedSnapshot and commit instead.
foreach (TransactionState transactionState in transactionStates)
{
Transaction tx = transactionState.Transaction;
using ApplicationEngine engine = ApplicationEngine.Create(TriggerType.Application, tx, clonedSnapshot, block, system.Settings, tx.SystemFee);
engine.LoadScript(tx.Script);
transactionState.State = engine.Execute();
if (transactionState.State == VMState.HALT)
DataCache txSnapshot = snapshot.CreateSnapshot();
var task = OnExecuteTransactionAsync(system, block, txSnapshot, transactionState);
tasks.Add(task);
}

// var readSet = new HashSet<StorageKey>();
var writeSet = new HashSet<StorageKey>();

var randomUsed = false;
foreach (var task in tasks)
Comment on lines 424 to +435
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you combine these two loops? I see you using OnExecuteTransactionAsync to do the loading. But I think if you were to batch or even just have a callback method for OnExecuteTransactionAsync you should be good and maybe speed up some more with one loop instead of two loops. Or you could use Task.WhenAll to keep sync.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are different, parallel execution, sequential commit.

{
var (transactionState, txSnapshot, executed) = task.Result;

if (txSnapshot.GetReadSet().Overlaps(writeSet) || (randomUsed && txSnapshot.isRandomNumberCalled))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if I have tx A reading nothing and writing into K some V1 and tx B reading nothing and writing into K some V2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then tx B will reexecute

Copy link
Contributor Author

@Jim8y Jim8y Sep 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will, this is a feature anyway, for potential ecosystem boost~. I know this is useful only if we push to the neo limit. In reality, of not much use considering those 0 tx blocks.

{
clonedSnapshot.Commit();
DataCache tmp = snapshot.CreateSnapshot();
var task2 = OnExecuteTransactionAsync(system, block, tmp, transactionState);
tasks.Add(task2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why adding to the list if you're waiting for the result below anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parallel execution, sequential commit.

In this way, we can start processing commit in the original order on txs that are alrady finished without waiting for the whole execution to finish.

(transactionState, txSnapshot, executed) = task.Result;
}
else

if (transactionState.State == VMState.HALT)
{
clonedSnapshot = snapshot.CreateSnapshot();
txSnapshot.Commit();
writeSet.UnionWith(txSnapshot.GetWriteSet());
randomUsed = txSnapshot.isRandomNumberCalled;
}
ApplicationExecuted application_executed = new(engine);
Context.System.EventStream.Publish(application_executed);
all_application_executed.Add(application_executed);

// ApplicationExecuted application_executed = new(engine);
Context.System.EventStream.Publish(executed);
all_application_executed.Add(executed);

}

using (ApplicationEngine engine = ApplicationEngine.Create(TriggerType.PostPersist, null, snapshot, block, system.Settings, 0))
{
engine.LoadScript(postPersistScript);
Expand Down Expand Up @@ -491,6 +514,23 @@ private static ImmutableHashSet<UInt160> UpdateExtensibleWitnessWhiteList(Protoc
}
return builder.ToImmutable();
}

private static async Task<(TransactionState, DataCache, ApplicationExecuted)> OnExecuteTransactionAsync(NeoSystem system, Block block, DataCache clonedSnapshot, TransactionState transactionState)
{
ApplicationExecuted application_executed = null;

await Task.Run(() =>
{
Transaction tx = transactionState.Transaction;
using ApplicationEngine engine = ApplicationEngine.Create(TriggerType.Application, tx, clonedSnapshot, block, system.Settings, tx.SystemFee);
engine.LoadScript(tx.Script);
transactionState.State = engine.Execute();
application_executed = new(engine);
});

return (transactionState, clonedSnapshot, application_executed);

}
}

internal class BlockchainMailbox : PriorityMailbox
Expand Down
65 changes: 59 additions & 6 deletions src/Neo/Persistence/DataCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public class Trackable
private readonly Dictionary<StorageKey, Trackable> dictionary = new();
private readonly HashSet<StorageKey> changeSet = new();

private readonly HashSet<StorageKey> readSet = new();

public bool isRandomNumberCalled = false;

/// <summary>
/// Reads a specified entry from the cache. If the entry is not in the cache, it will be automatically loaded from the underlying storage.
/// </summary>
Expand All @@ -67,7 +71,7 @@ public StorageItem this[StorageKey key]
trackable = new Trackable
{
Key = key,
Item = GetInternal(key),
Item = GetInternalWrapper(key),
State = TrackState.None
};
dictionary.Add(key, trackable);
Expand Down Expand Up @@ -179,7 +183,7 @@ public void Delete(StorageKey key)
}
else
{
StorageItem item = TryGetInternal(key);
StorageItem item = TryGetInternalWrapper(key);
if (item == null) return;
dictionary.Add(key, new Trackable
{
Expand Down Expand Up @@ -273,6 +277,11 @@ public IEnumerable<Trackable> GetChangeSet()
}
}

public HashSet<StorageKey> GetWriteSet()
{
return changeSet;
}

/// <summary>
/// Determines whether the cache contains the specified entry.
/// </summary>
Expand All @@ -284,7 +293,7 @@ public bool Contains(StorageKey key)
{
if (dictionary.TryGetValue(key, out Trackable trackable))
return trackable.State != TrackState.Deleted && trackable.State != TrackState.NotFound;
return ContainsInternal(key);
return ContainsInternalWrapper(key);
}
}

Expand All @@ -295,13 +304,25 @@ public bool Contains(StorageKey key)
/// <returns><see langword="true"/> if the underlying storage contains an entry with the specified key; otherwise, <see langword="false"/>.</returns>
protected abstract bool ContainsInternal(StorageKey key);

private bool ContainsInternalWrapper(StorageKey key)
{
RecordRead(key);
return ContainsInternal(key);
}

/// <summary>
/// Reads a specified entry from the underlying storage.
/// </summary>
/// <param name="key">The key of the entry.</param>
/// <returns>The data of the entry. Or <see langword="null"/> if the entry doesn't exist.</returns>
protected abstract StorageItem GetInternal(StorageKey key);

private StorageItem GetInternalWrapper(StorageKey key)
{
RecordRead(key);
return GetInternal(key);
}

/// <summary>
/// Reads a specified entry from the cache, and mark it as <see cref="TrackState.Changed"/>. If the entry is not in the cache, it will be automatically loaded from the underlying storage.
/// </summary>
Expand Down Expand Up @@ -339,7 +360,7 @@ public StorageItem GetAndChange(StorageKey key, Func<StorageItem> factory = null
trackable = new Trackable
{
Key = key,
Item = TryGetInternal(key)
Item = TryGetInternalWrapper(key)
};
if (trackable.Item == null)
{
Expand Down Expand Up @@ -389,7 +410,7 @@ public StorageItem GetOrAdd(StorageKey key, Func<StorageItem> factory)
trackable = new Trackable
{
Key = key,
Item = TryGetInternal(key)
Item = TryGetInternalWrapper(key)
};
if (trackable.Item == null)
{
Expand Down Expand Up @@ -472,6 +493,16 @@ public StorageItem GetOrAdd(StorageKey key, Func<StorageItem> factory)
/// <returns>An enumerator containing all the entries after seeking.</returns>
protected abstract IEnumerable<(StorageKey Key, StorageItem Value)> SeekInternal(byte[] keyOrPrefix, SeekDirection direction);


private IEnumerable<(StorageKey Key, StorageItem Value)> SeekInternalWrapper(byte[] keyOrPrefix, SeekDirection direction)
{
foreach (var (key, value) in SeekInternal(keyOrPrefix, direction))
{
RecordRead(key);
yield return (key, value);
}
}

/// <summary>
/// Reads a specified entry from the cache. If the entry is not in the cache, it will be automatically loaded from the underlying storage.
/// </summary>
Expand All @@ -487,7 +518,7 @@ public StorageItem TryGet(StorageKey key)
return null;
return trackable.Item;
}
StorageItem value = TryGetInternal(key);
StorageItem value = TryGetInternalWrapper(key);
if (value == null) return null;
dictionary.Add(key, new Trackable
{
Expand All @@ -506,11 +537,33 @@ public StorageItem TryGet(StorageKey key)
/// <returns>The data of the entry. Or <see langword="null"/> if it doesn't exist.</returns>
protected abstract StorageItem TryGetInternal(StorageKey key);

private StorageItem TryGetInternalWrapper(StorageKey key)
{
RecordRead(key);
return TryGetInternal(key);
}

/// <summary>
/// Updates an entry in the underlying storage.
/// </summary>
/// <param name="key">The key of the entry.</param>
/// <param name="value">The data of the entry.</param>
protected abstract void UpdateInternal(StorageKey key, StorageItem value);

private void RecordRead(StorageKey key)
{
lock (dictionary)
{
readSet.Add(key);
}
}

public HashSet<StorageKey> GetReadSet()
{
lock (dictionary)
{
return readSet;
}
}
}
}
5 changes: 5 additions & 0 deletions src/Neo/SmartContract/ApplicationEngine.Runtime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,11 @@ protected internal BigInteger GetRandom()
price = 1 << 4;
}
AddGas(price * ExecFeeFactor);
if (Snapshot != null)
{
Snapshot.isRandomNumberCalled = true;
}

return new BigInteger(buffer, isUnsigned: true);
}

Expand Down