Skip to content

itcomusic/amqpx

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

65 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

RabbitMQ Go Client

build-img pkg-img coverage-img

This is a Go AMQP 0.9.1 client wraps amqp091-go with support generics

  • Support of the encoding messages
    • defaults encoding (json, protobuf, protojson)
    • support of custom marshal/unmarshal functions
  • Middleware for easy integration

Installation

Go version 1.20+

go get github.com/itcomusic/amqpx

Usage

package main

import (
    "context" 
	"fmt"
	
	"github.com/itcomusic/amqpx"
)

func main() {
    conn, _ := amqpx.Connect()
    defer conn.Close()

    // simple publisher
    pub := amqpx.NewPublisher[[]byte](conn, amqpx.Direct, amqpx.UseRoutingKey("routing_key"))
    _ = pub.Publish(amqpx.NewPublishing([]byte("hello")).PersistentMode(), amqpx.SetRoutingKey("override_routing_key"))
	
    // simple consumer 
    _ = conn.NewConsumer("foo", amqpx.D(func(ctx context.Context, req *amqpx.Delivery[[]byte]) amqpx.Action {
        fmt.Printf("received message: %s\n", string(*req.Msg))
        return amqpx.Ack
    }))
}

Publisher & consumer struct

Pretty using struct and avoiding boilerplate marhsal/unmarshal. It is strict compared content-type of the message and invalid body is rejected.

    conn, _ := amqpx.Connect(
        amqpx.UseUnmarshaler(amqpxproto.NewUnmarshaler()), // global unmarshalers
        amqpx.UseMarshaler(amqpxproto.NewMarshaler())), // global marshaler
    defer conn.Close()

    type Gopher struct {
        Name string
    }
	
    // override default marshaler
    pub := amqpx.NewPublisher[Gopher](conn, amqpx.Direct, amqpx.SetMarshaler(amqpxjson.Marshaler)) 
    _ = pub.Publish(amqpx.NewPublishing(Gopher{Name: "Rob"}), amqpx.SetRoutingKey("routing_key"))
	
    // override default unmarshaler
    _ = conn.NewConsumer("bar", amqpx.D(func(ctx context.Context, req *amqpx.Delivery[Gopher]) amqpx.Action {
        fmt.Printf("user-id: %s, received message: %s\n", req.Req.UserID, req.Msg.Name)
        return amqpx.Ack
    }), amqpx.SetUnmarshaler(amqpxjson.Unmarshaler), amqpx.SetAutoAckMode())

Consumer rate limiting

The Prefetch count informs the server will deliver that many messages to consumers before acknowledgments are received. The Concurrency option limits numbers of goroutines of consumer, depends on prefetch count and auto-ack mode.

    // prefetch count
    _ = conn.NewConsumer("foo", amqpx.D(func(ctx context.Context, req *amqpx.Delivery[[]byte]) amqpx.Action {
        fmt.Printf("received message: %s\n", string(*req.Msg))
        return amqpx.Ack
    }), amqpx.SetPrefetchCount(8))

    // limit goroutines
	_ = conn.NewConsumer("foo", amqpx.D(func(ctx context.Context, req *amqpx.Delivery[[]byte]) amqpx.Action {
        fmt.Printf("received message: %s\n", string(*req.Body))
        return amqpx.Ack
    }), amqpx.SetAutoAckMode(), amqpx.SetConcurrency(32))

Declare queue

The declare queue, exchange and binding queue.

    _ = conn.NewConsumer("foo", amqpx.D(func(ctx context.Context, req *amqpx.Delivery[[]byte]) amqpx.Action {
        fmt.Printf("received message: %s\n", string(*req.Msg))
        return amqpx.Ack
    }), amqpx.DeclareQueue(amqpx.QueueDeclare{AutoDelete: true}),
        amqpx.DeclareExchange(amqpx.ExchangeDeclare{Name: "exchange_name", Type: amqpx.Direct}),
        amqpx.BindQueue(amqpx.QueueBind{Exchange: "exchange_name", RoutingKey: []string{"routing_key"}}))

Middleware

Predefined support opentelemetry using interceptor.

    import (
        "github.com/itcomusic/amqpx"
        "github.com/itcomusic/amqpx/amqpxotel"
    )

    // global
    conn, _ := amqpx.Connect(amqpx.UserInterceptor(amqpxotel.NewInterceptor())
    defer conn.Close()

    // can use special interceptor for publisher
    _ = amqpx.NewPublisher[[]byte](conn, amqpx.Direct, amqpx.SetPublishInterceptor(func(next amqpx.PublisherFunc) amqpx.PublisherFunc {
        return func(ctx context.Context, m *amqpx.PublishingRequest) error {
            fmt.Printf("message: %s\n", m.Body)
            return next(m)
        }
    }))

License

MIT License