Skip to content

Commit

Permalink
Merge pull request #13068 from kiminuo/feature/2024-05-22-fixInifiteL…
Browse files Browse the repository at this point in the history
…oopFetchTx

Fix infinite await in `FetchTransactionsAsync` in case of missing txs (take 2)
  • Loading branch information
Turbolay authored May 25, 2024
2 parents 1096572 + 1e0bc48 commit 13b5b0b
Showing 1 changed file with 40 additions and 11 deletions.
51 changes: 40 additions & 11 deletions WalletWasabi.Backend/Controllers/BlockchainController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public async Task<IActionResult> GetTransactionsAsync([FromQuery, Required] IEnu
/// <summary>
/// Fetches transactions from cache if possible and missing transactions are fetched using RPC.
/// </summary>
/// <exception cref="AggregateException">If RPC client succeeds in getting some transactions but not all.</exception>
private async Task<Transaction[]> FetchTransactionsAsync(uint256[] txIds, CancellationToken cancellationToken)
{
int requestCount = txIds.Length;
Expand Down Expand Up @@ -204,12 +205,20 @@ private async Task<Transaction[]> FetchTransactionsAsync(uint256[] txIds, Cancel
{
// Ask to get missing transactions over RPC.
IEnumerable<Transaction> txs = await RpcClient.GetRawTransactionsAsync(txIdsRetrieve.Keys, cancellationToken).ConfigureAwait(false);

Dictionary<uint256, Transaction> rpcBatch = txs.ToDictionary(x => x.GetHash(), x => x);

foreach (KeyValuePair<uint256, Transaction> kvp in rpcBatch)
{
txIdsRetrieve[kvp.Key].TrySetResult(kvp.Value);
}

// RPC client does not throw if a transaction is missing, so we need to account for this case.
if (rpcBatch.Count < txIdsRetrieve.Count)
{
IReadOnlyList<Exception> exceptions = MarkNotFinishedTasksAsFailed(txIdsRetrieve);
throw new AggregateException(exceptions);
}
}

Transaction[] result = new Transaction[requestCount];
Expand All @@ -227,18 +236,30 @@ private async Task<Transaction[]> FetchTransactionsAsync(uint256[] txIds, Cancel
{
if (txIdsRetrieve.Count > 0)
{
// It's necessary to always set a result to the task completion sources. Otherwise, cache can get corrupted.
Exception ex = new InvalidOperationException("Failed to get the transaction.");
foreach ((uint256 txid, TaskCompletionSource<Transaction> tcs) in txIdsRetrieve)
MarkNotFinishedTasksAsFailed(txIdsRetrieve);
}
}

IReadOnlyList<Exception> MarkNotFinishedTasksAsFailed(Dictionary<uint256, TaskCompletionSource<Transaction>> txIdsRetrieve)
{
List<Exception>? exceptions = null;

// It's necessary to always set a result to the task completion sources. Otherwise, cache can get corrupted.
foreach ((uint256 txid, TaskCompletionSource<Transaction> tcs) in txIdsRetrieve)
{
if (!tcs.Task.IsCompleted)
{
if (!tcs.Task.IsCompleted)
{
// Prefer new cache requests to try again rather than getting the exception. The window is small though.
Cache.Remove(txid);
tcs.SetException(ex);
}
exceptions ??= new();

// Prefer new cache requests to try again rather than getting the exception. The window is small though.
Exception e = new InvalidOperationException($"Failed to get the transaction '{txid}'.");
exceptions.Add(e);
Cache.Remove($"{nameof(GetTransactionsAsync)}#{txid}");
tcs.SetException(e);
}
}

return exceptions ?? [];
}
}

Expand Down Expand Up @@ -489,11 +510,19 @@ private async Task<Dictionary<uint256, UnconfirmedTransactionChainItem>> BuildUn

private async Task<UnconfirmedTransactionChainItem> ComputeUnconfirmedTransactionChainItemAsync(uint256 currentTxId, IEnumerable<uint256> mempoolHashes, CancellationToken cancellationToken)
{
var currentTx = (await FetchTransactionsAsync([currentTxId], cancellationToken).ConfigureAwait(false)).FirstOrDefault() ?? throw new InvalidOperationException("Tx not found");
var currentTx = (await FetchTransactionsAsync([currentTxId], cancellationToken).ConfigureAwait(false)).First();

var txsToFetch = currentTx.Inputs.Select(input => input.PrevOut.Hash).Distinct().ToArray();

var parentTxs = await FetchTransactionsAsync(txsToFetch, cancellationToken).ConfigureAwait(false);
Transaction[] parentTxs;
try
{
parentTxs = await FetchTransactionsAsync(txsToFetch, cancellationToken).ConfigureAwait(false);
}
catch(AggregateException ex)
{
throw new InvalidOperationException($"Some transactions part of the chain were not found: {ex}");
}

// Get unconfirmed parents and children
var unconfirmedParents = parentTxs.Where(x => mempoolHashes.Contains(x.GetHash())).ToHashSet();
Expand Down

0 comments on commit 13b5b0b

Please sign in to comment.