Skip to content

Commit

Permalink
Use a docker-compose file to run the tests
Browse files Browse the repository at this point in the history
Also create the topics & other deps inside the test itself.
  • Loading branch information
KJ Tsanaktsidis committed Apr 16, 2020
1 parent 955b7df commit 0650324
Show file tree
Hide file tree
Showing 2 changed files with 299 additions and 0 deletions.
183 changes: 183 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
version: '3.7'
services:
zookeeper-1:
image: 'zookeeper:3.6.0'
restart: always
depends_on: ['toxiproxy']
environment:
ZOO_MY_ID: '1'
ZOO_SERVERS: 'server.1=zookeeper-1:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181 server.4=zookeeper-4:2888:3888;2181 server.5=zookeeper-5:2888:3888;2181'
ZOO_INIT_LIMIT: '10'
ZOO_SYNC_LIMIT: '5'
ZOO_MAX_CLIENT_CONNS: '0'
zookeeper-2:
image: 'zookeeper:3.6.0'
restart: always
depends_on: ['toxiproxy']
environment:
ZOO_MY_ID: '2'
ZOO_SERVERS: 'server.1=zookeeper-1:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181 server.4=zookeeper-4:2888:3888;2181 server.5=zookeeper-5:2888:3888;2181'
ZOO_INIT_LIMIT: '10'
ZOO_SYNC_LIMIT: '5'
ZOO_MAX_CLIENT_CONNS: '0'
zookeeper-3:
image: 'zookeeper:3.6.0'
restart: always
depends_on: ['toxiproxy']
environment:
ZOO_MY_ID: '3'
ZOO_SERVERS: 'server.1=zookeeper-1:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181 server.4=zookeeper-4:2888:3888;2181 server.5=zookeeper-5:2888:3888;2181'
ZOO_INIT_LIMIT: '10'
ZOO_SYNC_LIMIT: '5'
ZOO_MAX_CLIENT_CONNS: '0'
zookeeper-4:
image: 'zookeeper:3.6.0'
restart: always
depends_on: ['toxiproxy']
environment:
ZOO_MY_ID: '4'
ZOO_SERVERS: 'server.1=zookeeper-1:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181 server.4=zookeeper-4:2888:3888;2181 server.5=zookeeper-5:2888:3888;2181'
ZOO_INIT_LIMIT: '10'
ZOO_SYNC_LIMIT: '5'
ZOO_MAX_CLIENT_CONNS: '0'
zookeeper-5:
image: 'zookeeper:3.6.0'
restart: always
depends_on: ['toxiproxy']
environment:
ZOO_MY_ID: '5'
ZOO_SERVERS: 'server.1=zookeeper-1:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181 server.4=zookeeper-4:2888:3888;2181 server.5=zookeeper-5:2888:3888;2181'
ZOO_INIT_LIMIT: '10'
ZOO_SYNC_LIMIT: '5'
ZOO_MAX_CLIENT_CONNS: '0'
kafka-1:
image: 'bitnami/kafka:2.4.1'
restart: always
depends_on: ['toxiproxy', 'zookeeper-1', 'zookeeper-2', 'zookeeper-3', 'zookeeper-4', 'zookeeper-5']
network_mode: 'service:toxiproxy'
environment:
KAFKA_CFG_ZOOKEEPER_CONNECT: 'toxiproxy:21801'
ALLOW_PLAINTEXT_LISTENER: 'yes'
KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: '2'
KAFKA_CFG_BROKER_ID: '29091'
KAFKA_CFG_BROKER_RACK: '1'
KAFKA_CFG_LISTENERS: 'PLAINTEXT://:9091'
KAFKA_CFG_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:29091'
KAFKA_CFG_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
KAFKA_CFG_RESERVED_BROKER_MAX_ID: '100000'
KAFKA_CFG_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true'
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'false'
kafka-2:
image: 'bitnami/kafka:2.4.1'
restart: always
depends_on: ['toxiproxy', 'zookeeper-1', 'zookeeper-2', 'zookeeper-3', 'zookeeper-4', 'zookeeper-5']
network_mode: 'service:toxiproxy'
environment:
KAFKA_CFG_ZOOKEEPER_CONNECT: 'toxiproxy:21802'
ALLOW_PLAINTEXT_LISTENER: 'yes'
KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: '2'
KAFKA_CFG_BROKER_ID: '29092'
KAFKA_CFG_BROKER_RACK: '2'
KAFKA_CFG_LISTENERS: 'PLAINTEXT://:9092'
KAFKA_CFG_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:29092'
KAFKA_CFG_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
KAFKA_CFG_RESERVED_BROKER_MAX_ID: '100000'
KAFKA_CFG_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true'
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'false'
kafka-3:
image: 'bitnami/kafka:2.4.1'
restart: always
depends_on: ['toxiproxy', 'zookeeper-1', 'zookeeper-2', 'zookeeper-3', 'zookeeper-4', 'zookeeper-5']
network_mode: 'service:toxiproxy'
environment:
KAFKA_CFG_ZOOKEEPER_CONNECT: 'toxiproxy:21803'
ALLOW_PLAINTEXT_LISTENER: 'yes'
KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: '2'
KAFKA_CFG_BROKER_ID: '29093'
KAFKA_CFG_BROKER_RACK: '3'
KAFKA_CFG_LISTENERS: 'PLAINTEXT://:9093'
KAFKA_CFG_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:29093'
KAFKA_CFG_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
KAFKA_CFG_RESERVED_BROKER_MAX_ID: '100000'
KAFKA_CFG_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true'
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'false'
kafka-4:
image: 'bitnami/kafka:2.4.1'
restart: always
depends_on: ['toxiproxy', 'zookeeper-1', 'zookeeper-2', 'zookeeper-3', 'zookeeper-4', 'zookeeper-5']
network_mode: 'service:toxiproxy'
environment:
KAFKA_CFG_ZOOKEEPER_CONNECT: 'toxiproxy:21804'
ALLOW_PLAINTEXT_LISTENER: 'yes'
KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: '2'
KAFKA_CFG_BROKER_ID: '29094'
KAFKA_CFG_BROKER_RACK: '4'
KAFKA_CFG_LISTENERS: 'PLAINTEXT://:9094'
KAFKA_CFG_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:29094'
KAFKA_CFG_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
KAFKA_CFG_RESERVED_BROKER_MAX_ID: '100000'
KAFKA_CFG_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true'
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'false'
kafka-5:
image: 'bitnami/kafka:2.4.1'
restart: always
depends_on: ['toxiproxy', 'zookeeper-1', 'zookeeper-2', 'zookeeper-3', 'zookeeper-4', 'zookeeper-5']
network_mode: 'service:toxiproxy'
environment:
KAFKA_CFG_ZOOKEEPER_CONNECT: 'toxiproxy:21805'
ALLOW_PLAINTEXT_LISTENER: 'yes'
KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: '2'
KAFKA_CFG_BROKER_ID: '29095'
KAFKA_CFG_BROKER_RACK: '5'
KAFKA_CFG_LISTENERS: 'PLAINTEXT://:9095'
KAFKA_CFG_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:29095'
KAFKA_CFG_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
KAFKA_CFG_RESERVED_BROKER_MAX_ID: '100000'
KAFKA_CFG_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true'
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'false'
toxiproxy:
image: 'shopify/toxiproxy:2.1.4'
restart: always
entrypoint: ['/bin/sh', '-c']
ports:
- '21801:21801'
- '21802:21802'
- '21803:21803'
- '21804:21804'
- '21805:21805'
- '29091:29091'
- '29092:29092'
- '29093:29093'
- '29094:29094'
- '29095:29095'
- '8474:8474'
command:
- |
{
while ! nc -w 1 localhost 8474 </dev/null; do echo "Waiting"; sleep 1; done
echo "toxiproxy up"
/go/bin/toxiproxy-cli create zk1 -l 0.0.0.0:21801 -u zookeeper-1:2181
/go/bin/toxiproxy-cli create zk2 -l 0.0.0.0:21802 -u zookeeper-2:2181
/go/bin/toxiproxy-cli create zk3 -l 0.0.0.0:21803 -u zookeeper-3:2181
/go/bin/toxiproxy-cli create zk4 -l 0.0.0.0:21804 -u zookeeper-4:2181
/go/bin/toxiproxy-cli create zk5 -l 0.0.0.0:21805 -u zookeeper-5:2181
/go/bin/toxiproxy-cli create kafka1 -l 0.0.0.0:29091 -u localhost:9091
/go/bin/toxiproxy-cli create kafka2 -l 0.0.0.0:29092 -u localhost:9092
/go/bin/toxiproxy-cli create kafka3 -l 0.0.0.0:29093 -u localhost:9093
/go/bin/toxiproxy-cli create kafka4 -l 0.0.0.0:29094 -u localhost:9094
/go/bin/toxiproxy-cli create kafka5 -l 0.0.0.0:29095 -u localhost:9095
} &
exec /go/bin/toxiproxy -host=0.0.0.0 -port=8474
116 changes: 116 additions & 0 deletions functional_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package sarama

import (
"fmt"
"log"
"math/rand"
"net"
"os"
"os/exec"
"strconv"
"strings"
"testing"
Expand All @@ -27,6 +29,7 @@ var (
Proxies map[string]*toxiproxy.Proxy
)


func init() {
if os.Getenv("DEBUG") == "true" {
Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
Expand Down Expand Up @@ -58,6 +61,10 @@ func init() {
}

kafkaRequired = os.Getenv("CI") != ""

if kafkaAvailable {
createTestTopics()
}
}

func checkKafkaAvailability(t testing.TB) {
Expand Down Expand Up @@ -144,3 +151,112 @@ func parseKafkaVersion(version string) kafkaVersion {

return result
}

func createTestTopics() {
Logger.Println("Creating topics")
config := NewConfig()
config.Metadata.Retry.Max = 3
config.Metadata.Retry.Backoff = 3 * time.Second
var err error
config.Version, err = ParseKafkaVersion(os.Getenv("KAFKA_VERSION"))
if err != nil {
panic(fmt.Sprintf("Could not parse kafka version %s: %+v", os.Getenv("KAFKA_VERSION"), err))
}
client, err := NewClient(kafkaBrokers, config)
if err != nil {
panic("failed to connect to kafka")
}
defer client.Close()
broker, err := client.Controller()
if err != nil {
panic(fmt.Sprintf("no controller available: %+v", err))
}

// Delete the uncommitted-topic-test-4 topic (which is used by
/// https://github.com/FrancoisPoinsot/simplest-uncommitted-msg/blob/master/src/main/java/CustomProducer/Main.java
// in TestReadOnlyAndAllCommittedMessages), so we can re-create it empty, and run the test.k
deleteRes, err := broker.DeleteTopics(&DeleteTopicsRequest{
Topics: []string{"uncomitted-topic-test-4", "test.1", "test.4", "test.64"},
Timeout: 3 * time.Second,
})
if err != nil {
panic(fmt.Sprintf("Could not delete the uncomitted 4 topic: %+v", err))
}
for _, topicErr := range deleteRes.TopicErrorCodes {
if topicErr != ErrUnknownTopicOrPartition && topicErr != ErrInvalidTopic && topicErr != ErrNoError {
panic(fmt.Sprintf("failed to delete topic: %+v", topicErr))
}
}

// We need to wait a while for the deletes to process
topicsOk := false
mdloop:
for i := 0; i < 20; i++{
time.Sleep(1 * time.Second)
md, err := broker.GetMetadata(&MetadataRequest{
Topics: []string{"uncomitted-topic-test-4", "test.1", "test.4", "test.64"},
})
if err != nil {
panic(fmt.Sprintf("failed to call GetMetadata: %+v", err))
}
for _, topicMd := range md.Topics {
if topicMd.Err != ErrUnknownTopicOrPartition && topicMd.Err != ErrInvalidTopic && topicMd.Err != ErrNoError {
// Need to try again
continue mdloop
}
}
topicsOk = true
break
}
if !topicsOk {
panic(fmt.Sprintf("timout waiting for topics to be OK"))
}

createRes, err := broker.CreateTopics(&CreateTopicsRequest{
TopicDetails: map[string]*TopicDetail{
"test.1": {
NumPartitions: 1,
ReplicationFactor: 3,
},
"test.4": {
NumPartitions: 4,
ReplicationFactor: 3,
},
"test.64": {
NumPartitions: 64,
ReplicationFactor: 3,
},
"uncommitted-topic-test-4": {
NumPartitions: 1,
ReplicationFactor: 3,
},
},
Timeout: 3 * time.Second,
})
if err != nil {
panic(fmt.Sprintf("could not create topics: %+v", err))
}
for topic, topicErr := range createRes.TopicErrors {
if topicErr.Err != ErrTopicAlreadyExists && topicErr.Err != ErrNoError {
panic(fmt.Sprintf("failed to create topic %s: %+v", topic, topicErr))
}
}

// Now fill the topic with the java blob.
c := exec.Command("wget", "-nc", "https://github.com/FrancoisPoinsot/simplest-uncommitted-msg/releases/download/0.1/simplest-uncommitted-msg-0.1-jar-with-dependencies.jar")
c.Stderr = os.Stderr
c.Stdout = os.Stdout
err = c.Run()
if err != nil {
panic(fmt.Sprintf("failed to download jre blob: %+v", err))
}
c = exec.Command("java", "-jar", "simplest-uncommitted-msg-0.1-jar-with-dependencies.jar", "-b", kafkaBrokers[0], "-c", "4")
c.Stderr = os.Stderr
c.Stdout = os.Stdout
err = c.Run()
if err != nil {
panic(fmt.Sprintf("failed to run java consumer: %+v", err))
}

Logger.Println("Created topics OK.")
}

0 comments on commit 0650324

Please sign in to comment.