diff --git a/packages/refactored/ledger/dataflow.go b/packages/refactored/ledger/dataflow.go index a2ab875520..04a938c4ce 100644 --- a/packages/refactored/ledger/dataflow.go +++ b/packages/refactored/ledger/dataflow.go @@ -2,6 +2,8 @@ package ledger import ( "github.com/iotaledger/hive.go/generics/dataflow" + + "github.com/iotaledger/goshimmer/packages/refactored/utxo" ) type DataFlow struct { @@ -20,15 +22,6 @@ func (d *DataFlow) unlockTransaction(params *DataFlowParams, next dataflow.Next[ return next(params) } -func (d *DataFlow) LockedCommand(command Command) (lockedCommand Command) { - return dataflow.New[*DataFlowParams]( - d.lockTransaction, - command, - ).WithTerminationCallback(func(params *DataFlowParams) { - d.Unlock(params.Transaction, false) - }).ChainedCommand -} - func (d *DataFlow) CheckSolidityCommand(params *DataFlowParams, next dataflow.Next[*DataFlowParams]) (err error) { if params.Inputs = d.CheckSolidity(params.Transaction, params.TransactionMetadata); params.Inputs == nil { return nil @@ -37,19 +30,30 @@ func (d *DataFlow) CheckSolidityCommand(params *DataFlowParams, next dataflow.Ne return next(params) } -func (d *DataFlow) SolidifyTransactionCommand() (solidificationCommand Command) { - return d.LockedCommand( - dataflow.New[*DataFlowParams]( - d.CheckSolidityCommand, - d.ValidatePastCone, - d.ExecuteTransaction, - d.BookTransaction, - ).WithErrorCallback(func(err error, params *DataFlowParams) { - // mark Transaction as invalid and trigger invalid event - - // eventually trigger generic errors if its not due to tx invalidity - }), - ) +func (d *DataFlow) SolidifyTransaction(tx utxo.Transaction, meta *TransactionMetadata) (success bool, consumers []utxo.TransactionID, err error) { + err = dataflow.New[*DataFlowParams]( + d.lockTransaction, + d.CheckSolidityCommand, + d.ValidatePastCone, + d.ExecuteTransaction, + d.BookTransaction, + ).WithSuccessCallback(func(params *DataFlowParams) { + success = true + // TODO: fill consumers from outputs + }).WithTerminationCallback(func(params *DataFlowParams) { + d.Unlock(params.Transaction, false) + }).Run(&DataFlowParams{ + Transaction: tx, + TransactionMetadata: meta, + }) + + if err != nil { + // TODO: mark Transaction as invalid and trigger invalid event + // eventually trigger generic errors if its not due to tx invalidity + return false, nil, err + } + + return success, consumers, nil } type Command dataflow.ChainedCommand[*DataFlowParams] diff --git a/packages/refactored/ledger/ledger.go b/packages/refactored/ledger/ledger.go index d2e6e74fa4..f5cceeaee0 100644 --- a/packages/refactored/ledger/ledger.go +++ b/packages/refactored/ledger/ledger.go @@ -49,58 +49,26 @@ func (l *Ledger) StoreAndProcessTransaction(transaction utxo.Transaction) { l.ProcessTransaction(transaction, cachedTransactionMetadata.Get()) } -func (l *Ledger) ProcessTransaction(transaction utxo.Transaction, metadata *TransactionMetadata) { - err := l.SolidifyTransactionCommand()(&DataFlowParams{ - Transaction: transaction, - TransactionMetadata: metadata, - }) - approversWalker := walker.New[utxo.TransactionID](true) - for approversWalker.HasNext() { - approversWalker.Next() +func (l *Ledger) ProcessTransaction(tx utxo.Transaction, meta *TransactionMetadata) { + success, consumers, err := l.SolidifyTransaction(tx, meta) + if !success { + return } - l.solidifyTransactionAndCollectApprovers - - _ = l.solidify()(&DataFlowParams{Transaction: transaction, TransactionMetadata: metadata}, nil) - - return -} - -func (l *Ledger) solidify() (solidifyTransactionCommand dataflow.ChainedCommand[*DataFlowParams]) { + // TODO: async event + consumersWalker := walker.New[utxo.TransactionID](true).PushAll(consumers...) + for consumersWalker.HasNext() { + l.CachedTransactionMetadata(consumersWalker.Next()).Consume(func(consumerMetadata *TransactionMetadata) { + l.CachedTransaction(consumerMetadata.ID()).Consume(func(consumerTransaction utxo.Transaction) { + _, consumers, err := l.SolidifyTransaction(tx, meta) - return dataflow.New[*DataFlowParams]( - l.solidifyTransactionAndCollectApprovers(approversWalker), - l.propagateSolidityToFutureCone(approversWalker), - ).ChainedCommand -} - -func (l *Ledger) solidifyTransactionAndCollectApprovers(approversWalker *walker.Walker[utxo.TransactionID]) dataflow.ChainedCommand[*DataFlowParams] { - return dataflow.New[*DataFlowParams]( - l.SolidifyTransactionCommand(), - func(params *DataFlowParams, next dataflow.Next[*DataFlowParams]) error { - // queue parents - approversWalker.PushAll() - - return next(params) - }, - ).ChainedCommand -} - -func (l *Ledger) propagateSolidityToFutureCone(approversWalker *walker.Walker[utxo.TransactionID]) Command { - return func(params *DataFlowParams, next dataflow.Next[*DataFlowParams]) error { - for approversWalker.HasNext() { - l.CachedTransactionMetadata(approversWalker.Next()).Consume(func(consumerMetadata *TransactionMetadata) { - l.CachedTransaction(consumerMetadata.ID()).Consume(func(consumerTransaction utxo.Transaction) { - _ = l.solidifyTransactionAndCollectApprovers(approversWalker)(&DataFlowParams{ - Transaction: consumerTransaction, - TransactionMetadata: consumerMetadata, - }, nil) - }) }) - } + }) - return next(params) + consumersWalker.Next() } + + return } func (l *Ledger) forkSingleTransactionCommand() (solidificationCommand dataflow.ChainedCommand[*DataFlowParams]) {