Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io.Writer and io.Reader pubsub. Also, CLI tool for watermill #53

Merged
merged 41 commits into from
Feb 27, 2019
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
744a71f
A simple publisher from io.Writer
maclav3 Feb 14, 2019
0cf9bde
A very sketchy implementation of sub/pub in io.Writer/Reader
maclav3 Feb 14, 2019
72331d7
Fix infinite resend after nack
maclav3 Feb 14, 2019
aadc13f
Remove the io tests
maclav3 Feb 18, 2019
0c2fff2
Fix message metadata keeping the old reference on copy
maclav3 Feb 18, 2019
b879b42
Fix some stuff with io pub/sub and add lossless (un)marshaler
maclav3 Feb 18, 2019
6c4c7e5
Remove file that was unnecessarily commited
maclav3 Feb 18, 2019
6e94567
Basic cobra config for watermill cli app
maclav3 Feb 18, 2019
e28f60f
Add go mod for the cobra conf
maclav3 Feb 18, 2019
bb3d6c8
Parse the variables for kafka/produce/...
maclav3 Feb 18, 2019
b4892c8
First working iteration of producer
maclav3 Feb 19, 2019
151eb3d
More reliable closing of consumer
maclav3 Feb 19, 2019
56b5ec4
Add googlecloud provider; collides with kafka for now
maclav3 Feb 19, 2019
a34bfc5
Fix accidental commit
maclav3 Feb 19, 2019
d212a01
Fix googlecloud, cleanup
maclav3 Feb 19, 2019
2eb49c4
Create temporary subscription for gcloud and remove it on consumer stop
maclav3 Feb 19, 2019
ef4cd4f
Add AMQP
maclav3 Feb 20, 2019
8f54a04
Fix complicated flags for gc and kafka
maclav3 Feb 20, 2019
6c75130
Merge branch 'master' into io-writer-reader
maclav3 Feb 21, 2019
d5a89d0
Add readme and makefile for the cli tool
maclav3 Feb 21, 2019
793200f
Add google cloud pubsub rm message
maclav3 Feb 21, 2019
99cfb6d
Subscription add command
maclav3 Feb 21, 2019
a683a56
Fix the labels, viper is bugged
maclav3 Feb 21, 2019
281e931
Add better config file support; fixes
maclav3 Feb 21, 2019
fe66eec
Moar fixes
maclav3 Feb 21, 2019
0ec53e9
Fix unit test
maclav3 Feb 21, 2019
14714b5
Fix passing lock by value
maclav3 Feb 21, 2019
03167dc
Move dev/cli to tools/watermill-cli
maclav3 Feb 21, 2019
8be8363
Fix unreliable io sub closing
maclav3 Feb 21, 2019
c897ddd
Merge branch 'master' into io-writer-reader
maclav3 Feb 22, 2019
acf7a1b
Enable the logger cli whenever debug or trace is enabled
maclav3 Feb 22, 2019
6ed2808
Fix subscriber based on incomplete run of pubsub tests
maclav3 Feb 22, 2019
b3ae9f2
Implement suggestions from CR
maclav3 Feb 22, 2019
cef2cc9
Add link to docs in doc.go
maclav3 Feb 22, 2019
8c42b70
Remove the validate example config, it is not needed
maclav3 Feb 22, 2019
ee84c1a
Merge branch 'master' into io-writer-reader
maclav3 Feb 27, 2019
e7f4aaa
Go mod tidy
maclav3 Feb 27, 2019
b137949
Rename `watermill-cli` to `mill`
maclav3 Feb 27, 2019
3043c20
Remove duplicated
maclav3 Feb 27, 2019
e211e08
Replace `go install` with `go get -u`
maclav3 Feb 27, 2019
a566ae0
Rephrase add/rm sub
maclav3 Feb 27, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dev/cli/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
watermill-cli:
go build -o watermill-cli main.go
62 changes: 62 additions & 0 deletions dev/cli/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# CLI Wrapper for Watermill

This is a CLI wrapper for Watermill. It has two basic functionalities: producing and consuming messages on the following
Pub/Subs:

1. Kafka
2. Google Cloud Pub/Sub
3. RabbitMQ

## Consume mode

In consume mode, the tool subscribes to a topic/queue/subscription (delete as appropriate) and prints the messages
in a simplified format to the standard output:

```
[yyyy-mm-dd hh:mm:ss.ssssssss] topic: message payload
```

Other outputs, for example ones that preserve UUIDs or metadata, are easily attainable by modification
of the marshaling function of the `io.Publisher` of the `consumeCmd`.

## Produce mode

In produce mode, subsequent lines of data from the standard input are transformed into messages outgoing to the requested
provider's topic/exchange.

The message's payload is set to the line from stdin, the UUID is auto-generated and the metadata is empty.

Similarly, the contents of the message could be parsed differently from stdin, by modification
of the unmarshaling function of the `io.Subscriber` of the `produceCmd`.

## Usage

The basic syntax of the tool is:

```bash
watermill-cli <provider> <command>
```

with the appropriate flags regulating the specific behaviour of each command.

`command` is usually one of `produce` or `consume`, but some providers may handle additional commands
that are specific for them.

The flags are context-specific, so the best way to find out about them is to use the `-h` flag and study
which flags are available/required for the specific context and act accordingly.

## Additional functionalities

### Google Cloud Pub/Sub

#### Adding/Removing subscriptions

The CLI tools allows for creating/removing subscriptions for Google Cloud Pub/Sub.

```bash
watermill-cli googlecloud subscription add -t <topic> <subscription_id>

watermill-cli googlecloud subscription rm <subscription_id>
```

Additional flags are available for addSubscription to regulate the subscription's settings.
218 changes: 218 additions & 0 deletions dev/cli/cmd/amqp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package cmd

import (
"github.com/ThreeDotsLabs/watermill/message/infrastructure/amqp"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

// amqpCmd is a mid-level command for working with the AMQP Pub/Sub provider.
var amqpCmd = &cobra.Command{
Use: "amqp",
Short: "Commands for the AMQP Pub/Sub provider",
Long: `Consume or produce messages from the AMQP Pub/Sub provider.

For the configuration of consuming/producing of the messages, check the help of the relevant command.`,
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
err := rootCmd.PersistentPreRunE(cmd, args)
if err != nil {
return err
}

logger.Debug("Using AMQP Pub/Sub", nil)

if cmd.Use == "consume" {
// amqp is special
topic = viper.GetString("amqp.consume.queue")
consumer, err = amqp.NewSubscriber(amqpConsumerConfig(), logger)
if err != nil {
return err
}
}

if cmd.Use == "produce" {
// amqp is special
topic = viper.GetString("amqp.produce.exchange")
producer, err = amqp.NewPublisher(amqpProducerConfig(), logger)
if err != nil {
return err
}
}

return nil
},
}

func amqpConsumerConfig() amqp.Config {
uri := viper.GetString("amqp.uri")
queue := viper.GetString("amqp.consume.queue")
exchangeName := viper.GetString("amqp.consume.exchange")
exchangeType := viper.GetString("amqp.produce.exchangeType")
durable := viper.GetBool("amqp.durable")

return amqp.Config{
Connection: amqp.ConnectionConfig{
AmqpURI: uri,
},
Marshaler: amqp.DefaultMarshaler{},
Queue: amqp.QueueConfig{
GenerateName: func(topic string) string {
return queue
},
Durable: durable,
},
Consume: amqp.ConsumeConfig{
Qos: amqp.QosConfig{
PrefetchCount: 1,
},
},

Exchange: amqp.ExchangeConfig{
GenerateName: func(topic string) string {
return exchangeName
},
Type: exchangeType,
Durable: durable,
},
}
}

func amqpProducerConfig() amqp.Config {
uri := viper.GetString("amqp.uri")
exchangeName := viper.GetString("amqp.produce.exchange")
exchangeType := viper.GetString("amqp.produce.exchangeType")
routingKey := viper.GetString("amqp.produce.routingKey")
durable := viper.GetBool("amqp.durable")

return amqp.Config{
Connection: amqp.ConnectionConfig{
AmqpURI: uri,
},
Marshaler: amqp.DefaultMarshaler{},
Exchange: amqp.ExchangeConfig{
GenerateName: func(topic string) string {
return exchangeName
},
Type: exchangeType,
Durable: durable,
},
Publish: amqp.PublishConfig{
GenerateRoutingKey: func(topic string) string {
return routingKey
},
},
}
}

func init() {
// Here you will define your flags and configuration settings.
rootCmd.AddCommand(amqpCmd)
configureAmqpCmd()
consumeCmd := addConsumeCmd(amqpCmd, false)
configureConsumeCmd(consumeCmd)
produceCmd := addProduceCmd(amqpCmd, false)
configureProduceCmd(produceCmd)
}

func configureAmqpCmd() {
amqpCmd.PersistentFlags().StringP(
"uri",
"u",
"",
"The URI to the AMQP instance (required)",
)
if err := amqpCmd.MarkPersistentFlagRequired("uri"); err != nil {
panic(err)
}
if err := viper.BindPFlag("amqp.uri", amqpCmd.PersistentFlags().Lookup("uri")); err != nil {
panic(err)
}

amqpCmd.PersistentFlags().Bool(
"durable",
true,
"If true, the queues and exchanges created automatically (if any) will be durable",
)
if err := viper.BindPFlag("amqp.durable", amqpCmd.PersistentFlags().Lookup("durable")); err != nil {
panic(err)
}
}

func configureConsumeCmd(consumeCmd *cobra.Command) {
consumeCmd.PersistentFlags().StringP(
"queue",
"q",
"",
"The name of the AMQP queue to consume messages from (required)",
)
if err := consumeCmd.MarkPersistentFlagRequired("queue"); err != nil {
panic(err)
}
if err := viper.BindPFlag("amqp.consume.queue", consumeCmd.PersistentFlags().Lookup("queue")); err != nil {
panic(err)
}

consumeCmd.PersistentFlags().StringP(
"exchange",
"x",
"",
"If non-empty, an exchange with this name is created if it didn't exist. Then, the queue is bound to this exchange.",
)
if err := viper.BindPFlag("amqp.consume.exchange", consumeCmd.PersistentFlags().Lookup("exchange")); err != nil {
panic(err)
}

consumeCmd.PersistentFlags().String(
"exchangeType",
"fanout",
"If exchange needs to be created, it will be created with this type. The common types are 'direct', 'fanout', 'topic' and 'headers'.",
)
if err := consumeCmd.MarkPersistentFlagRequired("exchange"); err != nil {
panic(err)
}

if err := viper.BindPFlag("amqp.produce.exchangeType", consumeCmd.PersistentFlags().Lookup("exchangeType")); err != nil {
panic(err)
}
}

func configureProduceCmd(produceCmd *cobra.Command) {
produceCmd.PersistentFlags().StringP(
"exchange",
"x",
"",
"The name of the AMQP exchange to produce messages to (required)",
)
if err := produceCmd.MarkPersistentFlagRequired("exchange"); err != nil {
panic(err)
}
if err := viper.BindPFlag("amqp.produce.exchange", produceCmd.PersistentFlags().Lookup("exchange")); err != nil {
panic(err)
}

produceCmd.PersistentFlags().String(
"exchangeType",
"fanout",
"If the exchange did not exist, it will be created with this type. The common types are 'direct', 'fanout', 'topic' and 'headers'.",
)
if err := produceCmd.MarkPersistentFlagRequired("exchange"); err != nil {
panic(err)
}

if err := viper.BindPFlag("amqp.produce.exchangeType", produceCmd.PersistentFlags().Lookup("exchangeType")); err != nil {
panic(err)
}

produceCmd.PersistentFlags().StringP(
"routingKey",
"r",
"",
"The routing key to use when publishing the message.",
)
if err := produceCmd.MarkPersistentFlagRequired("routingKey"); err != nil {
panic(err)
}
if err := viper.BindPFlag("amqp.produce.routingKey", produceCmd.PersistentFlags().Lookup("routingKey")); err != nil {
panic(err)
}
}
82 changes: 82 additions & 0 deletions dev/cli/cmd/consume.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package cmd

import (
"os"
"time"

"github.com/ThreeDotsLabs/watermill/message/infrastructure/io"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/pkg/errors"
"github.com/spf13/viper"

"github.com/ThreeDotsLabs/watermill/message"

"github.com/spf13/cobra"
)

// consumer is initialized by parent command to the pub/sub provider of choice.
var consumer message.Subscriber

func addConsumeCmd(parent *cobra.Command, addTopicFlag bool) *cobra.Command {
parentName := parent.Use
cmd := &cobra.Command{
Use: "consume",
Short: "Consume messages from a pub/sub and print them to stdout",
Long: `Consume messages from the pub/sub of your choice and print them on the standard output.

For the configuration of particular pub/sub providers, see the help for the provider commands.`,
RunE: func(cmd *cobra.Command, args []string) error {
if topic == "" {
topic = viper.GetString(parentName + ".consume.topic")
}
router, err := message.NewRouter(
message.RouterConfig{
CloseTimeout: 5 * time.Second,
},
logger,
)
if err != nil {
return errors.Wrap(err, "could not create router")
}

router.AddMiddleware(middleware.InstantAck)
router.AddPlugin(plugin.SignalsHandler)

out, err := io.NewPublisher(os.Stdout, io.PublisherConfig{
MarshalFunc: io.PrettyPayloadMarshalFunc,
})
if err != nil {
return errors.Wrap(err, "could not create console producer")
}

router.AddHandler(
"dump_to_stdout",
topic,
consumer,
"",
out,
func(msg *message.Message) ([]*message.Message, error) {
// just forward the message to stdout
return message.Messages{msg}, nil
},
)

return router.Run()
},
}

if addTopicFlag {
cmd.Flags().StringP("topic", "t", "", "The topic to consume messages from (required)")
err := cmd.MarkFlagRequired("topic")
if err != nil {
panic(err)
}
if err = viper.BindPFlag(parentName+".consume.topic", cmd.Flags().Lookup("topic")); err != nil {
panic(err)
}
}

parent.AddCommand(cmd)
return cmd
}
Loading