Skip to content

Commit

Permalink
feat(README): update readme
Browse files Browse the repository at this point in the history
  • Loading branch information
dmksnnk committed Oct 26, 2023
1 parent aacf92d commit 36ce635
Showing 1 changed file with 62 additions and 53 deletions.
115 changes: 62 additions & 53 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,48 +7,34 @@

GoRabbit provides additional capabilities for official RabbitMQ Go client [amqp091-go](https://github.com/rabbitmq/amqp091-go).

## Connection re-dialing

Add re-dialing capabilities when connection got closed,
you can reliably open new channel:
Quick start example:

```go
conn, _ := connection.Dial("amqp://localhost:5672")
ch, _ := conn.Channel() // use as regular connection, but it will re-dial if connection is closed
ch.Publish(...)
consumer, _ := gorabbit.NewConsumer("amqp://localhost:5672", "my-queue")
handleMessage := func(ctx context.Context, message []byte) error {
log.Println(message)
return nil
}
consumer.Start(context.Background(), consume.ByOne(handleMessage, false))
```

See full example at [examples/connection](./examples/connection/main.go).
This will create consumer which will connect to RabbitMQ on localhost,
start consuming messages one-by-one from "my-queue" and pass them to `handleMessage` function.

## Channel re-opening
It will re-dial if connection got broken and re-open channel if it will be closed.

Add channel re-opening capabilities on the top of a plain channel,
so you can get reliably consume or publish, even is something bad happened over the network.
You can use prepared consumers and publishers, or just re-dialing of connection and re-opening of a channel.

```go
conn, _ := connection.Dial("amqp://localhost:5672")
ch, _ := channel.New(conn) // add re-opening capabilities
// get constant flow of deliveries, channel will be re-opened if something goes south
deliveries := ch.Consume("example-queue", "", false, false, false, false, nil)
for d := range deliveries {
// process deliveries
}
// or publish reliably
ch.PublishWithContext(ctx, "example-exchange", "", false, false, amqp.Publishing{})
```
See full example at [examples/channel](./examples/channel/main.go).

## Consumers
## Consumer

GoRabbit provides [consumer](./consumer.go),
GoRabbit provides [consumer](./consumer/consumer.go),
which simplifies creation of RabbitMQ consumer.
It aims to provide sane defaults, but at the same time is fully configurable:

```go
conn, _ := connection.Dial("amqp://localhost:5672")
ch, _ := channel.New(conn)
consumer := gorabbit.NewConsumer(channel, "example-queue")
consumer.Start(context.Background(), gorabbit.ProcessorFunc(func(ctx context.Context, deliveries <-chan amqp.Delivery) error {
cons, _ := gorabbit.NewConsumer("amqp://localhost:5672", "example-queue")
cons.Start(context.Background(), consumer.ProcessorFunc(func(ctx context.Context, deliveries <-chan amqp.Delivery) error {
for d := range deliveries {
// process deliveries
}
Expand All @@ -57,24 +43,24 @@ consumer.Start(context.Background(), gorabbit.ProcessorFunc(func(ctx context.Con
```
See full example at [examples/consumer](./examples/consumer/main.go).

To simplify it even more, GoRabbit is providing [process](./process) building blocks:
To simplify it even more, GoRabbit is providing [consume](./consume) building blocks:
[ByOne](#one) and [InBatches](#batch). All you need is to write a function
for handling received messages:
if it returns error, processor will automatically NACK message, ACK on success.
if function returns error, it will automatically NACK message, or ACK on success.

### One

[One consumer](process/one.go) reads messages one-by-one and passes them to `Transaction`.
[One consume](process/one.go) reads messages one-by-one and passes them to `Transaction`.
Will ACK them on success, NACK on error.

Example of usage is in [examples/one](examples/one/main.go).

### One Middlewares

Easily plug-in any middlewares and pass them to `process.ByOne`:
Easily plug-in any middlewares and pass them to `consume.ByOne`:

```go
process.ByOne(myHandler, true, middleware.NewErrorLogging(zerolog.New(os.Stderr)))
consume.ByOne(tx, false, one.NewDeliveryLogging(zerolog.New(os.Stdout)))
```

Or implement your own with simple API:
Expand All @@ -83,16 +69,16 @@ Or implement your own with simple API:
type Middleware func(DeliveryHandler) DeliveryHandler
```

Check out more examples at [process/middleware/one.go](./process/middleware/one.go).
Check out more examples at [consume/middleware/one.go](./consume/middleware/one.go).

### Batch

If you want to process messages in batches, [Batch processor](process/batch.go) is here for you.
If you want to process messages in batches, [Batch consume](consume/batch.go) is here for you.

Batch processor reads a batch of messages from a broker and passes them to `BatchTransaction`.
Your code expected to return one-to-one errors for each passed message (`len(messages) == len(errors)`).

Consumer will ACK each of messages on success, NACK on error.
Batch processor will ACK each of messages on success, NACK on error.

Example of usage is in [examples/batch](examples/batch/main.go).

Expand All @@ -101,7 +87,7 @@ Example of usage is in [examples/batch](examples/batch/main.go).
Plug in any middlewares and pass them to the Batch consumer, implementing simple API:

```go
process.InBatches(100, time.Second, tx, false, middleware.NewBatchErrorLogging(zerolog.New(os.Stderr)))
consume.InBatches(100, time.Second, tx, false, batch.NewDeliveryLogging(zerolog.New(os.Stdout)))
```

Or implement your own with simple API:
Expand All @@ -110,27 +96,50 @@ Or implement your own with simple API:
type BatchMiddleware func(BatchDeliveryHandler) BatchDeliveryHandler
```

Check out more examples at [process/middleware/batch.go](./process/middleware/batch.go).
Check out more examples at [consume/middleware/batch.go](consume/middleware/batch.go).

# Publishing
# Publisher

Publishing is done using channel [`channel.New`](#channel-re-opening) the same way as in regular RabbitMQ client.
But GoRabbit also has helpers to simplify adding middlewares to your publisher.
Use `publish.Wrap` do add as much middlewares as you want:
GoRabbit provides [publisher](./publisher/publisher.go),
which simplifies creation of RabbitMQ publisher.
It aims to provide sane defaults, but at the same time is fully configurable:

```go
p := publish.Wrap(ch, publish.WithHeaders(amqp.Table{"x-example-header": "example-value"}))
p.PublishWithContext(
context.Background(),
"example-exchange",
"example-key",
false,
false,
amqp.Publishing{Body: []byte("hello world!")},
)
pub, _ := gorabbit.NewPublisher("amqp://localhost:5672", "example-exchange")
pub.Publish(context.Background(), "example-key", []byte("hello world!"))
```

See full example at [examples/publish](./examples/publish/main.go).
See full example at [examples/publisher](examples/publisher/main.go).

## Connection re-dialing

Add re-dialing capabilities when connection got closed:

```go
conn, _ := connection.Dial("amqp://localhost:5672")
ch, _ := conn.Channel() // use as regular connection, but it will re-dial if connection is closed
ch.Publish(...)
```

See full example at [examples/connection](./examples/connection/main.go).

## Channel re-opening

Add channel re-opening capabilities on the top of a plain channel,
so you can reliably consume or publish, even is something bad happened over the network:

```go
conn, _ := connection.Dial("amqp://localhost:5672")
ch, _ := conn.Channel(conn) // open channel with re-opening capabilities
// get constant flow of deliveries, channel will be re-opened if something goes south
for delivery := range ch.Consume("example-queue", "", false, false, false, false, nil) {
// process deliveries
}
// or publisher reliably
ch.PublishWithContext(ctx, "example-exchange", "", false, false, amqp.Publishing{})
```
See full example at [examples/channel](./examples/channel/main.go).


# Testing

Expand Down

0 comments on commit 36ce635

Please sign in to comment.