Skip to content

Commit

Permalink
Feat: cleaned up code a but more
Browse files Browse the repository at this point in the history
  • Loading branch information
hmoog committed Mar 17, 2022
1 parent a58e3c6 commit 90d2198
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 48 deletions.
51 changes: 22 additions & 29 deletions packages/refactored/ledger/ledger.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package ledger

import (
"github.com/cockroachdb/errors"
"github.com/iotaledger/hive.go/generics/dataflow"
"github.com/iotaledger/hive.go/generics/event"
"github.com/iotaledger/hive.go/generics/walker"
"github.com/iotaledger/hive.go/kvstore"

"github.com/iotaledger/goshimmer/packages/refactored/syncutils"
Expand All @@ -14,8 +12,9 @@ import (
// region Ledger ///////////////////////////////////////////////////////////////////////////////////////////////////////

type Ledger struct {
ErrorEvent *event.Event[error]
TransactionStoredEvent *event.Event[*TransactionStoredEvent]
TransactionProcessedEvent *event.Event[*TransactionProcessedEvent]
ErrorEvent *event.Event[error]

*Storage
*AvailabilityManager
Expand All @@ -36,33 +35,32 @@ func New(store kvstore.KVStore, vm utxo.VM) (ledger *Ledger) {

func (l *Ledger) Setup() {
// Attach = run async
l.TransactionProcessedEvent.Attach(event.NewClosure[*TransactionProcessedEvent](l.processFutureCone))
l.TransactionProcessedEvent.Attach(event.NewClosure[*TransactionProcessedEvent](l.processConsumers))

// attach sync = run in scope of event (while locks are still held)
l.TransactionStoredEvent.AttachSync(event.NewClosure[*TransactionStoredEvent](l.processStoredTransaction))
}

// StoreAndProcessTransaction is the only public facing api
func (l *Ledger) StoreAndProcessTransaction(transaction utxo.Transaction) (success bool, err error) {
func (l *Ledger) StoreAndProcessTransaction(transaction utxo.Transaction) (solid bool) {
// use computeifabsent as a mutex to only store things once
cachedTransactionMetadata := l.CachedTransactionMetadata(transaction.ID(), func(transactionID utxo.TransactionID) *TransactionMetadata {
l.transactionStorage.Store(transaction).Release()

// TODO: STORE CONSUMERS

success = true
solid = true
return NewTransactionMetadata(transactionID)
})

// if we didn't store ourselves then we consider this call to be a success if this transaction was processed already
// before (e.g. by a reattachment)
// TODO: maybe rename solid to "processed"
if !success {
// (e.g. by a reattachment)
if !solid {
cachedTransactionMetadata.Consume(func(metadata *TransactionMetadata) {
success = metadata.Solid()
solid = metadata.Solid()
})

return success, nil
return solid
}

cachedTransactionMetadata.Consume(func(metadata *TransactionMetadata) {
Expand All @@ -71,14 +69,14 @@ func (l *Ledger) StoreAndProcessTransaction(transaction utxo.Transaction) (succe
TransactionMetadata: metadata,
}})

success = metadata.solid
solid = metadata.Solid()
})

return success, nil
return solid
}

func (l *Ledger) processTransaction(tx utxo.Transaction, meta *TransactionMetadata) (success bool, err error) {
err = dataflow.New[*DataFlowParams](
func (l *Ledger) processTransaction(tx utxo.Transaction, meta *TransactionMetadata) (success bool) {
err := dataflow.New[*DataFlowParams](
l.lockTransactionStep,
l.CheckSolidity,
/*
Expand All @@ -100,34 +98,29 @@ func (l *Ledger) processTransaction(tx utxo.Transaction, meta *TransactionMetada
if err != nil {
// TODO: mark Transaction as invalid and trigger invalid event
// eventually trigger generic errors if its not due to tx invalidity
return false, err
return false
}

return success, nil
return success
}

func (l *Ledger) processFutureCone(event *TransactionProcessedEvent) {
func (l *Ledger) processConsumers(event *TransactionProcessedEvent) {
// TODO: FILL WITH ACTUAL CONSUMERS
_ = event.Inputs
consumers := []utxo.TransactionID{}

for consumersWalker := walker.New[utxo.TransactionID](true).PushAll(consumers...); consumersWalker.HasNext(); {
transactionID := consumersWalker.Next()

l.CachedTransactionMetadata(transactionID).Consume(func(consumerMetadata *TransactionMetadata) {
l.CachedTransaction(transactionID).Consume(func(consumerTransaction utxo.Transaction) {
if _, err := l.processTransaction(consumerTransaction, consumerMetadata); err != nil {
l.ErrorEvent.Trigger(errors.Errorf("failed to process Transaction with %s: %w", transactionID, err))
}
// we don't need a walker because the events will do the "walking for us"
for _, consumerTransactionId := range consumers {
l.CachedTransactionMetadata(consumerTransactionId).Consume(func(consumerMetadata *TransactionMetadata) {
l.CachedTransaction(consumerTransactionId).Consume(func(consumerTransaction utxo.Transaction) {
l.processTransaction(consumerTransaction, consumerMetadata)
})
})
}
}

func (l *Ledger) processStoredTransaction(event *TransactionStoredEvent) {
if _, err := l.processTransaction(event.Transaction, event.TransactionMetadata); err != nil {
l.ErrorEvent.Trigger(errors.Errorf("failed to process stored Transaction with %s: %w", event.Transaction.ID(), err))
}
l.processTransaction(event.Transaction, event.TransactionMetadata)
}

func (l *Ledger) lockTransactionStep(params *DataFlowParams, next dataflow.Next[*DataFlowParams]) error {
Expand Down
21 changes: 2 additions & 19 deletions packages/refactored/ledger/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
// region Storage //////////////////////////////////////////////////////////////////////////////////////////////////////

type Storage struct {
TransactionStoredEvent *event.Event[*TransactionStoredEvent]
transactionStorage *objectstorage.ObjectStorage[utxo.Transaction]
transactionMetadataStorage *objectstorage.ObjectStorage[*TransactionMetadata]
consumerStorage *objectstorage.ObjectStorage[*Consumer]
Expand All @@ -29,22 +28,6 @@ func NewStorage(ledger *Ledger) (newStorage *Storage) {
func (s *Storage) Setup() {
}

// Store adds a new Transaction to the ledger state. It returns a boolean that indicates whether the
// Transaction was stored.
func (s *Storage) Store(transaction utxo.Transaction) (cachedTransactionMetadata *objectstorage.CachedObject[*TransactionMetadata], stored bool) {
cachedTransactionMetadata = s.CachedTransactionMetadata(transaction.ID(), func(transactionID utxo.TransactionID) *TransactionMetadata {
s.transactionStorage.Store(transaction).Release()
stored = true
return NewTransactionMetadata(transactionID)
})

if !stored {
return
}

return
}

// CachedTransaction retrieves the Transaction with the given TransactionID from the object storage.
func (s *Storage) CachedTransaction(transactionID utxo.TransactionID) (cachedTransaction *objectstorage.CachedObject[utxo.Transaction]) {
return s.transactionStorage.Load(transactionID.Bytes())
Expand All @@ -67,9 +50,9 @@ func (s *Storage) CachedOutput(outputID utxo.OutputID) (cachedOutput *objectstor
}

// CachedConsumers retrieves the Consumers of the given OutputID from the object storage.
func (u *Storage) CachedConsumers(outputID utxo.OutputID) (cachedConsumers objectstorage.CachedObjects[*Consumer]) {
func (s *Storage) CachedConsumers(outputID utxo.OutputID) (cachedConsumers objectstorage.CachedObjects[*Consumer]) {
cachedConsumers = make(objectstorage.CachedObjects[*Consumer], 0)
u.consumerStorage.ForEach(func(key []byte, cachedObject *objectstorage.CachedObject[*Consumer]) bool {
s.consumerStorage.ForEach(func(key []byte, cachedObject *objectstorage.CachedObject[*Consumer]) bool {
cachedConsumers = append(cachedConsumers, cachedObject)
return true
}, objectstorage.WithIteratorPrefix(outputID.Bytes()))
Expand Down

0 comments on commit 90d2198

Please sign in to comment.