Skip to content

Commit

Permalink
feat(examples): update examples
Browse files Browse the repository at this point in the history
  • Loading branch information
dmksnnk committed Oct 26, 2023
1 parent 05acba2 commit aacf92d
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 97 deletions.
24 changes: 8 additions & 16 deletions examples/batch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,32 @@ package main
import (
"context"
"log"
"os"
"time"

"github.com/heureka/gorabbit"
"github.com/heureka/gorabbit/channel"
"github.com/heureka/gorabbit/connection"
"github.com/heureka/gorabbit/process"
"github.com/heureka/gorabbit/consume"
"github.com/heureka/gorabbit/consume/batch"
"github.com/rs/zerolog"
)

func main() {
// connection with re-dialing capabilities.
conn, err := connection.Dial("amqp://localhost:5672")
consumer, err := gorabbit.NewConsumer("amqp://localhost:5672", "my-queue")
if err != nil {
log.Panic(err)
}

// channel with re-connection capabilities.
ch, err := channel.New(conn)
if err != nil {
log.Panic(err)
}

consumer := gorabbit.NewConsumer(ch, "my-queue")

// transaction function for processing batch of messages
tx := func(ctx context.Context, msgs [][]byte) []error {
for _, msg := range msgs {
log.Println(string(msg))
}

return nil
// must return errors one-to-one for each processed message, no errors in this case
return make([]error, len(msgs))
}

// process 100 messages or 1 second of messages at once, whichever comes first
err = consumer.Start(context.Background(), process.InBatches(100, time.Second, tx, false))
err = consumer.Start(context.Background(), consume.InBatches(100, time.Second, tx, false, batch.NewDeliveryLogging(zerolog.New(os.Stdout))))
if err != nil {
log.Panic(err)
}
Expand Down
8 changes: 4 additions & 4 deletions examples/channel/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ func main() {
bo.MaxElapsedTime = 10 * time.Second

// create new channel with re-creation capabilities.
ch, err := channel.New(
conn,
ch, err := conn.Channel(
// set up different backoff strategy.
channel.WithBackoff(bo),
// set QOS to pre-fetch 100 messages, it will be applied every time channel is created.
Expand All @@ -41,7 +40,7 @@ func main() {
if err != nil {
log.Panic(err)
}

// get notification when channel will be re-opened.
reopenNotif := ch.NotifyReopen(make(chan error)) // reopenNotif will be closed on graceful shutdown
go func() {
for err := range reopenNotif {
Expand All @@ -53,7 +52,8 @@ func main() {
log.Println("successfully re-opened channel", err)
}
}()

// get notification when consuming has started,
// notification will be sent on every channel re-opening and resuming of consuming.
consumeNotif := ch.NotifyConsume(make(chan error)) // consumeNotif will be closed on graceful shutdown
go func() {
for err := range consumeNotif {
Expand Down
4 changes: 2 additions & 2 deletions examples/connection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func main() {
}),
// configure connection as you wish on each successful dial.
connection.WithDialledCallback(func(conn *amqp.Connection) {
// add listener for clos notification
// for example, add listener for close notification
closeNotif := conn.NotifyClose(make(chan *amqp.Error))
go func() {
for err := range closeNotif {
Expand All @@ -45,5 +45,5 @@ func main() {
log.Panic(err)
}

conn.Channel() // crate new channel, do whatever you do with connection.
conn.Channel() // crate new channel or do whatever you do with connection.
}
34 changes: 11 additions & 23 deletions examples/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,22 @@ import (
"time"

"github.com/heureka/gorabbit"
"github.com/heureka/gorabbit/channel"
"github.com/heureka/gorabbit/connection"
"github.com/heureka/gorabbit/consumer"
amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
// connection with re-dialing capabilities.
conn, err := connection.Dial("amqp://localhost:5672")
if err != nil {
log.Panic(err)
}

// channel with re-connection capabilities.
ch, err := channel.New(conn)
if err != nil {
log.Panic(err)
}

consumer := gorabbit.NewConsumer(
ch,
cons, err := gorabbit.NewConsumer(
"amqp://localhost:5672",
"example-queue",
gorabbit.WithConsumerTag("example-consumer"), // set up custom consumer tag
gorabbit.WithConsumeArgs(amqp.Table{"example": "tag"}), // add additional consumer tags
gorabbit.WithConsumeAutoAck(), // automatically ACK all messages
gorabbit.WithConsumeExclusive(), // tell the server to ensure that this is the sole consumer from this queue
gorabbit.WithConsumeNoWait(), // tell the server to immediately begin deliveries
consumer.WithConsumerTag("example-consumer"), // set up custom consumer tag
consumer.WithArgs(amqp.Table{"example": "tag"}), // add additional consumer tags
consumer.WithAutoAck(), // automatically ACK all messages
consumer.WithExclusive(), // tell the server to ensure that this is the sole consumer from this queue
consumer.WithNoWait(), // tell the server to immediately begin deliveries
)
err = consumer.Start(context.Background(), gorabbit.ProcessFunc(func(ctx context.Context, deliveries <-chan amqp.Delivery) error {
// see also examples/one and examples/batch for examples of prepared ProcessFunc.
err = cons.Start(context.Background(), consumer.ProcessFunc(func(ctx context.Context, deliveries <-chan amqp.Delivery) error {
for d := range deliveries {
log.Println(d.Body)
}
Expand All @@ -44,7 +32,7 @@ func main() {
}

time.Sleep(10 * time.Second) // consumer for 10 seconds
if err := consumer.Stop(); err != nil {
if err := cons.Stop(); err != nil {
log.Panic(err)
}
}
20 changes: 6 additions & 14 deletions examples/one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,27 @@ package one
import (
"context"
"log"
"os"
"time"

"github.com/heureka/gorabbit"
"github.com/heureka/gorabbit/channel"
"github.com/heureka/gorabbit/connection"
"github.com/heureka/gorabbit/process"
"github.com/heureka/gorabbit/consume"
"github.com/heureka/gorabbit/consume/one"
"github.com/rs/zerolog"
)

func main() {
// connection with re-dialing capabilities.
conn, err := connection.Dial("amqp://localhost:5672")
consumer, err := gorabbit.NewConsumer("amqp://localhost:5672", "my-queue")
if err != nil {
log.Panic(err)
}

// channel with re-connection capabilities.
ch, err := channel.New(conn)
if err != nil {
log.Panic(err)
}

consumer := gorabbit.NewConsumer(ch, "my-queue")
// transaction function for processing single message
tx := func(ctx context.Context, msg []byte) error {
log.Println(string(msg))
return nil
}

err = consumer.Start(context.Background(), process.ByOne(tx, false))
err = consumer.Start(context.Background(), consume.ByOne(tx, false, one.NewDeliveryLogging(zerolog.New(os.Stdout))))
if err != nil {
log.Panic(err)
}
Expand Down
38 changes: 0 additions & 38 deletions examples/publish/main.go

This file was deleted.

53 changes: 53 additions & 0 deletions examples/publisher/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package main

import (
"context"
"log"
"time"

"github.com/heureka/gorabbit"
"github.com/heureka/gorabbit/publish"
"github.com/heureka/gorabbit/publisher"
amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
pub, err := gorabbit.NewPublisher(
"amqp://localhost:5672",
"example-exchange",
publisher.WithConstHeaders(amqp.Table{"x-example-header": "example-value"}),
publisher.WithTransientDeliveryMode(),
publisher.WithImmediate(),
publisher.WithMandatory(),
publisher.WithExpiration(time.Minute),
)
if err != nil {
log.Panic(err)
}
// publish
err = pub.Publish(context.Background(), "example-key", []byte("hello world!"))
if err != nil {
log.Panic(err)
}

// override config only for this publishing
err = pub.Publish(
context.Background(),
"example-key",
[]byte("hello again!"),
// add headers
publish.WithHeaders(amqp.Table{"x-other-header": "only for thing publishing"}),
// set another expiration
publish.WithExpiration(time.Hour),
// custom overriding
func(channel publish.Channel) publish.Channel {
return publish.ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
// set another exchange only for this publishing
return channel.PublishWithContext(ctx, "other-exchange", key, mandatory, immediate, msg)
})
},
)
if err != nil {
log.Panic(err)
}
}

0 comments on commit aacf92d

Please sign in to comment.