Skip to content

Commit

Permalink
txSyncer: service does not start after node catching up (#4768)
Browse files Browse the repository at this point in the history
  • Loading branch information
algonautshant authored Nov 18, 2022
1 parent fe580fd commit 9791d64
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 3 deletions.
1 change: 1 addition & 0 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func MakeService(log logging.Logger, config config.Local, net network.GossipNode
func (s *Service) Start() {
s.done = make(chan struct{})
s.ctx, s.cancel = context.WithCancel(context.Background())
atomic.StoreUint32(&s.initialSyncNotified, 0)
s.InitialSyncDone = make(chan struct{})
go s.periodicSync()
}
Expand Down
3 changes: 3 additions & 0 deletions rpcs/txService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package rpcs

import (
"context"
"net"
"net/http"
"net/url"
Expand Down Expand Up @@ -149,6 +150,8 @@ func TestTxSync(t *testing.T) {
syncTimeout := time.Second
syncerPool := makeMockPendingTxAggregate(0)
syncer := MakeTxSyncer(syncerPool, nodeB, &handler, syncInterval, syncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize)
// Since syncer is not Started, set the context here
syncer.ctx, syncer.cancel = context.WithCancel(context.Background())
require.NoError(t, syncer.sync())
require.Equal(t, int32(3), atomic.LoadInt32(&handler.messageCounter))
}
Expand Down
4 changes: 1 addition & 3 deletions rpcs/txSyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,10 @@ type TxSyncer struct {

// MakeTxSyncer returns a TxSyncer
func MakeTxSyncer(pool PendingTxAggregate, clientSource network.GossipNode, txHandler data.SolicitedTxHandler, syncInterval time.Duration, syncTimeout time.Duration, serverResponseSize int) *TxSyncer {
ctx, cancel := context.WithCancel(context.Background())
return &TxSyncer{
pool: pool,
clientSource: clientSource,
handler: txHandler,
ctx: ctx,
cancel: cancel,
syncInterval: syncInterval,
syncTimeout: syncTimeout,
log: logging.Base(),
Expand All @@ -76,6 +73,7 @@ func MakeTxSyncer(pool PendingTxAggregate, clientSource network.GossipNode, txHa
// Start begins periodically syncing after the canStart chanel indicates it can begin
func (syncer *TxSyncer) Start(canStart chan struct{}) {
syncer.wg.Add(1)
syncer.ctx, syncer.cancel = context.WithCancel(context.Background())
go func() {
defer syncer.wg.Done()
select {
Expand Down
14 changes: 14 additions & 0 deletions rpcs/txSyncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ func TestSyncFromClient(t *testing.T) {
clientAgg := mockClientAggregator{peers: []network.Peer{&client}}
handler := mockHandler{}
syncer := MakeTxSyncer(clientPool, &clientAgg, &handler, testSyncInterval, testSyncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize)
// Since syncer is not Started, set the context here
syncer.ctx, syncer.cancel = context.WithCancel(context.Background())
syncer.log = logging.TestingLog(t)

require.NoError(t, syncer.syncFromClient(&client))
Expand All @@ -211,6 +213,8 @@ func TestSyncFromUnsupportedClient(t *testing.T) {
clientAgg := mockClientAggregator{peers: []network.Peer{&client}}
handler := mockHandler{}
syncer := MakeTxSyncer(pool, &clientAgg, &handler, testSyncInterval, testSyncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize)
// Since syncer is not Started, set the context here
syncer.ctx, syncer.cancel = context.WithCancel(context.Background())
syncer.log = logging.TestingLog(t)

require.Error(t, syncer.syncFromClient(&client))
Expand All @@ -226,6 +230,8 @@ func TestSyncFromClientAndQuit(t *testing.T) {
clientAgg := mockClientAggregator{peers: []network.Peer{&client}}
handler := mockHandler{}
syncer := MakeTxSyncer(pool, &clientAgg, &handler, testSyncInterval, testSyncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize)
// Since syncer is not Started, set the context here
syncer.ctx, syncer.cancel = context.WithCancel(context.Background())
syncer.log = logging.TestingLog(t)
syncer.cancel()
require.Error(t, syncer.syncFromClient(&client))
Expand All @@ -241,6 +247,8 @@ func TestSyncFromClientAndError(t *testing.T) {
clientAgg := mockClientAggregator{peers: []network.Peer{&client}}
handler := mockHandler{}
syncer := MakeTxSyncer(pool, &clientAgg, &handler, testSyncInterval, testSyncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize)
// Since syncer is not Started, set the context here
syncer.ctx, syncer.cancel = context.WithCancel(context.Background())
syncer.log = logging.TestingLog(t)
require.Error(t, syncer.syncFromClient(&client))
require.Zero(t, atomic.LoadInt32(&handler.messageCounter))
Expand All @@ -256,6 +264,8 @@ func TestSyncFromClientAndTimeout(t *testing.T) {
handler := mockHandler{}
syncTimeout := time.Duration(0)
syncer := MakeTxSyncer(pool, &clientAgg, &handler, testSyncInterval, syncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize)
// Since syncer is not Started, set the context here
syncer.ctx, syncer.cancel = context.WithCancel(context.Background())
syncer.log = logging.TestingLog(t)
require.Error(t, syncer.syncFromClient(&client))
require.Zero(t, atomic.LoadInt32(&handler.messageCounter))
Expand All @@ -277,6 +287,8 @@ func TestSync(t *testing.T) {
handler := mockHandler{}
syncerPool := makeMockPendingTxAggregate(3)
syncer := MakeTxSyncer(syncerPool, &clientAgg, &handler, testSyncInterval, testSyncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize)
// Since syncer is not Started, set the context here
syncer.ctx, syncer.cancel = context.WithCancel(context.Background())
syncer.log = logging.TestingLog(t)

require.NoError(t, syncer.sync())
Expand All @@ -290,6 +302,8 @@ func TestNoClientsSync(t *testing.T) {
clientAgg := mockClientAggregator{peers: []network.Peer{}}
handler := mockHandler{}
syncer := MakeTxSyncer(pool, &clientAgg, &handler, testSyncInterval, testSyncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize)
// Since syncer is not Started, set the context here
syncer.ctx, syncer.cancel = context.WithCancel(context.Background())
syncer.log = logging.TestingLog(t)

require.NoError(t, syncer.sync())
Expand Down
155 changes: 155 additions & 0 deletions test/e2e-go/features/catchup/catchpointCatchup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,3 +536,158 @@ outer:
_, err = fixture.WaitForConfirmedTxn(status.LastRound+50, addrs2[0], tx.ID().String())
a.NoError(err)
}

// TestNodeTxSyncRestart starts a two-node and one relay network
// Waits until a catchpoint is created
// Lets the primary node have the majority of the stake
// Stops the primary node to miss the next transaction
// Sends a transaction from the second node
// Starts the primary node, and immediately after start the catchup
// The transaction will be confirmed only when the TxSync of the pools passes the transaction to the primary node
func TestNodeTxSyncRestart(t *testing.T) {
partitiontest.PartitionTest(t)
defer fixtures.ShutdownSynchronizedTest(t)

if testing.Short() {
t.Skip()
}
a := require.New(fixtures.SynchronizedTest(t))

consensus := make(config.ConsensusProtocols)
protoVersion := protocol.ConsensusCurrentVersion
catchpointCatchupProtocol := config.Consensus[protoVersion]
catchpointCatchupProtocol.ApprovedUpgrades = map[protocol.ConsensusVersion]uint64{}
// MaxBalLookback = 2 x SeedRefreshInterval x SeedLookback
// ref. https://github.com/algorandfoundation/specs/blob/master/dev/abft.md
catchpointCatchupProtocol.SeedLookback = 2
catchpointCatchupProtocol.SeedRefreshInterval = 2
catchpointCatchupProtocol.MaxBalLookback = 2 * catchpointCatchupProtocol.SeedLookback * catchpointCatchupProtocol.SeedRefreshInterval
catchpointCatchupProtocol.CatchpointLookback = catchpointCatchupProtocol.MaxBalLookback
catchpointCatchupProtocol.EnableOnlineAccountCatchpoints = true
catchpointCatchupProtocol.StateProofInterval = 0
if runtime.GOOS == "darwin" || runtime.GOARCH == "amd64" {
// amd64/macos platforms are generally quite capable, so accelerate the round times to make the test run faster.
catchpointCatchupProtocol.AgreementFilterTimeoutPeriod0 = 1 * time.Second
catchpointCatchupProtocol.AgreementFilterTimeout = 1 * time.Second
}
consensus[protoVersion] = catchpointCatchupProtocol

var fixture fixtures.RestClientFixture
fixture.SetConsensus(consensus)
fixture.SetupNoStart(t, filepath.Join("nettemplates", "TwoNodes50EachWithRelay.json"))

// Get primary node
primaryNode, err := fixture.GetNodeController("Node1")
a.NoError(err)
// Get secondary node
secondNode, err := fixture.GetNodeController("Node2")
a.NoError(err)
// Get the relay
relayNode, err := fixture.GetNodeController("Relay")
a.NoError(err)

// prepare it's configuration file to set it to generate a catchpoint every 16 rounds.
cfg, err := config.LoadConfigFromDisk(primaryNode.GetDataDir())
a.NoError(err)
const catchpointInterval = 16
cfg.CatchpointInterval = catchpointInterval
cfg.CatchpointTracking = 2
cfg.MaxAcctLookback = 2
cfg.Archival = false

// Shorten the txn sync interval so the test can run faster
cfg.TxSyncIntervalSeconds = 4

cfg.SaveToDisk(primaryNode.GetDataDir())
cfg.SaveToDisk(secondNode.GetDataDir())

cfg, err = config.LoadConfigFromDisk(relayNode.GetDataDir())
a.NoError(err)
cfg.TxSyncIntervalSeconds = 4
cfg.SaveToDisk(relayNode.GetDataDir())

fixture.Start()
defer fixture.LibGoalFixture.Shutdown()

client1 := fixture.GetLibGoalClientFromNodeController(primaryNode)
client2 := fixture.GetLibGoalClientFromNodeController(secondNode)
wallet1, err := client1.GetUnencryptedWalletHandle()
a.NoError(err)
wallet2, err := client2.GetUnencryptedWalletHandle()
a.NoError(err)
addrs1, err := client1.ListAddresses(wallet1)
a.NoError(err)
addrs2, err := client2.ListAddresses(wallet2)
a.NoError(err)

// let the second node have insufficient stake for proposing a block
tx, err := client2.SendPaymentFromUnencryptedWallet(addrs2[0], addrs1[0], 1000, 4999999999000000, nil)
a.NoError(err)
status, err := client1.Status()
a.NoError(err)
_, err = fixture.WaitForConfirmedTxn(status.LastRound+100, addrs1[0], tx.ID().String())
a.NoError(err)
targetCatchpointRound := status.LastRound

// ensure the catchpoint is created for targetCatchpointRound
timer := time.NewTimer(100 * time.Second)
outer:
for {
status, err = client1.Status()
a.NoError(err)

var round basics.Round
if status.LastCatchpoint != nil && len(*status.LastCatchpoint) > 0 {
round, _, err = ledgercore.ParseCatchpointLabel(*status.LastCatchpoint)
a.NoError(err)
if uint64(round) >= targetCatchpointRound {
break
}
}
select {
case <-timer.C:
a.Failf("timeout waiting a catchpoint", "target: %d, got %d", targetCatchpointRound, round)
break outer
default:
time.Sleep(250 * time.Millisecond)
}
}

// stop the primary node
client1.FullStop()

// let the 2nd client send a transaction
tx, err = client2.SendPaymentFromUnencryptedWallet(addrs2[0], addrs1[0], 1000, 50000, nil)
a.NoError(err)

// now that the primary missed the transaction, start it, and let it catchup
_, err = fixture.StartNode(primaryNode.GetDataDir())
a.NoError(err)
// let the primary node catchup
err = client1.Catchup(*status.LastCatchpoint)
a.NoError(err)

// the transaction should not be confirmed yet
_, err = fixture.WaitForConfirmedTxn(0, addrs2[0], tx.ID().String())
a.Error(err)

// Wait for the catchup
for t := 0; t < 10; t++ {
status1, err := client1.Status()
a.NoError(err)
status2, err := client2.Status()
a.NoError(err)

if status1.LastRound+1 >= status2.LastRound {
// if the primary node is within 1 round of the secondary node, then it has
// caught up
break
}
time.Sleep(catchpointCatchupProtocol.AgreementFilterTimeout)
}

status, err = client2.Status()
a.NoError(err)
_, err = fixture.WaitForConfirmedTxn(status.LastRound+50, addrs2[0], tx.ID().String())
a.NoError(err)
}

0 comments on commit 9791d64

Please sign in to comment.