Skip to content

Commit

Permalink
🎉 release
Browse files Browse the repository at this point in the history
  • Loading branch information
adone committed Aug 29, 2017
1 parent e5ed02a commit 089bb69
Show file tree
Hide file tree
Showing 15 changed files with 926 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*.iml
.idea
*.coverprofile
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
VERSION=v$(shell cat VERSION)

test:
ginkgo -r -cover -race -progress -keepGoing -randomizeAllSpecs -slowSpecThreshold 5 -trace

release:
git tag $(VERSION) --message "release $(VERSION) ($(shell date '+%Y-%m-%d'))"
git push $(VERSION)
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# AMQP

High-level wrapper [amqp](github.com/streadway/amqp):

* Consumer/Producer, with [events](github.com/adone/go.events) integration
* Listener/Publisher with fault-tollerance mechanics
* autocreating queues, exchanges & bindings from YAML/JSON configuration
1 change: 1 addition & 0 deletions VERSION
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1.0.0
13 changes: 13 additions & 0 deletions amqp_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package amqp_test

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"testing"
)

func TestAmqp(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Amqp Suite")
}
52 changes: 52 additions & 0 deletions configuration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package amqp

type Binding struct {
Exchange string `yaml:"exchange"`
Key string `yaml:"key"`
}

type Exchange struct {
Name string `yaml:"name"`
Type string `yaml:"type"`
Durable bool `yaml:"durable"`
Bindings []Binding `yaml:"bindings"`
}

type Queue struct {
Name string `yaml:"name"`
Key string `yaml:"key"`
Durable bool `yaml:"durable"`
Deletable bool `yaml:"deletable"`
Bindings []Binding `yaml:"bindings"`
Arguments Arguments `yaml:"arguments"`
}

type Arguments struct {
MessageTTL int64 `yaml:"x-message-ttl"`
DeadLetterExchange string `yaml:"x-dead-letter-exchange"`
}

func (a Arguments) ToMap() map[string]interface{} {
result := make(map[string]interface{})
if a.MessageTTL > 0 {
result["x-message-ttl"] = a.MessageTTL
}
if a.DeadLetterExchange != "" {
result["x-dead-letter-exchange"] = a.DeadLetterExchange
}
return result
}

type Configuration struct {
URL string `yaml:"url"`
Node string `yaml:"node"`
Exchanges []Exchange `yaml:"exchanges"`
Queues []Queue `yaml:"queues"`
}

func (config *Configuration) GetNodeName() string {
if config.Node == "" {
return "node"
}
return config.Node
}
95 changes: 95 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package amqp

import (
"github.com/pkg/errors"
"github.com/streadway/amqp"
"gopkg.in/adone/go.events.v2"
)

// Consumer consume messages from message broker
type Consumer struct {
*events.Emitter

MaxMessages int
Queue string

tag string
channel *amqp.Channel
error chan error
done chan struct{}
}

// Start consuming messages
func (consumer *Consumer) Start() error {
consumer.done = make(chan struct{})
consumer.error = make(chan error)

defer close(consumer.error)

if err := consumer.channel.Qos(consumer.MaxMessages, 0, false); err != nil {
return err
}

deliveries, err := consumer.channel.Consume(
consumer.Queue, // name
consumer.tag, // consumerTag
false, // noAck
false, // exclusive
false, // noLocal
false, // noWait
nil, // arguments
)

if err != nil {
return err
}

go consumer.listenChannel()
go consumer.listenDeliveries(deliveries)

return <-consumer.error
}

// Stop consumimg messages
func (consumer *Consumer) Stop() error {
select {
case <-consumer.done:
return nil
default:
// manual call to channel.Cancel does not fire NotifyCancel, it just close deliveries chan
if err := consumer.channel.Cancel(consumer.tag, false); err != nil {
return errors.Wrap(err, "channel cancel failed")
}

return nil
}
}

func (consumer *Consumer) Close() error {
return consumer.channel.Close()
}

func (consumer *Consumer) listenDeliveries(deliveries <-chan amqp.Delivery) {
defer close(consumer.done)

for delivery := range deliveries {
consumer.Fire(events.New(ConsumerData, events.WithContext(events.Map{
"key": delivery.RoutingKey,
"data": Message{delivery},
"queue": consumer.Queue,
})))
}
}

func (consumer *Consumer) listenChannel() {
select {
case reason := <-consumer.channel.NotifyCancel(make(chan string)):
consumer.Fire(events.New(ConsumerCanceled, events.WithContext(events.Map{"consumer": consumer})))
consumer.error <- errors.Errorf("channel canceled: %s", reason)
case err := <-consumer.channel.NotifyClose(make(chan *amqp.Error)):
consumer.Fire(events.New(ConsumerClosed, events.WithContext(events.Map{"consumer": consumer, "error": err})))
consumer.error <- err
case <-consumer.done:
consumer.error <- nil
}
}
Loading

0 comments on commit 089bb69

Please sign in to comment.