Skip to content
This repository has been archived by the owner on Mar 29, 2022. It is now read-only.

Fix race conditions in AMQPServer, TopicExchange & Binding #28

Merged
merged 1 commit into from
Aug 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion amqptest/server/exchange.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package server

import "fmt"
import (
"fmt"
"sync"
)

type Exchange interface {
route(route string, d *Delivery) error
Expand All @@ -11,24 +14,32 @@ type Exchange interface {
type TopicExchange struct {
name string
bindings map[string]*Queue
mu *sync.RWMutex
}

func NewTopicExchange(name string) *TopicExchange {
return &TopicExchange{
name: name,
bindings: make(map[string]*Queue),
mu: &sync.RWMutex{},
}
}

func (t *TopicExchange) addBinding(route string, q *Queue) {
t.mu.Lock()
defer t.mu.Unlock()
t.bindings[route] = q
}

func (t *TopicExchange) delBinding(route string) {
t.mu.Lock()
defer t.mu.Unlock()
delete(t.bindings, route)
}

func (t *TopicExchange) route(route string, d *Delivery) error {
t.mu.RLock()
defer t.mu.RUnlock()
for bname, q := range t.bindings {
if topicMatch(bname, route) {
q.data <- d
Expand All @@ -43,16 +54,20 @@ func (t *TopicExchange) route(route string, d *Delivery) error {
type DirectExchange struct {
name string
bindings map[string]*Queue
mu *sync.RWMutex
}

func NewDirectExchange(name string) *DirectExchange {
return &DirectExchange{
name: name,
bindings: make(map[string]*Queue),
mu: &sync.RWMutex{},
}
}

func (d *DirectExchange) addBinding(route string, q *Queue) {
d.mu.Lock()
defer d.mu.Unlock()
if d.bindings == nil {
d.bindings = make(map[string]*Queue)
}
Expand All @@ -61,10 +76,14 @@ func (d *DirectExchange) addBinding(route string, q *Queue) {
}

func (d *DirectExchange) delBinding(route string) {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.bindings, route)
}

func (d *DirectExchange) route(route string, delivery *Delivery) error {
d.mu.RLock()
defer d.mu.RUnlock()
if q, ok := d.bindings[route]; ok {
q.data <- delivery
return nil
Expand Down
6 changes: 4 additions & 2 deletions amqptest/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type AMQPServer struct {
notifyChans map[string]*utils.ErrBroadcast
channels map[string][]*Channel
vhost *VHost
muChannels *sync.RWMutex
}

// NewServer returns a new fake amqp server
Expand All @@ -41,13 +42,14 @@ func newServer(amqpuri string) *AMQPServer {
notifyChans: make(map[string]*utils.ErrBroadcast),
channels: make(map[string][]*Channel),
vhost: NewVHost("/"),
muChannels: &sync.RWMutex{},
}
}

// CreateChannel returns a new fresh channel
func (s *AMQPServer) CreateChannel(connID string, conn wabbit.Conn) (wabbit.Channel, error) {
mu.Lock()
defer mu.Unlock()
s.muChannels.Lock()
defer s.muChannels.Unlock()

if _, ok := s.channels[connID]; !ok {
s.channels[connID] = make([]*Channel, 0, MaxChannels)
Expand Down
12 changes: 5 additions & 7 deletions amqptest/server/vhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@ func NewVHost(name string) *VHost {

func (v *VHost) createDefaultExchanges() {
exchs := make(map[string]Exchange)
exchs["amq.topic"] = &TopicExchange{}
exchs["amq.direct"] = &DirectExchange{}
exchs["topic"] = &TopicExchange{}
exchs["direct"] = &DirectExchange{}
exchs[""] = &DirectExchange{
name: "amq.direct",
}
exchs["amq.topic"] = NewTopicExchange("amq.topic")
exchs["amq.direct"] = NewDirectExchange("amq.direct")
exchs["topic"] = NewTopicExchange("topic")
exchs["direct"] = NewDirectExchange("direct")
exchs[""] = NewDirectExchange("amq.direct")

v.exchanges = exchs
}
Expand Down