Skip to content

Commit

Permalink
Added a test to prove the behavior of continuous aggregates on a norm…
Browse files Browse the repository at this point in the history
…al scheduled run
  • Loading branch information
noctarius committed Jul 6, 2023
1 parent 8cae860 commit d06f840
Showing 1 changed file with 110 additions and 0 deletions.
110 changes: 110 additions & 0 deletions internal/tests/integration/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"github.com/jackc/pgx/v5/pgtype"
"github.com/noctarius/timescaledb-event-streamer/internal/supporting"
"github.com/noctarius/timescaledb-event-streamer/internal/supporting/logging"
"github.com/noctarius/timescaledb-event-streamer/internal/sysconfig"
inttest "github.com/noctarius/timescaledb-event-streamer/internal/testing"
"github.com/noctarius/timescaledb-event-streamer/internal/testing/testrunner"
Expand Down Expand Up @@ -1087,6 +1088,115 @@ func (its *IntegrationTestSuite) TestContinuousAggregateCreateEvents() {
)
}

func (its *IntegrationTestSuite) TestContinuousAggregate_Scheduled_Refresh_CreateEvents() {
waiter := supporting.NewWaiterWithTimeout(time.Second * 30)
testSink := inttest.NewEventCollectorSink(
inttest.WithFilter(
func(_ time.Time, _ string, envelope inttest.Envelope) bool {
return envelope.Payload.Op == schema.OP_CREATE
},
),
inttest.WithPostHook(func(sink *inttest.EventCollectorSink) {
if sink.NumOfEvents()%20 == 0 {
waiter.Signal()
}
}),
)

its.RunTest(
func(context testrunner.Context) error {
logger, err := logging.NewLogger("CAGG_REFRESH_TEST")
if err != nil {
return err
}

if _, err := context.Exec(stdctx.Background(),
fmt.Sprintf(
"INSERT INTO \"%s\" SELECT ts, ROW_NUMBER() OVER (ORDER BY ts) AS val FROM GENERATE_SERIES('2023-03-25 00:00:00'::TIMESTAMPTZ, '2023-03-25 00:19:59'::TIMESTAMPTZ, INTERVAL '1 minute') t(ts)",
testrunner.GetAttribute[string](context, "tableName"),
),
); err != nil {
return err
}

waiter.Reset()
logger.Warnln("Scheduling continuous aggregate refresh")
if err := context.PrivilegedContext(func(ctx testrunner.PrivilegedContext) error {
_, err := ctx.Exec(stdctx.Background(), `
SELECT alter_job(j.id, next_start => now() + interval '5 seconds')
FROM _timescaledb_config.bgw_job j
LEFT JOIN _timescaledb_catalog.hypertable h ON h.id = j.hypertable_id
LEFT JOIN _timescaledb_catalog.continuous_agg c ON c.mat_hypertable_id = j.hypertable_id
WHERE c.user_view_name = $1`,
testrunner.GetAttribute[string](context, "aggregateName"),
)
return err
}); err != nil {
return err
}

if err := waiter.Await(); err != nil {
return err
}

for i := 0; i < 20; i++ {
expected := i + 1
event := testSink.Events()[i]
val := int(event.Envelope.Payload.After["val"].(float64))
if expected != val {
its.T().Errorf("event order inconsistent %d != %d", expected, val)
return nil
}
if event.Envelope.Payload.Op != schema.OP_CREATE {
its.T().Errorf("event should be of type 'c' but was %s", event.Envelope.Payload.Op)
return nil
}
}

return nil
},

testrunner.WithSetup(func(context testrunner.SetupContext) error {
_, tn, err := context.CreateHypertable("ts", time.Hour*24,
systemcatalog.NewColumn("ts", pgtype.TimestamptzOID, "timestamptz", false, nil),
systemcatalog.NewColumn("val", pgtype.Int4OID, "integer", false, nil),
)
if err != nil {
return err
}
testrunner.Attribute(context, "tableName", tn)

aggregateName := supporting.RandomTextString(10)
testrunner.Attribute(context, "aggregateName", aggregateName)

if _, err := context.Exec(stdctx.Background(),
fmt.Sprintf(
"CREATE MATERIALIZED VIEW %s WITH (timescaledb.continuous) AS SELECT time_bucket('1 min', t.ts) bucket, max(val) val FROM %s t GROUP BY 1 WITH NO DATA",
aggregateName, tn,
),
); err != nil {
return err
}
if _, err := context.Exec(stdctx.Background(),
fmt.Sprintf(
"SELECT add_continuous_aggregate_policy('%s', NULL, NULL, schedule_interval => interval '1 day')",
aggregateName,
),
); err != nil {
return err
}

context.AddSystemConfigConfigurator(testSink.SystemConfigConfigurator)
context.AddSystemConfigConfigurator(func(config *sysconfig.SystemConfig) {
config.TimescaleDB.Hypertables.Includes = []string{
systemcatalog.MakeRelationKey(inttest.DatabaseSchema, aggregateName),
}
})
return nil
}),
)
}

func (its *IntegrationTestSuite) Ignore_TestRollbackEvents() {
waiter := supporting.NewWaiterWithTimeout(time.Second * 20)
testSink := inttest.NewEventCollectorSink(
Expand Down

0 comments on commit d06f840

Please sign in to comment.