From 5e561e4c26f2e3e86cdd2fce8d150c0e5f3c3dd7 Mon Sep 17 00:00:00 2001 From: sakulali Date: Wed, 8 Nov 2023 00:28:22 +0800 Subject: [PATCH] [receiver/kafkametrics] Using unique container networks and container names and attempt to fix flaky tests (#28903) **Description:** Using unique container networks and container names and attempt to fix flaky tests **Link to tracking Issue:** https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/26293 **Testing:** **Preparation:** DIR = receiver/kafkametricsreceiver CMD = go test -v -count=1 -race -timeout 360s -parallel 4 -tags=integration,"" -run=Integration ./... **Tests:** 1. If we manually modify the code(as shown below) and use invalid kafka broker, such as `localhost:invalid-port`, the same error as shown in the issue may occur. ``` // receiver/kafkametricsreceiver/integration_test.go scraperinttest.WithCustomConfig( func(t *testing.T, cfg component.Config, ci *scraperinttest.ContainerInfo) { rCfg := cfg.(*Config) rCfg.CollectionInterval = 5 * time.Second rCfg.Brokers = []string{"localhost:invalid-port"} rCfg.Scrapers = []string{"brokers", "consumers", "topics"} }), ``` 2. If we execute the test commands **sequentially** , it seems that the execution results are all correct. ``` # all result are correct for i in {1..100}; do echo "Run $i"; ./${CMD} ; done ``` 3. If we execute the commands in **parallel** end with **`&`**, sometimes the error shown in the issue may occur. ``` # sometimes result occur error for i in {1..20}; do echo "Run $i"; ./${CMD} &; done ``` **Inference:** I have found that duplicate container networks and container names can cause container creation to fail or result in successfully created containers with unavailable ports, which may lead to issues similar to the one shown. **Additional information:** Since Kafka's startup relies on ZooKeeper (which waits for the default `zookeeper.connection.timeout.ms=18000`), if Kafka starts first and ZooKeeper fails to start properly after the timeout duration, it will cause the Kafka container to fail to start correctly. I found the issue https://github.com/testcontainers/testcontainers-go/issues/1791 wants to support that. **Documentation:** --------- Signed-off-by: sakulali --- receiver/kafkametricsreceiver/go.mod | 2 +- .../kafkametricsreceiver/integration_test.go | 20 +++++++++++-------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/receiver/kafkametricsreceiver/go.mod b/receiver/kafkametricsreceiver/go.mod index 50c3075b5dfe..6da6e7864dfd 100644 --- a/receiver/kafkametricsreceiver/go.mod +++ b/receiver/kafkametricsreceiver/go.mod @@ -5,6 +5,7 @@ go 1.20 require ( github.com/IBM/sarama v1.41.3 github.com/google/go-cmp v0.6.0 + github.com/google/uuid v1.4.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.88.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.88.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.88.0 @@ -44,7 +45,6 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/google/uuid v1.4.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-uuid v1.0.3 // indirect diff --git a/receiver/kafkametricsreceiver/integration_test.go b/receiver/kafkametricsreceiver/integration_test.go index d946e72a135e..dcaa40511588 100644 --- a/receiver/kafkametricsreceiver/integration_test.go +++ b/receiver/kafkametricsreceiver/integration_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/google/uuid" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" "go.opentelemetry.io/collector/component" @@ -20,13 +21,16 @@ import ( ) const ( - networkName = "kafka-network" kafkaPort = "9092" zookeeperPort = "2181" - zookeeperHost = "zookeeper" ) func TestIntegration(t *testing.T) { + uid := fmt.Sprintf("-%s", uuid.NewString()) + networkName := "kafka-network" + uid + zkContainerName := "zookeeper" + uid + kafkaContainerName := "kafka" + uid + scraperinttest.NewIntegrationTest( NewFactory(), scraperinttest.WithNetworkRequest( @@ -37,10 +41,10 @@ func TestIntegration(t *testing.T) { ), scraperinttest.WithContainerRequest( testcontainers.ContainerRequest{ - Name: "zookeeper", + Name: zkContainerName, Image: "ubuntu/zookeeper:3.1-22.04_beta", Networks: []string{networkName}, - Hostname: zookeeperHost, + Hostname: zkContainerName, ExposedPorts: []string{zookeeperPort}, WaitingFor: wait.ForAll( wait.ForListeningPort(zookeeperPort).WithStartupTimeout(2 * time.Minute), @@ -48,12 +52,12 @@ func TestIntegration(t *testing.T) { }), scraperinttest.WithContainerRequest( testcontainers.ContainerRequest{ - Name: "kafka", + Name: kafkaContainerName, Image: "ubuntu/kafka:3.1-22.04_beta", Networks: []string{networkName}, ExposedPorts: []string{kafkaPort}, Env: map[string]string{ - "ZOOKEEPER_HOST": zookeeperHost, + "ZOOKEEPER_HOST": zkContainerName, "ZOOKEEPER_PORT": zookeeperPort, }, WaitingFor: wait.ForAll( @@ -65,8 +69,8 @@ func TestIntegration(t *testing.T) { rCfg := cfg.(*Config) rCfg.CollectionInterval = 5 * time.Second rCfg.Brokers = []string{fmt.Sprintf("%s:%s", - ci.HostForNamedContainer(t, "kafka"), - ci.MappedPortForNamedContainer(t, "kafka", kafkaPort))} + ci.HostForNamedContainer(t, kafkaContainerName), + ci.MappedPortForNamedContainer(t, kafkaContainerName, kafkaPort))} rCfg.Scrapers = []string{"brokers", "consumers", "topics"} }), // scraperinttest.WriteExpected(), // TODO remove