Small, user-friendly library for consuming PGQ.
go get github.com/furdarius/pgqgo
$ dep ensure -add github.com/furdarius/pgqgo
- Implement Processor
- Register Consumer
- Start Consuming
Usage example:
// LogProcessor used to output received batch id, it's size and data.
// Implements pgqgo.BatchProcessor.
type LogProcessor struct{}
// Process consequentially output to stdout each events data from batch.
func (p *LogProcessor) Process(ctx context.Context, batchID int, events []pgqgo.Event) ([]pgqgo.RetryEvent, error) {
log.Printf("batch received: batch_id = %d, size = %d\n", batchID, len(events))
for _, event := range events {
fmt.Println(event)
}
return nil, nil
}
func main() {
db, _ := sql.Open("postgres", "postgres://postgres:postgres@127.0.0.1:5432/postgres?sslmode=disable")
// Create consumer with batch processor.
processor := &LogProcessor{}
consumer := pgqgo.NewConsumer(db, processor, "consumer_name")
queue := "queue_name"
err = consumer.Register(ctx, queue)
if err != nil && err != pgqgo.ErrAlreadyExists {
log.Fatalf("failed to register pgq consumer: %v", err)
}
err = consumer.Start(ctx, queue)
if err != nil && err != context.Canceled {
log.Fatalf("failed to start pgq consumer: %v", err)
}
}