Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add nsq e2e image #175

Merged
merged 1 commit into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions e2e/images/nsq/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM golang:1.22
WORKDIR /cmd

COPY go.mod go.sum ./
RUN go mod download

COPY *.go ./

RUN CGO_ENABLED=0 GOOS=linux go build -o cmd .

ENTRYPOINT ["./cmd"]
8 changes: 8 additions & 0 deletions e2e/images/nsq/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module github.com/kedacore/test-tools/e2e/images/nsq

go 1.22.0

require (
github.com/golang/snappy v0.0.1 // indirect
github.com/nsqio/go-nsq v1.1.0 // indirect
)
4 changes: 4 additions & 0 deletions e2e/images/nsq/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
100 changes: 100 additions & 0 deletions e2e/images/nsq/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package main

import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"

"github.com/nsqio/go-nsq"
)

type Handler struct{}

func (h *Handler) HandleMessage(m *nsq.Message) error {
log.Printf("Received message: %s", m.Body)
return nil
}

func nsqConsumer(config *nsq.Config, nsqlookupdHTTPAddress, topic, channel string) error {
consumer, err := nsq.NewConsumer(topic, channel, config)
if err != nil {
return err
}

consumer.AddHandler(&Handler{})

err = consumer.ConnectToNSQLookupd(nsqlookupdHTTPAddress)
if err != nil {
return err
}

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan

consumer.Stop()

return nil
}

func nsqProducer(config *nsq.Config, nsqdTCPAddress string, topic string, messageCount int) error {
producer, err := nsq.NewProducer(nsqdTCPAddress, config)
if err != nil {
return err
}

responseChan := make(chan *nsq.ProducerTransaction, messageCount)
for i := 0; i < messageCount; i++ {
err := producer.PublishAsync(topic, []byte(fmt.Sprintf("%d", i)), responseChan)
if err != nil {
return err
}
}

for i := 0; i < messageCount; i++ {
trans := <-responseChan
if trans.Error != nil {
return trans.Error
}
}

producer.Stop()

return nil
}

func main() {
mode := flag.String("mode", "", "consumer or producer")
topic := flag.String("topic", "", "topic name")
channel := flag.String("channel", "", "channel name")
nsqlookupdHTTPAddress := flag.String("nsqlookupd-http-address", "", "nsqlookupd HTTP address")
messageCount := flag.Int("message-count", 1, "number of messages to send")
nsqdTCPAddress := flag.String("nsqd-tcp-address", "", "nsqd TCP address")
flag.Parse()

config := nsq.NewConfig()

switch *mode {
case "consumer":
log.Println("Consumer mode")
if *topic == "" || *channel == "" || *nsqlookupdHTTPAddress == "" {
log.Fatalf("topic, channel, and nsqlookupd-http-address are required\n")
}
if err := nsqConsumer(config, *nsqlookupdHTTPAddress, *topic, *channel); err != nil {
log.Fatalf("read from nsq failed: %w\n", err)
}
case "producer":
log.Println("Producer mode")
if *topic == "" || *nsqdTCPAddress == "" {
log.Fatalf("topic and nsqd-tcp-address are required\n")
}
if err := nsqProducer(config, *nsqdTCPAddress, *topic, *messageCount); err != nil {
log.Fatalf("write to nsq failed: %w\n", err)
}
default:
log.Fatalf("unknown mode: %s\n", *mode)
}
}
Loading