Skip to content
This repository has been archived by the owner on Dec 22, 2023. It is now read-only.

Commit

Permalink
Add Broker to worker testing
Browse files Browse the repository at this point in the history
  • Loading branch information
rickmak committed Oct 13, 2016
1 parent d965bcb commit f95e596
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 10 deletions.
17 changes: 9 additions & 8 deletions pkg/server/plugin/zmq/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Broker struct {
backend *goczmq.Sock
bothPoller *goczmq.Poller
backendPoller *goczmq.Poller
workers workerQueue
freshWorkers chan []byte
logger *logrus.Entry
}
Expand Down Expand Up @@ -98,6 +99,7 @@ func NewBroker(name, frontendAddr, backendAddr string) (*Broker, error) {
backend: backend,
bothPoller: bothPoller,
backendPoller: backendPoller,
workers: newWorkerQueue(),
freshWorkers: make(chan []byte, 1),
logger: namedLogger,
}, nil
Expand All @@ -106,11 +108,10 @@ func NewBroker(name, frontendAddr, backendAddr string) (*Broker, error) {
// Run kicks start the Broker and listens for requests. It blocks function
// execution.
func (lb *Broker) Run() {
workers := newWorkerQueue()
heartbeatAt := time.Now().Add(HeartbeatInterval)
for {
var sock *goczmq.Sock
if workers.Len() == 0 {
if lb.workers.Len() == 0 {
sock = lb.backendPoller.Wait(heartbeatIntervalMS)
} else {
sock = lb.bothPoller.Wait(heartbeatIntervalMS)
Expand All @@ -125,7 +126,7 @@ func (lb *Broker) Run() {

address := frames[0]
msg := frames[1:]
tErr := workers.Tick(newWorker(address))
tErr := lb.workers.Tick(newWorker(address))
if tErr != nil {
status := string(msg[0])
if status != Ready {
Expand All @@ -134,7 +135,7 @@ func (lb *Broker) Run() {
}
if len(msg) == 1 {
status := string(msg[0])
lb.handleWorkerStatus(&workers, address, status)
lb.handleWorkerStatus(&lb.workers, address, status)
} else {
lb.frontend.SendMessage(msg)
lb.logger.Debugf("zmq/broker: plugin => server: %#x, %s\n", msg[0], msg)
Expand All @@ -145,7 +146,7 @@ func (lb *Broker) Run() {
panic(err)
}

frames = append([][]byte{workers.Next()}, frames...)
frames = append([][]byte{lb.workers.Next()}, frames...)
lb.backend.SendMessage(frames)
lb.logger.Debugf("zmq/broker: server => plugin: %#x, %s\n", frames[0], frames)
case nil:
Expand All @@ -154,17 +155,17 @@ func (lb *Broker) Run() {
panic("zmq/broker: received unknown socket")
}

lb.logger.Debugf("zmq/broker: idle worker count %d\n", workers.Len())
lb.logger.Debugf("zmq/broker: idle worker count %d\n", lb.workers.Len())
if heartbeatAt.Before(time.Now()) {
for _, worker := range workers.pworkers {
for _, worker := range lb.workers.pworkers {
msg := [][]byte{worker.address, []byte(Heartbeat)}
lb.logger.Debugf("zmq/broker: server => plugin Heartbeat: %s\n", worker.address)
lb.backend.SendMessage(msg)
}
heartbeatAt = time.Now().Add(HeartbeatInterval)
}

workers.Purge()
lb.workers.Purge()
}
}

Expand Down
75 changes: 73 additions & 2 deletions pkg/server/plugin/zmq/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func bytesArray(ss ...string) (bs [][]byte) {
return
}

func TestBrokerEndToEnd(t *testing.T) {
func TestBrokerSock(t *testing.T) {
const (
clientAddr = "inproc://client.test"
workerAddr = "inproc://worker.test"
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestWorker(t *testing.T) {
q.Add(newWorker(address2))
So(q.Len(), ShouldEqual, 2)

// With the worker to time out
// Wait the worker to time out
time.Sleep((HeartbeatLiveness + 1) * HeartbeatInterval)
q.Add(newWorker(address3))
So(q.Len(), ShouldEqual, 3)
Expand All @@ -204,3 +204,74 @@ func TestWorker(t *testing.T) {
})
})
}
func TestBrokerWorker(t *testing.T) {
const (
clientAddr = "inproc://server.test"
workerAddr = "inproc://plugin.test"
)
broker, err := NewBroker("", clientAddr, workerAddr)
if err != nil {
t.Fatalf("Failed to init broker: %v", err)
}
go broker.Run()

Convey("Test Broker with worker", t, func() {
Convey("receive Ready signal will register the worker", func() {
w := workerSock(t, "ready", workerAddr)
defer w.Destroy()
w.SendMessage(bytesArray(Ready))
<-broker.freshWorkers

So(broker.workers.Len(), ShouldEqual, 1)
w.SendMessage(bytesArray(Shutdown))
})

Convey("receive multiple Ready signal will register all workers", func() {
w1 := workerSock(t, "ready1", workerAddr)
defer w1.Destroy()
w1.SendMessage(bytesArray(Ready))
<-broker.freshWorkers
w2 := workerSock(t, "ready2", workerAddr)
defer w2.Destroy()
w2.SendMessage(bytesArray(Ready))
<-broker.freshWorkers

So(broker.workers.Len(), ShouldEqual, 2)
w1.SendMessage(bytesArray(Shutdown))
w2.SendMessage(bytesArray(Shutdown))
})

Convey("reveice Heartbeat without Reay will not register the worker", func() {
w := workerSock(t, "heartbeat", workerAddr)
defer w.Destroy()
w.SendMessage(bytesArray(Heartbeat))
// Wait the poller to get the message
time.Sleep(HeartbeatInterval)
So(broker.workers.Len(), ShouldEqual, 0)
})

Convey("send message from server to plugin", func() {
s := clientSock(t, "server", clientAddr)
defer s.Destroy()
w := workerSock(t, "worker", workerAddr)
w.SetRcvtimeo(heartbeatIntervalMS)
defer w.Destroy()
w.SendMessage(bytesArray(Ready))
<-broker.freshWorkers
w2 := workerSock(t, "worker2", workerAddr)
w2.SetRcvtimeo(heartbeatIntervalMS)
defer w2.Destroy()
w2.SendMessage(bytesArray(Ready))
<-broker.freshWorkers

So(broker.workers.Len(), ShouldEqual, 2)

s.SendMessage(bytesArray("from server"))
// Wait the poller to get the message
time.Sleep(HeartbeatInterval)
So(broker.workers.Len(), ShouldEqual, 1)
msg, _ := w2.RecvMessage()
So(msg[1], ShouldResemble, []byte("from server"))
})
})
}

0 comments on commit f95e596

Please sign in to comment.