Skip to content

Commit

Permalink
Added test to make sure PG13 / PG14 empty transactions (only COMMIT /…
Browse files Browse the repository at this point in the history
… BEGIN) are handled correctly
  • Loading branch information
noctarius committed Jul 9, 2023
1 parent 3e7ec11 commit 7a1054d
Showing 1 changed file with 113 additions and 0 deletions.
113 changes: 113 additions & 0 deletions internal/tests/integration/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package integration
import (
stdctx "context"
"fmt"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5/pgtype"
"github.com/noctarius/timescaledb-event-streamer/internal/supporting"
"github.com/noctarius/timescaledb-event-streamer/internal/supporting/logging"
"github.com/noctarius/timescaledb-event-streamer/internal/sysconfig"
inttest "github.com/noctarius/timescaledb-event-streamer/internal/testing"
"github.com/noctarius/timescaledb-event-streamer/internal/testing/testrunner"
"github.com/noctarius/timescaledb-event-streamer/internal/version"
"github.com/noctarius/timescaledb-event-streamer/spi/schema"
"github.com/noctarius/timescaledb-event-streamer/spi/systemcatalog"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -1280,3 +1282,114 @@ func (its *IntegrationTestSuite) Ignore_TestRollbackEvents() {
}),
)
}

func (its *IntegrationTestSuite) Test_Acknowledge_To_PG_With_Only_Begin_Commit() {
waiter := supporting.NewWaiterWithTimeout(time.Second * 60)
testSink := inttest.NewEventCollectorSink(
inttest.WithFilter(
func(_ time.Time, _ string, envelope inttest.Envelope) bool {
return envelope.Payload.Op == schema.OP_READ || envelope.Payload.Op == schema.OP_CREATE
},
),
inttest.WithPostHook(func(sink *inttest.EventCollectorSink) {
if sink.NumOfEvents() == 1 {
waiter.Signal()
}
}),
)

replicationSlotName := supporting.RandomTextString(20)
its.RunTest(
func(context testrunner.Context) error {
pgVersion := context.PostgresqlVersion()
if pgVersion >= version.PG_15_VERSION {
fmt.Printf("Skipped test, because of PostgreSQL version <15.0 (%s)", pgVersion.String())
return nil
}

tableName := testrunner.GetAttribute[string](context, "tableName")

var lsn1 pglogrepl.LSN
if err := context.QueryRow(stdctx.Background(),
"SELECT confirmed_flush_lsn FROM pg_catalog.pg_replication_slots WHERE slot_name=$1",
replicationSlotName,
).Scan(&lsn1); err != nil {
return err
}

for i := 0; i < 100; i++ {
if _, err := context.Exec(stdctx.Background(), "INSERT INTO tsdb.foo VALUES($1)", i); err != nil {
return err
}
time.Sleep(time.Millisecond * 5)
}

fmt.Print("Waiting for status update to server\n")
time.Sleep(time.Second * 6)

var lsn2 pglogrepl.LSN
if err := context.QueryRow(stdctx.Background(),
"SELECT confirmed_flush_lsn FROM pg_catalog.pg_replication_slots WHERE slot_name=$1",
replicationSlotName,
).Scan(&lsn2); err != nil {
return err
}

if _, err := context.Exec(stdctx.Background(),
fmt.Sprintf(
"INSERT INTO \"%s\" VALUES('2023-03-25 00:00:01', 654)",
tableName,
),
); err != nil {
return err
}

if err := waiter.Await(); err != nil {
return err
}

if lsn2 <= lsn1 {
its.T().Errorf(
"LSN2 must be larger than LSN1 - LSN1: %s, LSN2: %s",
lsn1.String(), lsn2.String(),
)
}

return nil
},

testrunner.WithSetup(func(context testrunner.SetupContext) error {
_, tn, err := context.CreateHypertable("ts", time.Hour*6,
systemcatalog.NewIndexColumn(
"ts", pgtype.TimestamptzOID, "timestamptz", false, true,
supporting.AddrOf(1), nil, false, supporting.AddrOf("primary"),
systemcatalog.ASC, systemcatalog.NULLS_LAST, false, false, nil, nil,
),
systemcatalog.NewColumn("val", pgtype.Int4OID, "integer", false, nil),
)
if err != nil {
return err
}
testrunner.Attribute(context, "tableName", tn)

if _, err := context.Exec(stdctx.Background(), "CREATE TABLE tsdb.foo (val int)"); err != nil {
return err
}

if _, err := context.Exec(stdctx.Background(),
fmt.Sprintf(
"INSERT INTO \"%s\" SELECT ts, ROW_NUMBER() OVER (ORDER BY ts) AS val FROM GENERATE_SERIES('2023-03-25 00:00:00'::TIMESTAMPTZ, '2023-03-25 23:59:59'::TIMESTAMPTZ, INTERVAL '1 minute') t(ts)",
tn,
),
); err != nil {
return err
}

context.AddSystemConfigConfigurator(testSink.SystemConfigConfigurator)
context.AddSystemConfigConfigurator(func(config *sysconfig.SystemConfig) {
config.PostgreSQL.ReplicationSlot.Name = replicationSlotName
})
return nil
}),
)
}

0 comments on commit 7a1054d

Please sign in to comment.