Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added unit test for implicit decompressions #85

Merged
merged 6 commits into from
Oct 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type logicalReplicationResolver struct {
typeManager pgtypes.TypeManager
logger *logging.Logger

decompressionEventTaskOperation func(task.Task) error

relations *containers.RelationCache[*pgtypes.RelationMessage]
chunkIdLookup *containers.RelationCache[int32]
eventQueues map[string]*containers.Queue[snapshotCallback]
Expand Down Expand Up @@ -78,13 +80,24 @@ func newLogicalReplicationResolver(
genHypertableMessageEvent := spiconfig.GetOrDefault(config, spiconfig.PropertyHypertableEventsMessage, true)
genPostgresqlMessageEvent := spiconfig.GetOrDefault(config, spiconfig.PropertyPostgresqlEventsMessage, true)

// There's a difference in the order of event enqueuing pre-, and post TimescaleDB 2.12
// due to the introduction of decompression markers. Therefore, pre 2.12 we need to
// schedule the decompression event to the end of the operation queue, while with 2.12+
// we need to execute it immediately (if decompression markers are enabled that is).
decompressionEventTaskOperation := taskManager.EnqueueTask
if replicationContext.IsDecompressionMarkingEnabled() {
decompressionEventTaskOperation = taskManager.RunTask
}

return &logicalReplicationResolver{
replicationContext: replicationContext,
systemCatalog: systemCatalog,
taskManager: taskManager,
typeManager: typeManager,
logger: logger,

decompressionEventTaskOperation: decompressionEventTaskOperation,

relations: containers.NewRelationCache[*pgtypes.RelationMessage](),
chunkIdLookup: containers.NewRelationCache[int32](),
eventQueues: make(map[string]*containers.Queue[snapshotCallback]),
Expand Down Expand Up @@ -673,7 +686,7 @@ func (l *logicalReplicationResolver) onChunkDecompressionEvent(
return nil
}

if err := l.taskManager.EnqueueTask(func(notificator task.Notificator) {
if err := l.decompressionEventTaskOperation(func(notificator task.Notificator) {
notificator.NotifyCompressionReplicationEventHandler(
func(handler eventhandlers.CompressionReplicationEventHandler) error {
return handler.OnChunkDecompressedEvent(xld, uncompressedHypertable, chunk)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package logicalreplicationresolver

import (
"bytes"
"github.com/jackc/pglogrepl"
"github.com/noctarius/timescaledb-event-streamer/internal/containers"
"github.com/noctarius/timescaledb-event-streamer/internal/logging"
Expand Down Expand Up @@ -64,7 +65,7 @@ func newTransactionTracker(
relations: containers.NewRelationCache[*pgtypes.RelationMessage](),
logger: logger,
resolver: resolver,
supportsDecompressionMarkers: replicationContext.IsTSDB212GE(),
supportsDecompressionMarkers: replicationContext.IsDecompressionMarkingEnabled(),
}

tt.activeTransaction = &transaction{
Expand Down Expand Up @@ -378,16 +379,22 @@ func (tt *transactionTracker) OnMessageEvent(
xld pgtypes.XLogData, msg *pgtypes.LogicalReplicationMessage,
) error {

// Clone content to release the original array which seems to be a slice of the
// original byte array from pgx
msg.Content = bytes.Clone(msg.Content)

// If the message is transactional we need to store it into the currently collected
// transaction, otherwise we can run it straight away.
if msg.IsTransactional() {
if msg.Prefix == decompressionMarkerStartId {
tt.activeTransaction.ongoingDecompression = true
return nil
} else if msg.Prefix == decompressionMarkerEndId &&
(tt.activeTransaction.active && tt.activeTransaction.ongoingDecompression) {
tt.activeTransaction.ongoingDecompression = false
return nil
if tt.supportsDecompressionMarkers {
if msg.Prefix == decompressionMarkerStartId {
tt.activeTransaction.ongoingDecompression = true
return nil
} else if msg.Prefix == decompressionMarkerEndId &&
(tt.activeTransaction.active && tt.activeTransaction.ongoingDecompression) {
tt.activeTransaction.ongoingDecompression = false
return nil
}
}

// If we don't want to generate the message events later one, we'll discard it
Expand Down Expand Up @@ -428,6 +435,7 @@ func (tt *transactionTracker) startTransaction(
tt.activeTransaction = &transaction{
transactionTracker: tt,
maxSize: tt.activeTransaction.maxSize,
active: true,
xid: xid,
finalLSN: finalLSN,
commitTime: commitTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,3 +486,7 @@ func (t testReplicationContext) ReadReplicationSlot(

return t.readReplicationSlot(slotName)
}

func (t testReplicationContext) IsDecompressionMarkingEnabled() bool {
return false
}
21 changes: 17 additions & 4 deletions internal/replication/replicationcontext/replicationcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ type replicationContext struct {
sideChannel sidechannel.SideChannel
stateStorageManager statestorage.Manager

snapshotInitialMode spiconfig.InitialSnapshotMode
replicationSlotName string
replicationSlotCreate bool
replicationSlotAutoDrop bool
snapshotInitialMode spiconfig.InitialSnapshotMode
replicationSlotName string
replicationSlotCreate bool
replicationSlotAutoDrop bool
decompressionMarkingEnabled bool

timeline int32
systemId string
Expand Down Expand Up @@ -128,6 +129,14 @@ func NewReplicationContext(
}
replicationContext.walLevel = walLevel

if replicationContext.IsTSDB212GE() {
enabled, err := sideChannel.GetReplicationMarkersEnabled()
if err != nil {
return nil, err
}
replicationContext.decompressionMarkingEnabled = enabled
}

return replicationContext, nil
}

Expand Down Expand Up @@ -335,6 +344,10 @@ func (rc *replicationContext) IsLogicalReplicationEnabled() bool {
return rc.walLevel == "logical"
}

func (rc *replicationContext) IsDecompressionMarkingEnabled() bool {
return rc.decompressionMarkingEnabled
}

// ----> SideChannel functions

func (rc *replicationContext) ExistsReplicationSlot(
Expand Down
2 changes: 2 additions & 0 deletions internal/sidechannel/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const queryPostgreSqlVersion = `SHOW SERVER_VERSION`

const queryConfiguredWalLevel = `SHOW WAL_LEVEL`

const queryReplicationMarkersEnabled = `SHOW timescaledb.enable_decompression_logrep_markers`

const queryTemplateReadPostgreSqlTypes = `
SELECT DISTINCT ON (t.typname) sp.nspname, t.typname, t.typinput='array_in'::REGPROC,
t.typinput='record_in'::REGPROC,
Expand Down
14 changes: 14 additions & 0 deletions internal/sidechannel/sidechannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,20 @@ func (sc *sideChannel) GetWalLevel() (walLevel string, err error) {
return
}

func (sc *sideChannel) GetReplicationMarkersEnabled() (enabled bool, err error) {
var value string
err = sc.newSession(time.Second*10, func(session *session) error {
return session.queryRow(queryReplicationMarkersEnabled).Scan(&value)
})
if err != nil {
if strings.Contains(err.Error(), "unrecognized configuration parameter") {
return false, nil
}
err = errors.Wrap(err, 0)
}
return strings.ToLower(value) == "on", nil
}

func (sc *sideChannel) GetPostgresVersion() (pgVersion version.PostgresVersion, err error) {
if err = sc.newSession(time.Second*10, func(session *session) error {
var v string
Expand Down
1 change: 1 addition & 0 deletions spi/replicationcontext/replicationcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type ReplicationContext interface {
IsMinimumTimescaleVersion() bool
IsTSDB212GE() bool
IsLogicalReplicationEnabled() bool
IsDecompressionMarkingEnabled() bool

ExistsReplicationSlot(
slotName string,
Expand Down
1 change: 1 addition & 0 deletions spi/sidechannel/sidechannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type SideChannel interface {
databaseName, systemId string, timeline int32, err error,
)
GetWalLevel() (walLevel string, err error)
GetReplicationMarkersEnabled() (enabled bool, err error)
GetPostgresVersion() (pgVersion version.PostgresVersion, err error)
GetTimescaleDBVersion() (tsdbVersion version.TimescaleVersion, found bool, err error)
ReadHypertables(
Expand Down
160 changes: 159 additions & 1 deletion tests/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ func (its *IntegrationTestSuite) Test_Hypertable_Decompression_Events() {
}
}

// Final event must be a truncate event
// Final event must be a compression event
event := testSink.Events()[10]
if event.Envelope.Payload.Op != schema.OP_TIMESCALE {
its.T().Errorf("event should be of type '$' but was %s", event.Envelope.Payload.Op)
Expand Down Expand Up @@ -872,6 +872,164 @@ func (its *IntegrationTestSuite) Test_Hypertable_Decompression_Events() {
)
}

func (its *IntegrationTestSuite) Test_Hypertable_Implicit_Decompression_Events_In_Transaction_With_Insert() {
waiter := waiting.NewWaiterWithTimeout(time.Second * 20)
testSink := testsupport.NewEventCollectorSink(
testsupport.WithFilter(
func(_ time.Time, _ string, envelope testsupport.Envelope) bool {
return envelope.Payload.Op == schema.OP_CREATE || envelope.Payload.Op == schema.OP_TIMESCALE
},
),
testsupport.WithPostHook(func(sink *testsupport.EventCollectorSink, envelope testsupport.Envelope) {
if sink.NumOfEvents() == 10 {
waiter.Signal()
}
if sink.NumOfEvents() == 21 {
waiter.Signal()
}

if envelope.Payload.Op == schema.OP_TIMESCALE && envelope.Payload.TsdbOp == schema.OP_COMPRESSION {
waiter.Signal()
}
}),
)

its.RunTest(
func(ctx testrunner.Context) error {
pgVersion := ctx.PostgresqlVersion()
if pgVersion < version.PG_14_VERSION {
fmt.Printf("Skipped test, because of PostgreSQL version <14.0 (%s)", pgVersion)
return nil
}

tsdbVersion := ctx.TimescaleVersion()
if tsdbVersion < version.TSDB_212_VERSION {
fmt.Printf("Skipped test, because of TimescaleDB version <2.12 (%s)", tsdbVersion)
return nil
}

if _, err := ctx.Exec(context.Background(),
fmt.Sprintf(
"INSERT INTO \"%s\" SELECT ts, ROW_NUMBER() OVER (ORDER BY ts) AS val FROM GENERATE_SERIES('2023-03-25 00:00:00'::TIMESTAMPTZ, '2023-03-25 00:09:59'::TIMESTAMPTZ, INTERVAL '1 minute') t(ts)",
testrunner.GetAttribute[string](ctx, "tableName"),
),
); err != nil {
return err
}

if err := waiter.Await(); err != nil {
return err
}
waiter.Reset()

if _, err := ctx.Exec(context.Background(),
fmt.Sprintf(
"ALTER TABLE \"%s\" SET (timescaledb.compress)",
testrunner.GetAttribute[string](ctx, "tableName"),
),
); err != nil {
return err
}
if _, err := ctx.Exec(context.Background(),
fmt.Sprintf(
"SELECT compress_chunk((t.chunk_schema || '.' || t.chunk_name)::regclass, true) FROM (SELECT * FROM timescaledb_information.chunks WHERE hypertable_name = '%s') t",
testrunner.GetAttribute[string](ctx, "tableName"),
),
); err != nil {
return err
}

if err := waiter.Await(); err != nil {
return err
}
waiter.Reset()

tx, err := ctx.Begin(context.Background())
if err != nil {
return err
}

if _, err := tx.Exec(context.Background(),
fmt.Sprintf(
"INSERT INTO \"%s\" SELECT ts, ROW_NUMBER() OVER (ORDER BY ts) + 10 AS val FROM GENERATE_SERIES('2023-03-25 00:00:00'::TIMESTAMPTZ, '2023-03-25 00:09:59'::TIMESTAMPTZ, INTERVAL '1 minute') t(ts)",
testrunner.GetAttribute[string](ctx, "tableName"),
),
); err != nil {
return err
}

if err := tx.Commit(context.Background()); err != nil {
return err
}

if err := waiter.Await(); err != nil {
return err
}

// Initial 10 events have to be of type read (same transaction as the chunk creation)
for i := 0; i < 10; i++ {
expected := i + 1
event := testSink.Events()[i]
val := int(event.Envelope.Payload.After["val"].(float64))
if expected != val {
its.T().Errorf("event order inconsistent %d != %d", expected, val)
return nil
}
if event.Envelope.Payload.Op != schema.OP_CREATE {
its.T().Errorf("event should be of type 'r' but was %s", event.Envelope.Payload.Op)
return nil
}
}

// 11th event must be a compression event
event := testSink.Events()[10]
if event.Envelope.Payload.Op != schema.OP_TIMESCALE {
its.T().Errorf("event should be of type '$' but was %s", event.Envelope.Payload.Op)
return nil
}
if event.Envelope.Payload.TsdbOp != schema.OP_COMPRESSION {
its.T().Errorf("event should be of timescaledb type 'c' but was %s", event.Envelope.Payload.TsdbOp)
return nil
}

// Remaining 10 events have to be of type inserts, but not from the decompression
for i := 0; i < 10; i++ {
expected := i + 11
event := testSink.Events()[i+11]
val := int(event.Envelope.Payload.After["val"].(float64))
if expected != val {
its.T().Errorf("event order inconsistent %d != %d", expected, val)
return nil
}
if event.Envelope.Payload.Op != schema.OP_CREATE {
its.T().Errorf("event should be of type 'r' but was %s", event.Envelope.Payload.Op)
return nil
}
}

return nil
},

testrunner.WithSetup(func(ctx testrunner.SetupContext) error {
_, tn, err := ctx.CreateHypertable("ts", time.Hour*24,
testsupport.NewColumn("ts", "timestamptz", false, true, nil),
testsupport.NewColumn("val", "integer", false, true, nil),
)
if err != nil {
return err
}
testrunner.Attribute(ctx, "tableName", tn)

ctx.AddSystemConfigConfigurator(testSink.SystemConfigConfigurator)
ctx.AddSystemConfigConfigurator(func(config *sysconfig.SystemConfig) {
config.TimescaleDB.Events.Compression = lo.ToPtr(true)
config.TimescaleDB.Events.Decompression = lo.ToPtr(true)
})
return nil
}),
)
}

func (its *IntegrationTestSuite) Test_Hypertable_Compression_Decompression_SingleTransaction_Events() {
waiter := waiting.NewWaiterWithTimeout(time.Second * 20)
testSink := testsupport.NewEventCollectorSink(
Expand Down
6 changes: 5 additions & 1 deletion testsupport/containers/timescaledb.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ func SetupTimescaleContainer() (testcontainers.Container, *ConfigProvider, error
containerRequest := testcontainers.ContainerRequest{
Image: imageName,
ExposedPorts: []string{"5432/tcp"},
Cmd: []string{"-c", "fsync=off", "-c", "wal_level=logical"},
Cmd: []string{
"-c", "fsync=off",
"-c", "wal_level=logical",
"-c", "timescaledb.enable_decompression_logrep_markers=true",
},
WaitingFor: wait.ForAll(
wait.ForLog("PostgreSQL init process complete; ready for start up."),
wait.ForListeningPort("5432/tcp"),
Expand Down
Loading