Skip to content

Commit

Permalink
Fixed restart unit tests which needed to handle the situation where a…
Browse files Browse the repository at this point in the history
…n existing transaction was partly replayed
  • Loading branch information
noctarius committed Jul 6, 2023
1 parent 8ad7737 commit 3ef2940
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 14 deletions.
10 changes: 5 additions & 5 deletions internal/replication/context/replicationconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ func (rc *ReplicationConnection) SendStatusUpdate() error {
return nil
}

func (rc *ReplicationConnection) StartReplication(pluginArguments []string) error {
func (rc *ReplicationConnection) StartReplication(pluginArguments []string) (pgtypes.LSN, error) {
restartLSN, err := rc.locateRestartLSN(rc.replicationContext.sideChannel.readReplicationSlot)
if err != nil {
return errors.Wrap(err, 0)
return 0, errors.Wrap(err, 0)
}

// Configure initial LSN in case there isn't anything immediate to handle
Expand All @@ -112,17 +112,17 @@ func (rc *ReplicationConnection) StartReplication(pluginArguments []string) erro
},
); err != nil {
if err := rc.reconnect(); err != nil {
return errors.Wrap(err, 0)
return 0, errors.Wrap(err, 0)
}

return pglogrepl.StartReplication(context.Background(), rc.conn,
return restartLSN, pglogrepl.StartReplication(context.Background(), rc.conn,
rc.replicationContext.ReplicationSlotName(), pglogrepl.LSN(restartLSN),
pglogrepl.StartReplicationOptions{
PluginArgs: pluginArguments,
},
)
}
return nil
return restartLSN, nil
}

func (rc *ReplicationConnection) StopReplication() error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ func (l *logicalReplicationResolver) OnBeginEvent(xld pgtypes.XLogData, msg *pgt
return nil
}

func (l *logicalReplicationResolver) OnCommitEvent(xld pgtypes.XLogData, msg *pgtypes.CommitMessage) error {
l.replicationContext.SetLastCommitLSN(pgtypes.LSN(xld.WALStart + pglogrepl.LSN(len(xld.WALData))))
func (l *logicalReplicationResolver) OnCommitEvent(_ pgtypes.XLogData, msg *pgtypes.CommitMessage) error {
l.replicationContext.SetLastCommitLSN(pgtypes.LSN(msg.TransactionEndLSN))
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ func (tt *transactionTracker) OnBeginEvent(xld pgtypes.XLogData, msg *pgtypes.Be
}

func (tt *transactionTracker) OnCommitEvent(xld pgtypes.XLogData, msg *pgtypes.CommitMessage) error {
// There isn't a running transaction, which can happen when we
// got restarted and the last processed LSN was inside a running
// transaction. In this case we skip all earlier logrepl messages
// and keep going from where we left off.
if tt.currentTransaction == nil {
return tt.resolver.OnCommitEvent(xld, msg)
}

currentTransaction := tt.currentTransaction
tt.currentTransaction = nil

Expand Down
5 changes: 3 additions & 2 deletions internal/replication/replicationchannel/replicationchannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ func (rc *ReplicationChannel) StartReplicationChannel(
return nil
}

if err := replicationConnection.StartReplication(pluginArguments); err != nil {
restartLSN, err := replicationConnection.StartReplication(pluginArguments)
if err != nil {
if rc.shutdownRequested.Load() {
// If we tried to start replication before the shutdown was initiated,
// we totally expect that to fail, and we can just ignore the error
Expand All @@ -154,7 +155,7 @@ func (rc *ReplicationChannel) StartReplicationChannel(
}

go func() {
err := replicationHandler.startReplicationHandler(replicationConnection)
err := replicationHandler.startReplicationHandler(replicationConnection, restartLSN)
if err != nil {
rc.logger.Fatalf("Issue handling WAL stream: %s", err)
}
Expand Down
15 changes: 12 additions & 3 deletions internal/replication/replicationchannel/replicationhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ func (rh *replicationHandler) stopReplicationHandler() error {
return rh.shutdownAwaiter.AwaitDone()
}

func (rh *replicationHandler) startReplicationHandler(replicationConnection *repcontext.ReplicationConnection) error {
func (rh *replicationHandler) startReplicationHandler(
replicationConnection *repcontext.ReplicationConnection, restartLSN pgtypes.LSN,
) error {

standbyMessageTimeout := time.Second * 10
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)

Expand Down Expand Up @@ -132,15 +135,21 @@ func (rh *replicationHandler) startReplicationHandler(replicationConnection *rep
Xid: xid,
}

if err := rh.handleXLogData(extendedXld); err != nil {
// Skip all entries that were already replicated before the streamer was shut down
msgType := pglogrepl.MessageType(xld.WALData[0])
if msgType != pglogrepl.MessageTypeRelation && restartLSN > pgtypes.LSN(xld.WALStart) {
continue
}

if err := rh.handleXLogData(extendedXld, restartLSN); err != nil {
return errors.Wrap(err, 0)
}
rh.replicationContext.AcknowledgeReceived(extendedXld)
}
}
}

func (rh *replicationHandler) handleXLogData(xld pgtypes.XLogData) error {
func (rh *replicationHandler) handleXLogData(xld pgtypes.XLogData, restartLSN pgtypes.LSN) error {
msg, err := pgdecoding.ParseXlogData(xld.WALData, rh.lastTransactionId)
if err != nil {
return fmt.Errorf("parsing logical replication message: %s", err)
Expand Down
4 changes: 2 additions & 2 deletions internal/tests/integration/streamer_restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func TestIntegrationRestartTestSuite(t *testing.T) {
suite.Run(t, new(IntegrationRestartTestSuite))
}

func (irts *IntegrationRestartTestSuite) Test_() {
waiter := supporting.NewWaiterWithTimeout(time.Second * 30)
func (irts *IntegrationRestartTestSuite) Test_Restart_Streamer() {
waiter := supporting.NewWaiterWithTimeout(time.Second * 600)
testSink := inttest.NewEventCollectorSink(
inttest.WithFilter(
func(_ time.Time, _ string, envelope inttest.Envelope) bool {
Expand Down

0 comments on commit 3ef2940

Please sign in to comment.