Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txSyncer: service does not start after node catching up #4768

Merged
1 change: 1 addition & 0 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func MakeService(log logging.Logger, config config.Local, net network.GossipNode
func (s *Service) Start() {
s.done = make(chan struct{})
s.ctx, s.cancel = context.WithCancel(context.Background())
atomic.StoreUint32(&s.initialSyncNotified, 0)
s.InitialSyncDone = make(chan struct{})
go s.periodicSync()
}
Expand Down
3 changes: 3 additions & 0 deletions rpcs/txService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package rpcs

import (
"context"
"net"
"net/http"
"net/url"
Expand Down Expand Up @@ -149,6 +150,8 @@ func TestTxSync(t *testing.T) {
syncTimeout := time.Second
syncerPool := makeMockPendingTxAggregate(0)
syncer := MakeTxSyncer(syncerPool, nodeB, &handler, syncInterval, syncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize)
// Since syncer is not Started, set the context here
syncer.ctx, syncer.cancel = context.WithCancel(context.Background())
require.NoError(t, syncer.sync())
require.Equal(t, int32(3), atomic.LoadInt32(&handler.messageCounter))
}
Expand Down
4 changes: 1 addition & 3 deletions rpcs/txSyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,10 @@ type TxSyncer struct {

// MakeTxSyncer returns a TxSyncer
func MakeTxSyncer(pool PendingTxAggregate, clientSource network.GossipNode, txHandler data.SolicitedTxHandler, syncInterval time.Duration, syncTimeout time.Duration, serverResponseSize int) *TxSyncer {
ctx, cancel := context.WithCancel(context.Background())
return &TxSyncer{
pool: pool,
clientSource: clientSource,
handler: txHandler,
ctx: ctx,
cancel: cancel,
syncInterval: syncInterval,
syncTimeout: syncTimeout,
log: logging.Base(),
Expand All @@ -76,6 +73,7 @@ func MakeTxSyncer(pool PendingTxAggregate, clientSource network.GossipNode, txHa
// Start begins periodically syncing after the canStart chanel indicates it can begin
func (syncer *TxSyncer) Start(canStart chan struct{}) {
syncer.wg.Add(1)
syncer.ctx, syncer.cancel = context.WithCancel(context.Background())
go func() {
defer syncer.wg.Done()
select {
Expand Down
14 changes: 14 additions & 0 deletions rpcs/txSyncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ func TestSyncFromClient(t *testing.T) {
clientAgg := mockClientAggregator{peers: []network.Peer{&client}}
handler := mockHandler{}
syncer := MakeTxSyncer(clientPool, &clientAgg, &handler, testSyncInterval, testSyncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize)
// Since syncer is not Started, set the context here
syncer.ctx, syncer.cancel = context.WithCancel(context.Background())
syncer.log = logging.TestingLog(t)

require.NoError(t, syncer.syncFromClient(&client))
Expand All @@ -211,6 +213,8 @@ func TestSyncFromUnsupportedClient(t *testing.T) {
clientAgg := mockClientAggregator{peers: []network.Peer{&client}}
handler := mockHandler{}
syncer := MakeTxSyncer(pool, &clientAgg, &handler, testSyncInterval, testSyncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize)
// Since syncer is not Started, set the context here
syncer.ctx, syncer.cancel = context.WithCancel(context.Background())
syncer.log = logging.TestingLog(t)

require.Error(t, syncer.syncFromClient(&client))
Expand All @@ -226,6 +230,8 @@ func TestSyncFromClientAndQuit(t *testing.T) {
clientAgg := mockClientAggregator{peers: []network.Peer{&client}}
handler := mockHandler{}
syncer := MakeTxSyncer(pool, &clientAgg, &handler, testSyncInterval, testSyncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize)
// Since syncer is not Started, set the context here
syncer.ctx, syncer.cancel = context.WithCancel(context.Background())
syncer.log = logging.TestingLog(t)
syncer.cancel()
require.Error(t, syncer.syncFromClient(&client))
Expand All @@ -241,6 +247,8 @@ func TestSyncFromClientAndError(t *testing.T) {
clientAgg := mockClientAggregator{peers: []network.Peer{&client}}
handler := mockHandler{}
syncer := MakeTxSyncer(pool, &clientAgg, &handler, testSyncInterval, testSyncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize)
// Since syncer is not Started, set the context here
syncer.ctx, syncer.cancel = context.WithCancel(context.Background())
syncer.log = logging.TestingLog(t)
require.Error(t, syncer.syncFromClient(&client))
require.Zero(t, atomic.LoadInt32(&handler.messageCounter))
Expand All @@ -256,6 +264,8 @@ func TestSyncFromClientAndTimeout(t *testing.T) {
handler := mockHandler{}
syncTimeout := time.Duration(0)
syncer := MakeTxSyncer(pool, &clientAgg, &handler, testSyncInterval, syncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize)
// Since syncer is not Started, set the context here
syncer.ctx, syncer.cancel = context.WithCancel(context.Background())
syncer.log = logging.TestingLog(t)
require.Error(t, syncer.syncFromClient(&client))
require.Zero(t, atomic.LoadInt32(&handler.messageCounter))
Expand All @@ -277,6 +287,8 @@ func TestSync(t *testing.T) {
handler := mockHandler{}
syncerPool := makeMockPendingTxAggregate(3)
syncer := MakeTxSyncer(syncerPool, &clientAgg, &handler, testSyncInterval, testSyncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize)
// Since syncer is not Started, set the context here
syncer.ctx, syncer.cancel = context.WithCancel(context.Background())
syncer.log = logging.TestingLog(t)

require.NoError(t, syncer.sync())
Expand All @@ -290,6 +302,8 @@ func TestNoClientsSync(t *testing.T) {
clientAgg := mockClientAggregator{peers: []network.Peer{}}
handler := mockHandler{}
syncer := MakeTxSyncer(pool, &clientAgg, &handler, testSyncInterval, testSyncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize)
// Since syncer is not Started, set the context here
syncer.ctx, syncer.cancel = context.WithCancel(context.Background())
syncer.log = logging.TestingLog(t)

require.NoError(t, syncer.sync())
Expand Down
156 changes: 155 additions & 1 deletion test/e2e-go/features/catchup/catchpointCatchup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,6 @@ func TestCatchpointLabelGeneration(t *testing.T) {
break
}
currentRound++

}
log.Infof("done building!\n")

Expand All @@ -400,3 +399,158 @@ func TestCatchpointLabelGeneration(t *testing.T) {
})
}
}

// TestNodeTxSyncRestart starts a two-node and one relay network
// Waits until a catchpoint is created
// Lets the primary node have the majority of the stake
// Stops the primary node to miss the next transaction
// Sends a transaction from the second node
// Starts the primary node, and immediately after start the catchup
// The transaction will be confirmed only when the TxSync of the pools passes the transaction to the primary node
func TestNodeTxSyncRestart(t *testing.T) {
partitiontest.PartitionTest(t)
defer fixtures.ShutdownSynchronizedTest(t)

if testing.Short() {
t.Skip()
}
a := require.New(fixtures.SynchronizedTest(t))

consensus := make(config.ConsensusProtocols)
protoVersion := protocol.ConsensusCurrentVersion
catchpointCatchupProtocol := config.Consensus[protoVersion]
catchpointCatchupProtocol.ApprovedUpgrades = map[protocol.ConsensusVersion]uint64{}
// MaxBalLookback = 2 x SeedRefreshInterval x SeedLookback
// ref. https://github.com/algorandfoundation/specs/blob/master/dev/abft.md
catchpointCatchupProtocol.SeedLookback = 2
catchpointCatchupProtocol.SeedRefreshInterval = 2
catchpointCatchupProtocol.MaxBalLookback = 2 * catchpointCatchupProtocol.SeedLookback * catchpointCatchupProtocol.SeedRefreshInterval
catchpointCatchupProtocol.CatchpointLookback = catchpointCatchupProtocol.MaxBalLookback
catchpointCatchupProtocol.EnableOnlineAccountCatchpoints = true
catchpointCatchupProtocol.StateProofInterval = 0
if runtime.GOOS == "darwin" || runtime.GOARCH == "amd64" {
// amd64/macos platforms are generally quite capable, so accelerate the round times to make the test run faster.
catchpointCatchupProtocol.AgreementFilterTimeoutPeriod0 = 1 * time.Second
catchpointCatchupProtocol.AgreementFilterTimeout = 1 * time.Second
}
consensus[protoVersion] = catchpointCatchupProtocol

var fixture fixtures.RestClientFixture
fixture.SetConsensus(consensus)
fixture.SetupNoStart(t, filepath.Join("nettemplates", "TwoNodes50EachWithRelay.json"))

// Get primary node
primaryNode, err := fixture.GetNodeController("Node1")
a.NoError(err)
// Get secondary node
secondNode, err := fixture.GetNodeController("Node2")
a.NoError(err)
// Get the relay
relayNode, err := fixture.GetNodeController("Relay")
a.NoError(err)

// prepare it's configuration file to set it to generate a catchpoint every 16 rounds.
cfg, err := config.LoadConfigFromDisk(primaryNode.GetDataDir())
a.NoError(err)
const catchpointInterval = 16
cfg.CatchpointInterval = catchpointInterval
cfg.CatchpointTracking = 2
cfg.MaxAcctLookback = 2
cfg.Archival = false

// Shorten the txn sync interval so the test can run faster
cfg.TxSyncIntervalSeconds = 4

cfg.SaveToDisk(primaryNode.GetDataDir())
cfg.SaveToDisk(secondNode.GetDataDir())

cfg, err = config.LoadConfigFromDisk(relayNode.GetDataDir())
a.NoError(err)
cfg.TxSyncIntervalSeconds = 4
cfg.SaveToDisk(relayNode.GetDataDir())

fixture.Start()
defer fixture.LibGoalFixture.Shutdown()

client1 := fixture.GetLibGoalClientFromNodeController(primaryNode)
client2 := fixture.GetLibGoalClientFromNodeController(secondNode)
wallet1, err := client1.GetUnencryptedWalletHandle()
a.NoError(err)
wallet2, err := client2.GetUnencryptedWalletHandle()
a.NoError(err)
addrs1, err := client1.ListAddresses(wallet1)
a.NoError(err)
addrs2, err := client2.ListAddresses(wallet2)
a.NoError(err)

// let the second node have insufficient stake for proposing a block
tx, err := client2.SendPaymentFromUnencryptedWallet(addrs2[0], addrs1[0], 1000, 4999999999000000, nil)
a.NoError(err)
status, err := client1.Status()
a.NoError(err)
_, err = fixture.WaitForConfirmedTxn(status.LastRound+100, addrs1[0], tx.ID().String())
a.NoError(err)
targetCatchpointRound := status.LastRound

// ensure the catchpoint is created for targetCatchpointRound
timer := time.NewTimer(100 * time.Second)
outer:
for {
status, err = client1.Status()
a.NoError(err)

var round basics.Round
if status.LastCatchpoint != nil && len(*status.LastCatchpoint) > 0 {
round, _, err = ledgercore.ParseCatchpointLabel(*status.LastCatchpoint)
a.NoError(err)
if uint64(round) >= targetCatchpointRound {
break
}
}
select {
case <-timer.C:
a.Failf("timeout waiting a catchpoint", "target: %d, got %d", targetCatchpointRound, round)
break outer
default:
time.Sleep(250 * time.Millisecond)
}
}

// stop the primary node
client1.FullStop()

// let the 2nd client send a transaction
tx, err = client2.SendPaymentFromUnencryptedWallet(addrs2[0], addrs1[0], 1000, 50000, nil)
a.NoError(err)

// now that the primary missed the transaction, start it, and let it catchup
_, err = fixture.StartNode(primaryNode.GetDataDir())
a.NoError(err)
// let the primary node catchup
err = client1.Catchup(*status.LastCatchpoint)
a.NoError(err)

// the transaction should not be confirmed yet
_, err = fixture.WaitForConfirmedTxn(0, addrs2[0], tx.ID().String())
a.Error(err)

// Wait for the catchup
for t := 0; t < 10; t++ {
status1, err := client1.Status()
a.NoError(err)
status2, err := client2.Status()
a.NoError(err)

if status1.LastRound+1 >= status2.LastRound {
// if the primary node is within 1 round of the secondary node, then it has
// caught up
break
}
time.Sleep(catchpointCatchupProtocol.AgreementFilterTimeout)
}

status, err = client2.Status()
a.NoError(err)
_, err = fixture.WaitForConfirmedTxn(status.LastRound+50, addrs2[0], tx.ID().String())
a.NoError(err)
}