diff --git a/kafkaclient/client_test.go b/kafkaclient/client_test.go index 5d5fe8ff..7186d6be 100644 --- a/kafkaclient/client_test.go +++ b/kafkaclient/client_test.go @@ -35,7 +35,7 @@ func TestClient_Ping(t *testing.T) { kafkaContainer, err := dockerKafka.Setup(pool, t) require.NoError(t, err) - kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Ports[0]) + kafkaHost := kafkaContainer.Brokers[0] c, err := New("tcp", []string{"bad-host", kafkaHost}, Config{}) require.NoError(t, err) @@ -63,11 +63,7 @@ func TestProducerBatchConsumerGroup(t *testing.T) { dockerKafka.WithBrokers(3)) require.NoError(t, err) - addresses := make([]string, 0, len(kafkaContainer.Ports)) - for i := 0; i < len(kafkaContainer.Ports); i++ { - addresses = append(addresses, fmt.Sprintf("localhost:%s", kafkaContainer.Ports[i])) - } - c, err := New("tcp", addresses, Config{ClientID: "some-client", DialTimeout: 5 * time.Second}) + c, err := New("tcp", kafkaContainer.Brokers, Config{ClientID: "some-client", DialTimeout: 5 * time.Second}) require.NoError(t, err) var ( @@ -201,7 +197,7 @@ func TestConsumer_Partition(t *testing.T) { dockerKafka.WithBrokers(1)) require.NoError(t, err) - kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Ports[0]) + kafkaHost := kafkaContainer.Brokers[0] c, err := New("tcp", []string{"bad-host", kafkaHost}, Config{ClientID: "some-client", DialTimeout: 5 * time.Second}) require.NoError(t, err) @@ -354,7 +350,7 @@ func TestWithSASL(t *testing.T) { kafkaContainer, err := dockerKafka.Setup(pool, t, containerOptions...) require.NoError(t, err) - kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Ports[0]) + kafkaHost := kafkaContainer.Brokers[0] c, err := New("tcp", []string{"bad-host", kafkaHost}, Config{ ClientID: "some-client", DialTimeout: 10 * time.Second, @@ -432,7 +428,7 @@ func TestWithSASLBadCredentials(t *testing.T) { kafkaContainer, err := dockerKafka.Setup(pool, t, containerOptions...) require.NoError(t, err) - kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Ports[0]) + kafkaHost := kafkaContainer.Brokers[0] c, err := New("tcp", []string{"bad-host", kafkaHost}, Config{ ClientID: "some-client", DialTimeout: 10 * time.Second, @@ -465,7 +461,7 @@ func TestProducer_Timeout(t *testing.T) { dockerKafka.WithBrokers(1)) require.NoError(t, err) - kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Ports[0]) + kafkaHost := kafkaContainer.Brokers[0] c, err := New("tcp", []string{"bad-host", kafkaHost}, Config{ClientID: "some-client", DialTimeout: 5 * time.Second}) require.NoError(t, err) @@ -534,7 +530,7 @@ func TestIsProducerErrTemporary(t *testing.T) { dockerKafka.WithBrokers(1)) require.NoError(t, err) - kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Ports[0]) + kafkaHost := kafkaContainer.Brokers[0] c, err := New("tcp", []string{"bad-host", kafkaHost}, Config{ClientID: "some-client", DialTimeout: 5 * time.Second}) require.NoError(t, err) @@ -707,7 +703,7 @@ func TestConsumerACK(t *testing.T) { dockerKafka.WithBrokers(1)) require.NoError(t, err) - kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Ports[0]) + kafkaHost := kafkaContainer.Brokers[0] kafkaClient, err := New("tcp", []string{"bad-host", kafkaHost}, Config{ClientID: "some-client", DialTimeout: 5 * time.Second}) require.NoError(t, err) diff --git a/kafkaclient/compression_benchmark_test.go b/kafkaclient/compression_benchmark_test.go index 15daf9e4..720e7d93 100644 --- a/kafkaclient/compression_benchmark_test.go +++ b/kafkaclient/compression_benchmark_test.go @@ -34,7 +34,7 @@ func BenchmarkCompression(b *testing.B) { kafkaContainer, err := kafka.Setup(pool, b, kafka.WithCustomAdvertisedListener(proxyHost)) require.NoError(b, err) - return "localhost:" + kafkaContainer.Ports[0] + return kafkaContainer.Brokers[0] } setupProxy := func(b *testing.B, kafkaAddr string, c Compression, bs int, bt time.Duration) ( diff --git a/stats/internal/otel/traces_test.go b/stats/internal/otel/traces_test.go index ca3b5b20..896582cb 100644 --- a/stats/internal/otel/traces_test.go +++ b/stats/internal/otel/traces_test.go @@ -85,7 +85,7 @@ func TestZipkinIntegration(t *testing.T) { var ( om Manager ctx = context.Background() - zipkinURL = "http://localhost:" + zipkinContainer.Port + "/api/v2/spans" + zipkinURL = zipkinContainer.URL + "/api/v2/spans" ) tp, _, err := om.Setup(ctx, res, WithTracerProvider(zipkinURL, diff --git a/stats/otel_test.go b/stats/otel_test.go index 24044d6c..211b2174 100644 --- a/stats/otel_test.go +++ b/stats/otel_test.go @@ -886,7 +886,7 @@ func TestZipkin(t *testing.T) { zipkinContainer, err := zipkin.Setup(pool, t) require.NoError(t, err) - zipkinURL := "http://localhost:" + zipkinContainer.Port + "/api/v2/spans" + zipkinURL := zipkinContainer.URL + "/api/v2/spans" conf := config.New() conf.Set("INSTANCE_ID", t.Name()) diff --git a/stats/traces_test.go b/stats/traces_test.go index 977671f4..c5ac8801 100644 --- a/stats/traces_test.go +++ b/stats/traces_test.go @@ -27,8 +27,8 @@ func TestSpanFromContext(t *testing.T) { zipkinContainer, err := zipkin.Setup(pool, t) require.NoError(t, err) - zipkinURL := "http://localhost:" + zipkinContainer.Port + "/api/v2/spans" - zipkinTracesURL := "http://localhost:" + zipkinContainer.Port + "/api/v2/traces?serviceName=" + t.Name() + zipkinURL := zipkinContainer.URL + "/api/v2/spans" + zipkinTracesURL := zipkinContainer.URL + "/api/v2/traces?serviceName=" + t.Name() c := config.New() c.Set("INSTANCE_ID", t.Name()) @@ -98,8 +98,8 @@ func TestAsyncTracePropagation(t *testing.T) { zipkinContainer, err := zipkin.Setup(pool, t) require.NoError(t, err) - zipkinURL := "http://localhost:" + zipkinContainer.Port + "/api/v2/spans" - zipkinTracesURL := "http://localhost:" + zipkinContainer.Port + "/api/v2/traces?serviceName=" + t.Name() + zipkinURL := zipkinContainer.URL + "/api/v2/spans" + zipkinTracesURL := zipkinContainer.URL + "/api/v2/traces?serviceName=" + t.Name() c := config.New() c.Set("INSTANCE_ID", t.Name()) @@ -166,7 +166,7 @@ func TestZipkinDownIsNotBlocking(t *testing.T) { zipkinContainer, err := zipkin.Setup(pool, t) require.NoError(t, err) - zipkinURL := "http://localhost:" + zipkinContainer.Port + "/api/v2/spans" + zipkinURL := zipkinContainer.URL + "/api/v2/spans" c := config.New() c.Set("INSTANCE_ID", t.Name()) diff --git a/testhelper/docker/docker.go b/testhelper/docker/docker.go index 556a035e..99587a77 100644 --- a/testhelper/docker/docker.go +++ b/testhelper/docker/docker.go @@ -4,9 +4,14 @@ import ( "regexp" "strconv" "testing" + "time" + "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" "github.com/stretchr/testify/require" + + "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource" + "github.com/rudderlabs/rudder-go-kit/testhelper/rand" ) // GetHostPort returns the desired port mapping @@ -26,3 +31,18 @@ func GetHostPort(t testing.TB, port string, container *docker.Container) int { func ToInternalDockerHost(url string) string { return regexp.MustCompile(`(localhost|127\.0\.0\.1)`).ReplaceAllString(url, "host.docker.internal") } + +func CreateNetwork(pool *dockertest.Pool, cln resource.Cleaner, prefix string) (*docker.Network, error) { + network, err := pool.Client.CreateNetwork(docker.CreateNetworkOptions{Name: prefix + "_test_" + time.Now().Format("YY-MM-DD-") + rand.String(6)}) + if err != nil { + return nil, err + } + + cln.Cleanup(func() { + if err := pool.Client.RemoveNetwork(network.ID); err != nil { + cln.Logf("Error while removing Docker network: %v", err) + } + }) + + return network, nil +} diff --git a/testhelper/docker/resource/etcd/etcd.go b/testhelper/docker/resource/etcd/etcd.go index 31735205..f84dea3b 100644 --- a/testhelper/docker/resource/etcd/etcd.go +++ b/testhelper/docker/resource/etcd/etcd.go @@ -3,7 +3,6 @@ package etcd import ( "context" "fmt" - "strconv" "time" "github.com/ory/dockertest/v3" @@ -11,6 +10,7 @@ import ( etcd "go.etcd.io/etcd/client/v3" "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource" + "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/internal" ) type Resource struct { @@ -18,7 +18,6 @@ type Resource struct { Hosts []string // HostsInNetwork is the list of ETCD hosts accessible from the provided Docker network (if any). HostsInNetwork []string - Port int } type config struct { @@ -50,29 +49,22 @@ func Setup(pool *dockertest.Pool, cln resource.Cleaner, opts ...Option) (*Resour Env: []string{ "ALLOW_NONE_AUTHENTICATION=yes", }, - }) - if err != nil { - return nil, fmt.Errorf("could not create container: %v", err) - } + PortBindings: internal.IPv4PortBindings([]string{"2379"}), + }, internal.DefaultHostConfig) cln.Cleanup(func() { if err := pool.Purge(container); err != nil { cln.Log(fmt.Errorf("could not purge ETCD resource: %v", err)) } }) + if err != nil { + return nil, fmt.Errorf("could not create container: %v", err) + } var ( etcdClient *etcd.Client etcdHosts []string - etcdPort int - - etcdPortStr = container.GetPort("2379/tcp") ) - etcdPort, err = strconv.Atoi(etcdPortStr) - if err != nil { - return nil, fmt.Errorf("could not convert port %q to int: %v", etcdPortStr, err) - } - - etcdHosts = []string{"http://localhost:" + etcdPortStr} + etcdHosts = []string{"http://" + container.GetBoundIP("2379/tcp") + ":" + container.GetPort("2379/tcp")} etcdClient, err = etcd.New(etcd.Config{ Endpoints: etcdHosts, @@ -102,6 +94,5 @@ func Setup(pool *dockertest.Pool, cln resource.Cleaner, opts ...Option) (*Resour Client: etcdClient, Hosts: etcdHosts, HostsInNetwork: hostsInNetwork, - Port: etcdPort, }, nil } diff --git a/testhelper/docker/resource/internal/ports.go b/testhelper/docker/resource/internal/ports.go new file mode 100644 index 00000000..d876bfc6 --- /dev/null +++ b/testhelper/docker/resource/internal/ports.go @@ -0,0 +1,27 @@ +package internal + +import ( + "github.com/ory/dockertest/v3/docker" +) + +const BindHostIP = "127.0.0.1" + +// IPv4PortBindings returns the port bindings for the given exposed ports forcing ipv4 address. +func IPv4PortBindings(exposedPorts []string) map[docker.Port][]docker.PortBinding { + portBindings := make(map[docker.Port][]docker.PortBinding) + + for _, exposedPort := range exposedPorts { + portBindings[docker.Port(exposedPort)+"/tcp"] = []docker.PortBinding{ + { + HostIP: BindHostIP, + HostPort: "0", + }, + } + } + + return portBindings +} + +func DefaultHostConfig(hc *docker.HostConfig) { + hc.PublishAllPorts = false +} diff --git a/testhelper/docker/resource/kafka/kafka.go b/testhelper/docker/resource/kafka/kafka.go index a8472608..f578fccb 100644 --- a/testhelper/docker/resource/kafka/kafka.go +++ b/testhelper/docker/resource/kafka/kafka.go @@ -3,7 +3,9 @@ package kafka import ( _ "encoding/json" "fmt" + "net" "strconv" + "time" _ "github.com/lib/pq" "github.com/ory/dockertest/v3" @@ -11,7 +13,9 @@ import ( "golang.org/x/sync/errgroup" kithelper "github.com/rudderlabs/rudder-go-kit/testhelper" + "github.com/rudderlabs/rudder-go-kit/testhelper/docker" "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource" + "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/internal" ) type scramHashGenerator uint8 @@ -118,7 +122,7 @@ func WithSchemaRegistry() Option { } type Resource struct { - Ports []string + Brokers []string SchemaRegistryURL string pool *dockertest.Pool @@ -147,15 +151,10 @@ func Setup(pool *dockertest.Pool, cln resource.Cleaner, opts ...Option) (*Resour network := c.network if c.network == nil { var err error - network, err = pool.Client.CreateNetwork(dc.CreateNetworkOptions{Name: "kafka_network"}) + network, err = docker.CreateNetwork(pool, cln, "kafka_network_") if err != nil { - return nil, fmt.Errorf("could not create docker network: %w", err) + return nil, err } - cln.Cleanup(func() { - if err := pool.Client.RemoveNetwork(network.ID); err != nil { - cln.Log(fmt.Errorf("could not remove kafka network: %w", err)) - } - }) } zookeeperPortInt, err := kithelper.GetFreePort() @@ -172,15 +171,15 @@ func Setup(pool *dockertest.Pool, cln resource.Cleaner, opts ...Option) (*Resour "2181/tcp": {{HostIP: "zookeeper", HostPort: zookeeperPort}}, }, Env: []string{"ALLOW_ANONYMOUS_LOGIN=yes"}, - }) - if err != nil { - return nil, err - } + }, internal.DefaultHostConfig) cln.Cleanup(func() { if err := pool.Purge(zookeeperContainer); err != nil { cln.Log("Could not purge resource", err) } }) + if err != nil { + return nil, err + } cln.Log("Zookeeper localhost port", zookeeperContainer.GetPort("2181/tcp")) @@ -201,35 +200,29 @@ func Setup(pool *dockertest.Pool, cln resource.Cleaner, opts ...Option) (*Resour Tag: "7.5-debian-11", NetworkID: network.ID, Hostname: "schemaregistry", - ExposedPorts: []string{"8081"}, + ExposedPorts: []string{"8081/tcp"}, + PortBindings: internal.IPv4PortBindings([]string{"8081"}), Env: []string{ "SCHEMA_REGISTRY_DEBUG=true", "SCHEMA_REGISTRY_KAFKA_BROKERS=" + bootstrapServers[:len(bootstrapServers)-1], "SCHEMA_REGISTRY_ADVERTISED_HOSTNAME=schemaregistry", "SCHEMA_REGISTRY_CLIENT_AUTHENTICATION=NONE", }, - }) - if err != nil { - return nil, err - } + }, internal.DefaultHostConfig) cln.Cleanup(func() { if err := pool.Purge(src); err != nil { cln.Log("Could not purge resource", err) } }) - var srPort int - for p, bindings := range src.Container.NetworkSettings.Ports { - if p.Port() == "8081" { - srPort, err = strconv.Atoi(bindings[0].HostPort) - if err != nil { - panic(fmt.Errorf("cannot convert port to int: %w", err)) - } - break - } + if err != nil { + return nil, err + } + if src.GetPort("8081/tcp") == "" { + return nil, fmt.Errorf("could not find schema registry port") } envVariables = append(envVariables, "KAFKA_SCHEMA_REGISTRY_URL=schemaregistry:8081") - schemaRegistryURL = fmt.Sprintf("http://localhost:%d", srPort) + schemaRegistryURL = fmt.Sprintf("http://%s:%s", src.GetBoundIP("8081/tcp"), src.GetPort("8081/tcp")) cln.Log("Schema Registry on", schemaRegistryURL) } @@ -337,35 +330,55 @@ func Setup(pool *dockertest.Pool, cln resource.Cleaner, opts ...Option) (*Resour )) } containers[i], err = pool.RunWithOptions(&dockertest.RunOptions{ - Repository: "bitnami/kafka", - Tag: "3.6.0", - NetworkID: network.ID, - Hostname: hostname, + Repository: "bitnami/kafka", + Tag: "3.6.0", + NetworkID: network.ID, + Hostname: hostname, + ExposedPorts: []string{kafkaClientPort + "/tcp"}, PortBindings: map[dc.Port][]dc.PortBinding{ - kafkaClientPort + "/tcp": {{HostIP: "localhost", HostPort: localhostPort}}, + kafkaClientPort + "/tcp": {{ + HostIP: internal.BindHostIP, + HostPort: strconv.Itoa(localhostPortInt), + }}, }, Mounts: mounts, Env: nodeEnvVars, - }) - if err != nil { - return nil, err - } + }, internal.DefaultHostConfig) cln.Cleanup(func() { if err := pool.Purge(containers[i]); err != nil { cln.Log(fmt.Errorf("could not purge Kafka resource: %w", err)) } }) + if err != nil { + return nil, err + } } res := &Resource{ - Ports: make([]string, 0, len(containers)), + Brokers: make([]string, 0, len(containers)), SchemaRegistryURL: schemaRegistryURL, pool: pool, containers: containers, } for i := 0; i < len(containers); i++ { - res.Ports = append(res.Ports, containers[i].GetPort(kafkaClientPort+"/tcp")) + if containers[i].GetBoundIP(kafkaClientPort+"/tcp") == "" { + return nil, fmt.Errorf("could not find kafka broker port") + } + res.Brokers = append(res.Brokers, containers[i].GetBoundIP(kafkaClientPort+"/tcp")+":"+containers[i].GetPort(kafkaClientPort+"/tcp")) + } + err = pool.Retry(func() error { + conn, err := net.DialTimeout("tcp", res.Brokers[0], time.Second) + if err != nil { + return err + } + + return conn.Close() + }) + if err != nil { + return nil, fmt.Errorf("could not connect to kafka: %w", err) } + cln.Logf("Kafka brokers on %v", res.Brokers) + return res, nil } diff --git a/testhelper/docker/resource/kafka/kafka_test.go b/testhelper/docker/resource/kafka/kafka_test.go index e5f4a0e0..0bccc4f5 100644 --- a/testhelper/docker/resource/kafka/kafka_test.go +++ b/testhelper/docker/resource/kafka/kafka_test.go @@ -19,7 +19,6 @@ import ( "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro" "github.com/linkedin/goavro/v2" "github.com/ory/dockertest/v3" - dc "github.com/ory/dockertest/v3/docker" "github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go/sasl" "github.com/segmentio/kafka-go/sasl/plain" @@ -27,6 +26,7 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/crypto/ssh" + "github.com/rudderlabs/rudder-go-kit/testhelper/docker" "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/sshserver" "github.com/rudderlabs/rudder-go-kit/testhelper/keygen" ) @@ -47,11 +47,7 @@ func TestResource(t *testing.T) { var ( ctx = context.Background() topic = "my-topic" - brokers = []string{ - "localhost:" + res.Ports[0], - "localhost:" + res.Ports[1], - "localhost:" + res.Ports[2], - } + brokers = res.Brokers ) w := &kafka.Writer{ @@ -121,7 +117,7 @@ func TestWithSASL(t *testing.T) { require.NoError(t, err) w := kafka.Writer{ - Addr: kafka.TCP("localhost:" + container.Ports[0]), + Addr: kafka.TCP(container.Brokers...), Balancer: &kafka.Hash{}, Transport: &kafka.Transport{ SASL: mechanism, @@ -158,7 +154,7 @@ func TestAvroSchemaRegistry(t *testing.T) { require.NoError(t, err) c, err := confluent.NewConsumer(&confluent.ConfigMap{ - "bootstrap.servers": fmt.Sprintf("localhost:%s", container.Ports[0]), + "bootstrap.servers": container.Brokers[0], "group.id": "group-1", "session.timeout.ms": 6000, "auto.offset.reset": "earliest", @@ -221,7 +217,7 @@ func TestAvroSchemaRegistry(t *testing.T) { avroMessage := serializeAvroMessage(t, schemaID2, userSchema2, rawMessage) w := &kafka.Writer{ - Addr: kafka.TCP("localhost:" + container.Ports[0]), + Addr: kafka.TCP(container.Brokers...), Balancer: &kafka.LeastBytes{}, AllowAutoTopicCreation: true, } @@ -249,14 +245,8 @@ func TestSSH(t *testing.T) { pool, err := dockertest.NewPool("") require.NoError(t, err) - // Start shared Docker network - network, err := pool.Client.CreateNetwork(dc.CreateNetworkOptions{Name: "kafka_network"}) + network, err := docker.CreateNetwork(pool, t, "kafka_ssh_network_") require.NoError(t, err) - t.Cleanup(func() { - if err := pool.Client.RemoveNetwork(network.ID); err != nil { - t.Logf("Error while removing Docker network: %v", err) - } - }) // Start Kafka cluster with ZooKeeper and three brokers _, err = Setup(pool, t, diff --git a/testhelper/docker/resource/minio/minio.go b/testhelper/docker/resource/minio/minio.go index ffcbb920..8f996e5f 100644 --- a/testhelper/docker/resource/minio/minio.go +++ b/testhelper/docker/resource/minio/minio.go @@ -17,6 +17,7 @@ import ( "github.com/rudderlabs/rudder-go-kit/httputil" "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource" + "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/internal" ) type Resource struct { @@ -59,7 +60,8 @@ func Setup(pool *dockertest.Pool, d resource.Cleaner, opts ...func(*Config)) (*R fmt.Sprintf("MINIO_SITE_REGION=%s", region), "MINIO_API_SELECT_PARQUET=on", }, c.Options...), - }) + PortBindings: internal.IPv4PortBindings([]string{"9000"}), + }, internal.DefaultHostConfig) if err != nil { return nil, fmt.Errorf("could not start resource: %s", err) } diff --git a/testhelper/docker/resource/mysql/mysql.go b/testhelper/docker/resource/mysql/mysql.go index 2039a516..c01ef24d 100644 --- a/testhelper/docker/resource/mysql/mysql.go +++ b/testhelper/docker/resource/mysql/mysql.go @@ -9,6 +9,7 @@ import ( dc "github.com/ory/dockertest/v3/docker" "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource" + "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/internal" ) const ( @@ -42,9 +43,11 @@ func Setup(pool *dockertest.Pool, d resource.Cleaner, opts ...func(*Config)) (*R "MYSQL_ROOT_PASSWORD=" + defaultPassword, "MYSQL_DATABASE=" + defaultDB, }, + ExposedPorts: []string{"3306/tcp"}, + PortBindings: internal.IPv4PortBindings([]string{"3306"}), }, func(hc *dc.HostConfig) { hc.ShmSize = c.ShmSize - }) + }, internal.DefaultHostConfig) if err != nil { return nil, fmt.Errorf("running container: %w", err) } @@ -55,8 +58,11 @@ func Setup(pool *dockertest.Pool, d resource.Cleaner, opts ...func(*Config)) (*R } }) - dbDSN := fmt.Sprintf("%s:%s@tcp(127.0.0.1:%s)/%s?tls=false", - defaultUser, defaultPassword, mysqlContainer.GetPort("3306/tcp"), defaultDB, + dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?tls=false", + defaultUser, defaultPassword, + mysqlContainer.GetBoundIP("3306/tcp"), + mysqlContainer.GetPort("3306/tcp"), + defaultDB, ) // exponential backoff-retry, because the application in the container might not be ready to accept connections yet err = pool.Retry(func() (err error) { @@ -82,7 +88,7 @@ func Setup(pool *dockertest.Pool, d resource.Cleaner, opts ...func(*Config)) (*R Database: defaultDB, User: defaultUser, Password: defaultPassword, - Host: "localhost", + Host: mysqlContainer.GetBoundIP("3306/tcp"), Port: mysqlContainer.GetPort("3306/tcp"), }, nil } diff --git a/testhelper/docker/resource/postgres/postgres.go b/testhelper/docker/resource/postgres/postgres.go index c47d9d1f..6f211b52 100644 --- a/testhelper/docker/resource/postgres/postgres.go +++ b/testhelper/docker/resource/postgres/postgres.go @@ -13,6 +13,7 @@ import ( "github.com/rudderlabs/rudder-go-kit/bytesize" "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource" + "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/internal" ) const ( @@ -57,12 +58,13 @@ func Setup(pool *dockertest.Pool, d resource.Cleaner, opts ...func(*Config)) (*R "POSTGRES_DB=" + postgresDefaultDB, "POSTGRES_USER=" + postgresDefaultUser, }, - Cmd: cmd, + Cmd: cmd, + PortBindings: internal.IPv4PortBindings([]string{"5432"}), }, func(hc *dc.HostConfig) { hc.ShmSize = c.ShmSize hc.OOMKillDisable = c.OOMKillDisable hc.Memory = c.Memory - }) + }, internal.DefaultHostConfig) if err != nil { return nil, err } @@ -94,8 +96,11 @@ func Setup(pool *dockertest.Pool, d resource.Cleaner, opts ...func(*Config)) (*R }) dbDSN := fmt.Sprintf( - "postgres://%s:%s@localhost:%s/%s?sslmode=disable", - postgresDefaultUser, postgresDefaultPassword, postgresContainer.GetPort("5432/tcp"), postgresDefaultDB, + "postgres://%s:%s@%s:%s/%s?sslmode=disable", + postgresDefaultUser, postgresDefaultPassword, + postgresContainer.GetBoundIP("5432/tcp"), + postgresContainer.GetPort("5432/tcp"), + postgresDefaultDB, ) // exponential backoff-retry, because the application in the container might not be ready to accept connections yet @@ -141,7 +146,7 @@ func Setup(pool *dockertest.Pool, d resource.Cleaner, opts ...func(*Config)) (*R Database: postgresDefaultDB, User: postgresDefaultUser, Password: postgresDefaultPassword, - Host: "localhost", + Host: postgresContainer.GetBoundIP("5432/tcp"), Port: postgresContainer.GetPort("5432/tcp"), ContainerName: postgresContainer.Container.Name, ContainerID: postgresContainer.Container.ID, diff --git a/testhelper/docker/resource/pulsar/pulsar.go b/testhelper/docker/resource/pulsar/pulsar.go index 84719623..920eaa52 100644 --- a/testhelper/docker/resource/pulsar/pulsar.go +++ b/testhelper/docker/resource/pulsar/pulsar.go @@ -7,6 +7,7 @@ import ( "github.com/ory/dockertest/v3" "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource" + "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/internal" ) type Resource struct { @@ -18,7 +19,7 @@ type Resource struct { func Setup(pool *dockertest.Pool, d resource.Cleaner, opts ...Option) (*Resource, error) { c := &config{ - tag: "3.2.2", + tag: "3.2.4", } for _, opt := range opts { opt(c) @@ -32,10 +33,11 @@ func Setup(pool *dockertest.Pool, d resource.Cleaner, opts ...Option) (*Resource Repository: "apachepulsar/pulsar", Tag: c.tag, Env: []string{}, - ExposedPorts: []string{"6650", "8080"}, + ExposedPorts: []string{"6650/tcp", "8080/tcp"}, + PortBindings: internal.IPv4PortBindings([]string{"6650", "8080"}), Cmd: []string{"bin/pulsar", "standalone"}, NetworkID: networkID, - }) + }, internal.DefaultHostConfig) if err != nil { return nil, err } @@ -46,8 +48,8 @@ func Setup(pool *dockertest.Pool, d resource.Cleaner, opts ...Option) (*Resource } }) - url := fmt.Sprintf("pulsar://localhost:%s", container.GetPort("6650/tcp")) - adminURL := fmt.Sprintf("http://localhost:%s", container.GetPort("8080/tcp")) + url := fmt.Sprintf("pulsar://%s:%s", container.GetBoundIP("6650/tcp"), container.GetPort("6650/tcp")) + adminURL := fmt.Sprintf("http://%s:%s", container.GetBoundIP("8080/tcp"), container.GetPort("8080/tcp")) if err := pool.Retry(func() (err error) { var w bytes.Buffer diff --git a/testhelper/docker/resource/redis/redis.go b/testhelper/docker/resource/redis/redis.go index 15532abb..329a7991 100644 --- a/testhelper/docker/resource/redis/redis.go +++ b/testhelper/docker/resource/redis/redis.go @@ -10,8 +10,11 @@ import ( "github.com/ory/dockertest/v3" "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource" + "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/internal" ) +const redisPort = "6379" + // WithTag is used to specify a custom tag that is used when pulling the Redis image from the container registry func WithTag(tag string) Option { return func(c *redisConfig) { @@ -62,28 +65,30 @@ func Setup(ctx context.Context, pool *dockertest.Pool, d resource.Cleaner, opts opt(&conf) } runOptions := &dockertest.RunOptions{ - Repository: conf.repository, - Tag: conf.tag, - Env: conf.envs, - Cmd: []string{"redis-server"}, + Repository: conf.repository, + Tag: conf.tag, + Env: conf.envs, + Cmd: []string{"redis-server"}, + ExposedPorts: []string{redisPort + "/tcp"}, + PortBindings: internal.IPv4PortBindings([]string{redisPort}), } if len(conf.cmdArgs) > 0 { runOptions.Cmd = append(runOptions.Cmd, conf.cmdArgs...) } // pulls a redis image, creates a container based on it and runs it - container, err := pool.RunWithOptions(runOptions) - if err != nil { - return nil, err - } + container, err := pool.RunWithOptions(runOptions, internal.DefaultHostConfig) d.Cleanup(func() { if err := pool.Purge(container); err != nil { d.Log("Could not purge resource:", err) } }) + if err != nil { + return nil, fmt.Errorf("run redis container: %w", err) + } // exponential backoff-retry, because the application in the container might not be ready to accept connections yet - addr := fmt.Sprintf("localhost:%s", container.GetPort("6379/tcp")) + addr := fmt.Sprintf("%s:%s", container.GetBoundIP(redisPort+"/tcp"), container.GetPort(redisPort+"/tcp")) err = pool.Retry(func() error { redisClient := redis.NewClient(&redis.Options{ Addr: addr, diff --git a/testhelper/docker/resource/scylla/scylla.go b/testhelper/docker/resource/scylla/scylla.go index b22f8f16..50c872d5 100644 --- a/testhelper/docker/resource/scylla/scylla.go +++ b/testhelper/docker/resource/scylla/scylla.go @@ -6,8 +6,11 @@ import ( "github.com/gocql/gocql" "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" + "github.com/rudderlabs/rudder-go-kit/bytesize" "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource" + "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/internal" ) type Resource struct { @@ -27,7 +30,12 @@ func Setup(pool *dockertest.Pool, d resource.Cleaner, opts ...Option) (*Resource Repository: "scylladb/scylla", Tag: c.tag, Env: []string{}, - ExposedPorts: []string{"9042"}, + ExposedPorts: []string{"9042/tcp"}, + PortBindings: internal.IPv4PortBindings([]string{"9042"}), + Cmd: []string{"--smp 1"}, + }, internal.DefaultHostConfig, func(hc *docker.HostConfig) { + hc.CPUCount = 1 + hc.Memory = 128 * bytesize.MB }) if err != nil { return nil, err @@ -39,7 +47,7 @@ func Setup(pool *dockertest.Pool, d resource.Cleaner, opts ...Option) (*Resource } }) - url := fmt.Sprintf("localhost:%s", container.GetPort("9042/tcp")) + url := fmt.Sprintf("%s:%s", container.GetBoundIP("9042/tcp"), container.GetPort("9042/tcp")) if err := pool.Retry(func() (err error) { var w bytes.Buffer diff --git a/testhelper/docker/resource/sshserver/sshserver.go b/testhelper/docker/resource/sshserver/sshserver.go index 587aef0d..60e1d914 100644 --- a/testhelper/docker/resource/sshserver/sshserver.go +++ b/testhelper/docker/resource/sshserver/sshserver.go @@ -3,14 +3,15 @@ package sshserver import ( "bytes" "fmt" + "strconv" "strings" "time" "github.com/ory/dockertest/v3" dc "github.com/ory/dockertest/v3/docker" - kithelper "github.com/rudderlabs/rudder-go-kit/testhelper" "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource" + "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/internal" ) const exposedPort = "2222" @@ -46,6 +47,7 @@ func WithDockerNetwork(network *dc.Network) Option { } type Resource struct { + Host string Port int container *dockertest.Resource @@ -71,11 +73,6 @@ func Setup(pool *dockertest.Pool, cln resource.Cleaner, opts ...Option) (*Resour }) } - port, err := kithelper.GetFreePort() - if err != nil { - return nil, err - } - var ( mounts []string envVars = []string{ @@ -101,22 +98,25 @@ func Setup(pool *dockertest.Pool, cln resource.Cleaner, opts ...Option) (*Resour Tag: "9.3_p2-r1-ls145", NetworkID: network.ID, Hostname: "sshserver", + ExposedPorts: []string{ + exposedPort + "/tcp", + }, PortBindings: map[dc.Port][]dc.PortBinding{ exposedPort + "/tcp": { - {HostIP: "sshserver", HostPort: fmt.Sprintf("%d", port)}, + {HostIP: "127.0.0.1", HostPort: "0"}, }, }, Env: envVars, Mounts: mounts, - }) - if err != nil { - return nil, err - } + }, internal.DefaultHostConfig) cln.Cleanup(func() { if err := pool.Purge(container); err != nil { cln.Log("Could not purge resource", err) } }) + if err != nil { + return nil, err + } var ( buf *bytes.Buffer @@ -153,9 +153,14 @@ loop: return nil, fmt.Errorf("ssh server not health within timeout") } } + p, err := strconv.Atoi(container.GetPort(exposedPort + "/tcp")) + if err != nil { + return nil, fmt.Errorf("could not convert port %q to int: %w", container.GetPort(exposedPort+"/tcp"), err) + } return &Resource{ - Port: port, + Host: container.GetBoundIP(exposedPort + "/tcp"), + Port: p, container: container, }, nil } diff --git a/testhelper/docker/resource/sshserver/sshserver_test.go b/testhelper/docker/resource/sshserver/sshserver_test.go index 489d07a9..ed17bf83 100644 --- a/testhelper/docker/resource/sshserver/sshserver_test.go +++ b/testhelper/docker/resource/sshserver/sshserver_test.go @@ -65,7 +65,7 @@ func TestKeys(t *testing.T) { require.NoError(t, err) c, err := goph.NewConn(&goph.Config{ - Addr: "localhost", + Addr: res.Host, Port: uint(res.Port), User: "linuxserver.io", Auth: auth, diff --git a/testhelper/docker/resource/transformer/transformer.go b/testhelper/docker/resource/transformer/transformer.go index 66f1dd84..510f6834 100644 --- a/testhelper/docker/resource/transformer/transformer.go +++ b/testhelper/docker/resource/transformer/transformer.go @@ -15,11 +15,13 @@ import ( "github.com/rudderlabs/rudder-go-kit/httputil" "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource" + "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/internal" ) +const transformerPort = "9090/tcp" + type Resource struct { TransformerURL string - Port string } type config struct { @@ -112,13 +114,14 @@ func Setup(pool *dockertest.Pool, d resource.Cleaner, opts ...func(conf *config) if err != nil { return nil, fmt.Errorf("failed to pull image: %w", err) } + transformerContainer, err := pool.RunWithOptions(&dockertest.RunOptions{ Repository: conf.repository, Tag: conf.tag, - ExposedPorts: conf.exposedPorts, + PortBindings: internal.IPv4PortBindings(conf.exposedPorts), Env: conf.envs, ExtraHosts: conf.extraHosts, - }) + }, internal.DefaultHostConfig) if err != nil { return nil, err } @@ -130,8 +133,7 @@ func Setup(pool *dockertest.Pool, d resource.Cleaner, opts ...func(conf *config) }) transformerResource := &Resource{ - TransformerURL: fmt.Sprintf("http://localhost:%s", transformerContainer.GetPort("9090/tcp")), - Port: transformerContainer.GetPort("9090/tcp"), + TransformerURL: fmt.Sprintf("http://%s:%s", transformerContainer.GetBoundIP(transformerPort), transformerContainer.GetPort(transformerPort)), } err = pool.Retry(func() (err error) { diff --git a/testhelper/docker/resource/types.go b/testhelper/docker/resource/types.go index cf6c96e4..7de35a4c 100644 --- a/testhelper/docker/resource/types.go +++ b/testhelper/docker/resource/types.go @@ -2,6 +2,7 @@ package resource type Logger interface { Log(...interface{}) + Logf(string, ...interface{}) } type FailIndicator interface { diff --git a/testhelper/docker/resource/zipkin/zipkin.go b/testhelper/docker/resource/zipkin/zipkin.go index ffb1cfa0..8c711e77 100644 --- a/testhelper/docker/resource/zipkin/zipkin.go +++ b/testhelper/docker/resource/zipkin/zipkin.go @@ -3,21 +3,19 @@ package zipkin import ( "fmt" "net/http" - "strconv" "sync" "github.com/ory/dockertest/v3" - "github.com/ory/dockertest/v3/docker" "github.com/rudderlabs/rudder-go-kit/httputil" - "github.com/rudderlabs/rudder-go-kit/testhelper" "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource" + "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/internal" ) -const port = "9411" +const zipkinPort = "9411" type Resource struct { - Port string + URL string pool *dockertest.Pool resource *dockertest.Resource @@ -43,18 +41,11 @@ func (z *Resource) Purge() error { } func Setup(pool *dockertest.Pool, d resource.Cleaner) (*Resource, error) { - freePort, err := testhelper.GetFreePort() - if err != nil { - return nil, fmt.Errorf("failed to get free port: %w", err) - } - zipkin, err := pool.RunWithOptions(&dockertest.RunOptions{ Repository: "openzipkin/zipkin", - ExposedPorts: []string{port}, - PortBindings: map[docker.Port][]docker.PortBinding{ - port + "/tcp": {{HostPort: strconv.Itoa(freePort)}}, - }, - }) + ExposedPorts: []string{zipkinPort + "/tcp"}, + PortBindings: internal.IPv4PortBindings([]string{zipkinPort}), + }, internal.DefaultHostConfig) if err != nil { return nil, fmt.Errorf("failed to start zipkin: %w", err) } @@ -62,16 +53,20 @@ func Setup(pool *dockertest.Pool, d resource.Cleaner) (*Resource, error) { res := &Resource{ pool: pool, resource: zipkin, - Port: zipkin.GetPort(port + "/tcp"), + URL: fmt.Sprintf("http://%s:%s", zipkin.GetBoundIP(zipkinPort+"/tcp"), zipkin.GetPort(zipkinPort+"/tcp")), } + + if zipkin.GetBoundIP(zipkinPort+"/tcp") == "" { + return nil, fmt.Errorf("failed to get zipkin bound ip") + } + d.Cleanup(func() { if err := res.Purge(); err != nil { d.Log("Could not purge zipkin resource:", err) } }) - zipkinHealthURL := "http://localhost:" + strconv.Itoa(freePort) + "/health" - healthReq, err := http.NewRequest(http.MethodGet, zipkinHealthURL, nil) + healthReq, err := http.NewRequest(http.MethodGet, res.URL+"/health", nil) if err != nil { return nil, fmt.Errorf("failed to create zipkin health request: %w", err) } diff --git a/throttling/util_test.go b/throttling/util_test.go index 7a0b8b11..675646a6 100644 --- a/throttling/util_test.go +++ b/throttling/util_test.go @@ -13,6 +13,7 @@ import ( type tester interface { Helper() Log(...interface{}) + Logf(string, ...any) Errorf(format string, args ...interface{}) Fatalf(format string, args ...any) Failed() bool