Skip to content

Commit

Permalink
Merge pull request lightninglabs#132 from halseth/unconfirmed-batches
Browse files Browse the repository at this point in the history
auctioneer: republish all unconfirmed batches on startup
  • Loading branch information
Roasbeef authored Jul 24, 2020
2 parents 0ba148e + 6b951f9 commit ca2365e
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 63 deletions.
75 changes: 50 additions & 25 deletions auctioneer.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,42 +505,67 @@ func DecrementBatchKey(batchKey *btcec.PublicKey) *btcec.PublicKey {
func (a *Auctioneer) rebroadcastPendingBatches() error {
// First, we'll fetch the current batch key, as we'll use this to walk
// backwards to find all the batches that are still pending.
//
// TODO(roasbeef): instead block until batch confirmed?
ctxb := context.Background()
currentBatchKey, err := a.cfg.DB.BatchKey(ctxb)
if err != nil {
return err
}

// Now that we have the current batch key, we'll walk "backwards" by
// decrementing the batch key by -G each time.
priorBatchKey := DecrementBatchKey(currentBatchKey)

// Now for this batch key, we'll check to see if the batch has been
// marked as finalized on disk or not.
var batchID orderT.BatchID
copy(batchID[:], priorBatchKey.SerializeCompressed())
batchConfirmed, err := a.cfg.DB.BatchConfirmed(ctxb, batchID)
if err != nil && err != subastadb.ErrNoBatchExists {
return err
// We keep a list of the batches to re-publish. To ensure propagation,
// we publish them in the order they were created.
type batch struct {
tx *wire.MsgTx
id orderT.BatchID
}
var batches []batch

for {
// Now that we have the current batch key, we'll walk
// "backwards" by decrementing the batch key by -G each time.
currentBatchKey = DecrementBatchKey(currentBatchKey)

// Now for this batch key, we'll check to see if the batch has
// been marked as finalized on disk or not.
var batchID orderT.BatchID
copy(batchID[:], currentBatchKey.SerializeCompressed())
batchConfirmed, err := a.cfg.DB.BatchConfirmed(ctxb, batchID)
if err != nil && err != subastadb.ErrNoBatchExists {
return err
}

// If this batch is confirmed (or a batch has never existed),
// then we can exit early.
if batchConfirmed || err == subastadb.ErrNoBatchExists {
break
}

// Now that we know this batch isn't finalized, we'll fetch the
// batch transaction from disk so we can rebroadcast it.
var priorBatchID orderT.BatchID
copy(priorBatchID[:], currentBatchKey.SerializeCompressed())
_, batchTx, err := a.cfg.DB.GetBatchSnapshot(ctxb, priorBatchID)
if err != nil {
return err
}

// If this batch is confirmed (or a batch has never existed), then we
// can exit early.
if batchConfirmed || err == subastadb.ErrNoBatchExists {
return nil
batches = append(batches, batch{
tx: batchTx,
id: batchID,
})
}

// Now that we know this batch isn't finalized, we'll fetch the batch
// transaction from disk so we can rebroadcast it.
var priorBatchID orderT.BatchID
copy(priorBatchID[:], priorBatchKey.SerializeCompressed())
_, batchTx, err := a.cfg.DB.GetBatchSnapshot(ctxb, priorBatchID)
if err != nil {
return err
log.Infof("Rebroadcasting %d unconfirmed batch transactions",
len(batches))

// Publish them in the order they were originally created.
for i := len(batches) - 1; i >= 0; i-- {
b := batches[i]
if err = a.publishBatchTx(ctxb, b.tx, b.id); err != nil {
return err
}
}
return a.publishBatchTx(ctxb, batchTx, batchID)

return nil
}

// blockFeeder is a dedicated goroutine that listens for updates to the chain
Expand Down
130 changes: 92 additions & 38 deletions auctioneer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ type mockWallet struct {

balance btcutil.Amount

lastTx *wire.MsgTx
lastTxs []*wire.MsgTx
}

func (m *mockWallet) SendOutputs(cctx context.Context, outputs []*wire.TxOut,
Expand All @@ -232,13 +232,15 @@ func (m *mockWallet) SendOutputs(cctx context.Context, outputs []*wire.TxOut,
m.Lock()
defer m.Unlock()

m.lastTx = wire.NewMsgTx(2)
m.lastTx.TxIn = []*wire.TxIn{
lastTx := wire.NewMsgTx(2)
lastTx.TxIn = []*wire.TxIn{
{},
}
m.lastTx.TxOut = outputs
lastTx.TxOut = outputs

return m.lastTx, nil
m.lastTxs = append(m.lastTxs, lastTx)

return lastTx, nil
}

func (m *mockWallet) ConfirmedWalletBalance(context.Context) (btcutil.Amount, error) {
Expand All @@ -252,18 +254,14 @@ func (m *mockWallet) ListTransactions(context.Context) ([]*wire.MsgTx, error) {
m.RLock()
defer m.RUnlock()

if m.lastTx == nil {
return nil, nil
}

return []*wire.MsgTx{m.lastTx}, nil
return m.lastTxs, nil
}

func (m *mockWallet) PublishTransaction(ctx context.Context, tx *wire.MsgTx) error {
m.Lock()
defer m.Unlock()

m.lastTx = tx
m.lastTxs = append(m.lastTxs, tx)
return nil
}

Expand Down Expand Up @@ -505,7 +503,7 @@ func (a *auctioneerTestHarness) AssertTxBroadcast() *wire.MsgTx {
a.wallet.RLock()
defer a.wallet.RUnlock()

if a.wallet.lastTx == nil {
if len(a.wallet.lastTxs) != 1 {
return fmt.Errorf("no tx broadcast")
}

Expand All @@ -519,7 +517,30 @@ func (a *auctioneerTestHarness) AssertTxBroadcast() *wire.MsgTx {

a.wallet.RLock()
defer a.wallet.RUnlock()
return a.wallet.lastTx
return a.wallet.lastTxs[0]
}

func (a *auctioneerTestHarness) AssertNTxsBroadcast(n int) []*wire.MsgTx {
var txs []*wire.MsgTx
checkBroadcast := func() error {
a.wallet.RLock()
defer a.wallet.RUnlock()

if len(a.wallet.lastTxs) != n {
return fmt.Errorf("%d txs broadcast",
len(a.wallet.lastTxs))
}

txs = a.wallet.lastTxs
return nil
}

err := wait.NoError(checkBroadcast, time.Second*5)
if err != nil {
a.t.Fatal(err)
}

return txs
}

func (a *auctioneerTestHarness) RestartAuctioneer() {
Expand All @@ -540,9 +561,11 @@ func (a *auctioneerTestHarness) RestartAuctioneer() {
a.StartAuctioneer()
}

func (a *auctioneerTestHarness) SendConf(tx *wire.MsgTx) {
a.notifier.ConfChan <- &chainntnfs.TxConfirmation{
Tx: tx,
func (a *auctioneerTestHarness) SendConf(txs ...*wire.MsgTx) {
for _, tx := range txs {
a.notifier.ConfChan <- &chainntnfs.TxConfirmation{
Tx: tx,
}
}
}

Expand Down Expand Up @@ -973,45 +996,76 @@ func TestAuctioneerLoadDiskOrders(t *testing.T) {
}

// TestAuctioneerPendingBatchRebroadcast tests that if we come online, and
// there's a pending batch on disk, then we'll broadcast the pending batch.
// there are pending batches on disk, then we'll re-broadcast them.
func TestAuctioneerPendingBatchRebroadcast(t *testing.T) {
t.Parallel()

const numPending = 3

// First, we'll start up the auctioneer as normal.
testHarness := newAuctioneerTestHarness(t)

// We'll grab the current batch key, then increment it by one. The new
// batch key will be the current batch key from the PoV of the
// auctioneer.
startingBatchKey := testHarness.db.batchKey
nextBatchKey := clmscript.IncrementKey(startingBatchKey)
testHarness.db.batchKey = nextBatchKey

batchTx := &wire.MsgTx{
TxOut: []*wire.TxOut{
{
PkScript: key[:],
// We'll grab the current batch key, then increment it by every time we
// create a new batch below.
currentBatchKey := testHarness.db.batchKey

var batchKeys []*btcec.PublicKey
for i := 0; i < numPending; i++ {
// Create a batch tx, using the script to encode what batch
// this corresponds to.
var s [32]byte
copy(s[:], key[:])
s[0] = byte(i)
batchTx := &wire.MsgTx{
TxOut: []*wire.TxOut{
{
PkScript: s[:],
},
},
},
}
}

// We'll now insert a pending batch snapshot and transaction, marking
// it as unconfirmed.
testHarness.MarkBatchUnconfirmed(startingBatchKey, batchTx)
// We'll now insert a pending batch snapshot and transaction,
// marking it as unconfirmed.
testHarness.MarkBatchUnconfirmed(currentBatchKey, batchTx)
batchKeys = append(batchKeys, currentBatchKey)

// Now that the new batch has been marked unconfirmed, we
// increment the current batch key by one. The new batch key
// will be the current batch key from the PoV of the
// auctioneer.
currentBatchKey = clmscript.IncrementKey(currentBatchKey)
testHarness.db.batchKey = currentBatchKey
}

// Now that the batch is marked unconfirmed, we'll start up the
// auctioneer. It should recognize this batch is still unconfirmed, and
// publish the transaction again.
// publish the unconfirmed batch transactions again.
testHarness.StartAuctioneer()
broadcastTx := testHarness.AssertTxBroadcast()
broadcastTxs := testHarness.AssertNTxsBroadcast(numPending)

if len(broadcastTxs) != numPending {
t.Fatalf("expected %d transactions, found %d",
numPending, len(broadcastTxs))
}

// The auctioneer should now be waiting for this transaction to
// The order of the broadcasted transactions should be the same as the
// original ordering.
for i, tx := range broadcastTxs {
b := tx.TxOut[0].PkScript[0]
if b != byte(i) {
t.Fatalf("tx %d had script byte %d", i, b)
}
}

// The auctioneer should now be waiting for these transactions to
// confirm, so we'll dispatch a confirmation.
testHarness.SendConf(broadcastTx)
testHarness.SendConf(broadcastTxs...)

// At this point, the batch that was marked unconfirmed, should now
// show up as being confirmed.
testHarness.AssertBatchConfirmed(startingBatchKey)
for _, batchKey := range batchKeys {
testHarness.AssertBatchConfirmed(batchKey)
}
}

// TestAuctioneerBatchTickNoop tests that if a master account is present, and
Expand Down

0 comments on commit ca2365e

Please sign in to comment.