Skip to content

Commit

Permalink
Fixed unclean shutdown bug after the connection got terminated from t…
Browse files Browse the repository at this point in the history
…he remote side or some other catastrophic WAL stream event happened.
  • Loading branch information
noctarius committed Jul 7, 2023
1 parent 6eb1869 commit 1123dbd
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 0 deletions.
10 changes: 10 additions & 0 deletions internal/replication/replicationchannel/replicationhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/noctarius/timescaledb-event-streamer/spi/eventhandlers"
"github.com/noctarius/timescaledb-event-streamer/spi/pgtypes"
"runtime"
"sync/atomic"
"time"
)

Expand All @@ -37,6 +38,7 @@ type replicationHandler struct {
clientXLogPos pglogrepl.LSN
relations map[uint32]*pgtypes.RelationMessage
shutdownAwaiter *supporting.ShutdownAwaiter
loopDead atomic.Bool
lastTransactionId *uint32
logger *logging.Logger
}
Expand All @@ -52,12 +54,16 @@ func newReplicationHandler(replicationContext *repcontext.ReplicationContext) (*
relations: make(map[uint32]*pgtypes.RelationMessage),
shutdownAwaiter: supporting.NewShutdownAwaiter(),
logger: logger,
loopDead: atomic.Bool{},
}, nil
}

func (rh *replicationHandler) stopReplicationHandler() error {
rh.logger.Println("Starting to shutdown")
rh.shutdownAwaiter.SignalShutdown()
if rh.loopDead.Load() {
return nil
}
return rh.shutdownAwaiter.AwaitDone()
}

Expand Down Expand Up @@ -88,6 +94,8 @@ func (rh *replicationHandler) startReplicationHandler(

rawMsg, err := replicationConnection.ReceiveMessage(nextStandbyMessageDeadline)
if err != nil {
runtime.UnlockOSThread()
rh.loopDead.Store(true)
return errors.Wrap(err, 0)
}

Expand Down Expand Up @@ -142,6 +150,8 @@ func (rh *replicationHandler) startReplicationHandler(
}

if err := rh.handleXLogData(extendedXld); err != nil {
runtime.UnlockOSThread()
rh.loopDead.Store(true)
return errors.Wrap(err, 0)
}
rh.replicationContext.AcknowledgeReceived(extendedXld)
Expand Down
118 changes: 118 additions & 0 deletions internal/tests/integration/streamer_restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,121 @@ func (irts *IntegrationRestartTestSuite) Test_Restart_Streamer() {
}),
)
}

func (irts *IntegrationRestartTestSuite) Test_Restart_Streamer_After_Backend_Kill() {
waiter := supporting.NewWaiterWithTimeout(time.Second * 60)
testSink := inttest.NewEventCollectorSink(
inttest.WithFilter(
func(_ time.Time, _ string, envelope inttest.Envelope) bool {
return envelope.Payload.Op == schema.OP_READ || envelope.Payload.Op == schema.OP_CREATE
},
),
inttest.WithPostHook(func(sink *inttest.EventCollectorSink) {
if sink.NumOfEvents() == 1 {
waiter.Signal()
}
if sink.NumOfEvents() == 21 {
waiter.Signal()
}
}),
)

replicationSlotName := supporting.RandomTextString(20)

irts.RunTest(
func(context testrunner.Context) error {
if _, err := context.Exec(stdctx.Background(),
fmt.Sprintf(
"INSERT INTO \"%s\" (ts, val) VALUES ('2023-02-25 00:00:00', 1)",
testrunner.GetAttribute[string](context, "tableName"),
),
); err != nil {
return err
}

if err := context.PrivilegedContext(func(context testrunner.PrivilegedContext) error {
_, err := context.Query(
stdctx.Background(),
"SELECT pg_terminate_backend(rs.active_pid) FROM pg_catalog.pg_replication_slots rs WHERE rs.slot_name = $1",
replicationSlotName,
)
return err
}); err != nil {
return err
}

if err := waiter.Await(); err != nil {
return err
}

if err := context.PauseReplicator(); err != nil {
return err
}
waiter.Reset()

if _, err := context.Exec(stdctx.Background(),
fmt.Sprintf(
"INSERT INTO \"%s\" SELECT ts, ROW_NUMBER() OVER (ORDER BY ts) + 1 AS val FROM GENERATE_SERIES('2023-03-25 00:00:00'::TIMESTAMPTZ, '2023-03-25 00:19:59'::TIMESTAMPTZ, INTERVAL '1 minute') t(ts)",
testrunner.GetAttribute[string](context, "tableName"),
),
); err != nil {
return err
}

if err := context.ResumeReplicator(); err != nil {
return err
}

if err := waiter.Await(); err != nil {
return err
}

for i, event := range testSink.Events() {
expected := i + 1
val := int(event.Envelope.Payload.After["val"].(float64))
if expected != val {
irts.T().Errorf("event order inconsistent %d != %d", expected, val)
return nil
}
}

return nil
},

testrunner.WithSetup(func(context testrunner.SetupContext) error {
_, tn, err := context.CreateHypertable("ts", time.Hour*24,
systemcatalog.NewColumn("ts", pgtype.TimestamptzOID, "timestamptz", false, nil),
systemcatalog.NewColumn("val", pgtype.Int4OID, "integer", false, nil),
)
if err != nil {
return err
}
testrunner.Attribute(context, "tableName", tn)

tempFile, err := inttest.CreateTempFile("restart-replicator")
if err != nil {
return err
}
testrunner.Attribute(context, "tempFile", tempFile)

context.AddSystemConfigConfigurator(testSink.SystemConfigConfigurator)
context.AddSystemConfigConfigurator(func(config *sysconfig.SystemConfig) {
config.Config.PostgreSQL.ReplicationSlot.Name = replicationSlotName
config.Config.PostgreSQL.ReplicationSlot.Create = supporting.AddrOf(true)
config.Config.PostgreSQL.ReplicationSlot.AutoDrop = supporting.AddrOf(false)
config.Config.PostgreSQL.Publication.Name = supporting.RandomTextString(10)
config.Config.PostgreSQL.Publication.Create = supporting.AddrOf(true)
config.Config.PostgreSQL.Publication.AutoDrop = supporting.AddrOf(false)
config.Config.StateStorage.Type = spiconfig.FileStorage
config.Config.StateStorage.FileStorage.Path = tempFile
})
return nil
}),

testrunner.WithTearDown(func(context testrunner.Context) error {
tempFile := testrunner.GetAttribute[string](context, "tempFile")
os.Remove(tempFile)
return nil
}),
)
}

0 comments on commit 1123dbd

Please sign in to comment.