go-rigger-amqp 将github.com/streadway/amqp封装成了actor模式/接口
go get github.con/saintEvol/go-rigger-amqp
graph TD
app(app) --> topSup
topSup --> connectionManagingServer
topSup --> connectionSup
connectionSup --> connectionServer
topSup --> channelSup
channelSup --> channelServer
- 连接MQ
connection, err := riggerAmqp.Connect(riggerAmqp.ConnectConfig{
Tag: "test-consume",
Host: "localhost",
Port: 5672,
User: "admin",
Password: "admin",
})
- 打开信道
// 可以多次调用以打开多个信道 channel, err := connection.OpenChannel()
- 声明交换机
err = channel.ExchangeDeclare(&riggerAmqp.ExchangeDeclareConfig{ Name: "rigger-amqp-test", Kind: riggerAmqp.ExchangeKindDirect, // "direct"类型的交换机 IsPassive: false, Durable: true, AutoDelete: false, Internal: false, NoWait: false, Args: nil, })
- 声明队列
-
channel.QueueDeclare(&riggerAmqp.QueueDeclareConfig{ Name: "rigger-amqp-test", Durable: true, IsPassive: false, AutoDelete: false, Exclusive: false, NoWait: false, Args: nil, })
- 绑定队列
err = channel.QueueBind(&riggerAmqp.QueueBindConfig {
Name: "rigger-amqp-test",
Key: "",
Exchange: "rigger-amqp-test",
NoWait: false,
Args: nil,
})
- 发送消息:
err = channel.Publish(&riggerAmqp.Publish{ Exchange: "rigger-amqp-test", Key: "", Mandatory: false, Immediate: false, Message: &amqp.Publishing{ Headers: nil, ContentType: "", ContentEncoding: "", DeliveryMode: 0, Priority: 0, CorrelationId: "", ReplyTo: "", Expiration: "", MessageId: "", Timestamp: time.Time{}, Type: "", UserId: "", AppId: "", Body: []byte("test msg"), }, })
- 消费消息:
channel.Consume(riggerAmqp.ConsumeConfig{ Pid: consumerPid, Queue: "rigger-amqp-test", Consumer: "rigger-consumer-test", AutoAck: false, Exclusive: false, NoLocal: false, NoWait: false, Args: nil, })