Skip to content

Commit

Permalink
Node/Acct: Remove obsolete pending transfers from db
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Jul 10, 2024
1 parent 6236a9a commit 8057ffa
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 27 deletions.
18 changes: 6 additions & 12 deletions node/pkg/db/accountant.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ func (d *Database) AcctGetData(logger *zap.Logger) ([]*common.MessagePublication
})
}

// See if we have any old format pending transfers.
// Any pending transfers in the old format are long since obsolete. Just delete them.
if err == nil {
oldPendingTransfers := []*common.MessagePublication{}
oldPendingTransfers := []string{}
prefixBytes := []byte(acctOldPendingTransfer)
err = d.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
Expand All @@ -116,7 +116,7 @@ func (d *Database) AcctGetData(logger *zap.Logger) ([]*common.MessagePublication
continue
}

oldPendingTransfers = append(oldPendingTransfers, pt)
oldPendingTransfers = append(oldPendingTransfers, pt.MessageIDString())
} else {
return fmt.Errorf("unexpected accountant pending transfer key '%s'", string(key))
}
Expand All @@ -126,20 +126,14 @@ func (d *Database) AcctGetData(logger *zap.Logger) ([]*common.MessagePublication
})

if err == nil && len(oldPendingTransfers) != 0 {
pendingTransfers = append(pendingTransfers, oldPendingTransfers...)
for _, pt := range oldPendingTransfers {
logger.Info("updating format of database entry for pending vaa", zap.String("msgId", pt.MessageIDString()))
err := d.AcctStorePendingTransfer(pt)
if err != nil {
return pendingTransfers, fmt.Errorf("failed to write new pending msg for key [%v]: %w", pt.MessageIDString(), err)
}

key := acctOldPendingTransferMsgID(pt.MessageIDString())
key := acctOldPendingTransferMsgID(pt)
logger.Info("deleting obsolete pending transfer", zap.String("msgId", pt), zap.String("key", string(key)))
if err := d.db.Update(func(txn *badger.Txn) error {
err := txn.Delete(key)
return err
}); err != nil {
return pendingTransfers, fmt.Errorf("failed to delete old pending msg for key [%v]: %w", pt.MessageIDString(), err)
return pendingTransfers, fmt.Errorf("failed to delete old pending msg for key [%v]: %w", pt, err)
}
}
}
Expand Down
24 changes: 9 additions & 15 deletions node/pkg/db/accountant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"encoding/binary"
"os"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -201,7 +200,7 @@ func TestAcctGetData(t *testing.T) {
assert.Equal(t, *msg2, *pendings[1])
}

func TestAcctLoadingOldPendings(t *testing.T) {
func TestAcctLoadingWhereOldPendingsGetDropped(t *testing.T) {
dbPath := t.TempDir()
db, err := Open(dbPath)
if err != nil {
Expand All @@ -210,6 +209,8 @@ func TestAcctLoadingOldPendings(t *testing.T) {
defer db.Close()
defer os.Remove(dbPath)

logger := zap.NewNop()

tokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
require.NoError(t, err)

Expand Down Expand Up @@ -249,27 +250,20 @@ func TestAcctLoadingOldPendings(t *testing.T) {
err = db.AcctStorePendingTransfer(pending2)
require.Nil(t, err)

logger := zap.NewNop()
// When we reload the data, the first one should get dropped, so we should get back only one.
pendings, err := db.AcctGetData(logger)
require.NoError(t, err)
require.Equal(t, 2, len(pendings))

// Updated old pending events get placed at the end, so we need to sort into timestamp order.
sort.SliceStable(pendings, func(i, j int) bool {
return pendings[i].Timestamp.Before(pendings[j].Timestamp)
})
require.Equal(t, 1, len(pendings))

assert.Equal(t, *pending1, *pendings[0])
assert.Equal(t, *pending2, *pendings[1])
assert.Equal(t, *pending2, *pendings[0])

// Make sure we can reload the updated pendings.
// Make sure we can still reload things after deleting the old one.
pendings2, err := db.AcctGetData(logger)

require.Nil(t, err)
require.Equal(t, 2, len(pendings2))
require.Equal(t, 1, len(pendings2))

assert.Equal(t, pending1, pendings2[0])
assert.Equal(t, pending2, pendings2[1])
assert.Equal(t, pending2, pendings2[0])
}

func (d *Database) acctStoreOldPendingTransfer(t *testing.T, msg *common.MessagePublication) {
Expand Down

0 comments on commit 8057ffa

Please sign in to comment.