From 90d219819cb885ddad44dee69ca70ed6e5411667 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Thu, 17 Mar 2022 05:03:52 +0100 Subject: [PATCH] Feat: cleaned up code a but more --- packages/refactored/ledger/ledger.go | 51 ++++++++++++--------------- packages/refactored/ledger/storage.go | 21 ++--------- 2 files changed, 24 insertions(+), 48 deletions(-) diff --git a/packages/refactored/ledger/ledger.go b/packages/refactored/ledger/ledger.go index bc1e18f22c..f21959ead0 100644 --- a/packages/refactored/ledger/ledger.go +++ b/packages/refactored/ledger/ledger.go @@ -1,10 +1,8 @@ package ledger import ( - "github.com/cockroachdb/errors" "github.com/iotaledger/hive.go/generics/dataflow" "github.com/iotaledger/hive.go/generics/event" - "github.com/iotaledger/hive.go/generics/walker" "github.com/iotaledger/hive.go/kvstore" "github.com/iotaledger/goshimmer/packages/refactored/syncutils" @@ -14,8 +12,9 @@ import ( // region Ledger /////////////////////////////////////////////////////////////////////////////////////////////////////// type Ledger struct { - ErrorEvent *event.Event[error] + TransactionStoredEvent *event.Event[*TransactionStoredEvent] TransactionProcessedEvent *event.Event[*TransactionProcessedEvent] + ErrorEvent *event.Event[error] *Storage *AvailabilityManager @@ -36,33 +35,32 @@ func New(store kvstore.KVStore, vm utxo.VM) (ledger *Ledger) { func (l *Ledger) Setup() { // Attach = run async - l.TransactionProcessedEvent.Attach(event.NewClosure[*TransactionProcessedEvent](l.processFutureCone)) + l.TransactionProcessedEvent.Attach(event.NewClosure[*TransactionProcessedEvent](l.processConsumers)) // attach sync = run in scope of event (while locks are still held) l.TransactionStoredEvent.AttachSync(event.NewClosure[*TransactionStoredEvent](l.processStoredTransaction)) } // StoreAndProcessTransaction is the only public facing api -func (l *Ledger) StoreAndProcessTransaction(transaction utxo.Transaction) (success bool, err error) { +func (l *Ledger) StoreAndProcessTransaction(transaction utxo.Transaction) (solid bool) { // use computeifabsent as a mutex to only store things once cachedTransactionMetadata := l.CachedTransactionMetadata(transaction.ID(), func(transactionID utxo.TransactionID) *TransactionMetadata { l.transactionStorage.Store(transaction).Release() // TODO: STORE CONSUMERS - success = true + solid = true return NewTransactionMetadata(transactionID) }) // if we didn't store ourselves then we consider this call to be a success if this transaction was processed already - // before (e.g. by a reattachment) - // TODO: maybe rename solid to "processed" - if !success { + // (e.g. by a reattachment) + if !solid { cachedTransactionMetadata.Consume(func(metadata *TransactionMetadata) { - success = metadata.Solid() + solid = metadata.Solid() }) - return success, nil + return solid } cachedTransactionMetadata.Consume(func(metadata *TransactionMetadata) { @@ -71,14 +69,14 @@ func (l *Ledger) StoreAndProcessTransaction(transaction utxo.Transaction) (succe TransactionMetadata: metadata, }}) - success = metadata.solid + solid = metadata.Solid() }) - return success, nil + return solid } -func (l *Ledger) processTransaction(tx utxo.Transaction, meta *TransactionMetadata) (success bool, err error) { - err = dataflow.New[*DataFlowParams]( +func (l *Ledger) processTransaction(tx utxo.Transaction, meta *TransactionMetadata) (success bool) { + err := dataflow.New[*DataFlowParams]( l.lockTransactionStep, l.CheckSolidity, /* @@ -100,34 +98,29 @@ func (l *Ledger) processTransaction(tx utxo.Transaction, meta *TransactionMetada if err != nil { // TODO: mark Transaction as invalid and trigger invalid event // eventually trigger generic errors if its not due to tx invalidity - return false, err + return false } - return success, nil + return success } -func (l *Ledger) processFutureCone(event *TransactionProcessedEvent) { +func (l *Ledger) processConsumers(event *TransactionProcessedEvent) { // TODO: FILL WITH ACTUAL CONSUMERS _ = event.Inputs consumers := []utxo.TransactionID{} - for consumersWalker := walker.New[utxo.TransactionID](true).PushAll(consumers...); consumersWalker.HasNext(); { - transactionID := consumersWalker.Next() - - l.CachedTransactionMetadata(transactionID).Consume(func(consumerMetadata *TransactionMetadata) { - l.CachedTransaction(transactionID).Consume(func(consumerTransaction utxo.Transaction) { - if _, err := l.processTransaction(consumerTransaction, consumerMetadata); err != nil { - l.ErrorEvent.Trigger(errors.Errorf("failed to process Transaction with %s: %w", transactionID, err)) - } + // we don't need a walker because the events will do the "walking for us" + for _, consumerTransactionId := range consumers { + l.CachedTransactionMetadata(consumerTransactionId).Consume(func(consumerMetadata *TransactionMetadata) { + l.CachedTransaction(consumerTransactionId).Consume(func(consumerTransaction utxo.Transaction) { + l.processTransaction(consumerTransaction, consumerMetadata) }) }) } } func (l *Ledger) processStoredTransaction(event *TransactionStoredEvent) { - if _, err := l.processTransaction(event.Transaction, event.TransactionMetadata); err != nil { - l.ErrorEvent.Trigger(errors.Errorf("failed to process stored Transaction with %s: %w", event.Transaction.ID(), err)) - } + l.processTransaction(event.Transaction, event.TransactionMetadata) } func (l *Ledger) lockTransactionStep(params *DataFlowParams, next dataflow.Next[*DataFlowParams]) error { diff --git a/packages/refactored/ledger/storage.go b/packages/refactored/ledger/storage.go index d58ea97c82..2e0a6fecc1 100644 --- a/packages/refactored/ledger/storage.go +++ b/packages/refactored/ledger/storage.go @@ -10,7 +10,6 @@ import ( // region Storage ////////////////////////////////////////////////////////////////////////////////////////////////////// type Storage struct { - TransactionStoredEvent *event.Event[*TransactionStoredEvent] transactionStorage *objectstorage.ObjectStorage[utxo.Transaction] transactionMetadataStorage *objectstorage.ObjectStorage[*TransactionMetadata] consumerStorage *objectstorage.ObjectStorage[*Consumer] @@ -29,22 +28,6 @@ func NewStorage(ledger *Ledger) (newStorage *Storage) { func (s *Storage) Setup() { } -// Store adds a new Transaction to the ledger state. It returns a boolean that indicates whether the -// Transaction was stored. -func (s *Storage) Store(transaction utxo.Transaction) (cachedTransactionMetadata *objectstorage.CachedObject[*TransactionMetadata], stored bool) { - cachedTransactionMetadata = s.CachedTransactionMetadata(transaction.ID(), func(transactionID utxo.TransactionID) *TransactionMetadata { - s.transactionStorage.Store(transaction).Release() - stored = true - return NewTransactionMetadata(transactionID) - }) - - if !stored { - return - } - - return -} - // CachedTransaction retrieves the Transaction with the given TransactionID from the object storage. func (s *Storage) CachedTransaction(transactionID utxo.TransactionID) (cachedTransaction *objectstorage.CachedObject[utxo.Transaction]) { return s.transactionStorage.Load(transactionID.Bytes()) @@ -67,9 +50,9 @@ func (s *Storage) CachedOutput(outputID utxo.OutputID) (cachedOutput *objectstor } // CachedConsumers retrieves the Consumers of the given OutputID from the object storage. -func (u *Storage) CachedConsumers(outputID utxo.OutputID) (cachedConsumers objectstorage.CachedObjects[*Consumer]) { +func (s *Storage) CachedConsumers(outputID utxo.OutputID) (cachedConsumers objectstorage.CachedObjects[*Consumer]) { cachedConsumers = make(objectstorage.CachedObjects[*Consumer], 0) - u.consumerStorage.ForEach(func(key []byte, cachedObject *objectstorage.CachedObject[*Consumer]) bool { + s.consumerStorage.ForEach(func(key []byte, cachedObject *objectstorage.CachedObject[*Consumer]) bool { cachedConsumers = append(cachedConsumers, cachedObject) return true }, objectstorage.WithIteratorPrefix(outputID.Bytes()))