diff --git a/e2e/images/nsq/Dockerfile b/e2e/images/nsq/Dockerfile new file mode 100644 index 0000000..de63d9d --- /dev/null +++ b/e2e/images/nsq/Dockerfile @@ -0,0 +1,11 @@ +FROM golang:1.22 +WORKDIR /cmd + +COPY go.mod go.sum ./ +RUN go mod download + +COPY *.go ./ + +RUN CGO_ENABLED=0 GOOS=linux go build -o cmd . + +ENTRYPOINT ["./cmd"] diff --git a/e2e/images/nsq/go.mod b/e2e/images/nsq/go.mod new file mode 100644 index 0000000..59074c8 --- /dev/null +++ b/e2e/images/nsq/go.mod @@ -0,0 +1,8 @@ +module github.com/kedacore/test-tools/e2e/images/nsq + +go 1.22.0 + +require ( + github.com/golang/snappy v0.0.1 // indirect + github.com/nsqio/go-nsq v1.1.0 // indirect +) diff --git a/e2e/images/nsq/go.sum b/e2e/images/nsq/go.sum new file mode 100644 index 0000000..457d1d9 --- /dev/null +++ b/e2e/images/nsq/go.sum @@ -0,0 +1,4 @@ +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= +github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= diff --git a/e2e/images/nsq/main.go b/e2e/images/nsq/main.go new file mode 100644 index 0000000..3d33ab9 --- /dev/null +++ b/e2e/images/nsq/main.go @@ -0,0 +1,100 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os" + "os/signal" + "syscall" + + "github.com/nsqio/go-nsq" +) + +type Handler struct{} + +func (h *Handler) HandleMessage(m *nsq.Message) error { + log.Printf("Received message: %s", m.Body) + return nil +} + +func nsqConsumer(config *nsq.Config, nsqlookupdHTTPAddress, topic, channel string) error { + consumer, err := nsq.NewConsumer(topic, channel, config) + if err != nil { + return err + } + + consumer.AddHandler(&Handler{}) + + err = consumer.ConnectToNSQLookupd(nsqlookupdHTTPAddress) + if err != nil { + return err + } + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan + + consumer.Stop() + + return nil +} + +func nsqProducer(config *nsq.Config, nsqdTCPAddress string, topic string, messageCount int) error { + producer, err := nsq.NewProducer(nsqdTCPAddress, config) + if err != nil { + return err + } + + responseChan := make(chan *nsq.ProducerTransaction, messageCount) + for i := 0; i < messageCount; i++ { + err := producer.PublishAsync(topic, []byte(fmt.Sprintf("%d", i)), responseChan) + if err != nil { + return err + } + } + + for i := 0; i < messageCount; i++ { + trans := <-responseChan + if trans.Error != nil { + return trans.Error + } + } + + producer.Stop() + + return nil +} + +func main() { + mode := flag.String("mode", "", "consumer or producer") + topic := flag.String("topic", "", "topic name") + channel := flag.String("channel", "", "channel name") + nsqlookupdHTTPAddress := flag.String("nsqlookupd-http-address", "", "nsqlookupd HTTP address") + messageCount := flag.Int("message-count", 1, "number of messages to send") + nsqdTCPAddress := flag.String("nsqd-tcp-address", "", "nsqd TCP address") + flag.Parse() + + config := nsq.NewConfig() + + switch *mode { + case "consumer": + log.Println("Consumer mode") + if *topic == "" || *channel == "" || *nsqlookupdHTTPAddress == "" { + log.Fatalf("topic, channel, and nsqlookupd-http-address are required\n") + } + if err := nsqConsumer(config, *nsqlookupdHTTPAddress, *topic, *channel); err != nil { + log.Fatalf("read from nsq failed: %w\n", err) + } + case "producer": + log.Println("Producer mode") + if *topic == "" || *nsqdTCPAddress == "" { + log.Fatalf("topic and nsqd-tcp-address are required\n") + } + if err := nsqProducer(config, *nsqdTCPAddress, *topic, *messageCount); err != nil { + log.Fatalf("write to nsq failed: %w\n", err) + } + default: + log.Fatalf("unknown mode: %s\n", *mode) + } +}