Skip to content

Commit

Permalink
test: using arm64 compatible images if necessary (#2670)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum authored Nov 9, 2022
1 parent 073a035 commit e2402b6
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 46 deletions.
35 changes: 13 additions & 22 deletions integration_test/docker_test/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"net/http"
"os"
"os/signal"
"runtime"
"strconv"
"strings"
"syscall"
Expand Down Expand Up @@ -215,10 +214,6 @@ func TestMainFlow(t *testing.T) {
})

t.Run("kafka", func(t *testing.T) {
if runtime.GOARCH == "arm64" && !overrideArm64Check {
t.Skip("arm64 is not supported yet")
}

kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Port)

// Create new consumer
Expand Down Expand Up @@ -389,20 +384,18 @@ func setupMainFlow(svcCtx context.Context, t *testing.T) <-chan struct{} {
require.NoError(t, err)

containersGroup, containersCtx := errgroup.WithContext(context.TODO())
if runtime.GOARCH != "arm64" || overrideArm64Check {
containersGroup.Go(func() (err error) {
kafkaContainer, err = destination.SetupKafka(pool, t,
destination.WithLogger(&testLogger{logger.NewLogger().Child("kafka")}),
destination.WithBrokers(1),
)
if err != nil {
return err
}
kafkaCtx, kafkaCancel := context.WithTimeout(containersCtx, 3*time.Minute)
defer kafkaCancel()
return waitForKafka(kafkaCtx, t, kafkaContainer.Port)
})
}
containersGroup.Go(func() (err error) {
kafkaContainer, err = destination.SetupKafka(pool, t,
destination.WithLogger(&testLogger{logger.NewLogger().Child("kafka")}),
destination.WithBrokers(1),
)
if err != nil {
return err
}
kafkaCtx, kafkaCancel := context.WithTimeout(containersCtx, 3*time.Minute)
defer kafkaCancel()
return waitForKafka(kafkaCtx, t, kafkaContainer.Port)
})
containersGroup.Go(func() (err error) {
redisContainer, err = destination.SetupRedis(pool, t)
return err
Expand Down Expand Up @@ -462,9 +455,7 @@ func setupMainFlow(svcCtx context.Context, t *testing.T) <-chan struct{} {
"minioEndpoint": minioContainer.Endpoint,
"minioBucketName": minioContainer.BucketName,
}
if runtime.GOARCH != "arm64" || overrideArm64Check {
mapWorkspaceConfig["kafkaPort"] = kafkaContainer.Port
}
mapWorkspaceConfig["kafkaPort"] = kafkaContainer.Port
workspaceConfigPath := workspaceConfig.CreateTempFile(t,
"testdata/workspaceConfigTemplate.json",
mapWorkspaceConfig,
Expand Down
22 changes: 11 additions & 11 deletions integration_test/multi_tentant_test/multi_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func testMultiTenantByAppType(t *testing.T, appType string) {

// Pushing valid configuration via ETCD
etcdReqKey := getETCDWorkspacesReqKey(releaseName, serverInstanceID, appType)
_, err = etcdContainer.Client.Put(ctx, etcdReqKey, `{"workspaces":"`+workspaceID+`","ack_key":"test-ack/1"}`)
_, err = etcdContainer.Client.Put(ctx, etcdReqKey, `{"workspaces":"`+workspaceID+`","ack_key":"test-ack-1/1"}`)
require.NoError(t, err)

// Checking now that the configuration has been processed and the server can start
Expand All @@ -233,7 +233,7 @@ func testMultiTenantByAppType(t *testing.T, appType string) {
)

select {
case ack := <-etcdContainer.Client.Watch(ctx, "test-ack/1"):
case ack := <-etcdContainer.Client.Watch(ctx, "test-ack-1/1", clientv3.WithRev(1)):
v, err := unmarshalWorkspaceAckValue(t, &ack)
require.NoError(t, err)
require.Equal(t, "RELOADED", v.Status)
Expand All @@ -245,7 +245,7 @@ func testMultiTenantByAppType(t *testing.T, appType string) {
grpc.WithBlock(), // block until the underlying connection is up
},
})
t.Fatalf("Timeout waiting for test-ack/1 (etcd status error: %v)", err)
t.Fatalf("Timeout waiting for test-ack-1/1 (etcd status error: %v)", err)
}

cleanupGwJobs := func() {
Expand Down Expand Up @@ -299,7 +299,7 @@ func testMultiTenantByAppType(t *testing.T, appType string) {
t.Log("Triggering degraded mode")

select {
case ack := <-etcdContainer.Client.Watch(ctx, "test-ack/", clientv3.WithPrefix()):
case ack := <-etcdContainer.Client.Watch(ctx, "test-ack/", clientv3.WithPrefix(), clientv3.WithRev(1)):
require.Len(t, ack.Events, 1)
require.Equal(t, "test-ack/normal", string(ack.Events[0].Kv.Key))
require.Equal(t, `{"status":"NORMAL"}`, string(ack.Events[0].Kv.Value))
Expand All @@ -321,17 +321,17 @@ func testMultiTenantByAppType(t *testing.T, appType string) {
serverModeReqKey := getETCDServerModeReqKey(releaseName, serverInstanceID)
t.Logf("Server mode ETCD key: %s", serverModeReqKey)

_, err := etcdContainer.Client.Put(ctx, serverModeReqKey, `{"mode":"DEGRADED","ack_key":"test-ack/2"}`)
_, err := etcdContainer.Client.Put(ctx, serverModeReqKey, `{"mode":"DEGRADED","ack_key":"test-ack-2/2"}`)
require.NoError(t, err)
t.Log("Triggering degraded mode")

select {
case ack := <-etcdContainer.Client.Watch(ctx, "test-ack/", clientv3.WithPrefix()):
case ack := <-etcdContainer.Client.Watch(ctx, "test-ack-2/", clientv3.WithPrefix(), clientv3.WithRev(1)):
require.Len(t, ack.Events, 1)
require.Equal(t, "test-ack/2", string(ack.Events[0].Kv.Key))
require.Equal(t, "test-ack-2/2", string(ack.Events[0].Kv.Key))
require.Equal(t, `{"status":"DEGRADED"}`, string(ack.Events[0].Kv.Value))
case <-time.After(60 * time.Second):
t.Fatal("Timeout waiting for server-mode test-ack")
t.Fatal("Timeout waiting for server-mode test-ack-2")
}

sendEventsToGateway(t, httpPort, writeKey)
Expand All @@ -350,17 +350,17 @@ func testMultiTenantByAppType(t *testing.T, appType string) {
// workspaces it is serving.
t.Run("empty workspaces are accepted", func(t *testing.T) {
_, err := etcdContainer.Client.Put(ctx,
etcdReqKey, `{"workspaces":"","ack_key":"test-ack/3"}`,
etcdReqKey, `{"workspaces":"","ack_key":"test-ack-3/3"}`,
)
require.NoError(t, err)
select {
case ack := <-etcdContainer.Client.Watch(ctx, "test-ack/3"):
case ack := <-etcdContainer.Client.Watch(ctx, "test-ack-3/3", clientv3.WithRev(1)):
v, err := unmarshalWorkspaceAckValue(t, &ack)
require.NoError(t, err)
require.Equal(t, "RELOADED", v.Status)
require.Equal(t, "", v.Error)
case <-time.After(60 * time.Second):
t.Fatal("Timeout waiting for test-ack/3")
t.Fatal("Timeout waiting for test-ack-3/3")
}
})
}
Expand Down
5 changes: 0 additions & 5 deletions services/streammanager/kafka/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"io"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"sync/atomic"
Expand All @@ -32,10 +31,6 @@ func TestMain(m *testing.M) {
if os.Getenv("OVERRIDE_ARM64_CHECK") == "1" {
overrideArm64Check = true
}
if runtime.GOARCH == "arm64" && !overrideArm64Check {
fmt.Println("arm64 is not supported yet")
os.Exit(0)
}
os.Exit(m.Run())
}

Expand Down
5 changes: 0 additions & 5 deletions services/streammanager/kafka/kafkamanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"os"
"runtime"
"testing"
"time"

Expand Down Expand Up @@ -119,10 +118,6 @@ func TestNewProducer(t *testing.T) {
})

t.Run("ok", func(t *testing.T) {
if runtime.GOARCH == "arm64" && !overrideArm64Check {
t.Skip("arm64 is not supported yet")
}

kafkaStats.creationTime = getMockedTimer(t, gomock.NewController(t))

pool, err := dockertest.NewPool("")
Expand Down
13 changes: 11 additions & 2 deletions testhelper/destination/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package destination
import (
_ "encoding/json"
"fmt"
"runtime"
"strconv"

_ "github.com/lib/pq"
Expand Down Expand Up @@ -141,9 +142,13 @@ func SetupKafka(pool *dockertest.Pool, cln cleaner, opts ...Option) (*KafkaResou
if err != nil {
return nil, err
}
zkImage := "bitnami/zookeeper"
if runtime.GOARCH == "arm64" {
zkImage = "zcube/bitnami-compat-zookeeper"
}
zookeeperPort := fmt.Sprintf("%s/tcp", strconv.Itoa(zookeeperPortInt))
zookeeperContainer, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "bitnami/zookeeper",
Repository: zkImage,
Tag: "latest",
NetworkID: network.ID,
Hostname: "zookeeper",
Expand Down Expand Up @@ -255,8 +260,12 @@ func SetupKafka(pool *dockertest.Pool, cln cleaner, opts ...Option) (*KafkaResou

nodeID := fmt.Sprintf("%d", i+1)
hostname := "kafka" + nodeID
kImage := "bitnami/kafka"
if runtime.GOARCH == "arm64" {
kImage = "zcube/bitnami-compat-kafka"
}
containers[i], err = pool.RunWithOptions(&dockertest.RunOptions{
Repository: "bitnami/kafka",
Repository: kImage,
Tag: "latest",
NetworkID: network.ID,
Hostname: hostname,
Expand Down
7 changes: 6 additions & 1 deletion testhelper/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package etcd

import (
"fmt"
"runtime"
"strconv"

"github.com/ory/dockertest/v3"
Expand All @@ -21,7 +22,11 @@ type Resource struct {
}

func Setup(pool *dockertest.Pool, cln cleaner) (*Resource, error) {
container, err := pool.Run("bitnami/etcd", "3", []string{
etcdImage := "bitnami/etcd"
if runtime.GOARCH == "arm64" {
etcdImage = "zcube/bitnami-compat-etcd"
}
container, err := pool.Run(etcdImage, "3.5", []string{
"ALLOW_NONE_AUTHENTICATION=yes",
})
if err != nil {
Expand Down

0 comments on commit e2402b6

Please sign in to comment.