Skip to content

Commit

Permalink
Fixed returning which was nil *errors.Error, not nil
Browse files Browse the repository at this point in the history
  • Loading branch information
noctarius committed Jul 5, 2023
1 parent c813dde commit a983ddc
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 1 deletion.
2 changes: 1 addition & 1 deletion internal/replication/context/replicationconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (rc *ReplicationConnection) CreateReplicationSlot() (slotName, snapshotName
}

rc.replicationSlotCreated = true
return slot.SlotName, slot.SnapshotName, true, errors.Wrap(err, 0)
return slot.SlotName, slot.SnapshotName, true, nil
}

func (rc *ReplicationConnection) DropReplicationSlot() error {
Expand Down
1 change: 1 addition & 0 deletions internal/testing/containers/localstack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package containers
169 changes: 169 additions & 0 deletions internal/tests/integration/aws_sqs_sink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package integration

import (
"context"
"encoding/json"
"fmt"
"github.com/go-errors/errors"
"github.com/go-redis/redis"
"github.com/jackc/pgx/v5/pgtype"
"github.com/noctarius/timescaledb-event-streamer/internal/supporting"
"github.com/noctarius/timescaledb-event-streamer/internal/supporting/logging"
"github.com/noctarius/timescaledb-event-streamer/internal/sysconfig"
inttest "github.com/noctarius/timescaledb-event-streamer/internal/testing"
"github.com/noctarius/timescaledb-event-streamer/internal/testing/containers"
"github.com/noctarius/timescaledb-event-streamer/internal/testing/testrunner"
spiconfig "github.com/noctarius/timescaledb-event-streamer/spi/config"
"github.com/noctarius/timescaledb-event-streamer/spi/systemcatalog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/testcontainers/testcontainers-go"
"testing"
"time"
)

type RedisIntegrationTestSuite struct {
testrunner.TestRunner
}

func TestRedisIntegrationTestSuite(t *testing.T) {
suite.Run(t, new(RedisIntegrationTestSuite))
}

func (rits *RedisIntegrationTestSuite) Test_Redis_Sink() {
topicPrefix := supporting.RandomTextString(10)

redisLogger, err := logging.NewLogger("Test_Redis_Sink")
if err != nil {
rits.T().Error(err)
}

var address string
var container testcontainers.Container

rits.RunTest(
func(ctx testrunner.Context) error {
client := redis.NewClient(&redis.Options{
Addr: address,
})

subjectName := fmt.Sprintf(
"%s.%s.%s", topicPrefix,
testrunner.GetAttribute[string](ctx, "schemaName"),
testrunner.GetAttribute[string](ctx, "tableName"),
)

groupName := supporting.RandomTextString(10)
consumerName := supporting.RandomTextString(10)

if err := client.XGroupCreateMkStream(subjectName, groupName, "0").Err(); err != nil {
return err
}

collected := make(chan bool, 1)
envelopes := make([]inttest.Envelope, 0)
go func() {
for {
results, err := client.XReadGroup(&redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{subjectName, ">"},
Count: 1,
Block: 0,
NoAck: false,
}).Result()
if err != nil {
redisLogger.Errorf("failed reading from redis: %+v", err)
collected <- true
return
}

for _, message := range results[0].Messages {
envelope := inttest.Envelope{}
if err := json.Unmarshal([]byte(message.Values["envelope"].(string)), &envelope); err != nil {
rits.T().Error(err)
}

redisLogger.Debugf("EVENT: %+v", envelope)
envelopes = append(envelopes, envelope)
if len(envelopes) >= 10 {
collected <- true
return
}

client.XAck(subjectName, groupName, message.ID)
}
}
}()

if _, err := ctx.Exec(context.Background(),
fmt.Sprintf(
"INSERT INTO \"%s\" SELECT ts, ROW_NUMBER() OVER (ORDER BY ts) AS val FROM GENERATE_SERIES('2023-03-25 00:00:00'::TIMESTAMPTZ, '2023-03-25 00:09:59'::TIMESTAMPTZ, INTERVAL '1 minute') t(ts)",
testrunner.GetAttribute[string](ctx, "tableName"),
),
); err != nil {
return err
}

<-collected

for i, envelope := range envelopes {
assert.Equal(rits.T(), i+1, int(envelope.Payload.After["val"].(float64)))
}
return nil
},

testrunner.WithSetup(func(setupContext testrunner.SetupContext) error {
sn, tn, err := setupContext.CreateHypertable("ts", time.Hour*24,
systemcatalog.NewColumn("ts", pgtype.TimestamptzOID, "timestamptz", false, nil),
systemcatalog.NewColumn("val", pgtype.Int4OID, "integer", false, nil),
)
if err != nil {
return err
}
testrunner.Attribute(setupContext, "schemaName", sn)
testrunner.Attribute(setupContext, "tableName", tn)

rC, rA, err := containers.SetupRedisContainer()
if err != nil {
return errors.Wrap(err, 0)
}
address = rA
container = rC

setupContext.AddSystemConfigConfigurator(func(config *sysconfig.SystemConfig) {
config.Topic.Prefix = topicPrefix
config.Sink.Type = spiconfig.Redis
config.Sink.Redis = spiconfig.RedisConfig{
Address: address,
}
})

return nil
}),

testrunner.WithTearDown(func(ctx testrunner.Context) error {
if container != nil {
container.Terminate(context.Background())
}
return nil
}),
)
}

0 comments on commit a983ddc

Please sign in to comment.