Skip to content

Commit

Permalink
Added Nats streaming http connector (#37)
Browse files Browse the repository at this point in the history
Co-authored-by: GT <gtalekar@infracloud.io>
  • Loading branch information
girishg4t and gtalekar committed Oct 27, 2020
1 parent 684eadf commit 431fcbd
Show file tree
Hide file tree
Showing 10 changed files with 528 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The deployment that Keda scales can be a custom program you write to do somethin
|[Kafka HTTP Connector](./kafka-http-connector/README.md)| Consumes messages from Kafka topics and posts the message to an HTTP endpoint.|
|[RabbitMQ HTTP Connector](./rabbitmq-http-connector/README.md)|Reads message from RabbitMQ and posts to a HTTP endpoint. Currently only the AMQP protocol is supported for consuming RabbitMQ messages.|
|[AWS Kinesis Stream HTTP Connector](./aws-kinesis-http-connector/README.md)|Reads message from Amazon Kinesis Data Streams and posts to a HTTP endpoint.|
|[Nats Streaming HTTP Connector](./nats-streaming-http-connector/README.md)|Subscribes to a Nats streaming queue with subject and queue group to read the messages and posts to a HTTP endpoint.|

# Contributing

Expand Down
21 changes: 21 additions & 0 deletions nats-streaming-http-connector/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
FROM golang:1.13-alpine as builder

RUN apk add bash ca-certificates git gcc g++ libc-dev


RUN mkdir /app
WORKDIR /app
COPY go.mod .
COPY go.sum .

RUN go mod download

# Copy source code to image
COPY . .

RUN go build -a -o /go/bin/main
FROM alpine:3.12 as base
RUN apk add --update ca-certificates
COPY --from=builder /go/bin/main /

ENTRYPOINT ["/main"]
24 changes: 24 additions & 0 deletions nats-streaming-http-connector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# NATS Streaming KEDA Connector

NATs streaming KEDA connector image can be used in the Kubernetes deployment as scaleTargetRef in scaledObject of [NATs scaler](https://keda.sh/docs/1.5/scalers/nats-streaming/).

The job of the connector is to read messages from the subject, call an HTTP endpoint with the body of the message, and write response or error in the respective subject. Following enviornment variables are used by connector image as configuration to connect and authenticate with NATs server which should be defined in the Kubernetes deployment manifest.

- `TOPIC`: Subject from which messages are read
- `HTTP_ENDPOINT`: http endpoint to post request
- `ERROR_TOPIC`: Subject to write errors on failure.
- `RESPONSE_TOPIC`: Subject to write responses on success response.
- `SOURCE_NAME`: Optional. Name of the Source. Default is "KEDAConnector"
- `MAX_RETRIES`: Maximum number of times an http endpoint will be retried upon failure.
- `CONTENT_TYPE`: Content type used while creating post request
- `NATS_SERVER`: NATs connection string, like `nats://127.0.0.1:4222`
- `CLUSTER_ID`: StanClusterID to form a connection to the NATS Streaming subsystem
- `CLIENT_ID`: Which is used by the server to uniquely identify, and restrict, a given client.
- `DURABLE_NAME`: Durable name to resume message consumption from where it previously stopped



## Resources
* To setup and run nats streaming server, reference https://github.com/nats-io/nats-streaming-server
* For running the connecter with fission e.g.
```fission mqt create --name natstest --function helloworld --mqtype stan --topic hello --resptopic response --mqtkind keda --errortopic error --maxretries 3 --metadata subject=hello --metadata queueGroup=grp1 --metadata durableName=due --metadata natsServerMonitoringEndpoint=nats-monitor.default.svc.cluster.local:8222 --metadata clusterId=test-cluster --metadata clientId=stan-sub --metadata natsServer=nats://nats-monitor:4222```
14 changes: 14 additions & 0 deletions nats-streaming-http-connector/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module github.com/keda-connectors/nats-streaming-http-connector

go 1.13

require (
github.com/fission/keda-connectors/common v0.0.0-20201006080550-8116bda4db7a
github.com/golang/protobuf v1.4.2 // indirect
github.com/nats-io/nats-server/v2 v2.1.8 // indirect
github.com/nats-io/nats-streaming-server v0.18.0 // indirect
github.com/nats-io/nats.go v1.10.0
github.com/nats-io/stan.go v0.7.0
go.uber.org/zap v1.16.0
google.golang.org/protobuf v1.25.0 // indirect
)
188 changes: 188 additions & 0 deletions nats-streaming-http-connector/go.sum

Large diffs are not rendered by default.

127 changes: 127 additions & 0 deletions nats-streaming-http-connector/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package main

import (
"io/ioutil"
"log"
"net/http"
"os"

"github.com/fission/keda-connectors/common"
"github.com/nats-io/nats.go"
"github.com/nats-io/stan.go"
"go.uber.org/zap"
)

type natsConnector struct {
host string
connectordata common.ConnectorMetadata
stanConnection stan.Conn
logger *zap.Logger
}

func (conn natsConnector) consumeMessage() {

headers := http.Header{
"Topic": {conn.connectordata.Topic},
"RespTopic": {conn.connectordata.ResponseTopic},
"ErrorTopic": {conn.connectordata.ErrorTopic},
"Content-Type": {conn.connectordata.ContentType},
"Source-Name": {conn.connectordata.SourceName},
}
forever := make(chan bool)
_, err := conn.stanConnection.QueueSubscribe(os.Getenv("TOPIC"), os.Getenv("QUEUE_GROUP"), func(m *stan.Msg) {
msg := string(m.Data)
conn.logger.Info(msg)
resp, err := common.HandleHTTPRequest(msg, headers, conn.connectordata, conn.logger)
if err != nil {
conn.logger.Info(err.Error())
conn.errorHandler(err)
} else {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
conn.logger.Info(err.Error())
conn.errorHandler(err)
} else {
if success := conn.responseHandler(body); success {
conn.logger.Info("Done processing message",
zap.String("messsage", string(body)))
}
}
}
}, stan.DurableName(os.Getenv("DURABLE_NAME")), stan.DeliverAllAvailable())

if err != nil {
conn.logger.Fatal("error occurred while consuming message", zap.Error(err))
}

conn.logger.Info("NATs consumer up and running!...")
<-forever
}

func (conn natsConnector) errorHandler(err error) {
publishErr := conn.stanConnection.Publish(conn.connectordata.ErrorTopic, []byte(err.Error()))

if publishErr != nil {
conn.logger.Error("failed to publish message to error topic",
zap.Error(publishErr),
zap.String("source", conn.connectordata.SourceName),
zap.String("message", publishErr.Error()),
zap.String("topic", conn.connectordata.ErrorTopic))
}
}

func (conn natsConnector) responseHandler(response []byte) bool {

if len(conn.connectordata.ResponseTopic) > 0 {

publishErr := conn.stanConnection.Publish(conn.connectordata.ResponseTopic, response)

if publishErr != nil {
conn.logger.Error("failed to publish response body from http request to topic",
zap.Error(publishErr),
zap.String("topic", conn.connectordata.ResponseTopic),
zap.String("source", conn.connectordata.SourceName),
zap.String("http endpoint", conn.connectordata.HTTPEndpoint),
)
return false
}
}
return true
}

func main() {
logger, err := zap.NewProduction()
if err != nil {
log.Fatalf("can't initialize zap logger: %v", err)
}
defer logger.Sync()

connectordata, err := common.ParseConnectorMetadata()

host := os.Getenv("NATS_SERVER")

if host == "" {
logger.Fatal("received empty host field")
}

nc, err := nats.Connect(host)

if err != nil {
logger.Fatal("failed to establish connection with NATS", zap.Error(err))
}

sc, err := stan.Connect(os.Getenv("CLUSTER_ID"), os.Getenv("CLIENT_ID"), stan.NatsConn(nc))
if err != nil {
log.Fatal(err)
}
defer nc.Close()

conn := natsConnector{
host: host,
stanConnection: sc,
connectordata: connectordata,
logger: logger,
}
conn.consumeMessage()
}
21 changes: 21 additions & 0 deletions nats-streaming-http-connector/test/producer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
FROM golang:1.12-alpine as builder

RUN apk add bash ca-certificates git gcc g++ libc-dev


RUN mkdir /app
WORKDIR /app
COPY go.mod .
COPY go.sum .

RUN go mod download

# Copy source code to image
COPY . .

RUN go build -a -o /go/bin/main
FROM alpine:3.12 as base
RUN apk add --update ca-certificates
COPY --from=builder /go/bin/main /

ENTRYPOINT ["/main"]
11 changes: 11 additions & 0 deletions nats-streaming-http-connector/test/producer/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/keda-connectors/nats-streaming-http-connector/test/producer

go 1.13

require (
github.com/golang/protobuf v1.4.2 // indirect
github.com/nats-io/nats-server/v2 v2.1.8 // indirect
github.com/nats-io/nats.go v1.10.0
github.com/nats-io/stan.go v0.7.0
google.golang.org/protobuf v1.25.0 // indirect
)
92 changes: 92 additions & 0 deletions nats-streaming-http-connector/test/producer/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/nats-server/v2 v2.1.8 h1:d5GoJA6W7vQkmt99Nfdeie3pEFFUEjIwt1YZp50DkIQ=
github.com/nats-io/nats-server/v2 v2.1.8/go.mod h1:rbRrRE/Iv93O/rUvZ9dh4NfT0Cm9HWjW/BqOWLGgYiE=
github.com/nats-io/nats.go v1.10.0 h1:L8qnKaofSfNFbXg0C5F71LdjPRnmQwSsA4ukmkt1TvY=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nats-io/stan.go v0.7.0 h1:sMVHD9RkxPOl6PJfDVBQd+gbxWkApeYl6GrH+10msO4=
github.com/nats-io/stan.go v0.7.0/go.mod h1:Ci6mUIpGQTjl++MqK2XzkWI/0vF+Bl72uScx7ejSYmU=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e h1:D5TXcfTk7xF7hvieo4QErS3qqCB4teTffacDWr7CI+0=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
29 changes: 29 additions & 0 deletions nats-streaming-http-connector/test/producer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package main

import (
"log"
"strconv"

"github.com/nats-io/nats.go"
"github.com/nats-io/stan.go"
)

func main() {
nc, err := nats.Connect("nats://localhost:4222")
if err != nil {
log.Fatal(err)
}
sc, err := stan.Connect("test-cluster", "stan-sub", stan.NatsConn(nc))
if err != nil {
log.Fatal(err)
}
for i := 100; i < 102; i++ {
sc.Publish("hello", []byte("Test"+strconv.Itoa(i)))
}

// sc.QueueSubscribe("response", "grp1", func(m *stan.Msg) {
// log.Printf("[Received] %+v", m)
// }, stan.DurableName("due"), stan.DeliverAllAvailable())

select {}
}

0 comments on commit 431fcbd

Please sign in to comment.