Skip to content

Commit

Permalink
Continue dataflow
Browse files Browse the repository at this point in the history
  • Loading branch information
jonastheis committed Mar 16, 2022
1 parent d818774 commit 8ad7369
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 68 deletions.
48 changes: 26 additions & 22 deletions packages/refactored/ledger/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package ledger

import (
"github.com/iotaledger/hive.go/generics/dataflow"

"github.com/iotaledger/goshimmer/packages/refactored/utxo"
)

type DataFlow struct {
Expand All @@ -20,15 +22,6 @@ func (d *DataFlow) unlockTransaction(params *DataFlowParams, next dataflow.Next[
return next(params)
}

func (d *DataFlow) LockedCommand(command Command) (lockedCommand Command) {
return dataflow.New[*DataFlowParams](
d.lockTransaction,
command,
).WithTerminationCallback(func(params *DataFlowParams) {
d.Unlock(params.Transaction, false)
}).ChainedCommand
}

func (d *DataFlow) CheckSolidityCommand(params *DataFlowParams, next dataflow.Next[*DataFlowParams]) (err error) {
if params.Inputs = d.CheckSolidity(params.Transaction, params.TransactionMetadata); params.Inputs == nil {
return nil
Expand All @@ -37,19 +30,30 @@ func (d *DataFlow) CheckSolidityCommand(params *DataFlowParams, next dataflow.Ne
return next(params)
}

func (d *DataFlow) SolidifyTransactionCommand() (solidificationCommand Command) {
return d.LockedCommand(
dataflow.New[*DataFlowParams](
d.CheckSolidityCommand,
d.ValidatePastCone,
d.ExecuteTransaction,
d.BookTransaction,
).WithErrorCallback(func(err error, params *DataFlowParams) {
// mark Transaction as invalid and trigger invalid event

// eventually trigger generic errors if its not due to tx invalidity
}),
)
func (d *DataFlow) SolidifyTransaction(tx utxo.Transaction, meta *TransactionMetadata) (success bool, consumers []utxo.TransactionID, err error) {
err = dataflow.New[*DataFlowParams](
d.lockTransaction,
d.CheckSolidityCommand,
d.ValidatePastCone,
d.ExecuteTransaction,
d.BookTransaction,
).WithSuccessCallback(func(params *DataFlowParams) {
success = true
// TODO: fill consumers from outputs
}).WithTerminationCallback(func(params *DataFlowParams) {
d.Unlock(params.Transaction, false)
}).Run(&DataFlowParams{
Transaction: tx,
TransactionMetadata: meta,
})

if err != nil {
// TODO: mark Transaction as invalid and trigger invalid event
// eventually trigger generic errors if its not due to tx invalidity
return false, nil, err
}

return success, consumers, nil
}

type Command dataflow.ChainedCommand[*DataFlowParams]
60 changes: 14 additions & 46 deletions packages/refactored/ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,58 +49,26 @@ func (l *Ledger) StoreAndProcessTransaction(transaction utxo.Transaction) {
l.ProcessTransaction(transaction, cachedTransactionMetadata.Get())
}

func (l *Ledger) ProcessTransaction(transaction utxo.Transaction, metadata *TransactionMetadata) {
err := l.SolidifyTransactionCommand()(&DataFlowParams{
Transaction: transaction,
TransactionMetadata: metadata,
})
approversWalker := walker.New[utxo.TransactionID](true)
for approversWalker.HasNext() {
approversWalker.Next()
func (l *Ledger) ProcessTransaction(tx utxo.Transaction, meta *TransactionMetadata) {
success, consumers, err := l.SolidifyTransaction(tx, meta)
if !success {
return
}

l.solidifyTransactionAndCollectApprovers

_ = l.solidify()(&DataFlowParams{Transaction: transaction, TransactionMetadata: metadata}, nil)

return
}

func (l *Ledger) solidify() (solidifyTransactionCommand dataflow.ChainedCommand[*DataFlowParams]) {
// TODO: async event
consumersWalker := walker.New[utxo.TransactionID](true).PushAll(consumers...)
for consumersWalker.HasNext() {
l.CachedTransactionMetadata(consumersWalker.Next()).Consume(func(consumerMetadata *TransactionMetadata) {
l.CachedTransaction(consumerMetadata.ID()).Consume(func(consumerTransaction utxo.Transaction) {
_, consumers, err := l.SolidifyTransaction(tx, meta)

return dataflow.New[*DataFlowParams](
l.solidifyTransactionAndCollectApprovers(approversWalker),
l.propagateSolidityToFutureCone(approversWalker),
).ChainedCommand
}

func (l *Ledger) solidifyTransactionAndCollectApprovers(approversWalker *walker.Walker[utxo.TransactionID]) dataflow.ChainedCommand[*DataFlowParams] {
return dataflow.New[*DataFlowParams](
l.SolidifyTransactionCommand(),
func(params *DataFlowParams, next dataflow.Next[*DataFlowParams]) error {
// queue parents
approversWalker.PushAll()

return next(params)
},
).ChainedCommand
}

func (l *Ledger) propagateSolidityToFutureCone(approversWalker *walker.Walker[utxo.TransactionID]) Command {
return func(params *DataFlowParams, next dataflow.Next[*DataFlowParams]) error {
for approversWalker.HasNext() {
l.CachedTransactionMetadata(approversWalker.Next()).Consume(func(consumerMetadata *TransactionMetadata) {
l.CachedTransaction(consumerMetadata.ID()).Consume(func(consumerTransaction utxo.Transaction) {
_ = l.solidifyTransactionAndCollectApprovers(approversWalker)(&DataFlowParams{
Transaction: consumerTransaction,
TransactionMetadata: consumerMetadata,
}, nil)
})
})
}
})

return next(params)
consumersWalker.Next()
}

return
}

func (l *Ledger) forkSingleTransactionCommand() (solidificationCommand dataflow.ChainedCommand[*DataFlowParams]) {
Expand Down

0 comments on commit 8ad7369

Please sign in to comment.