diff --git a/catchup/service.go b/catchup/service.go index 1ebaf0fd3b..1043718fe5 100644 --- a/catchup/service.go +++ b/catchup/service.go @@ -124,6 +124,7 @@ func MakeService(log logging.Logger, config config.Local, net network.GossipNode func (s *Service) Start() { s.done = make(chan struct{}) s.ctx, s.cancel = context.WithCancel(context.Background()) + atomic.StoreUint32(&s.initialSyncNotified, 0) s.InitialSyncDone = make(chan struct{}) go s.periodicSync() } diff --git a/rpcs/txService_test.go b/rpcs/txService_test.go index 4db86d10cd..5ced5b1a3b 100644 --- a/rpcs/txService_test.go +++ b/rpcs/txService_test.go @@ -17,6 +17,7 @@ package rpcs import ( + "context" "net" "net/http" "net/url" @@ -149,6 +150,8 @@ func TestTxSync(t *testing.T) { syncTimeout := time.Second syncerPool := makeMockPendingTxAggregate(0) syncer := MakeTxSyncer(syncerPool, nodeB, &handler, syncInterval, syncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize) + // Since syncer is not Started, set the context here + syncer.ctx, syncer.cancel = context.WithCancel(context.Background()) require.NoError(t, syncer.sync()) require.Equal(t, int32(3), atomic.LoadInt32(&handler.messageCounter)) } diff --git a/rpcs/txSyncer.go b/rpcs/txSyncer.go index cbeeb27fc5..3996371690 100644 --- a/rpcs/txSyncer.go +++ b/rpcs/txSyncer.go @@ -59,13 +59,10 @@ type TxSyncer struct { // MakeTxSyncer returns a TxSyncer func MakeTxSyncer(pool PendingTxAggregate, clientSource network.GossipNode, txHandler data.SolicitedTxHandler, syncInterval time.Duration, syncTimeout time.Duration, serverResponseSize int) *TxSyncer { - ctx, cancel := context.WithCancel(context.Background()) return &TxSyncer{ pool: pool, clientSource: clientSource, handler: txHandler, - ctx: ctx, - cancel: cancel, syncInterval: syncInterval, syncTimeout: syncTimeout, log: logging.Base(), @@ -76,6 +73,7 @@ func MakeTxSyncer(pool PendingTxAggregate, clientSource network.GossipNode, txHa // Start begins periodically syncing after the canStart chanel indicates it can begin func (syncer *TxSyncer) Start(canStart chan struct{}) { syncer.wg.Add(1) + syncer.ctx, syncer.cancel = context.WithCancel(context.Background()) go func() { defer syncer.wg.Done() select { diff --git a/rpcs/txSyncer_test.go b/rpcs/txSyncer_test.go index 0d09bb0878..377080ae8d 100644 --- a/rpcs/txSyncer_test.go +++ b/rpcs/txSyncer_test.go @@ -196,6 +196,8 @@ func TestSyncFromClient(t *testing.T) { clientAgg := mockClientAggregator{peers: []network.Peer{&client}} handler := mockHandler{} syncer := MakeTxSyncer(clientPool, &clientAgg, &handler, testSyncInterval, testSyncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize) + // Since syncer is not Started, set the context here + syncer.ctx, syncer.cancel = context.WithCancel(context.Background()) syncer.log = logging.TestingLog(t) require.NoError(t, syncer.syncFromClient(&client)) @@ -211,6 +213,8 @@ func TestSyncFromUnsupportedClient(t *testing.T) { clientAgg := mockClientAggregator{peers: []network.Peer{&client}} handler := mockHandler{} syncer := MakeTxSyncer(pool, &clientAgg, &handler, testSyncInterval, testSyncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize) + // Since syncer is not Started, set the context here + syncer.ctx, syncer.cancel = context.WithCancel(context.Background()) syncer.log = logging.TestingLog(t) require.Error(t, syncer.syncFromClient(&client)) @@ -226,6 +230,8 @@ func TestSyncFromClientAndQuit(t *testing.T) { clientAgg := mockClientAggregator{peers: []network.Peer{&client}} handler := mockHandler{} syncer := MakeTxSyncer(pool, &clientAgg, &handler, testSyncInterval, testSyncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize) + // Since syncer is not Started, set the context here + syncer.ctx, syncer.cancel = context.WithCancel(context.Background()) syncer.log = logging.TestingLog(t) syncer.cancel() require.Error(t, syncer.syncFromClient(&client)) @@ -241,6 +247,8 @@ func TestSyncFromClientAndError(t *testing.T) { clientAgg := mockClientAggregator{peers: []network.Peer{&client}} handler := mockHandler{} syncer := MakeTxSyncer(pool, &clientAgg, &handler, testSyncInterval, testSyncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize) + // Since syncer is not Started, set the context here + syncer.ctx, syncer.cancel = context.WithCancel(context.Background()) syncer.log = logging.TestingLog(t) require.Error(t, syncer.syncFromClient(&client)) require.Zero(t, atomic.LoadInt32(&handler.messageCounter)) @@ -256,6 +264,8 @@ func TestSyncFromClientAndTimeout(t *testing.T) { handler := mockHandler{} syncTimeout := time.Duration(0) syncer := MakeTxSyncer(pool, &clientAgg, &handler, testSyncInterval, syncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize) + // Since syncer is not Started, set the context here + syncer.ctx, syncer.cancel = context.WithCancel(context.Background()) syncer.log = logging.TestingLog(t) require.Error(t, syncer.syncFromClient(&client)) require.Zero(t, atomic.LoadInt32(&handler.messageCounter)) @@ -277,6 +287,8 @@ func TestSync(t *testing.T) { handler := mockHandler{} syncerPool := makeMockPendingTxAggregate(3) syncer := MakeTxSyncer(syncerPool, &clientAgg, &handler, testSyncInterval, testSyncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize) + // Since syncer is not Started, set the context here + syncer.ctx, syncer.cancel = context.WithCancel(context.Background()) syncer.log = logging.TestingLog(t) require.NoError(t, syncer.sync()) @@ -290,6 +302,8 @@ func TestNoClientsSync(t *testing.T) { clientAgg := mockClientAggregator{peers: []network.Peer{}} handler := mockHandler{} syncer := MakeTxSyncer(pool, &clientAgg, &handler, testSyncInterval, testSyncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize) + // Since syncer is not Started, set the context here + syncer.ctx, syncer.cancel = context.WithCancel(context.Background()) syncer.log = logging.TestingLog(t) require.NoError(t, syncer.sync()) diff --git a/test/e2e-go/features/catchup/catchpointCatchup_test.go b/test/e2e-go/features/catchup/catchpointCatchup_test.go index 7d70247d02..f745c9e759 100644 --- a/test/e2e-go/features/catchup/catchpointCatchup_test.go +++ b/test/e2e-go/features/catchup/catchpointCatchup_test.go @@ -536,3 +536,158 @@ outer: _, err = fixture.WaitForConfirmedTxn(status.LastRound+50, addrs2[0], tx.ID().String()) a.NoError(err) } + +// TestNodeTxSyncRestart starts a two-node and one relay network +// Waits until a catchpoint is created +// Lets the primary node have the majority of the stake +// Stops the primary node to miss the next transaction +// Sends a transaction from the second node +// Starts the primary node, and immediately after start the catchup +// The transaction will be confirmed only when the TxSync of the pools passes the transaction to the primary node +func TestNodeTxSyncRestart(t *testing.T) { + partitiontest.PartitionTest(t) + defer fixtures.ShutdownSynchronizedTest(t) + + if testing.Short() { + t.Skip() + } + a := require.New(fixtures.SynchronizedTest(t)) + + consensus := make(config.ConsensusProtocols) + protoVersion := protocol.ConsensusCurrentVersion + catchpointCatchupProtocol := config.Consensus[protoVersion] + catchpointCatchupProtocol.ApprovedUpgrades = map[protocol.ConsensusVersion]uint64{} + // MaxBalLookback = 2 x SeedRefreshInterval x SeedLookback + // ref. https://github.com/algorandfoundation/specs/blob/master/dev/abft.md + catchpointCatchupProtocol.SeedLookback = 2 + catchpointCatchupProtocol.SeedRefreshInterval = 2 + catchpointCatchupProtocol.MaxBalLookback = 2 * catchpointCatchupProtocol.SeedLookback * catchpointCatchupProtocol.SeedRefreshInterval + catchpointCatchupProtocol.CatchpointLookback = catchpointCatchupProtocol.MaxBalLookback + catchpointCatchupProtocol.EnableOnlineAccountCatchpoints = true + catchpointCatchupProtocol.StateProofInterval = 0 + if runtime.GOOS == "darwin" || runtime.GOARCH == "amd64" { + // amd64/macos platforms are generally quite capable, so accelerate the round times to make the test run faster. + catchpointCatchupProtocol.AgreementFilterTimeoutPeriod0 = 1 * time.Second + catchpointCatchupProtocol.AgreementFilterTimeout = 1 * time.Second + } + consensus[protoVersion] = catchpointCatchupProtocol + + var fixture fixtures.RestClientFixture + fixture.SetConsensus(consensus) + fixture.SetupNoStart(t, filepath.Join("nettemplates", "TwoNodes50EachWithRelay.json")) + + // Get primary node + primaryNode, err := fixture.GetNodeController("Node1") + a.NoError(err) + // Get secondary node + secondNode, err := fixture.GetNodeController("Node2") + a.NoError(err) + // Get the relay + relayNode, err := fixture.GetNodeController("Relay") + a.NoError(err) + + // prepare it's configuration file to set it to generate a catchpoint every 16 rounds. + cfg, err := config.LoadConfigFromDisk(primaryNode.GetDataDir()) + a.NoError(err) + const catchpointInterval = 16 + cfg.CatchpointInterval = catchpointInterval + cfg.CatchpointTracking = 2 + cfg.MaxAcctLookback = 2 + cfg.Archival = false + + // Shorten the txn sync interval so the test can run faster + cfg.TxSyncIntervalSeconds = 4 + + cfg.SaveToDisk(primaryNode.GetDataDir()) + cfg.SaveToDisk(secondNode.GetDataDir()) + + cfg, err = config.LoadConfigFromDisk(relayNode.GetDataDir()) + a.NoError(err) + cfg.TxSyncIntervalSeconds = 4 + cfg.SaveToDisk(relayNode.GetDataDir()) + + fixture.Start() + defer fixture.LibGoalFixture.Shutdown() + + client1 := fixture.GetLibGoalClientFromNodeController(primaryNode) + client2 := fixture.GetLibGoalClientFromNodeController(secondNode) + wallet1, err := client1.GetUnencryptedWalletHandle() + a.NoError(err) + wallet2, err := client2.GetUnencryptedWalletHandle() + a.NoError(err) + addrs1, err := client1.ListAddresses(wallet1) + a.NoError(err) + addrs2, err := client2.ListAddresses(wallet2) + a.NoError(err) + + // let the second node have insufficient stake for proposing a block + tx, err := client2.SendPaymentFromUnencryptedWallet(addrs2[0], addrs1[0], 1000, 4999999999000000, nil) + a.NoError(err) + status, err := client1.Status() + a.NoError(err) + _, err = fixture.WaitForConfirmedTxn(status.LastRound+100, addrs1[0], tx.ID().String()) + a.NoError(err) + targetCatchpointRound := status.LastRound + + // ensure the catchpoint is created for targetCatchpointRound + timer := time.NewTimer(100 * time.Second) +outer: + for { + status, err = client1.Status() + a.NoError(err) + + var round basics.Round + if status.LastCatchpoint != nil && len(*status.LastCatchpoint) > 0 { + round, _, err = ledgercore.ParseCatchpointLabel(*status.LastCatchpoint) + a.NoError(err) + if uint64(round) >= targetCatchpointRound { + break + } + } + select { + case <-timer.C: + a.Failf("timeout waiting a catchpoint", "target: %d, got %d", targetCatchpointRound, round) + break outer + default: + time.Sleep(250 * time.Millisecond) + } + } + + // stop the primary node + client1.FullStop() + + // let the 2nd client send a transaction + tx, err = client2.SendPaymentFromUnencryptedWallet(addrs2[0], addrs1[0], 1000, 50000, nil) + a.NoError(err) + + // now that the primary missed the transaction, start it, and let it catchup + _, err = fixture.StartNode(primaryNode.GetDataDir()) + a.NoError(err) + // let the primary node catchup + err = client1.Catchup(*status.LastCatchpoint) + a.NoError(err) + + // the transaction should not be confirmed yet + _, err = fixture.WaitForConfirmedTxn(0, addrs2[0], tx.ID().String()) + a.Error(err) + + // Wait for the catchup + for t := 0; t < 10; t++ { + status1, err := client1.Status() + a.NoError(err) + status2, err := client2.Status() + a.NoError(err) + + if status1.LastRound+1 >= status2.LastRound { + // if the primary node is within 1 round of the secondary node, then it has + // caught up + break + } + time.Sleep(catchpointCatchupProtocol.AgreementFilterTimeout) + } + + status, err = client2.Status() + a.NoError(err) + _, err = fixture.WaitForConfirmedTxn(status.LastRound+50, addrs2[0], tx.ID().String()) + a.NoError(err) +}