Skip to content

Commit

Permalink
Add Suspended status to store (stratisproject#185)
Browse files Browse the repository at this point in the history
* Add `Suspended` status to store

* Add the ability to undo the ProcessTransaction operation

* Optimize RecordLatestMatureDepositsAsync and add RemoveTransaction to rollback

* Update RemoveTransaction method

* Call RemoveTransaction when setting Rejected status

* Add StoringDepositsWhenWalletBalanceInSufficientSucceedsWithSuspendStatus test case

* Extend test case and implemt fix
  • Loading branch information
quantumagi authored Nov 21, 2018
1 parent bafeb9a commit 3427ece
Show file tree
Hide file tree
Showing 7 changed files with 316 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Stratis.FederatedPeg.Features.FederationGateway.Interfaces
/// </summary>
public enum CrossChainTransferStatus
{
Suspended = 'U',
Partial = 'P',
FullySigned = 'F',
SeenInBlock = 'S',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public interface ICrossChainTransferStore : IDisposable
/// Records the mature deposits from <see cref="NextMatureDepositHeight"/> on the counter-chain.
/// The value of <see cref="NextMatureDepositHeight"/> is incremented at the end of this call.
/// </summary>
/// <param name="deposits">The deposits.</param>
/// <param name="deposits">The deposits in order of occurrence on the source chain.</param>
/// <remarks>
/// The transfers are set to <see cref="CrossChainTransfer.Status"/> of <see cref="CrossChainTransferStatus.Partial"/>
/// or <see cref="CrossChainTransferStatus.Rejected"/> depending on whether enough funds are available in the federation wallet.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ public interface IFederationWalletManager
/// <returns>A value indicating whether this transaction affects the wallet.</returns>
bool ProcessTransaction(Transaction transaction, int? blockHeight = null, Block block = null, bool isPropagated = true);

/// <summary>
/// Removes a transaction not yet broadcasted or included in a block.
/// </summary>
/// <param name="transaction">The transaction to remove.</param>
/// <returns>A value indicating whether this transaction affects the wallet.</returns>
bool RemoveTransaction(Transaction transaction);

/// <summary>
/// Saves the wallet into the file system.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ public void ReadWrite(BitcoinStream stream)
stream.ReadWrite(ref this.depositAmount);
stream.ReadWrite(ref this.partialTransaction);

if (!stream.Serializing && this.partialTransaction.Inputs.Count == 0 && this.partialTransaction.Outputs.Count == 0)
this.partialTransaction = null;

if (this.status == CrossChainTransferStatus.SeenInBlock)
{
stream.ReadWrite(ref this.blockHash);
Expand All @@ -98,7 +101,13 @@ public void ReadWrite(BitcoinStream stream)
/// <inheritdoc />
public bool IsValid()
{
if (this.depositTransactionId == null || this.PartialTransaction == null || this.depositTargetAddress == null || this.depositAmount == 0)
if (this.depositTransactionId == null || this.depositTargetAddress == null || this.depositAmount == 0)
return false;

if (this.status == CrossChainTransferStatus.Suspended)
return true;

if (this.PartialTransaction == null)
return false;

if (this.status == CrossChainTransferStatus.SeenInBlock && this.blockHash == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,15 @@ private ICrossChainTransfer[] ValidateCrossChainTransfers(ICrossChainTransfer[]
var tracker = new StatusChangeTracker();
foreach (CrossChainTransfer partialTransfer in crossChainTransfers)
{
// Verify that the transaction input UTXO's have been reserved by the wallet.
if (partialTransfer.Status == CrossChainTransferStatus.Partial || partialTransfer.Status == CrossChainTransferStatus.FullySigned)
if (partialTransfer != null)
{
if (!ValidateTransaction(partialTransfer.PartialTransaction, wallet, partialTransfer.Status == CrossChainTransferStatus.FullySigned))
// Verify that the transaction input UTXO's have been reserved by the wallet.
if (partialTransfer.Status == CrossChainTransferStatus.Partial || partialTransfer.Status == CrossChainTransferStatus.FullySigned)
{
tracker.SetTransferStatus(partialTransfer, CrossChainTransferStatus.Rejected);
if (!ValidateTransaction(partialTransfer.PartialTransaction, wallet, partialTransfer.Status == CrossChainTransferStatus.FullySigned))
{
tracker.SetTransferStatus(partialTransfer, CrossChainTransferStatus.Rejected);
}
}
}
}
Expand All @@ -217,6 +220,17 @@ private ICrossChainTransfer[] ValidateCrossChainTransfers(ICrossChainTransfer[]

this.UpdateLookups(tracker);

// Remove any remnants of the transaction from the wallet.
foreach (KeyValuePair<ICrossChainTransfer, CrossChainTransferStatus?> kv in tracker)
{
if (kv.Value == CrossChainTransferStatus.Rejected)
{
this.federationWalletManager.RemoveTransaction(kv.Key.PartialTransaction);
}
}

this.federationWalletManager.SaveWallet();

return crossChainTransfers;
}
catch (Exception err)
Expand Down Expand Up @@ -290,65 +304,122 @@ public Task RecordLatestMatureDepositsAsync(IDeposit[] deposits)

return Task.Run(() =>
{
this.logger.LogTrace("()");

this.Synchronize();

FederationWallet wallet = this.federationWalletManager.GetWallet();

this.logger.LogTrace("()");
ICrossChainTransfer[] transfers = this.ValidateCrossChainTransfers(this.Get(deposits.Select(d => d.Id).ToArray()));

using (DBreeze.Transactions.Transaction dbreezeTransaction = this.DBreeze.GetTransaction())
{
dbreezeTransaction.SynchronizeTables(transferTableName, commonTableName);
var tracker = new StatusChangeTracker();
bool walletUpdated = false;

// Check if the deposits already exist which could happen if it was found on the chain.
CrossChainTransfer[] transfers = this.Get(dbreezeTransaction, deposits.Select(d => d.Id).ToArray());
int currentDepositHeight = this.NextMatureDepositHeight;
// Deposits are assumed to be in order of occurrence on the source chain.
// If we fail to build a transacion the transfer and subsequent transfers
// in the orderd list will be set to suspended.
bool haveSuspendedTransfers = false;

try
for (int i = 0; i < deposits.Length; i++)
{
// Check if the deposits already exist which could happen if it was found on the chain.
if (transfers[i] == null || transfers[i].Status == CrossChainTransferStatus.Suspended)
{
var tracker = new StatusChangeTracker();
IDeposit deposit = deposits[i];

for (int i = 0; i < deposits.Length; i++)
{
IDeposit deposit = deposits[i];
Script scriptPubKey = BitcoinAddress.Create(deposit.TargetAddress, this.network).ScriptPubKey;
Transaction transaction = null;
CrossChainTransferStatus status = CrossChainTransferStatus.Suspended;
Script scriptPubKey = BitcoinAddress.Create(deposit.TargetAddress, this.network).ScriptPubKey;

if (!haveSuspendedTransfers)
{
var recipient = new Recipient
{
Amount = deposit.Amount,
ScriptPubKey = scriptPubKey
};

if (transfers[i] == null)
transaction = BuildDeterministicTransaction(deposit.Id, recipient);

if (transaction != null)
{
Transaction transaction = BuildDeterministicTransaction(deposit.Id, recipient);
// Reserve the UTXOs before building the next transaction.
walletUpdated |= this.federationWalletManager.ProcessTransaction(transaction, isPropagated: false);

if (transaction == null)
throw new Exception("Failed to build transaction");
status = CrossChainTransferStatus.Partial;
}
else
{
haveSuspendedTransfers = true;
}
}

transfers[i] = new CrossChainTransfer((transaction != null) ? CrossChainTransferStatus.Partial : CrossChainTransferStatus.Rejected,
deposit.Id, scriptPubKey, deposit.Amount, transaction, 0, -1 /* Unknown */);
if (transfers[i] == null)
{
transfers[i] = new CrossChainTransfer(status, deposit.Id, scriptPubKey, deposit.Amount, transaction, 0, -1 /* Unknown */);

tracker.SetTransferStatus(transfers[i]);
tracker.SetTransferStatus(transfers[i]);
}
else if (transaction != null)
{
transfers[i].SetPartialTransaction(transaction);
tracker.SetTransferStatus(transfers[i], CrossChainTransferStatus.Partial);
}
else
{
// If we can't fix suspended transfers then exit this loop now.
break;
}
}
}

this.PutTransfer(dbreezeTransaction, transfers[i]);
if (tracker.Count != 0)
{
using (DBreeze.Transactions.Transaction dbreezeTransaction = this.DBreeze.GetTransaction())
{
dbreezeTransaction.SynchronizeTables(transferTableName, commonTableName);

// Reserve the UTXOs before building the next transaction.
this.federationWalletManager.ProcessTransaction(transaction, isPropagated: false);
int currentDepositHeight = this.NextMatureDepositHeight;

try
{
// Update new or modified transfers.
foreach (KeyValuePair<ICrossChainTransfer, CrossChainTransferStatus?> kv in tracker)
{
this.PutTransfer(dbreezeTransaction, kv.Key);
}
}

// Commit additions
this.SaveNextMatureHeight(dbreezeTransaction, this.NextMatureDepositHeight + 1);
// Ensure we get called for a retry by NOT advancing the chain A tip if the block
// contained any suspended transfers.
if (!haveSuspendedTransfers)
{
this.SaveNextMatureHeight(dbreezeTransaction, this.NextMatureDepositHeight + 1);
}

dbreezeTransaction.Commit();
dbreezeTransaction.Commit();

// Do this last to maintain DB integrity. We are assuming that this won't throw.
this.UpdateLookups(tracker);
}
catch (Exception err)
{
// Restore expected store state in case the calling code retries / continues using the store.
this.NextMatureDepositHeight = currentDepositHeight;
this.RollbackAndThrowTransactionError(dbreezeTransaction, err, "DEPOSIT_ERROR");
this.UpdateLookups(tracker);
}
catch (Exception err)
{
// Undo reserved UTXO's.
if (walletUpdated)
{
foreach (KeyValuePair<ICrossChainTransfer, CrossChainTransferStatus?> kv in tracker)
{
if (kv.Value == CrossChainTransferStatus.Partial)
{
this.federationWalletManager.RemoveTransaction(kv.Key.PartialTransaction);
}
}

this.federationWalletManager.SaveWallet();
}

// Restore expected store state in case the calling code retries / continues using the store.
this.NextMatureDepositHeight = currentDepositHeight;
this.RollbackAndThrowTransactionError(dbreezeTransaction, err, "DEPOSIT_ERROR");
}
}
}

Expand Down Expand Up @@ -889,7 +960,7 @@ public void UpdateLookups(StatusChangeTracker tracker)
{
this.TransferStatusUpdated(kv.Key, kv.Value);

if (kv.Key.BlockHash != 0)
if (kv.Key.BlockHash != null)
{
if (!this.depositIdsByBlockHash[kv.Key.BlockHash].Contains(kv.Key.DepositTransactionId))
this.depositIdsByBlockHash[kv.Key.BlockHash].Add(kv.Key.DepositTransactionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,55 @@ public bool ProcessTransaction(Transaction transaction, int? blockHeight = null,
return foundSendingTrx || foundReceivingTrx;
}

/// <inheritdoc />
public bool RemoveTransaction(Transaction transaction)
{
Guard.NotNull(transaction, nameof(transaction));
uint256 hash = transaction.GetHash();
this.logger.LogTrace("({0}:'{1}')", nameof(transaction), hash);

bool updatedWallet = false;

// Check the inputs - include those that have a reference to a transaction containing one of our scripts and the same index.
foreach (TxIn input in transaction.Inputs)
{
if (!this.outpointLookup.TryGetValue(input.PrevOut, out TransactionData tTx))
{
continue;
}

// Get the transaction being spent and unspend it.
TransactionData spentTransaction = this.Wallet.MultiSigAddress.Transactions.SingleOrDefault(t => (t.Id == tTx.Id) && (t.Index == tTx.Index));
if (spentTransaction != null)
{
spentTransaction.SpendingDetails = null;
spentTransaction.MerkleProof = null;
updatedWallet = true;
}
}

foreach (TxOut utxo in transaction.Outputs)
{
// Check if the outputs contain one of our addresses.
if (this.Wallet.MultiSigAddress.ScriptPubKey == utxo.ScriptPubKey)
{
int index = transaction.Outputs.IndexOf(utxo);

// Remove any UTXO's that were provided by this transaction from wallet.
TransactionData foundTransaction = this.Wallet.MultiSigAddress.Transactions.FirstOrDefault(t => (t.Id == hash) && (t.Index == index));
if (foundTransaction != null)
{
this.RemoveInputKeysLookupLock(foundTransaction);
this.Wallet.MultiSigAddress.Transactions.Remove(foundTransaction);
updatedWallet = true;
}
}
}

this.logger.LogTrace("(-)");

return updatedWallet;
}

/// <summary>
/// Adds a transaction that credits the wallet with new coins.
Expand Down Expand Up @@ -664,6 +713,19 @@ private void AddInputKeysLookupLock(TransactionData transactionData)
}
}

/// <summary>
/// Remove from the list of unspent outputs kept in memory for faster lookups.
/// </summary>
private void RemoveInputKeysLookupLock(TransactionData transactionData)
{
Guard.NotNull(transactionData, nameof(transactionData));

lock (this.lockObject)
{
this.outpointLookup.Remove(new OutPoint(transactionData.Id, transactionData.Index));
}
}

public void TransactionFoundInternal(Script script)
{
this.logger.LogTrace("()");
Expand Down
Loading

0 comments on commit 3427ece

Please sign in to comment.