Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: specify port binding for docker #592

Merged
merged 15 commits into from
Aug 22, 2024
20 changes: 8 additions & 12 deletions kafkaclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestClient_Ping(t *testing.T) {
kafkaContainer, err := dockerKafka.Setup(pool, t)
require.NoError(t, err)

kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Ports[0])
kafkaHost := kafkaContainer.Brokers[0]
c, err := New("tcp", []string{"bad-host", kafkaHost}, Config{})
require.NoError(t, err)

Expand Down Expand Up @@ -63,11 +63,7 @@ func TestProducerBatchConsumerGroup(t *testing.T) {
dockerKafka.WithBrokers(3))
require.NoError(t, err)

addresses := make([]string, 0, len(kafkaContainer.Ports))
for i := 0; i < len(kafkaContainer.Ports); i++ {
addresses = append(addresses, fmt.Sprintf("localhost:%s", kafkaContainer.Ports[i]))
}
c, err := New("tcp", addresses, Config{ClientID: "some-client", DialTimeout: 5 * time.Second})
c, err := New("tcp", kafkaContainer.Brokers, Config{ClientID: "some-client", DialTimeout: 5 * time.Second})
require.NoError(t, err)

var (
Expand Down Expand Up @@ -201,7 +197,7 @@ func TestConsumer_Partition(t *testing.T) {
dockerKafka.WithBrokers(1))
require.NoError(t, err)

kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Ports[0])
kafkaHost := kafkaContainer.Brokers[0]
c, err := New("tcp", []string{"bad-host", kafkaHost}, Config{ClientID: "some-client", DialTimeout: 5 * time.Second})
require.NoError(t, err)

Expand Down Expand Up @@ -354,7 +350,7 @@ func TestWithSASL(t *testing.T) {
kafkaContainer, err := dockerKafka.Setup(pool, t, containerOptions...)
require.NoError(t, err)

kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Ports[0])
kafkaHost := kafkaContainer.Brokers[0]
c, err := New("tcp", []string{"bad-host", kafkaHost}, Config{
ClientID: "some-client",
DialTimeout: 10 * time.Second,
Expand Down Expand Up @@ -432,7 +428,7 @@ func TestWithSASLBadCredentials(t *testing.T) {
kafkaContainer, err := dockerKafka.Setup(pool, t, containerOptions...)
require.NoError(t, err)

kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Ports[0])
kafkaHost := kafkaContainer.Brokers[0]
c, err := New("tcp", []string{"bad-host", kafkaHost}, Config{
ClientID: "some-client",
DialTimeout: 10 * time.Second,
Expand Down Expand Up @@ -465,7 +461,7 @@ func TestProducer_Timeout(t *testing.T) {
dockerKafka.WithBrokers(1))
require.NoError(t, err)

kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Ports[0])
kafkaHost := kafkaContainer.Brokers[0]
c, err := New("tcp", []string{"bad-host", kafkaHost}, Config{ClientID: "some-client", DialTimeout: 5 * time.Second})
require.NoError(t, err)

Expand Down Expand Up @@ -534,7 +530,7 @@ func TestIsProducerErrTemporary(t *testing.T) {
dockerKafka.WithBrokers(1))
require.NoError(t, err)

kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Ports[0])
kafkaHost := kafkaContainer.Brokers[0]
c, err := New("tcp", []string{"bad-host", kafkaHost}, Config{ClientID: "some-client", DialTimeout: 5 * time.Second})
require.NoError(t, err)

Expand Down Expand Up @@ -707,7 +703,7 @@ func TestConsumerACK(t *testing.T) {
dockerKafka.WithBrokers(1))
require.NoError(t, err)

kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Ports[0])
kafkaHost := kafkaContainer.Brokers[0]
kafkaClient, err := New("tcp", []string{"bad-host", kafkaHost}, Config{ClientID: "some-client", DialTimeout: 5 * time.Second})
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion kafkaclient/compression_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func BenchmarkCompression(b *testing.B) {
kafkaContainer, err := kafka.Setup(pool, b, kafka.WithCustomAdvertisedListener(proxyHost))
require.NoError(b, err)

return "localhost:" + kafkaContainer.Ports[0]
return kafkaContainer.Brokers[0]
}

setupProxy := func(b *testing.B, kafkaAddr string, c Compression, bs int, bt time.Duration) (
Expand Down
2 changes: 1 addition & 1 deletion stats/internal/otel/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestZipkinIntegration(t *testing.T) {
var (
om Manager
ctx = context.Background()
zipkinURL = "http://localhost:" + zipkinContainer.Port + "/api/v2/spans"
zipkinURL = zipkinContainer.URL + "/api/v2/spans"
)
tp, _, err := om.Setup(ctx, res,
WithTracerProvider(zipkinURL,
Expand Down
2 changes: 1 addition & 1 deletion stats/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ func TestZipkin(t *testing.T) {
zipkinContainer, err := zipkin.Setup(pool, t)
require.NoError(t, err)

zipkinURL := "http://localhost:" + zipkinContainer.Port + "/api/v2/spans"
zipkinURL := zipkinContainer.URL + "/api/v2/spans"

conf := config.New()
conf.Set("INSTANCE_ID", t.Name())
Expand Down
10 changes: 5 additions & 5 deletions stats/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func TestSpanFromContext(t *testing.T) {
zipkinContainer, err := zipkin.Setup(pool, t)
require.NoError(t, err)

zipkinURL := "http://localhost:" + zipkinContainer.Port + "/api/v2/spans"
zipkinTracesURL := "http://localhost:" + zipkinContainer.Port + "/api/v2/traces?serviceName=" + t.Name()
zipkinURL := zipkinContainer.URL + "/api/v2/spans"
zipkinTracesURL := zipkinContainer.URL + "/api/v2/traces?serviceName=" + t.Name()

c := config.New()
c.Set("INSTANCE_ID", t.Name())
Expand Down Expand Up @@ -98,8 +98,8 @@ func TestAsyncTracePropagation(t *testing.T) {
zipkinContainer, err := zipkin.Setup(pool, t)
require.NoError(t, err)

zipkinURL := "http://localhost:" + zipkinContainer.Port + "/api/v2/spans"
zipkinTracesURL := "http://localhost:" + zipkinContainer.Port + "/api/v2/traces?serviceName=" + t.Name()
zipkinURL := zipkinContainer.URL + "/api/v2/spans"
zipkinTracesURL := zipkinContainer.URL + "/api/v2/traces?serviceName=" + t.Name()

c := config.New()
c.Set("INSTANCE_ID", t.Name())
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestZipkinDownIsNotBlocking(t *testing.T) {
zipkinContainer, err := zipkin.Setup(pool, t)
require.NoError(t, err)

zipkinURL := "http://localhost:" + zipkinContainer.Port + "/api/v2/spans"
zipkinURL := zipkinContainer.URL + "/api/v2/spans"

c := config.New()
c.Set("INSTANCE_ID", t.Name())
Expand Down
20 changes: 20 additions & 0 deletions testhelper/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ import (
"regexp"
"strconv"
"testing"
"time"

"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource"
"github.com/rudderlabs/rudder-go-kit/testhelper/rand"
)

// GetHostPort returns the desired port mapping
Expand All @@ -26,3 +31,18 @@ func GetHostPort(t testing.TB, port string, container *docker.Container) int {
func ToInternalDockerHost(url string) string {
return regexp.MustCompile(`(localhost|127\.0\.0\.1)`).ReplaceAllString(url, "host.docker.internal")
}

func CreateNetwork(pool *dockertest.Pool, cln resource.Cleaner, prefix string) (*docker.Network, error) {
network, err := pool.Client.CreateNetwork(docker.CreateNetworkOptions{Name: prefix + "_test_" + time.Now().Format("YY-MM-DD-") + rand.String(6)})
if err != nil {
return nil, err
}

cln.Cleanup(func() {
if err := pool.Client.RemoveNetwork(network.ID); err != nil {
cln.Logf("Error while removing Docker network: %v", err)
}
})

return network, nil
}
23 changes: 7 additions & 16 deletions testhelper/docker/resource/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,21 @@ package etcd
import (
"context"
"fmt"
"strconv"
"time"

"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
etcd "go.etcd.io/etcd/client/v3"

"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/internal"
)

type Resource struct {
Client *etcd.Client
Hosts []string
// HostsInNetwork is the list of ETCD hosts accessible from the provided Docker network (if any).
HostsInNetwork []string
Port int
}

type config struct {
Expand Down Expand Up @@ -50,29 +49,22 @@ func Setup(pool *dockertest.Pool, cln resource.Cleaner, opts ...Option) (*Resour
Env: []string{
"ALLOW_NONE_AUTHENTICATION=yes",
},
})
if err != nil {
return nil, fmt.Errorf("could not create container: %v", err)
}
PortBindings: internal.IPv4PortBindings([]string{"2379"}),
}, internal.DefaultHostConfig)
cln.Cleanup(func() {
if err := pool.Purge(container); err != nil {
cln.Log(fmt.Errorf("could not purge ETCD resource: %v", err))
}
})
if err != nil {
return nil, fmt.Errorf("could not create container: %v", err)
}

var (
etcdClient *etcd.Client
etcdHosts []string
etcdPort int

etcdPortStr = container.GetPort("2379/tcp")
)
etcdPort, err = strconv.Atoi(etcdPortStr)
if err != nil {
return nil, fmt.Errorf("could not convert port %q to int: %v", etcdPortStr, err)
}

etcdHosts = []string{"http://localhost:" + etcdPortStr}
etcdHosts = []string{"http://" + container.GetBoundIP("2379/tcp") + ":" + container.GetPort("2379/tcp")}

etcdClient, err = etcd.New(etcd.Config{
Endpoints: etcdHosts,
Expand Down Expand Up @@ -102,6 +94,5 @@ func Setup(pool *dockertest.Pool, cln resource.Cleaner, opts ...Option) (*Resour
Client: etcdClient,
Hosts: etcdHosts,
HostsInNetwork: hostsInNetwork,
Port: etcdPort,
}, nil
}
27 changes: 27 additions & 0 deletions testhelper/docker/resource/internal/ports.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package internal

import (
"github.com/ory/dockertest/v3/docker"
)

const BindHostIP = "127.0.0.1"

// IPv4PortBindings returns the port bindings for the given exposed ports forcing ipv4 address.
func IPv4PortBindings(exposedPorts []string) map[docker.Port][]docker.PortBinding {
portBindings := make(map[docker.Port][]docker.PortBinding)

for _, exposedPort := range exposedPorts {
portBindings[docker.Port(exposedPort)+"/tcp"] = []docker.PortBinding{
{
HostIP: BindHostIP,
HostPort: "0",
},
}
}

return portBindings
}

func DefaultHostConfig(hc *docker.HostConfig) {
hc.PublishAllPorts = false
}
Loading
Loading