From 7a1054db2defa0d71eb76e86bc89dae4bac42dce Mon Sep 17 00:00:00 2001 From: "Christoph Engelbert (noctarius)" Date: Sun, 9 Jul 2023 14:00:07 +0200 Subject: [PATCH] Added test to make sure PG13 / PG14 empty transactions (only COMMIT / BEGIN) are handled correctly --- internal/tests/integration/streamer_test.go | 113 ++++++++++++++++++++ 1 file changed, 113 insertions(+) diff --git a/internal/tests/integration/streamer_test.go b/internal/tests/integration/streamer_test.go index 775ffde..eddd853 100644 --- a/internal/tests/integration/streamer_test.go +++ b/internal/tests/integration/streamer_test.go @@ -20,12 +20,14 @@ package integration import ( stdctx "context" "fmt" + "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5/pgtype" "github.com/noctarius/timescaledb-event-streamer/internal/supporting" "github.com/noctarius/timescaledb-event-streamer/internal/supporting/logging" "github.com/noctarius/timescaledb-event-streamer/internal/sysconfig" inttest "github.com/noctarius/timescaledb-event-streamer/internal/testing" "github.com/noctarius/timescaledb-event-streamer/internal/testing/testrunner" + "github.com/noctarius/timescaledb-event-streamer/internal/version" "github.com/noctarius/timescaledb-event-streamer/spi/schema" "github.com/noctarius/timescaledb-event-streamer/spi/systemcatalog" "github.com/stretchr/testify/suite" @@ -1280,3 +1282,114 @@ func (its *IntegrationTestSuite) Ignore_TestRollbackEvents() { }), ) } + +func (its *IntegrationTestSuite) Test_Acknowledge_To_PG_With_Only_Begin_Commit() { + waiter := supporting.NewWaiterWithTimeout(time.Second * 60) + testSink := inttest.NewEventCollectorSink( + inttest.WithFilter( + func(_ time.Time, _ string, envelope inttest.Envelope) bool { + return envelope.Payload.Op == schema.OP_READ || envelope.Payload.Op == schema.OP_CREATE + }, + ), + inttest.WithPostHook(func(sink *inttest.EventCollectorSink) { + if sink.NumOfEvents() == 1 { + waiter.Signal() + } + }), + ) + + replicationSlotName := supporting.RandomTextString(20) + its.RunTest( + func(context testrunner.Context) error { + pgVersion := context.PostgresqlVersion() + if pgVersion >= version.PG_15_VERSION { + fmt.Printf("Skipped test, because of PostgreSQL version <15.0 (%s)", pgVersion.String()) + return nil + } + + tableName := testrunner.GetAttribute[string](context, "tableName") + + var lsn1 pglogrepl.LSN + if err := context.QueryRow(stdctx.Background(), + "SELECT confirmed_flush_lsn FROM pg_catalog.pg_replication_slots WHERE slot_name=$1", + replicationSlotName, + ).Scan(&lsn1); err != nil { + return err + } + + for i := 0; i < 100; i++ { + if _, err := context.Exec(stdctx.Background(), "INSERT INTO tsdb.foo VALUES($1)", i); err != nil { + return err + } + time.Sleep(time.Millisecond * 5) + } + + fmt.Print("Waiting for status update to server\n") + time.Sleep(time.Second * 6) + + var lsn2 pglogrepl.LSN + if err := context.QueryRow(stdctx.Background(), + "SELECT confirmed_flush_lsn FROM pg_catalog.pg_replication_slots WHERE slot_name=$1", + replicationSlotName, + ).Scan(&lsn2); err != nil { + return err + } + + if _, err := context.Exec(stdctx.Background(), + fmt.Sprintf( + "INSERT INTO \"%s\" VALUES('2023-03-25 00:00:01', 654)", + tableName, + ), + ); err != nil { + return err + } + + if err := waiter.Await(); err != nil { + return err + } + + if lsn2 <= lsn1 { + its.T().Errorf( + "LSN2 must be larger than LSN1 - LSN1: %s, LSN2: %s", + lsn1.String(), lsn2.String(), + ) + } + + return nil + }, + + testrunner.WithSetup(func(context testrunner.SetupContext) error { + _, tn, err := context.CreateHypertable("ts", time.Hour*6, + systemcatalog.NewIndexColumn( + "ts", pgtype.TimestamptzOID, "timestamptz", false, true, + supporting.AddrOf(1), nil, false, supporting.AddrOf("primary"), + systemcatalog.ASC, systemcatalog.NULLS_LAST, false, false, nil, nil, + ), + systemcatalog.NewColumn("val", pgtype.Int4OID, "integer", false, nil), + ) + if err != nil { + return err + } + testrunner.Attribute(context, "tableName", tn) + + if _, err := context.Exec(stdctx.Background(), "CREATE TABLE tsdb.foo (val int)"); err != nil { + return err + } + + if _, err := context.Exec(stdctx.Background(), + fmt.Sprintf( + "INSERT INTO \"%s\" SELECT ts, ROW_NUMBER() OVER (ORDER BY ts) AS val FROM GENERATE_SERIES('2023-03-25 00:00:00'::TIMESTAMPTZ, '2023-03-25 23:59:59'::TIMESTAMPTZ, INTERVAL '1 minute') t(ts)", + tn, + ), + ); err != nil { + return err + } + + context.AddSystemConfigConfigurator(testSink.SystemConfigConfigurator) + context.AddSystemConfigConfigurator(func(config *sysconfig.SystemConfig) { + config.PostgreSQL.ReplicationSlot.Name = replicationSlotName + }) + return nil + }), + ) +}