Skip to content

Commit

Permalink
Added unit test for implicit decompressions
Browse files Browse the repository at this point in the history
  • Loading branch information
noctarius committed Sep 30, 2023
1 parent c626ac3 commit 05da944
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ func (l *logicalReplicationResolver) onChunkDecompressionEvent(
return nil
}

if err := l.taskManager.EnqueueTask(func(notificator task.Notificator) {
if err := l.taskManager.RunTask(func(notificator task.Notificator) {
notificator.NotifyCompressionReplicationEventHandler(
func(handler eventhandlers.CompressionReplicationEventHandler) error {
return handler.OnChunkDecompressedEvent(xld, uncompressedHypertable, chunk)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,13 +381,15 @@ func (tt *transactionTracker) OnMessageEvent(
// If the message is transactional we need to store it into the currently collected
// transaction, otherwise we can run it straight away.
if msg.IsTransactional() {
if msg.Prefix == decompressionMarkerStartId {
tt.activeTransaction.ongoingDecompression = true
return nil
} else if msg.Prefix == decompressionMarkerEndId &&
(tt.activeTransaction.active && tt.activeTransaction.ongoingDecompression) {
tt.activeTransaction.ongoingDecompression = false
return nil
if tt.supportsDecompressionMarkers {
if msg.Prefix == decompressionMarkerStartId {
tt.activeTransaction.ongoingDecompression = true
return nil
} else if msg.Prefix == decompressionMarkerEndId &&
(tt.activeTransaction.active && tt.activeTransaction.ongoingDecompression) {
tt.activeTransaction.ongoingDecompression = false
return nil
}
}

// If we don't want to generate the message events later one, we'll discard it
Expand Down Expand Up @@ -428,6 +430,7 @@ func (tt *transactionTracker) startTransaction(
tt.activeTransaction = &transaction{
transactionTracker: tt,
maxSize: tt.activeTransaction.maxSize,
active: true,
xid: xid,
finalLSN: finalLSN,
commitTime: commitTime,
Expand Down
154 changes: 153 additions & 1 deletion tests/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ func (its *IntegrationTestSuite) Test_Hypertable_Decompression_Events() {
}
}

// Final event must be a truncate event
// Final event must be a compression event
event := testSink.Events()[10]
if event.Envelope.Payload.Op != schema.OP_TIMESCALE {
its.T().Errorf("event should be of type '$' but was %s", event.Envelope.Payload.Op)
Expand Down Expand Up @@ -872,6 +872,158 @@ func (its *IntegrationTestSuite) Test_Hypertable_Decompression_Events() {
)
}

func (its *IntegrationTestSuite) Test_Hypertable_Implicit_Decompression_Events_In_Transaction_With_Insert() {
waiter := waiting.NewWaiterWithTimeout(time.Second * 20)
testSink := testsupport.NewEventCollectorSink(
testsupport.WithFilter(
func(_ time.Time, _ string, envelope testsupport.Envelope) bool {
return envelope.Payload.Op == schema.OP_CREATE || envelope.Payload.Op == schema.OP_TIMESCALE
},
),
testsupport.WithPostHook(func(sink *testsupport.EventCollectorSink, envelope testsupport.Envelope) {
if sink.NumOfEvents() == 10 {
waiter.Signal()
}
if sink.NumOfEvents() == 21 {
waiter.Signal()
}

if envelope.Payload.Op == schema.OP_TIMESCALE && envelope.Payload.TsdbOp == schema.OP_COMPRESSION {
waiter.Signal()
}
}),
)

its.RunTest(
func(ctx testrunner.Context) error {
if _, err := ctx.Exec(context.Background(),
fmt.Sprintf(
"INSERT INTO \"%s\" SELECT ts, ROW_NUMBER() OVER (ORDER BY ts) AS val FROM GENERATE_SERIES('2023-03-25 00:00:00'::TIMESTAMPTZ, '2023-03-25 00:09:59'::TIMESTAMPTZ, INTERVAL '1 minute') t(ts)",
testrunner.GetAttribute[string](ctx, "tableName"),
),
); err != nil {
return err
}

if err := waiter.Await(); err != nil {
return err
}
waiter.Reset()

if _, err := ctx.Exec(context.Background(),
fmt.Sprintf(
"ALTER TABLE \"%s\" SET (timescaledb.compress)",
testrunner.GetAttribute[string](ctx, "tableName"),
),
); err != nil {
return err
}
if _, err := ctx.Exec(context.Background(),
fmt.Sprintf(
"SELECT compress_chunk((t.chunk_schema || '.' || t.chunk_name)::regclass, true) FROM (SELECT * FROM timescaledb_information.chunks WHERE hypertable_name = '%s') t",
testrunner.GetAttribute[string](ctx, "tableName"),
),
); err != nil {
return err
}

if err := waiter.Await(); err != nil {
return err
}
waiter.Reset()

tx, err := ctx.Begin(context.Background())
if err != nil {
return err
}

if _, err := tx.Exec(context.Background(),
fmt.Sprintf(
"INSERT INTO \"%s\" SELECT ts, ROW_NUMBER() OVER (ORDER BY ts) + 10 AS val FROM GENERATE_SERIES('2023-03-25 00:00:00'::TIMESTAMPTZ, '2023-03-25 00:09:59'::TIMESTAMPTZ, INTERVAL '1 minute') t(ts)",
testrunner.GetAttribute[string](ctx, "tableName"),
),
); err != nil {
return err
}

if err := tx.Commit(context.Background()); err != nil {
return err
}

if err := waiter.Await(); err != nil {
return err
}

// Initial 10 events have to be of type read (same transaction as the chunk creation)
for i := 0; i < 10; i++ {
expected := i + 1
event := testSink.Events()[i]
val := int(event.Envelope.Payload.After["val"].(float64))
if expected != val {
its.T().Errorf("event order inconsistent %d != %d", expected, val)
return nil
}
if event.Envelope.Payload.Op != schema.OP_CREATE {
its.T().Errorf("event should be of type 'r' but was %s", event.Envelope.Payload.Op)
return nil
}
}

// 11th event must be a compression event
event := testSink.Events()[10]
if event.Envelope.Payload.Op != schema.OP_TIMESCALE {
its.T().Errorf("event should be of type '$' but was %s", event.Envelope.Payload.Op)
return nil
}
if event.Envelope.Payload.TsdbOp != schema.OP_COMPRESSION {
its.T().Errorf("event should be of timescaledb type 'c' but was %s", event.Envelope.Payload.TsdbOp)
return nil
}

// Remaining 10 events have to be of type inserts, but not from the decompression
for i := 0; i < 10; i++ {
expected := i + 11
event := testSink.Events()[i+11]
val := int(event.Envelope.Payload.After["val"].(float64))
if expected != val {
its.T().Errorf("event order inconsistent %d != %d", expected, val)
return nil
}
if event.Envelope.Payload.Op != schema.OP_CREATE {
its.T().Errorf("event should be of type 'r' but was %s", event.Envelope.Payload.Op)
return nil
}
}

return nil
},

testrunner.WithSetup(func(ctx testrunner.SetupContext) error {
_, tn, err := ctx.CreateHypertable("ts", time.Hour*24,
testsupport.NewColumn("ts", "timestamptz", false, true, nil),
testsupport.NewColumn("val", "integer", false, true, nil),
)
if err != nil {
return err
}
testrunner.Attribute(ctx, "tableName", tn)

var v string
if err := ctx.QueryRow(context.Background(), "SHOW timescaledb.enable_decompression_logrep_markers").Scan(&v); err != nil {
return err
}
fmt.Println(v)

ctx.AddSystemConfigConfigurator(testSink.SystemConfigConfigurator)
ctx.AddSystemConfigConfigurator(func(config *sysconfig.SystemConfig) {
config.TimescaleDB.Events.Compression = lo.ToPtr(true)
config.TimescaleDB.Events.Decompression = lo.ToPtr(true)
})
return nil
}),
)
}

func (its *IntegrationTestSuite) Test_Hypertable_Compression_Decompression_SingleTransaction_Events() {
waiter := waiting.NewWaiterWithTimeout(time.Second * 20)
testSink := testsupport.NewEventCollectorSink(
Expand Down
6 changes: 5 additions & 1 deletion testsupport/containers/timescaledb.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ func SetupTimescaleContainer() (testcontainers.Container, *ConfigProvider, error
containerRequest := testcontainers.ContainerRequest{
Image: imageName,
ExposedPorts: []string{"5432/tcp"},
Cmd: []string{"-c", "fsync=off", "-c", "wal_level=logical"},
Cmd: []string{
"-c", "fsync=off",
"-c", "wal_level=logical",
"-c", "timescaledb.enable_decompression_logrep_markers=true",
},
WaitingFor: wait.ForAll(
wait.ForLog("PostgreSQL init process complete; ready for start up."),
wait.ForListeningPort("5432/tcp"),
Expand Down
51 changes: 41 additions & 10 deletions testsupport/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,28 @@ func CreateHypertable(
return "", "", err
}

columnDefinitions := make([]string, len(columns))
for i, column := range columns {
columnDefinitions[i] = toDefinition(column)
compositePrimaryKey := lo.Filter(columns, func(column Column, _ int) bool {
return column.primaryKey
})

columnDefinitions := lo.Map(columns, func(column Column, _ int) string {
return toDefinition(column, len(compositePrimaryKey) > 1)
})

primaryKeyConstraint := ""
if len(compositePrimaryKey) > 1 {
primaryKeyConstraint = fmt.Sprintf(
", PRIMARY KEY (%s)",
strings.Join(
lo.Map(compositePrimaryKey, func(column Column, _ int) string {
return column.Name()
}), ",",
),
)
}

query := fmt.Sprintf("CREATE TABLE \"%s\".\"%s\" (%s)", DatabaseSchema,
tableName, strings.Join(columnDefinitions, ", "))
query := fmt.Sprintf("CREATE TABLE \"%s\".\"%s\" (%s%s)", DatabaseSchema,
tableName, strings.Join(columnDefinitions, ", "), primaryKeyConstraint)

if _, err := tx.Exec(context.Background(), query); err != nil {
tx.Rollback(context.Background())
Expand Down Expand Up @@ -92,13 +107,29 @@ func CreateVanillaTable(
return "", "", err
}

compositePrimaryKey := lo.Filter(columns, func(column Column, _ int) bool {
return column.primaryKey
})

columnDefinitions := make([]string, len(columns))
for i, column := range columns {
columnDefinitions[i] = toDefinition(column)
columnDefinitions[i] = toDefinition(column, len(compositePrimaryKey) > 1)
}

primaryKeyConstraint := ""
if len(compositePrimaryKey) > 1 {
primaryKeyConstraint = fmt.Sprintf(
", PRIMARY KEY (%s)",
strings.Join(
lo.Map(compositePrimaryKey, func(column Column, _ int) string {
return column.Name()
}), ",",
),
)
}

query := fmt.Sprintf("CREATE TABLE \"%s\".\"%s\" (%s)", DatabaseSchema,
tableName, strings.Join(columnDefinitions, ", "))
query := fmt.Sprintf("CREATE TABLE \"%s\".\"%s\" (%s%s)", DatabaseSchema,
tableName, strings.Join(columnDefinitions, ", "), primaryKeyConstraint)

if _, err := tx.Exec(context.Background(), query); err != nil {
tx.Rollback(context.Background())
Expand All @@ -121,7 +152,7 @@ func randomTableName() string {
}

func toDefinition(
column Column,
column Column, compositePrimaryKey bool,
) string {

builder := strings.Builder{}
Expand All @@ -134,7 +165,7 @@ func toDefinition(
if column.DefaultValue() != nil {
builder.WriteString(fmt.Sprintf(" DEFAULT '%s'", *column.DefaultValue()))
}
if column.IsPrimaryKey() {
if column.IsPrimaryKey() && !compositePrimaryKey {
builder.WriteString(" PRIMARY KEY")
}
return builder.String()
Expand Down

0 comments on commit 05da944

Please sign in to comment.