diff --git a/internal/replication/context/replicationconnection.go b/internal/replication/context/replicationconnection.go index e06b758..10a78a1 100644 --- a/internal/replication/context/replicationconnection.go +++ b/internal/replication/context/replicationconnection.go @@ -95,10 +95,10 @@ func (rc *ReplicationConnection) SendStatusUpdate() error { return nil } -func (rc *ReplicationConnection) StartReplication(pluginArguments []string) error { +func (rc *ReplicationConnection) StartReplication(pluginArguments []string) (pgtypes.LSN, error) { restartLSN, err := rc.locateRestartLSN(rc.replicationContext.sideChannel.readReplicationSlot) if err != nil { - return errors.Wrap(err, 0) + return 0, errors.Wrap(err, 0) } // Configure initial LSN in case there isn't anything immediate to handle @@ -112,17 +112,17 @@ func (rc *ReplicationConnection) StartReplication(pluginArguments []string) erro }, ); err != nil { if err := rc.reconnect(); err != nil { - return errors.Wrap(err, 0) + return 0, errors.Wrap(err, 0) } - return pglogrepl.StartReplication(context.Background(), rc.conn, + return restartLSN, pglogrepl.StartReplication(context.Background(), rc.conn, rc.replicationContext.ReplicationSlotName(), pglogrepl.LSN(restartLSN), pglogrepl.StartReplicationOptions{ PluginArgs: pluginArguments, }, ) } - return nil + return restartLSN, nil } func (rc *ReplicationConnection) StopReplication() error { diff --git a/internal/replication/logicalreplicationresolver/replicationresolver.go b/internal/replication/logicalreplicationresolver/replicationresolver.go index 49eb9bd..de9a5e3 100644 --- a/internal/replication/logicalreplicationresolver/replicationresolver.go +++ b/internal/replication/logicalreplicationresolver/replicationresolver.go @@ -145,8 +145,8 @@ func (l *logicalReplicationResolver) OnBeginEvent(xld pgtypes.XLogData, msg *pgt return nil } -func (l *logicalReplicationResolver) OnCommitEvent(xld pgtypes.XLogData, msg *pgtypes.CommitMessage) error { - l.replicationContext.SetLastCommitLSN(pgtypes.LSN(xld.WALStart + pglogrepl.LSN(len(xld.WALData)))) +func (l *logicalReplicationResolver) OnCommitEvent(_ pgtypes.XLogData, msg *pgtypes.CommitMessage) error { + l.replicationContext.SetLastCommitLSN(pgtypes.LSN(msg.TransactionEndLSN)) return nil } diff --git a/internal/replication/logicalreplicationresolver/transactiontracker.go b/internal/replication/logicalreplicationresolver/transactiontracker.go index dacdcfe..7e514ab 100644 --- a/internal/replication/logicalreplicationresolver/transactiontracker.go +++ b/internal/replication/logicalreplicationresolver/transactiontracker.go @@ -107,6 +107,14 @@ func (tt *transactionTracker) OnBeginEvent(xld pgtypes.XLogData, msg *pgtypes.Be } func (tt *transactionTracker) OnCommitEvent(xld pgtypes.XLogData, msg *pgtypes.CommitMessage) error { + // There isn't a running transaction, which can happen when we + // got restarted and the last processed LSN was inside a running + // transaction. In this case we skip all earlier logrepl messages + // and keep going from where we left off. + if tt.currentTransaction == nil { + return tt.resolver.OnCommitEvent(xld, msg) + } + currentTransaction := tt.currentTransaction tt.currentTransaction = nil diff --git a/internal/replication/replicationchannel/replicationchannel.go b/internal/replication/replicationchannel/replicationchannel.go index 1fe30f0..1552bee 100644 --- a/internal/replication/replicationchannel/replicationchannel.go +++ b/internal/replication/replicationchannel/replicationchannel.go @@ -144,7 +144,8 @@ func (rc *ReplicationChannel) StartReplicationChannel( return nil } - if err := replicationConnection.StartReplication(pluginArguments); err != nil { + restartLSN, err := replicationConnection.StartReplication(pluginArguments) + if err != nil { if rc.shutdownRequested.Load() { // If we tried to start replication before the shutdown was initiated, // we totally expect that to fail, and we can just ignore the error @@ -154,7 +155,7 @@ func (rc *ReplicationChannel) StartReplicationChannel( } go func() { - err := replicationHandler.startReplicationHandler(replicationConnection) + err := replicationHandler.startReplicationHandler(replicationConnection, restartLSN) if err != nil { rc.logger.Fatalf("Issue handling WAL stream: %s", err) } diff --git a/internal/replication/replicationchannel/replicationhandler.go b/internal/replication/replicationchannel/replicationhandler.go index fe12eb4..31c62fd 100644 --- a/internal/replication/replicationchannel/replicationhandler.go +++ b/internal/replication/replicationchannel/replicationhandler.go @@ -61,7 +61,10 @@ func (rh *replicationHandler) stopReplicationHandler() error { return rh.shutdownAwaiter.AwaitDone() } -func (rh *replicationHandler) startReplicationHandler(replicationConnection *repcontext.ReplicationConnection) error { +func (rh *replicationHandler) startReplicationHandler( + replicationConnection *repcontext.ReplicationConnection, restartLSN pgtypes.LSN, +) error { + standbyMessageTimeout := time.Second * 10 nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout) @@ -132,7 +135,13 @@ func (rh *replicationHandler) startReplicationHandler(replicationConnection *rep Xid: xid, } - if err := rh.handleXLogData(extendedXld); err != nil { + // Skip all entries that were already replicated before the streamer was shut down + msgType := pglogrepl.MessageType(xld.WALData[0]) + if msgType != pglogrepl.MessageTypeRelation && restartLSN > pgtypes.LSN(xld.WALStart) { + continue + } + + if err := rh.handleXLogData(extendedXld, restartLSN); err != nil { return errors.Wrap(err, 0) } rh.replicationContext.AcknowledgeReceived(extendedXld) @@ -140,7 +149,7 @@ func (rh *replicationHandler) startReplicationHandler(replicationConnection *rep } } -func (rh *replicationHandler) handleXLogData(xld pgtypes.XLogData) error { +func (rh *replicationHandler) handleXLogData(xld pgtypes.XLogData, restartLSN pgtypes.LSN) error { msg, err := pgdecoding.ParseXlogData(xld.WALData, rh.lastTransactionId) if err != nil { return fmt.Errorf("parsing logical replication message: %s", err) diff --git a/internal/tests/integration/streamer_restart_test.go b/internal/tests/integration/streamer_restart_test.go index 7bcdaaf..e79bd49 100644 --- a/internal/tests/integration/streamer_restart_test.go +++ b/internal/tests/integration/streamer_restart_test.go @@ -42,8 +42,8 @@ func TestIntegrationRestartTestSuite(t *testing.T) { suite.Run(t, new(IntegrationRestartTestSuite)) } -func (irts *IntegrationRestartTestSuite) Test_() { - waiter := supporting.NewWaiterWithTimeout(time.Second * 30) +func (irts *IntegrationRestartTestSuite) Test_Restart_Streamer() { + waiter := supporting.NewWaiterWithTimeout(time.Second * 600) testSink := inttest.NewEventCollectorSink( inttest.WithFilter( func(_ time.Time, _ string, envelope inttest.Envelope) bool {