From a983ddc29c614b94c001474203b61725f356d95d Mon Sep 17 00:00:00 2001 From: "Christoph Engelbert (noctarius)" Date: Wed, 5 Jul 2023 09:27:43 +0200 Subject: [PATCH] Fixed returning which was nil *errors.Error, not nil --- .../context/replicationconnection.go | 2 +- internal/testing/containers/localstack.go | 1 + .../tests/integration/aws_sqs_sink_test.go | 169 ++++++++++++++++++ 3 files changed, 171 insertions(+), 1 deletion(-) create mode 100644 internal/testing/containers/localstack.go create mode 100644 internal/tests/integration/aws_sqs_sink_test.go diff --git a/internal/replication/context/replicationconnection.go b/internal/replication/context/replicationconnection.go index 3842bee..c927fd6 100644 --- a/internal/replication/context/replicationconnection.go +++ b/internal/replication/context/replicationconnection.go @@ -160,7 +160,7 @@ func (rc *ReplicationConnection) CreateReplicationSlot() (slotName, snapshotName } rc.replicationSlotCreated = true - return slot.SlotName, slot.SnapshotName, true, errors.Wrap(err, 0) + return slot.SlotName, slot.SnapshotName, true, nil } func (rc *ReplicationConnection) DropReplicationSlot() error { diff --git a/internal/testing/containers/localstack.go b/internal/testing/containers/localstack.go new file mode 100644 index 0000000..0c09d3e --- /dev/null +++ b/internal/testing/containers/localstack.go @@ -0,0 +1 @@ +package containers diff --git a/internal/tests/integration/aws_sqs_sink_test.go b/internal/tests/integration/aws_sqs_sink_test.go new file mode 100644 index 0000000..f893733 --- /dev/null +++ b/internal/tests/integration/aws_sqs_sink_test.go @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package integration + +import ( + "context" + "encoding/json" + "fmt" + "github.com/go-errors/errors" + "github.com/go-redis/redis" + "github.com/jackc/pgx/v5/pgtype" + "github.com/noctarius/timescaledb-event-streamer/internal/supporting" + "github.com/noctarius/timescaledb-event-streamer/internal/supporting/logging" + "github.com/noctarius/timescaledb-event-streamer/internal/sysconfig" + inttest "github.com/noctarius/timescaledb-event-streamer/internal/testing" + "github.com/noctarius/timescaledb-event-streamer/internal/testing/containers" + "github.com/noctarius/timescaledb-event-streamer/internal/testing/testrunner" + spiconfig "github.com/noctarius/timescaledb-event-streamer/spi/config" + "github.com/noctarius/timescaledb-event-streamer/spi/systemcatalog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + "github.com/testcontainers/testcontainers-go" + "testing" + "time" +) + +type RedisIntegrationTestSuite struct { + testrunner.TestRunner +} + +func TestRedisIntegrationTestSuite(t *testing.T) { + suite.Run(t, new(RedisIntegrationTestSuite)) +} + +func (rits *RedisIntegrationTestSuite) Test_Redis_Sink() { + topicPrefix := supporting.RandomTextString(10) + + redisLogger, err := logging.NewLogger("Test_Redis_Sink") + if err != nil { + rits.T().Error(err) + } + + var address string + var container testcontainers.Container + + rits.RunTest( + func(ctx testrunner.Context) error { + client := redis.NewClient(&redis.Options{ + Addr: address, + }) + + subjectName := fmt.Sprintf( + "%s.%s.%s", topicPrefix, + testrunner.GetAttribute[string](ctx, "schemaName"), + testrunner.GetAttribute[string](ctx, "tableName"), + ) + + groupName := supporting.RandomTextString(10) + consumerName := supporting.RandomTextString(10) + + if err := client.XGroupCreateMkStream(subjectName, groupName, "0").Err(); err != nil { + return err + } + + collected := make(chan bool, 1) + envelopes := make([]inttest.Envelope, 0) + go func() { + for { + results, err := client.XReadGroup(&redis.XReadGroupArgs{ + Group: groupName, + Consumer: consumerName, + Streams: []string{subjectName, ">"}, + Count: 1, + Block: 0, + NoAck: false, + }).Result() + if err != nil { + redisLogger.Errorf("failed reading from redis: %+v", err) + collected <- true + return + } + + for _, message := range results[0].Messages { + envelope := inttest.Envelope{} + if err := json.Unmarshal([]byte(message.Values["envelope"].(string)), &envelope); err != nil { + rits.T().Error(err) + } + + redisLogger.Debugf("EVENT: %+v", envelope) + envelopes = append(envelopes, envelope) + if len(envelopes) >= 10 { + collected <- true + return + } + + client.XAck(subjectName, groupName, message.ID) + } + } + }() + + if _, err := ctx.Exec(context.Background(), + fmt.Sprintf( + "INSERT INTO \"%s\" SELECT ts, ROW_NUMBER() OVER (ORDER BY ts) AS val FROM GENERATE_SERIES('2023-03-25 00:00:00'::TIMESTAMPTZ, '2023-03-25 00:09:59'::TIMESTAMPTZ, INTERVAL '1 minute') t(ts)", + testrunner.GetAttribute[string](ctx, "tableName"), + ), + ); err != nil { + return err + } + + <-collected + + for i, envelope := range envelopes { + assert.Equal(rits.T(), i+1, int(envelope.Payload.After["val"].(float64))) + } + return nil + }, + + testrunner.WithSetup(func(setupContext testrunner.SetupContext) error { + sn, tn, err := setupContext.CreateHypertable("ts", time.Hour*24, + systemcatalog.NewColumn("ts", pgtype.TimestamptzOID, "timestamptz", false, nil), + systemcatalog.NewColumn("val", pgtype.Int4OID, "integer", false, nil), + ) + if err != nil { + return err + } + testrunner.Attribute(setupContext, "schemaName", sn) + testrunner.Attribute(setupContext, "tableName", tn) + + rC, rA, err := containers.SetupRedisContainer() + if err != nil { + return errors.Wrap(err, 0) + } + address = rA + container = rC + + setupContext.AddSystemConfigConfigurator(func(config *sysconfig.SystemConfig) { + config.Topic.Prefix = topicPrefix + config.Sink.Type = spiconfig.Redis + config.Sink.Redis = spiconfig.RedisConfig{ + Address: address, + } + }) + + return nil + }), + + testrunner.WithTearDown(func(ctx testrunner.Context) error { + if container != nil { + container.Terminate(context.Background()) + } + return nil + }), + ) +}