diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index 8dbc670e6..fe6c2eaeb 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -17,9 +17,9 @@ package zmq import ( - "bytes" "errors" "fmt" + "sync" "time" "github.com/Sirupsen/logrus" @@ -50,81 +50,96 @@ const ( Shutdown = "\003" ) -// Broker implements the Paranoid Pirate queue described in the zguide: -// http://zguide.zeromq.org/py:all#Robust-Reliable-Queuing-Paranoid-Pirate-Pattern +// parcel is used to multiplex the chan with zmq worker +type parcel struct { + respChan chan []byte + frame []byte + retry int +} + +func newParcel(frame []byte) *parcel { + return &parcel{ + respChan: make(chan []byte), + frame: frame, + retry: 0, + } +} + +// Broker implements the Paranoid Pirate queue described as follow: // Related RFC: https://rfc.zeromq.org/spec:6/PPP +// refs: http://zguide.zeromq.org/py:all#Robust-Reliable-Queuing-Paranoid-Pirate-Pattern // with the addition of: // // 1. Shutdown signal, which signifies a normal termination of worker to provide // a fast path of worker removal type Broker struct { - name string - // NOTE: goroutines are caller of plugin, so frontend is Go side, - // backend is plugin side - frontend *goczmq.Sock - backend *goczmq.Sock - bothPoller *goczmq.Poller - backendPoller *goczmq.Poller - workers workerQueue - freshWorkers chan []byte - logger *logrus.Entry + name string + backendAddr string + frontend chan [][]byte + recvChan chan *parcel + addressChan map[string]chan []byte + timeout chan string + workers workerQueue + logger *logrus.Entry + stop chan int } // NewBroker returns a new *Broker. -func NewBroker(name, frontendAddr, backendAddr string) (*Broker, error) { - namedLogger := log.WithFields(logrus.Fields{"plugin": name}) - frontend, err := goczmq.NewRouter(frontendAddr) - if err != nil { - panic(err) +func NewBroker(name, backendAddr string) (*Broker, error) { + namedLogger := log.WithFields(logrus.Fields{ + "plugin": name, + "eaddr": backendAddr, + }) + + broker := &Broker{ + name: name, + backendAddr: backendAddr, + frontend: make(chan [][]byte, 10), + recvChan: make(chan *parcel, 10), + addressChan: map[string]chan []byte{}, + timeout: make(chan string), + workers: newWorkerQueue(), + logger: namedLogger, + stop: make(chan int), } - backend, err := goczmq.NewRouter(backendAddr) + go broker.Run() + go broker.Channeler() + return broker, nil +} + +// Run the Broker and listens for zmq requests. +func (lb *Broker) Run() { + backend, err := goczmq.NewRouter(lb.backendAddr) if err != nil { panic(err) } + defer backend.Destroy() - backendPoller, err := goczmq.NewPoller(backend) + pull, err := goczmq.NewPull(fmt.Sprintf("inproc://chanpipeline%d", lb.name)) if err != nil { panic(err) } + defer pull.Destroy() - bothPoller, err := goczmq.NewPoller(frontend, backend) + backendPoller, err := goczmq.NewPoller(backend, pull) if err != nil { panic(err) } - return &Broker{ - name: name, - frontend: frontend, - backend: backend, - bothPoller: bothPoller, - backendPoller: backendPoller, - workers: newWorkerQueue(), - freshWorkers: make(chan []byte, 1), - logger: namedLogger, - }, nil -} - -// Run kicks start the Broker and listens for requests. It blocks function -// execution. -func (lb *Broker) Run() { heartbeatAt := time.Now().Add(HeartbeatInterval) for { - var sock *goczmq.Sock - if lb.workers.Len() == 0 { - sock = lb.backendPoller.Wait(heartbeatIntervalMS) - } else { - sock = lb.bothPoller.Wait(heartbeatIntervalMS) - } + sock := backendPoller.Wait(heartbeatIntervalMS) + lb.workers.Lock() switch sock { - case lb.backend: - frames, err := lb.backend.RecvMessage() + case backend: + frames, err := backend.RecvMessage() if err != nil { panic(err) } - address := frames[0] + address := string(frames[0]) msg := frames[1:] tErr := lb.workers.Tick(newWorker(address)) if tErr != nil { @@ -135,50 +150,129 @@ func (lb *Broker) Run() { } if len(msg) == 1 { status := string(msg[0]) - lb.handleWorkerStatus(&lb.workers, address, status) + lb.handleWorkerStatus(address, status) } else { - lb.frontend.SendMessage(msg) + lb.frontend <- msg lb.logger.Debugf("zmq/broker: plugin => server: %#x, %s\n", msg[0], msg) } - case lb.frontend: - frames, err := lb.frontend.RecvMessage() + case pull: + frames, err := pull.RecvMessage() if err != nil { panic(err) } - - frames = append([][]byte{lb.workers.Next()}, frames...) - lb.backend.SendMessage(frames) - lb.logger.Debugf("zmq/broker: server => plugin: %#x, %s\n", frames[0], frames) + backend.SendMessage(frames) case nil: // do nothing default: panic("zmq/broker: received unknown socket") } - lb.logger.Debugf("zmq/broker: idle worker count %d\n", lb.workers.Len()) if heartbeatAt.Before(time.Now()) { for _, worker := range lb.workers.pworkers { - msg := [][]byte{worker.address, []byte(Heartbeat)} - lb.logger.Debugf("zmq/broker: server => plugin Heartbeat: %s\n", worker.address) - lb.backend.SendMessage(msg) + msg := [][]byte{ + []byte(worker.address), + []byte(Heartbeat), + } + backend.SendMessage(msg) } heartbeatAt = time.Now().Add(HeartbeatInterval) } lb.workers.Purge() + lb.workers.Unlock() } } -func (lb *Broker) handleWorkerStatus(workers *workerQueue, address []byte, status string) { +// Channeler accept message from RPC and dispatch to zmq if avalible. +// It retry and timeout the request if the zmq worker is not yet avalible. +func (lb *Broker) Channeler() { + lb.logger.Infof("zmq channler running %p\n", lb) + push, err := goczmq.NewPush(fmt.Sprintf("inproc://chanpipeline%d", lb.name)) + if err != nil { + panic(err) + } + defer push.Destroy() +ChannelerLoop: + for { + select { + case frames := <-lb.frontend: + lb.logger.Debugf("zmq/broker: zmq => channel %#x, %s\n", frames[0], frames) + // Dispacth back to the channel based on the zmq first frame + address := string(frames[0]) + respChan, ok := lb.addressChan[address] + if !ok { + lb.logger.Infof("zmq/broker: chan not found for worker %s\n", address) + break + } + delete(lb.addressChan, address) + respChan <- frames[2] + case p := <-lb.recvChan: + // Save the chan and dispatch the message to zmq + // If current no worker ready, will retry after HeartbeatInterval. + // Retry for HeartbeatLiveness tine + address := lb.workers.Next() + if address == "" { + if p.retry < HeartbeatLiveness { + p.retry += 1 + lb.logger.Infof("zmq/broker: no worker avaliable, retry %d...\n", p.retry) + go func(p2 *parcel) { + time.Sleep(HeartbeatInterval) + lb.recvChan <- p2 + }(p) + break + } + lb.logger.Infof("zmq/broker: no worker avaliable, timeout.\n") + p.respChan <- []byte{0} + break + } + addr := []byte(address) + frames := [][]byte{ + addr, + addr, + []byte{}, + p.frame, + } + lb.addressChan[address] = p.respChan + push.SendMessage(frames) + lb.logger.Debugf("zmq/broker: channel => zmq: %#x, %s\n", addr, frames) + go lb.setTimeout(address, HeartbeatInterval*HeartbeatLiveness) + case address := <-lb.timeout: + respChan, ok := lb.addressChan[address] + if !ok { + break + } + lb.logger.Infof("zmq/broker: chan time out for worker %s\n", address) + delete(lb.addressChan, address) + respChan <- []byte{0} + case <-lb.stop: + break ChannelerLoop + } + } + lb.logger.Infof("zmq channler stopped %p!\n", lb) +} + +func (lb *Broker) RPC(requestChan chan chan []byte, in []byte) { + p := newParcel(in) + lb.recvChan <- p + go func() { + requestChan <- p.respChan + }() +} + +func (lb *Broker) setTimeout(address string, wait time.Duration) { + time.Sleep(wait) + lb.timeout <- address +} + +func (lb *Broker) handleWorkerStatus(address string, status string) { switch status { case Ready: log.Infof("zmq/broker: ready worker = %s", address) - workers.Add(newWorker(address)) - lb.freshWorkers <- address + lb.workers.Add(newWorker(address)) case Heartbeat: // no-op case Shutdown: - workers.Remove(address) + lb.workers.Remove(address) log.Infof("zmq/broker: shutdown of worker = %s", address) default: log.Errorf("zmq/broker: invalid status from worker = %s: %s", address, status) @@ -186,11 +280,11 @@ func (lb *Broker) handleWorkerStatus(workers *workerQueue, address []byte, statu } type pworker struct { - address []byte + address string expiry time.Time } -func newWorker(address []byte) pworker { +func newWorker(address string) pworker { return pworker{ address, time.Now().Add(HeartbeatLiveness * HeartbeatInterval), @@ -198,47 +292,81 @@ func newWorker(address []byte) pworker { } // workerQueue is a last tick fist out queue. -// A worker need to register itself using Add before it can tick. -// Ticking of an non-registered worker will be no-ops. +// +// Worker is expect to register itself on ready. Tick itself when it is +// avalible. The most recently Tick worker will got the job. +// A worker do not Tick itself within the expiry will regards as disconnected +// and requires to Add itself again to become avaliable. +// +// workerQueue is not goroutine safe. To use it safely across goroutine. +// Please use the Lock/Unlock interace before manupliate the queue item via +// methods like Add/Tick/Purge. +// Comsuming method Next is the only method will acquire the mutex lock +// by itself. type workerQueue struct { pworkers []pworker addresses map[string]bool + mu *sync.Mutex } func newWorkerQueue() workerQueue { + mu := &sync.Mutex{} return workerQueue{ []pworker{}, map[string]bool{}, + mu, } } +func (q *workerQueue) Lock() { + q.mu.Lock() +} + +func (q *workerQueue) Unlock() { + q.mu.Unlock() +} -func (q workerQueue) Len() int { +func (q *workerQueue) Len() int { + q.mu.Lock() + defer q.mu.Unlock() return len(q.pworkers) } -func (q *workerQueue) Next() []byte { +// Next will pop the next avaliable worker, and the worker will not avalible +// until it Tick back to the workerQueue again. +// This worker comsuing method will acquire mutex lock. +func (q *workerQueue) Next() string { + q.mu.Lock() + defer q.mu.Unlock() + cnt := len(q.pworkers) + if cnt == 0 { + return "" + } workers := q.pworkers worker := workers[len(workers)-1] q.pworkers = workers[:len(workers)-1] return worker.address } +// Add will register the worker as live worker and call Tick to make itself to +// the next avaliable worker. func (q *workerQueue) Add(worker pworker) { - q.addresses[string(worker.address)] = true + q.addresses[worker.address] = true err := q.Tick(worker) if err == nil { return } } +// Tick will make the worker to be the next avalible worker. Ticking an un +// registered worker will be no-op. func (q *workerQueue) Tick(worker pworker) error { - if _, ok := q.addresses[string(worker.address)]; !ok { + if _, ok := q.addresses[worker.address]; !ok { return errors.New(fmt.Sprintf("zmq/broker: Ticking non-registered worker = %s", worker.address)) } workers := q.pworkers for i, w := range workers { - if bytes.Equal(w.address, worker.address) { + if w.address == worker.address { q.pworkers = append(append(workers[:i], workers[i+1:]...), worker) return nil } @@ -248,6 +376,8 @@ func (q *workerQueue) Tick(worker pworker) error { return nil } +// Purge will unregister the worker that is not heathly. i.e. haven't Tick for +// a while. func (q *workerQueue) Purge() { workers := q.pworkers @@ -257,17 +387,19 @@ func (q *workerQueue) Purge() { break } q.pworkers = workers[i+1:] - delete(q.addresses, string(w.address)) + delete(q.addresses, w.address) log.Infof("zmq/broker: disconnected worker = %s", w.address) } } -func (q *workerQueue) Remove(address []byte) { - delete(q.addresses, string(address)) +// Remove will unregister the worker with specified address regardless of its +// expiry. Intented for clean shutdown and fast removal of worker. +func (q *workerQueue) Remove(address string) { + delete(q.addresses, address) workers := q.pworkers for i, w := range workers { - if bytes.Equal(w.address, address) { + if w.address == address { q.pworkers = append(workers[:i], workers[i+1:]...) break } diff --git a/pkg/server/plugin/zmq/broker_test.go b/pkg/server/plugin/zmq/broker_test.go index c31dcbdb2..bebdd36b3 100644 --- a/pkg/server/plugin/zmq/broker_test.go +++ b/pkg/server/plugin/zmq/broker_test.go @@ -17,8 +17,7 @@ package zmq import ( - "reflect" - "strings" + "sync" "testing" "time" @@ -45,6 +44,17 @@ func clientSock(t *testing.T, id string, addr string) *goczmq.Sock { return sock } +func recvNonControlFrame(w *goczmq.Sock) [][]byte { + var msg [][]byte + for { + msg, _ = w.RecvMessage() + if len(msg) != 1 { + break + } + } + return msg +} + func bytesArray(ss ...string) (bs [][]byte) { for _, s := range ss { bs = append(bs, []byte(s)) @@ -52,99 +62,11 @@ func bytesArray(ss ...string) (bs [][]byte) { return } -func TestBrokerSock(t *testing.T) { - const ( - clientAddr = "inproc://client.test" - workerAddr = "inproc://worker.test" - ) - broker, err := NewBroker("", clientAddr, workerAddr) - if err != nil { - t.Fatalf("Failed to init broker: %v", err) - } - go broker.Run() - - w0 := workerSock(t, "w0", workerAddr) - defer w0.Destroy() - w0.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers - - c0 := clientSock(t, "c0", clientAddr) - defer c0.Destroy() - c0.SendMessage(bytesArray("simple job")) - - msg, _ := w0.RecvMessage() - expectedMsg := bytesArray(c0.Identity(), "simple job") - if !reflect.DeepEqual(msg, expectedMsg) { - t.Fatalf(`want %v, got %v`, expectedMsg, msg) - } - - w0.SendMessage(expectedMsg) - msg, _ = c0.RecvMessage() - expectedMsg = bytesArray("simple job") - if !reflect.DeepEqual(msg, expectedMsg) { - t.Fatalf("want %v, got %v", expectedMsg, msg) - } - - // multiple workers, multiple clients - w1 := workerSock(t, "w1", workerAddr) - defer w1.Destroy() - w1.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers - - c1 := clientSock(t, "c1", clientAddr) - defer c1.Destroy() - c2 := clientSock(t, "c2", clientAddr) - defer c2.Destroy() - - c0.SendMessage(bytesArray("job 0")) - c1.SendMessage(bytesArray("job 1")) - c2.SendMessage(bytesArray("job 2")) - - work0, _ := w0.RecvMessage() - if !strings.HasPrefix(string(work0[1]), "job") { - t.Fatalf(`want job *, got %s`, work0[1]) - } - - work1, _ := w1.RecvMessage() - if !strings.HasPrefix(string(work1[1]), "job") { - t.Fatalf(`want job, got %v`, work1[1]) - } - - // let w0 complete its work and receive new job - w0.SendMessage(work0) - work2, _ := w0.RecvMessage() - if !strings.HasPrefix(string(work2[1]), "job") { - t.Fatalf(`want job, got %v`, work2[1]) - } - - // complete all jobs - w0.SendMessage(work2) - w1.SendMessage(work1) - - // clients receive all jobs - resp0, err := c0.RecvMessage() - resp1, err := c1.RecvMessage() - resp2, err := c2.RecvMessage() - - resps := []string{ - string(resp0[0]), - string(resp1[0]), - string(resp2[0]), - } - if !reflect.DeepEqual(resps, []string{ - "job 0", - "job 1", - "job 2", - }) { - t.Fatalf(`want ["job 0", "job 1", "job 2"], got %v`, resps) - } -} - func TestWorker(t *testing.T) { Convey("Test workerQueue", t, func() { - address1 := []byte("address1") - address2 := []byte("address2") - address3 := []byte("address3") + address1 := "address1" + address2 := "address2" + address3 := "address3" Convey("Add and pick a worker", func() { q := newWorkerQueue() @@ -206,21 +128,22 @@ func TestWorker(t *testing.T) { } func TestBrokerWorker(t *testing.T) { const ( - clientAddr = "inproc://server.test" workerAddr = "inproc://plugin.test" ) - broker, err := NewBroker("", clientAddr, workerAddr) + broker, err := NewBroker("test", workerAddr) if err != nil { t.Fatalf("Failed to init broker: %v", err) } - go broker.Run() - Convey("Test Broker with worker", t, func() { + Convey("Test Broker", t, func() { Convey("receive Ready signal will register the worker", func() { w := workerSock(t, "ready", workerAddr) - defer w.Destroy() + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() w.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers + time.Sleep(HeartbeatInterval) So(broker.workers.Len(), ShouldEqual, 1) w.SendMessage(bytesArray(Shutdown)) @@ -228,50 +151,225 @@ func TestBrokerWorker(t *testing.T) { Convey("receive multiple Ready signal will register all workers", func() { w1 := workerSock(t, "ready1", workerAddr) - defer w1.Destroy() + defer func() { + w1.SendMessage(bytesArray(Shutdown)) + w1.Destroy() + }() w1.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers w2 := workerSock(t, "ready2", workerAddr) - defer w2.Destroy() + defer func() { + w2.SendMessage(bytesArray(Shutdown)) + w2.Destroy() + }() w2.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers + time.Sleep(HeartbeatInterval) So(broker.workers.Len(), ShouldEqual, 2) w1.SendMessage(bytesArray(Shutdown)) w2.SendMessage(bytesArray(Shutdown)) }) - Convey("reveice Heartbeat without Reay will not register the worker", func() { + Convey("reveice Heartbeat without Ready will not register the worker", func() { w := workerSock(t, "heartbeat", workerAddr) - defer w.Destroy() + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() w.SendMessage(bytesArray(Heartbeat)) // Wait the poller to get the message time.Sleep(HeartbeatInterval) So(broker.workers.Len(), ShouldEqual, 0) }) - Convey("send message from server to plugin", func() { - s := clientSock(t, "server", clientAddr) - defer s.Destroy() - w := workerSock(t, "worker", workerAddr) + Convey("reveice worker message without Reay will be ignored", func() { + w := workerSock(t, "unregistered", workerAddr) + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() + w.SendMessage([][]byte{ + []byte("unregistered"), + []byte{0}, + []byte("Message to be ignored"), + }) + // Wait the poller to get the message + time.Sleep(HeartbeatInterval) + So(broker.workers.Len(), ShouldEqual, 0) + }) + + Convey("receive RPC will timeout", func() { + w := workerSock(t, "timeout", workerAddr) w.SetRcvtimeo(heartbeatIntervalMS) - defer w.Destroy() + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() w.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers - w2 := workerSock(t, "worker2", workerAddr) - w2.SetRcvtimeo(heartbeatIntervalMS) - defer w2.Destroy() - w2.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers + time.Sleep(HeartbeatInterval) - So(broker.workers.Len(), ShouldEqual, 2) + So(broker.workers.Len(), ShouldEqual, 1) - s.SendMessage(bytesArray("from server")) - // Wait the poller to get the message + reqChan := make(chan chan []byte) + broker.RPC(reqChan, []byte(("from server"))) + respChan := <-reqChan + msg := <-respChan + So(msg, ShouldResemble, []byte{0}) + }) + + Convey("recive RPC without Ready worker will wait for Heartbeat Liveness time", func() { + reqChan := make(chan chan []byte) + timeout := time.Now().Add(HeartbeatInterval * HeartbeatLiveness) + broker.RPC(reqChan, []byte(("from server"))) + respChan := <-reqChan + resp := <-respChan + So(resp, ShouldResemble, []byte{0}) + So(time.Now(), ShouldHappenAfter, timeout) + }) + + Convey("worker after recive RPC and before timeout will got the message", func() { + reqChan := make(chan chan []byte) + broker.RPC(reqChan, []byte(("from server"))) + respChan := <-reqChan time.Sleep(HeartbeatInterval) - So(broker.workers.Len(), ShouldEqual, 1) - msg, _ := w2.RecvMessage() - So(msg[1], ShouldResemble, []byte("from server")) + + w := workerSock(t, "lateworker", workerAddr) + w.SetRcvtimeo(heartbeatIntervalMS * 2) + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() + w.SendMessage(bytesArray(Ready)) + + msg := recvNonControlFrame(w) + So(len(msg), ShouldEqual, 3) + So(msg[2], ShouldResemble, []byte("from server")) + msg[2] = []byte("from worker") + w.SendMessage(msg) + + resp := <-respChan + So(resp, ShouldResemble, []byte("from worker")) + }) + + Convey("broker RPC recive worker reply", func() { + w := workerSock(t, "worker", workerAddr) + w.SetRcvtimeo(heartbeatIntervalMS * 2) + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() + w.SendMessage(bytesArray(Ready)) + + reqChan := make(chan chan []byte) + broker.RPC(reqChan, []byte(("from server"))) + respChan := <-reqChan + + msg := recvNonControlFrame(w) + So(len(msg), ShouldEqual, 3) + So(msg[2], ShouldResemble, []byte("from server")) + msg[2] = []byte("from worker") + w.SendMessage(msg) + + resp := <-respChan + So(resp, ShouldResemble, []byte("from worker")) + }) + + Convey("send message from server to multiple plugin", func(c C) { + go func() { + w := workerSock(t, "worker1", workerAddr) + w.SetRcvtimeo(heartbeatIntervalMS * 2) + defer func() { + w.SendMessage(bytesArray(Shutdown)) + time.Sleep((HeartbeatLiveness + 1) * HeartbeatInterval) + w.Destroy() + }() + w.SendMessage(bytesArray(Ready)) + msg := recvNonControlFrame(w) + if len(msg) == 3 { + c.So(msg[2], ShouldResemble, []byte("from server")) + msg[2] = []byte("from worker") + w.SendMessage(msg) + } + }() + + go func() { + w2 := workerSock(t, "worker2", workerAddr) + w2.SetRcvtimeo(heartbeatIntervalMS * 2) + defer func() { + w2.SendMessage(bytesArray(Shutdown)) + time.Sleep((HeartbeatLiveness + 1) * HeartbeatInterval) + w2.Destroy() + }() + w2.SendMessage(bytesArray(Ready)) + msg := recvNonControlFrame(w2) + if len(msg) == 3 { + c.So(msg[2], ShouldResemble, []byte("from server")) + msg[2] = []byte("from worker") + w2.SendMessage(msg) + } + }() + + reqChan := make(chan chan []byte) + broker.RPC(reqChan, []byte(("from server"))) + respChan := <-reqChan + resp := <-respChan + So(resp, ShouldResemble, []byte("from worker")) + }) + + Convey("send multiple message from server to multple plugin", func(c C) { + go func() { + w := workerSock(t, "mworker1", workerAddr) + w.SetRcvtimeo(heartbeatIntervalMS * 2) + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() + w.SendMessage(bytesArray(Ready)) + + msg := recvNonControlFrame(w) + c.So(len(msg), ShouldEqual, 3) + c.So(msg[2], ShouldResemble, []byte("from server")) + msg[2] = []byte("from worker1") + w.SendMessage(msg) + }() + + go func() { + w2 := workerSock(t, "mworker2", workerAddr) + w2.SetRcvtimeo(heartbeatIntervalMS * 2) + defer func() { + w2.SendMessage(bytesArray(Shutdown)) + w2.Destroy() + }() + w2.SendMessage(bytesArray(Ready)) + + msg := recvNonControlFrame(w2) + c.So(len(msg), ShouldEqual, 3) + c.So(msg[2], ShouldResemble, []byte("from server")) + msg[2] = []byte("from worker2") + w2.SendMessage(msg) + }() + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + reqChan := make(chan chan []byte) + broker.RPC(reqChan, []byte(("from server"))) + respChan := <-reqChan + resp := <-respChan + c.So(resp, ShouldNotBeEmpty) + }() + go func() { + defer wg.Done() + req2Chan := make(chan chan []byte) + broker.RPC(req2Chan, []byte(("from server"))) + resp2Chan := <-req2Chan + resp := <-resp2Chan + c.So(resp, ShouldNotBeEmpty) + }() + wg.Wait() + }) }) + broker.stop <- 1 + } diff --git a/pkg/server/plugin/zmq/rpc.go b/pkg/server/plugin/zmq/rpc.go index c4db9a1f7..8c35ec2fd 100644 --- a/pkg/server/plugin/zmq/rpc.go +++ b/pkg/server/plugin/zmq/rpc.go @@ -17,9 +17,11 @@ package zmq import ( + "bytes" "encoding/json" "fmt" "runtime/debug" + "time" "github.com/Sirupsen/logrus" @@ -32,25 +34,13 @@ import ( "golang.org/x/net/context" ) -const initRequestTimeout = 2000 - -type zmqTransport struct { - state skyplugin.TransportState - name string - iaddr string // the internal addr used by goroutines to make request to plugin - eaddr string // the addr exposed for plugin to connect to with REP. - broker *Broker - initHandler skyplugin.TransportInitHandler - logger *logrus.Entry - config skyconfig.Configuration -} +const initRequestTimeout = time.Second * 2 type request struct { Context context.Context Kind string Name string Param interface{} - Timeout int // timeout in millisecond } type hookRequest struct { @@ -110,6 +100,15 @@ func (req *request) MarshalJSON() ([]byte, error) { return json.Marshal(¶mReq) } +type zmqTransport struct { + state skyplugin.TransportState + name string + broker *Broker + initHandler skyplugin.TransportInitHandler + logger *logrus.Entry + config skyconfig.Configuration +} + func (p *zmqTransport) State() skyplugin.TransportState { return p.state } @@ -126,21 +125,16 @@ func (p *zmqTransport) setState(state skyplugin.TransportState) { } } +// RequestInit is expected to run in separate gorountine and called once to +// set it internal state with coordinate with broker. func (p *zmqTransport) RequestInit() { for { - address := <-p.broker.freshWorkers - - if p.state != skyplugin.TransportStateUninitialized { - // Although the plugin is only initialized once, we need - // to clear the channel buffer so that broker doesn't get stuck + out, err := p.RunInit() + if err != nil { + p.logger.WithField("err", err). + Warnf(`zmq/rpc: Unable to send init request to plugin "%s". Retrying...`, p.name) continue } - - p.logger.Debugf("zmq transport got fresh worker %s", string(address)) - - // TODO: Only send init to the new address. For now, we let - // the broker decide. - out, err := p.RunInit() if p.initHandler != nil { handlerError := p.initHandler(out, err) if err != nil || handlerError != nil { @@ -148,6 +142,7 @@ func (p *zmqTransport) RequestInit() { } } p.setState(skyplugin.TransportStateReady) + break } } @@ -155,14 +150,8 @@ func (p *zmqTransport) RunInit() (out []byte, err error) { param := struct { Config skyconfig.Configuration `json:"config"` }{p.config} - req := request{Kind: "init", Param: param, Timeout: initRequestTimeout} - for { - out, err = p.ipc(&req) - if err == nil { - break - } - p.logger.WithField("err", err).Warnf(`zmq/rpc: Unable to send init request to plugin "%s". Retrying...`, p.name) - } + req := request{Kind: "init", Param: param} + out, err = p.ipc(&req) return } @@ -247,40 +236,22 @@ func (p *zmqTransport) ipc(req *request) (out []byte, err error) { return } }() - var ( - in []byte - reqSock *goczmq.Sock - ) - in, err = json.Marshal(req) + in, err := json.Marshal(req) if err != nil { return } - reqSock, err = goczmq.NewReq(p.iaddr) - if err != nil { - return - } - defer func() { - reqSock.Destroy() - }() - if req.Timeout > 0 { - reqSock.SetRcvtimeo(req.Timeout) - } - err = reqSock.SendMessage([][]byte{in}) - if err != nil { - return - } - - msg, err := reqSock.RecvMessage() - if err != nil { - return - } + reqChan := make(chan chan []byte) + p.broker.RPC(reqChan, in) + respChan := <-reqChan + // Broker will sent back a null byte if time out + msg := <-respChan - if len(msg) != 1 { - err = fmt.Errorf("malformed resp msg = %s", msg) + if bytes.Equal(msg, []byte{0}) { + err = fmt.Errorf("RPC time out") } else { - out = msg[0] + out = msg } return @@ -290,12 +261,8 @@ type zmqTransportFactory struct { } func (f zmqTransportFactory) Open(name string, args []string, config skyconfig.Configuration) (transport skyplugin.Transport) { - const internalAddrFmt = `inproc://%s` - - internalAddr := fmt.Sprintf(internalAddrFmt, name) externalAddr := args[0] - - broker, err := NewBroker(name, internalAddr, externalAddr) + broker, err := NewBroker(name, externalAddr) logger := log.WithFields(logrus.Fields{"plugin": name}) if err != nil { logger.Panicf("Failed to init broker for zmq transport: %v", err) @@ -304,18 +271,11 @@ func (f zmqTransportFactory) Open(name string, args []string, config skyconfig.C p := zmqTransport{ state: skyplugin.TransportStateUninitialized, name: name, - iaddr: internalAddr, - eaddr: externalAddr, broker: broker, logger: logger, config: config, } - go func() { - logger.Infof("Running zmq broker:\niaddr = %s\neaddr = %s", internalAddr, externalAddr) - broker.Run() - }() - return &p }