From 381bc9a95b279ac596d88d5a8264442f054bbfc5 Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Thu, 13 Oct 2016 14:27:02 +0800 Subject: [PATCH 01/18] Remove eaddr from zmq transport --- pkg/server/plugin/zmq/rpc.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/server/plugin/zmq/rpc.go b/pkg/server/plugin/zmq/rpc.go index c4db9a1f7..af714d269 100644 --- a/pkg/server/plugin/zmq/rpc.go +++ b/pkg/server/plugin/zmq/rpc.go @@ -38,7 +38,6 @@ type zmqTransport struct { state skyplugin.TransportState name string iaddr string // the internal addr used by goroutines to make request to plugin - eaddr string // the addr exposed for plugin to connect to with REP. broker *Broker initHandler skyplugin.TransportInitHandler logger *logrus.Entry @@ -305,7 +304,6 @@ func (f zmqTransportFactory) Open(name string, args []string, config skyconfig.C state: skyplugin.TransportStateUninitialized, name: name, iaddr: internalAddr, - eaddr: externalAddr, broker: broker, logger: logger, config: config, From 4b86067618652b389b8d5f1b7acc8d0e8be8070f Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Thu, 13 Oct 2016 22:48:32 +0800 Subject: [PATCH 02/18] Remove the frontend zmq router and replace with go chan --- pkg/server/plugin/zmq/broker.go | 117 +++++++++++++++------ pkg/server/plugin/zmq/broker_test.go | 152 +++++++++------------------ pkg/server/plugin/zmq/rpc.go | 67 ++++-------- 3 files changed, 159 insertions(+), 177 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index 8dbc670e6..de4eb3444 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -50,6 +50,19 @@ const ( Shutdown = "\003" ) +// parcel is used to multiplex the chan with zmq worker +type parcel struct { + respChan chan []byte + frame []byte +} + +func newParcel(frame []byte) *parcel { + return &parcel{ + respChan: make(chan []byte), + frame: frame, + } +} + // Broker implements the Paranoid Pirate queue described in the zguide: // http://zguide.zeromq.org/py:all#Robust-Reliable-Queuing-Paranoid-Pirate-Pattern // Related RFC: https://rfc.zeromq.org/spec:6/PPP @@ -58,25 +71,26 @@ const ( // 1. Shutdown signal, which signifies a normal termination of worker to provide // a fast path of worker removal type Broker struct { - name string - // NOTE: goroutines are caller of plugin, so frontend is Go side, - // backend is plugin side - frontend *goczmq.Sock + name string backend *goczmq.Sock bothPoller *goczmq.Poller backendPoller *goczmq.Poller + frontend chan [][]byte + recvChan chan *parcel + addressChan map[string]chan []byte + timeout chan string workers workerQueue freshWorkers chan []byte logger *logrus.Entry + stop chan int } // NewBroker returns a new *Broker. -func NewBroker(name, frontendAddr, backendAddr string) (*Broker, error) { - namedLogger := log.WithFields(logrus.Fields{"plugin": name}) - frontend, err := goczmq.NewRouter(frontendAddr) - if err != nil { - panic(err) - } +func NewBroker(name, backendAddr string) (*Broker, error) { + namedLogger := log.WithFields(logrus.Fields{ + "plugin": name, + "eaddr": backendAddr, + }) backend, err := goczmq.NewRouter(backendAddr) if err != nil { @@ -88,34 +102,28 @@ func NewBroker(name, frontendAddr, backendAddr string) (*Broker, error) { panic(err) } - bothPoller, err := goczmq.NewPoller(frontend, backend) - if err != nil { - panic(err) - } - return &Broker{ name: name, - frontend: frontend, backend: backend, - bothPoller: bothPoller, backendPoller: backendPoller, + frontend: make(chan [][]byte), + recvChan: make(chan *parcel), + addressChan: map[string]chan []byte{}, + timeout: make(chan string), workers: newWorkerQueue(), freshWorkers: make(chan []byte, 1), logger: namedLogger, + stop: make(chan int), }, nil } // Run kicks start the Broker and listens for requests. It blocks function // execution. func (lb *Broker) Run() { + lb.logger.Infof("Running zmq broker") heartbeatAt := time.Now().Add(HeartbeatInterval) for { - var sock *goczmq.Sock - if lb.workers.Len() == 0 { - sock = lb.backendPoller.Wait(heartbeatIntervalMS) - } else { - sock = lb.bothPoller.Wait(heartbeatIntervalMS) - } + sock := lb.backendPoller.Wait(heartbeatIntervalMS) switch sock { case lb.backend: @@ -137,18 +145,9 @@ func (lb *Broker) Run() { status := string(msg[0]) lb.handleWorkerStatus(&lb.workers, address, status) } else { - lb.frontend.SendMessage(msg) + lb.frontend <- msg lb.logger.Debugf("zmq/broker: plugin => server: %#x, %s\n", msg[0], msg) } - case lb.frontend: - frames, err := lb.frontend.RecvMessage() - if err != nil { - panic(err) - } - - frames = append([][]byte{lb.workers.Next()}, frames...) - lb.backend.SendMessage(frames) - lb.logger.Debugf("zmq/broker: server => plugin: %#x, %s\n", frames[0], frames) case nil: // do nothing default: @@ -169,6 +168,58 @@ func (lb *Broker) Run() { } } +func (lb *Broker) Channler() { + lb.logger.Infof("zmq channler running %p", lb) + for { + select { + case frames := <-lb.frontend: + lb.logger.Debugf("zmq/broker: zmq => channel %#x, %s\n", frames[0], frames) + // Dispacth back to the channel based on the zmq first frame + address := string(frames[0]) + respChan, ok := lb.addressChan[address] + if !ok { + lb.logger.Infof("zmq/broker: chan not found for worker %#x\n", address) + return + } + delete(lb.addressChan, address) + respChan <- frames[2] + case p := <-lb.recvChan: + // Save the chan and dispatch the message to zmq + addr := lb.workers.Next() + frames := append([][]byte{addr}, addr, []byte{}, p.frame) + address := string(addr) + lb.addressChan[address] = p.respChan + lb.backend.SendMessage(frames) + lb.logger.Debugf("zmq/broker: channel => zmq: %#x, %s\n", addr, frames) + go lb.setTimeout(address, HeartbeatInterval*HeartbeatLiveness) + case address := <-lb.timeout: + respChan, ok := lb.addressChan[address] + if !ok { + return + } + lb.logger.Infof("zmq/broker: chan time out for worker %#x\n", address) + delete(lb.addressChan, address) + respChan <- []byte{0} + case <-lb.stop: + break + } + } + lb.logger.Infof("zmq channler stopped %p!", lb) +} + +func (lb *Broker) RPC(requestChan chan chan []byte, in []byte) { + p := newParcel(in) + lb.recvChan <- p + go func() { + requestChan <- p.respChan + }() +} + +func (lb *Broker) setTimeout(address string, wait time.Duration) { + time.Sleep(wait) + lb.timeout <- address +} + func (lb *Broker) handleWorkerStatus(workers *workerQueue, address []byte, status string) { switch status { case Ready: diff --git a/pkg/server/plugin/zmq/broker_test.go b/pkg/server/plugin/zmq/broker_test.go index c31dcbdb2..6e7179a88 100644 --- a/pkg/server/plugin/zmq/broker_test.go +++ b/pkg/server/plugin/zmq/broker_test.go @@ -17,8 +17,6 @@ package zmq import ( - "reflect" - "strings" "testing" "time" @@ -52,94 +50,6 @@ func bytesArray(ss ...string) (bs [][]byte) { return } -func TestBrokerSock(t *testing.T) { - const ( - clientAddr = "inproc://client.test" - workerAddr = "inproc://worker.test" - ) - broker, err := NewBroker("", clientAddr, workerAddr) - if err != nil { - t.Fatalf("Failed to init broker: %v", err) - } - go broker.Run() - - w0 := workerSock(t, "w0", workerAddr) - defer w0.Destroy() - w0.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers - - c0 := clientSock(t, "c0", clientAddr) - defer c0.Destroy() - c0.SendMessage(bytesArray("simple job")) - - msg, _ := w0.RecvMessage() - expectedMsg := bytesArray(c0.Identity(), "simple job") - if !reflect.DeepEqual(msg, expectedMsg) { - t.Fatalf(`want %v, got %v`, expectedMsg, msg) - } - - w0.SendMessage(expectedMsg) - msg, _ = c0.RecvMessage() - expectedMsg = bytesArray("simple job") - if !reflect.DeepEqual(msg, expectedMsg) { - t.Fatalf("want %v, got %v", expectedMsg, msg) - } - - // multiple workers, multiple clients - w1 := workerSock(t, "w1", workerAddr) - defer w1.Destroy() - w1.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers - - c1 := clientSock(t, "c1", clientAddr) - defer c1.Destroy() - c2 := clientSock(t, "c2", clientAddr) - defer c2.Destroy() - - c0.SendMessage(bytesArray("job 0")) - c1.SendMessage(bytesArray("job 1")) - c2.SendMessage(bytesArray("job 2")) - - work0, _ := w0.RecvMessage() - if !strings.HasPrefix(string(work0[1]), "job") { - t.Fatalf(`want job *, got %s`, work0[1]) - } - - work1, _ := w1.RecvMessage() - if !strings.HasPrefix(string(work1[1]), "job") { - t.Fatalf(`want job, got %v`, work1[1]) - } - - // let w0 complete its work and receive new job - w0.SendMessage(work0) - work2, _ := w0.RecvMessage() - if !strings.HasPrefix(string(work2[1]), "job") { - t.Fatalf(`want job, got %v`, work2[1]) - } - - // complete all jobs - w0.SendMessage(work2) - w1.SendMessage(work1) - - // clients receive all jobs - resp0, err := c0.RecvMessage() - resp1, err := c1.RecvMessage() - resp2, err := c2.RecvMessage() - - resps := []string{ - string(resp0[0]), - string(resp1[0]), - string(resp2[0]), - } - if !reflect.DeepEqual(resps, []string{ - "job 0", - "job 1", - "job 2", - }) { - t.Fatalf(`want ["job 0", "job 1", "job 2"], got %v`, resps) - } -} - func TestWorker(t *testing.T) { Convey("Test workerQueue", t, func() { address1 := []byte("address1") @@ -206,14 +116,14 @@ func TestWorker(t *testing.T) { } func TestBrokerWorker(t *testing.T) { const ( - clientAddr = "inproc://server.test" workerAddr = "inproc://plugin.test" ) - broker, err := NewBroker("", clientAddr, workerAddr) + broker, err := NewBroker("", workerAddr) if err != nil { t.Fatalf("Failed to init broker: %v", err) } go broker.Run() + go broker.Channler() Convey("Test Broker with worker", t, func() { Convey("receive Ready signal will register the worker", func() { @@ -250,9 +160,45 @@ func TestBrokerWorker(t *testing.T) { So(broker.workers.Len(), ShouldEqual, 0) }) - Convey("send message from server to plugin", func() { - s := clientSock(t, "server", clientAddr) - defer s.Destroy() + Convey("broker RPC will timeout", func() { + w := workerSock(t, "worker", workerAddr) + w.SetRcvtimeo(heartbeatIntervalMS) + defer w.Destroy() + w.SendMessage(bytesArray(Ready)) + <-broker.freshWorkers + + So(broker.workers.Len(), ShouldEqual, 1) + + reqChan := make(chan chan []byte) + broker.RPC(reqChan, []byte(("from server"))) + respChan := <-reqChan + msg := <-respChan + So(msg, ShouldResemble, []byte{0}) + }) + + Convey("broker RPC recive worker reply", func() { + w := workerSock(t, "worker", workerAddr) + w.SetRcvtimeo(heartbeatIntervalMS) + defer w.Destroy() + w.SendMessage(bytesArray(Ready)) + <-broker.freshWorkers + So(broker.workers.Len(), ShouldEqual, 1) + + reqChan := make(chan chan []byte) + broker.RPC(reqChan, []byte(("from server"))) + respChan := <-reqChan + + msg, _ := w.RecvMessage() + So(len(msg), ShouldEqual, 3) + So(msg[2], ShouldResemble, []byte("from server")) + msg[2] = []byte("from worker") + w.SendMessage(msg) + + resp := <-respChan + So(resp, ShouldResemble, []byte("from worker")) + }) + + Convey("send message from server to multple plugin", func() { w := workerSock(t, "worker", workerAddr) w.SetRcvtimeo(heartbeatIntervalMS) defer w.Destroy() @@ -266,12 +212,18 @@ func TestBrokerWorker(t *testing.T) { So(broker.workers.Len(), ShouldEqual, 2) - s.SendMessage(bytesArray("from server")) - // Wait the poller to get the message - time.Sleep(HeartbeatInterval) - So(broker.workers.Len(), ShouldEqual, 1) + reqChan := make(chan chan []byte) + broker.RPC(reqChan, []byte(("from server"))) + respChan := <-reqChan + msg, _ := w2.RecvMessage() - So(msg[1], ShouldResemble, []byte("from server")) + So(len(msg), ShouldEqual, 3) + So(msg[2], ShouldResemble, []byte("from server")) + msg[2] = []byte("from worker2") + w2.SendMessage(msg) + + resp := <-respChan + So(resp, ShouldResemble, []byte("from worker2")) }) }) } diff --git a/pkg/server/plugin/zmq/rpc.go b/pkg/server/plugin/zmq/rpc.go index af714d269..e412c5cb3 100644 --- a/pkg/server/plugin/zmq/rpc.go +++ b/pkg/server/plugin/zmq/rpc.go @@ -17,6 +17,7 @@ package zmq import ( + "bytes" "encoding/json" "fmt" "runtime/debug" @@ -34,16 +35,6 @@ import ( const initRequestTimeout = 2000 -type zmqTransport struct { - state skyplugin.TransportState - name string - iaddr string // the internal addr used by goroutines to make request to plugin - broker *Broker - initHandler skyplugin.TransportInitHandler - logger *logrus.Entry - config skyconfig.Configuration -} - type request struct { Context context.Context Kind string @@ -109,6 +100,15 @@ func (req *request) MarshalJSON() ([]byte, error) { return json.Marshal(¶mReq) } +type zmqTransport struct { + state skyplugin.TransportState + name string + broker *Broker + initHandler skyplugin.TransportInitHandler + logger *logrus.Entry + config skyconfig.Configuration +} + func (p *zmqTransport) State() skyplugin.TransportState { return p.state } @@ -246,40 +246,22 @@ func (p *zmqTransport) ipc(req *request) (out []byte, err error) { return } }() - var ( - in []byte - reqSock *goczmq.Sock - ) - in, err = json.Marshal(req) + in, err := json.Marshal(req) if err != nil { return } - reqSock, err = goczmq.NewReq(p.iaddr) - if err != nil { - return - } - defer func() { - reqSock.Destroy() - }() - if req.Timeout > 0 { - reqSock.SetRcvtimeo(req.Timeout) - } - err = reqSock.SendMessage([][]byte{in}) - if err != nil { - return - } + reqChan := make(chan chan []byte) + p.broker.RPC(reqChan, in) + respChan := <-reqChan + // Broker will sent back a null byte if time out + msg := <-respChan - msg, err := reqSock.RecvMessage() - if err != nil { - return - } - - if len(msg) != 1 { - err = fmt.Errorf("malformed resp msg = %s", msg) + if bytes.Equal(msg, []byte{0}) { + err = fmt.Errorf("RPC time out") } else { - out = msg[0] + out = msg } return @@ -289,12 +271,8 @@ type zmqTransportFactory struct { } func (f zmqTransportFactory) Open(name string, args []string, config skyconfig.Configuration) (transport skyplugin.Transport) { - const internalAddrFmt = `inproc://%s` - - internalAddr := fmt.Sprintf(internalAddrFmt, name) externalAddr := args[0] - - broker, err := NewBroker(name, internalAddr, externalAddr) + broker, err := NewBroker(name, externalAddr) logger := log.WithFields(logrus.Fields{"plugin": name}) if err != nil { logger.Panicf("Failed to init broker for zmq transport: %v", err) @@ -303,16 +281,17 @@ func (f zmqTransportFactory) Open(name string, args []string, config skyconfig.C p := zmqTransport{ state: skyplugin.TransportStateUninitialized, name: name, - iaddr: internalAddr, broker: broker, logger: logger, config: config, } go func() { - logger.Infof("Running zmq broker:\niaddr = %s\neaddr = %s", internalAddr, externalAddr) broker.Run() }() + go func() { + broker.Channler() + }() return &p } From 64407d6a68263b6fb846f9a29e42e482aa71ab8e Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Fri, 14 Oct 2016 18:24:40 +0800 Subject: [PATCH 03/18] Add multiple message and plugin test --- pkg/server/plugin/zmq/broker_test.go | 71 +++++++++++++++++++++++++++- 1 file changed, 69 insertions(+), 2 deletions(-) diff --git a/pkg/server/plugin/zmq/broker_test.go b/pkg/server/plugin/zmq/broker_test.go index 6e7179a88..90e582993 100644 --- a/pkg/server/plugin/zmq/broker_test.go +++ b/pkg/server/plugin/zmq/broker_test.go @@ -17,6 +17,7 @@ package zmq import ( + "sync" "testing" "time" @@ -201,12 +202,18 @@ func TestBrokerWorker(t *testing.T) { Convey("send message from server to multple plugin", func() { w := workerSock(t, "worker", workerAddr) w.SetRcvtimeo(heartbeatIntervalMS) - defer w.Destroy() + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() w.SendMessage(bytesArray(Ready)) <-broker.freshWorkers w2 := workerSock(t, "worker2", workerAddr) w2.SetRcvtimeo(heartbeatIntervalMS) - defer w2.Destroy() + defer func() { + w2.SendMessage(bytesArray(Shutdown)) + w2.Destroy() + }() w2.SendMessage(bytesArray(Ready)) <-broker.freshWorkers @@ -225,5 +232,65 @@ func TestBrokerWorker(t *testing.T) { resp := <-respChan So(resp, ShouldResemble, []byte("from worker2")) }) + + Convey("send multiple message from server to multple plugin", func(c C) { + w := workerSock(t, "mworker", workerAddr) + w.SetRcvtimeo(heartbeatIntervalMS) + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() + w.SendMessage(bytesArray(Ready)) + <-broker.freshWorkers + w2 := workerSock(t, "mworker2", workerAddr) + w2.SetRcvtimeo(heartbeatIntervalMS) + defer func() { + w2.SendMessage(bytesArray(Shutdown)) + w2.Destroy() + }() + w2.SendMessage(bytesArray(Ready)) + <-broker.freshWorkers + + So(broker.workers.Len(), ShouldEqual, 2) + + reqChan := make(chan chan []byte) + broker.RPC(reqChan, []byte(("from server"))) + respChan := <-reqChan + + req2Chan := make(chan chan []byte) + broker.RPC(req2Chan, []byte(("from server"))) + resp2Chan := <-req2Chan + + go func() { + msg, _ := w.RecvMessage() + c.So(len(msg), ShouldEqual, 3) + c.So(msg[2], ShouldResemble, []byte("from server")) + msg[2] = []byte("from worker1") + w.SendMessage(msg) + }() + + go func() { + msg, _ := w2.RecvMessage() + c.So(len(msg), ShouldEqual, 3) + c.So(msg[2], ShouldResemble, []byte("from server")) + msg[2] = []byte("from worker2") + w2.SendMessage(msg) + }() + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + resp := <-respChan + c.So(resp, ShouldResemble, []byte("from worker2")) + }() + go func() { + defer wg.Done() + resp := <-resp2Chan + c.So(resp, ShouldResemble, []byte("from worker1")) + }() + wg.Wait() + + }) }) } From 017182d3cc67dbca3abdfbf039560163e8d149cc Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Fri, 14 Oct 2016 18:54:39 +0800 Subject: [PATCH 04/18] Use string address in go space do the conversation on the zmq boundary --- pkg/server/plugin/zmq/broker.go | 45 ++++++++++++++++------------ pkg/server/plugin/zmq/broker_test.go | 6 ++-- pkg/server/plugin/zmq/rpc.go | 2 +- 3 files changed, 30 insertions(+), 23 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index de4eb3444..2d89de564 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -17,7 +17,6 @@ package zmq import ( - "bytes" "errors" "fmt" "time" @@ -80,7 +79,7 @@ type Broker struct { addressChan map[string]chan []byte timeout chan string workers workerQueue - freshWorkers chan []byte + freshWorkers chan string logger *logrus.Entry stop chan int } @@ -111,7 +110,7 @@ func NewBroker(name, backendAddr string) (*Broker, error) { addressChan: map[string]chan []byte{}, timeout: make(chan string), workers: newWorkerQueue(), - freshWorkers: make(chan []byte, 1), + freshWorkers: make(chan string, 1), logger: namedLogger, stop: make(chan int), }, nil @@ -132,7 +131,7 @@ func (lb *Broker) Run() { panic(err) } - address := frames[0] + address := string(frames[0]) msg := frames[1:] tErr := lb.workers.Tick(newWorker(address)) if tErr != nil { @@ -157,7 +156,10 @@ func (lb *Broker) Run() { lb.logger.Debugf("zmq/broker: idle worker count %d\n", lb.workers.Len()) if heartbeatAt.Before(time.Now()) { for _, worker := range lb.workers.pworkers { - msg := [][]byte{worker.address, []byte(Heartbeat)} + msg := [][]byte{ + []byte(worker.address), + []byte(Heartbeat), + } lb.logger.Debugf("zmq/broker: server => plugin Heartbeat: %s\n", worker.address) lb.backend.SendMessage(msg) } @@ -185,9 +187,14 @@ func (lb *Broker) Channler() { respChan <- frames[2] case p := <-lb.recvChan: // Save the chan and dispatch the message to zmq - addr := lb.workers.Next() - frames := append([][]byte{addr}, addr, []byte{}, p.frame) - address := string(addr) + address := lb.workers.Next() + addr := []byte(address) + frames := [][]byte{ + addr, + addr, + []byte{}, + p.frame, + } lb.addressChan[address] = p.respChan lb.backend.SendMessage(frames) lb.logger.Debugf("zmq/broker: channel => zmq: %#x, %s\n", addr, frames) @@ -220,7 +227,7 @@ func (lb *Broker) setTimeout(address string, wait time.Duration) { lb.timeout <- address } -func (lb *Broker) handleWorkerStatus(workers *workerQueue, address []byte, status string) { +func (lb *Broker) handleWorkerStatus(workers *workerQueue, address string, status string) { switch status { case Ready: log.Infof("zmq/broker: ready worker = %s", address) @@ -237,11 +244,11 @@ func (lb *Broker) handleWorkerStatus(workers *workerQueue, address []byte, statu } type pworker struct { - address []byte + address string expiry time.Time } -func newWorker(address []byte) pworker { +func newWorker(address string) pworker { return pworker{ address, time.Now().Add(HeartbeatLiveness * HeartbeatInterval), @@ -267,7 +274,7 @@ func (q workerQueue) Len() int { return len(q.pworkers) } -func (q *workerQueue) Next() []byte { +func (q *workerQueue) Next() string { workers := q.pworkers worker := workers[len(workers)-1] q.pworkers = workers[:len(workers)-1] @@ -275,7 +282,7 @@ func (q *workerQueue) Next() []byte { } func (q *workerQueue) Add(worker pworker) { - q.addresses[string(worker.address)] = true + q.addresses[worker.address] = true err := q.Tick(worker) if err == nil { return @@ -283,13 +290,13 @@ func (q *workerQueue) Add(worker pworker) { } func (q *workerQueue) Tick(worker pworker) error { - if _, ok := q.addresses[string(worker.address)]; !ok { + if _, ok := q.addresses[worker.address]; !ok { return errors.New(fmt.Sprintf("zmq/broker: Ticking non-registered worker = %s", worker.address)) } workers := q.pworkers for i, w := range workers { - if bytes.Equal(w.address, worker.address) { + if w.address == worker.address { q.pworkers = append(append(workers[:i], workers[i+1:]...), worker) return nil } @@ -308,17 +315,17 @@ func (q *workerQueue) Purge() { break } q.pworkers = workers[i+1:] - delete(q.addresses, string(w.address)) + delete(q.addresses, w.address) log.Infof("zmq/broker: disconnected worker = %s", w.address) } } -func (q *workerQueue) Remove(address []byte) { - delete(q.addresses, string(address)) +func (q *workerQueue) Remove(address string) { + delete(q.addresses, address) workers := q.pworkers for i, w := range workers { - if bytes.Equal(w.address, address) { + if w.address == address { q.pworkers = append(workers[:i], workers[i+1:]...) break } diff --git a/pkg/server/plugin/zmq/broker_test.go b/pkg/server/plugin/zmq/broker_test.go index 90e582993..1039b439d 100644 --- a/pkg/server/plugin/zmq/broker_test.go +++ b/pkg/server/plugin/zmq/broker_test.go @@ -53,9 +53,9 @@ func bytesArray(ss ...string) (bs [][]byte) { func TestWorker(t *testing.T) { Convey("Test workerQueue", t, func() { - address1 := []byte("address1") - address2 := []byte("address2") - address3 := []byte("address3") + address1 := "address1" + address2 := "address2" + address3 := "address3" Convey("Add and pick a worker", func() { q := newWorkerQueue() diff --git a/pkg/server/plugin/zmq/rpc.go b/pkg/server/plugin/zmq/rpc.go index e412c5cb3..630053584 100644 --- a/pkg/server/plugin/zmq/rpc.go +++ b/pkg/server/plugin/zmq/rpc.go @@ -135,7 +135,7 @@ func (p *zmqTransport) RequestInit() { continue } - p.logger.Debugf("zmq transport got fresh worker %s", string(address)) + p.logger.Debugf("zmq transport got fresh worker %s", address) // TODO: Only send init to the new address. For now, we let // the broker decide. From 86c19084a032d9bd94817738ae8b6fdd85b83fb3 Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Fri, 14 Oct 2016 19:01:35 +0800 Subject: [PATCH 05/18] Remove heartbeat logs --- pkg/server/plugin/zmq/broker.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index 2d89de564..810ead9fd 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -153,14 +153,12 @@ func (lb *Broker) Run() { panic("zmq/broker: received unknown socket") } - lb.logger.Debugf("zmq/broker: idle worker count %d\n", lb.workers.Len()) if heartbeatAt.Before(time.Now()) { for _, worker := range lb.workers.pworkers { msg := [][]byte{ []byte(worker.address), []byte(Heartbeat), } - lb.logger.Debugf("zmq/broker: server => plugin Heartbeat: %s\n", worker.address) lb.backend.SendMessage(msg) } heartbeatAt = time.Now().Add(HeartbeatInterval) From 303b01ee09dda5d9709795fbea8604097f4851e4 Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Mon, 17 Oct 2016 11:30:36 +0800 Subject: [PATCH 06/18] Properly break the channeler on exit --- pkg/server/plugin/zmq/broker.go | 15 ++++---- pkg/server/plugin/zmq/broker_test.go | 57 ++++++++++++++++++++++------ 2 files changed, 54 insertions(+), 18 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index 810ead9fd..9817b3678 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -169,7 +169,8 @@ func (lb *Broker) Run() { } func (lb *Broker) Channler() { - lb.logger.Infof("zmq channler running %p", lb) + lb.logger.Infof("zmq channler running %p\n", lb) +ChannlerLoop: for { select { case frames := <-lb.frontend: @@ -178,8 +179,8 @@ func (lb *Broker) Channler() { address := string(frames[0]) respChan, ok := lb.addressChan[address] if !ok { - lb.logger.Infof("zmq/broker: chan not found for worker %#x\n", address) - return + lb.logger.Infof("zmq/broker: chan not found for worker %s\n", address) + break } delete(lb.addressChan, address) respChan <- frames[2] @@ -200,16 +201,16 @@ func (lb *Broker) Channler() { case address := <-lb.timeout: respChan, ok := lb.addressChan[address] if !ok { - return + break } - lb.logger.Infof("zmq/broker: chan time out for worker %#x\n", address) + lb.logger.Infof("zmq/broker: chan time out for worker %s\n", address) delete(lb.addressChan, address) respChan <- []byte{0} case <-lb.stop: - break + break ChannlerLoop } } - lb.logger.Infof("zmq channler stopped %p!", lb) + lb.logger.Infof("zmq channler stopped %p!\n", lb) } func (lb *Broker) RPC(requestChan chan chan []byte, in []byte) { diff --git a/pkg/server/plugin/zmq/broker_test.go b/pkg/server/plugin/zmq/broker_test.go index 1039b439d..49e7902c1 100644 --- a/pkg/server/plugin/zmq/broker_test.go +++ b/pkg/server/plugin/zmq/broker_test.go @@ -119,7 +119,7 @@ func TestBrokerWorker(t *testing.T) { const ( workerAddr = "inproc://plugin.test" ) - broker, err := NewBroker("", workerAddr) + broker, err := NewBroker("test", workerAddr) if err != nil { t.Fatalf("Failed to init broker: %v", err) } @@ -129,7 +129,10 @@ func TestBrokerWorker(t *testing.T) { Convey("Test Broker with worker", t, func() { Convey("receive Ready signal will register the worker", func() { w := workerSock(t, "ready", workerAddr) - defer w.Destroy() + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() w.SendMessage(bytesArray(Ready)) <-broker.freshWorkers @@ -139,11 +142,17 @@ func TestBrokerWorker(t *testing.T) { Convey("receive multiple Ready signal will register all workers", func() { w1 := workerSock(t, "ready1", workerAddr) - defer w1.Destroy() + defer func() { + w1.SendMessage(bytesArray(Shutdown)) + w1.Destroy() + }() w1.SendMessage(bytesArray(Ready)) <-broker.freshWorkers w2 := workerSock(t, "ready2", workerAddr) - defer w2.Destroy() + defer func() { + w2.SendMessage(bytesArray(Shutdown)) + w2.Destroy() + }() w2.SendMessage(bytesArray(Ready)) <-broker.freshWorkers @@ -152,19 +161,41 @@ func TestBrokerWorker(t *testing.T) { w2.SendMessage(bytesArray(Shutdown)) }) - Convey("reveice Heartbeat without Reay will not register the worker", func() { + Convey("reveice Heartbeat without Ready will not register the worker", func() { w := workerSock(t, "heartbeat", workerAddr) - defer w.Destroy() + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() w.SendMessage(bytesArray(Heartbeat)) // Wait the poller to get the message time.Sleep(HeartbeatInterval) So(broker.workers.Len(), ShouldEqual, 0) }) + Convey("reveice message without Reay will be ignored", func() { + w := workerSock(t, "unregistered", workerAddr) + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() + w.SendMessage([][]byte{ + []byte("unregistered"), + []byte{0}, + []byte("Message to be ignored"), + }) + // Wait the poller to get the message + time.Sleep(HeartbeatInterval) + So(broker.workers.Len(), ShouldEqual, 0) + }) + Convey("broker RPC will timeout", func() { - w := workerSock(t, "worker", workerAddr) + w := workerSock(t, "timeout", workerAddr) w.SetRcvtimeo(heartbeatIntervalMS) - defer w.Destroy() + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() w.SendMessage(bytesArray(Ready)) <-broker.freshWorkers @@ -180,7 +211,10 @@ func TestBrokerWorker(t *testing.T) { Convey("broker RPC recive worker reply", func() { w := workerSock(t, "worker", workerAddr) w.SetRcvtimeo(heartbeatIntervalMS) - defer w.Destroy() + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() w.SendMessage(bytesArray(Ready)) <-broker.freshWorkers So(broker.workers.Len(), ShouldEqual, 1) @@ -200,7 +234,7 @@ func TestBrokerWorker(t *testing.T) { }) Convey("send message from server to multple plugin", func() { - w := workerSock(t, "worker", workerAddr) + w := workerSock(t, "worker1", workerAddr) w.SetRcvtimeo(heartbeatIntervalMS) defer func() { w.SendMessage(bytesArray(Shutdown)) @@ -234,7 +268,7 @@ func TestBrokerWorker(t *testing.T) { }) Convey("send multiple message from server to multple plugin", func(c C) { - w := workerSock(t, "mworker", workerAddr) + w := workerSock(t, "mworker1", workerAddr) w.SetRcvtimeo(heartbeatIntervalMS) defer func() { w.SendMessage(bytesArray(Shutdown)) @@ -293,4 +327,5 @@ func TestBrokerWorker(t *testing.T) { }) }) + broker.stop <- 1 } From f1d1d558f5d61ae4559f01a435506beeec8c2f72 Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Mon, 17 Oct 2016 19:06:53 +0800 Subject: [PATCH 07/18] Add RPC timeout and retry on no worker ready yet This will properly handle worker crash on init by timeout. And in case of all workers crash, the channeler will not crash. Just all RPC will timeout. Also add mutex lock to make the workerQueue goroutine safe. --- pkg/server/plugin/zmq/broker.go | 32 +++++++++++++++++++++ pkg/server/plugin/zmq/broker_test.go | 42 ++++++++++++++++++++++++++-- pkg/server/plugin/zmq/rpc.go | 7 +++-- 3 files changed, 75 insertions(+), 6 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index 9817b3678..5afb954b0 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -19,6 +19,7 @@ package zmq import ( "errors" "fmt" + "sync" "time" "github.com/Sirupsen/logrus" @@ -53,12 +54,14 @@ const ( type parcel struct { respChan chan []byte frame []byte + retry int } func newParcel(frame []byte) *parcel { return &parcel{ respChan: make(chan []byte), frame: frame, + retry: 0, } } @@ -186,7 +189,22 @@ ChannlerLoop: respChan <- frames[2] case p := <-lb.recvChan: // Save the chan and dispatch the message to zmq + // If current no worker ready, will retry after HeartbeatInterval. + // Retry for HeartbeatLiveness tine address := lb.workers.Next() + if address == "" { + if p.retry < HeartbeatLiveness { + p.retry += 1 + lb.logger.Infof("zmq/broker: no worker avaliable, retry %d...\n", p.retry) + go func() { + time.Sleep(HeartbeatInterval) + lb.recvChan <- p + }() + break + } + lb.logger.Infof("zmq/broker: no worker avaliable, timeout.\n") + p.respChan <- []byte{0} + } addr := []byte(address) frames := [][]byte{ addr, @@ -260,12 +278,15 @@ func newWorker(address string) pworker { type workerQueue struct { pworkers []pworker addresses map[string]bool + mu *sync.Mutex } func newWorkerQueue() workerQueue { + mu := new(sync.Mutex) return workerQueue{ []pworker{}, map[string]bool{}, + mu, } } @@ -274,6 +295,11 @@ func (q workerQueue) Len() int { } func (q *workerQueue) Next() string { + q.mu.Lock() + defer q.mu.Unlock() + if q.Len() == 0 { + return "" + } workers := q.pworkers worker := workers[len(workers)-1] q.pworkers = workers[:len(workers)-1] @@ -289,6 +315,8 @@ func (q *workerQueue) Add(worker pworker) { } func (q *workerQueue) Tick(worker pworker) error { + q.mu.Lock() + defer q.mu.Unlock() if _, ok := q.addresses[worker.address]; !ok { return errors.New(fmt.Sprintf("zmq/broker: Ticking non-registered worker = %s", worker.address)) } @@ -306,6 +334,8 @@ func (q *workerQueue) Tick(worker pworker) error { } func (q *workerQueue) Purge() { + q.mu.Lock() + defer q.mu.Unlock() workers := q.pworkers now := time.Now() @@ -320,6 +350,8 @@ func (q *workerQueue) Purge() { } func (q *workerQueue) Remove(address string) { + q.mu.Lock() + defer q.mu.Unlock() delete(q.addresses, address) workers := q.pworkers diff --git a/pkg/server/plugin/zmq/broker_test.go b/pkg/server/plugin/zmq/broker_test.go index 49e7902c1..8387b85ec 100644 --- a/pkg/server/plugin/zmq/broker_test.go +++ b/pkg/server/plugin/zmq/broker_test.go @@ -126,7 +126,7 @@ func TestBrokerWorker(t *testing.T) { go broker.Run() go broker.Channler() - Convey("Test Broker with worker", t, func() { + Convey("Test Broker", t, func() { Convey("receive Ready signal will register the worker", func() { w := workerSock(t, "ready", workerAddr) defer func() { @@ -173,7 +173,7 @@ func TestBrokerWorker(t *testing.T) { So(broker.workers.Len(), ShouldEqual, 0) }) - Convey("reveice message without Reay will be ignored", func() { + Convey("reveice worker message without Reay will be ignored", func() { w := workerSock(t, "unregistered", workerAddr) defer func() { w.SendMessage(bytesArray(Shutdown)) @@ -189,7 +189,7 @@ func TestBrokerWorker(t *testing.T) { So(broker.workers.Len(), ShouldEqual, 0) }) - Convey("broker RPC will timeout", func() { + Convey("receive RPC will timeout", func() { w := workerSock(t, "timeout", workerAddr) w.SetRcvtimeo(heartbeatIntervalMS) defer func() { @@ -208,6 +208,42 @@ func TestBrokerWorker(t *testing.T) { So(msg, ShouldResemble, []byte{0}) }) + Convey("recive RPC without Ready worker will wait for Heartbeat Liveness time", func() { + reqChan := make(chan chan []byte) + timeout := time.Now().Add(HeartbeatInterval * HeartbeatLiveness) + broker.RPC(reqChan, []byte(("from server"))) + respChan := <-reqChan + resp := <-respChan + So(resp, ShouldResemble, []byte{0}) + So(time.Now(), ShouldHappenAfter, timeout) + }) + + Convey("worker after recive RPC and before timeout will got the message", func() { + reqChan := make(chan chan []byte) + broker.RPC(reqChan, []byte(("from server"))) + respChan := <-reqChan + time.Sleep(HeartbeatInterval) + + w := workerSock(t, "lateworker", workerAddr) + w.SetRcvtimeo(heartbeatIntervalMS) + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() + w.SendMessage(bytesArray(Ready)) + <-broker.freshWorkers + So(broker.workers.Len(), ShouldEqual, 1) + + msg, _ := w.RecvMessage() + So(len(msg), ShouldEqual, 3) + So(msg[2], ShouldResemble, []byte("from server")) + msg[2] = []byte("from worker") + w.SendMessage(msg) + + resp := <-respChan + So(resp, ShouldResemble, []byte("from worker")) + }) + Convey("broker RPC recive worker reply", func() { w := workerSock(t, "worker", workerAddr) w.SetRcvtimeo(heartbeatIntervalMS) diff --git a/pkg/server/plugin/zmq/rpc.go b/pkg/server/plugin/zmq/rpc.go index 630053584..3206c42c6 100644 --- a/pkg/server/plugin/zmq/rpc.go +++ b/pkg/server/plugin/zmq/rpc.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "runtime/debug" + "time" "github.com/Sirupsen/logrus" @@ -33,14 +34,13 @@ import ( "golang.org/x/net/context" ) -const initRequestTimeout = 2000 +const initRequestTimeout = time.Second * 2 type request struct { Context context.Context Kind string Name string Param interface{} - Timeout int // timeout in millisecond } type hookRequest struct { @@ -154,13 +154,14 @@ func (p *zmqTransport) RunInit() (out []byte, err error) { param := struct { Config skyconfig.Configuration `json:"config"` }{p.config} - req := request{Kind: "init", Param: param, Timeout: initRequestTimeout} + req := request{Kind: "init", Param: param} for { out, err = p.ipc(&req) if err == nil { break } p.logger.WithField("err", err).Warnf(`zmq/rpc: Unable to send init request to plugin "%s". Retrying...`, p.name) + time.Sleep(initRequestTimeout) } return } From 79ae1e37d8cf8640ca924430b91d337a759a09e7 Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Wed, 19 Oct 2016 13:34:53 +0800 Subject: [PATCH 08/18] Fix goroutine halt on multiple worker land with init --- pkg/server/plugin/zmq/broker.go | 47 +++++++++++++------------ pkg/server/plugin/zmq/broker_test.go | 52 +++++++++++++--------------- pkg/server/plugin/zmq/rpc.go | 27 +++++---------- 3 files changed, 58 insertions(+), 68 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index 5afb954b0..e78bfc1ee 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -82,7 +82,6 @@ type Broker struct { addressChan map[string]chan []byte timeout chan string workers workerQueue - freshWorkers chan string logger *logrus.Entry stop chan int } @@ -108,12 +107,11 @@ func NewBroker(name, backendAddr string) (*Broker, error) { name: name, backend: backend, backendPoller: backendPoller, - frontend: make(chan [][]byte), - recvChan: make(chan *parcel), + frontend: make(chan [][]byte, 10), + recvChan: make(chan *parcel, 10), addressChan: map[string]chan []byte{}, timeout: make(chan string), workers: newWorkerQueue(), - freshWorkers: make(chan string, 1), logger: namedLogger, stop: make(chan int), }, nil @@ -122,10 +120,10 @@ func NewBroker(name, backendAddr string) (*Broker, error) { // Run kicks start the Broker and listens for requests. It blocks function // execution. func (lb *Broker) Run() { - lb.logger.Infof("Running zmq broker") heartbeatAt := time.Now().Add(HeartbeatInterval) for { sock := lb.backendPoller.Wait(heartbeatIntervalMS) + lb.workers.Lock() switch sock { case lb.backend: @@ -145,7 +143,7 @@ func (lb *Broker) Run() { } if len(msg) == 1 { status := string(msg[0]) - lb.handleWorkerStatus(&lb.workers, address, status) + lb.handleWorkerStatus(address, status) } else { lb.frontend <- msg lb.logger.Debugf("zmq/broker: plugin => server: %#x, %s\n", msg[0], msg) @@ -168,6 +166,7 @@ func (lb *Broker) Run() { } lb.workers.Purge() + lb.workers.Unlock() } } @@ -196,14 +195,15 @@ ChannlerLoop: if p.retry < HeartbeatLiveness { p.retry += 1 lb.logger.Infof("zmq/broker: no worker avaliable, retry %d...\n", p.retry) - go func() { + go func(p2 *parcel) { time.Sleep(HeartbeatInterval) - lb.recvChan <- p - }() + lb.recvChan <- p2 + }(p) break } lb.logger.Infof("zmq/broker: no worker avaliable, timeout.\n") p.respChan <- []byte{0} + break } addr := []byte(address) frames := [][]byte{ @@ -244,16 +244,15 @@ func (lb *Broker) setTimeout(address string, wait time.Duration) { lb.timeout <- address } -func (lb *Broker) handleWorkerStatus(workers *workerQueue, address string, status string) { +func (lb *Broker) handleWorkerStatus(address string, status string) { switch status { case Ready: log.Infof("zmq/broker: ready worker = %s", address) - workers.Add(newWorker(address)) - lb.freshWorkers <- address + lb.workers.Add(newWorker(address)) case Heartbeat: // no-op case Shutdown: - workers.Remove(address) + lb.workers.Remove(address) log.Infof("zmq/broker: shutdown of worker = %s", address) default: log.Errorf("zmq/broker: invalid status from worker = %s: %s", address, status) @@ -282,22 +281,32 @@ type workerQueue struct { } func newWorkerQueue() workerQueue { - mu := new(sync.Mutex) + mu := &sync.Mutex{} return workerQueue{ []pworker{}, map[string]bool{}, mu, } } +func (q *workerQueue) Lock() { + q.mu.Lock() +} -func (q workerQueue) Len() int { +func (q *workerQueue) Unlock() { + q.mu.Unlock() +} + +func (q *workerQueue) Len() int { + q.mu.Lock() + defer q.mu.Unlock() return len(q.pworkers) } func (q *workerQueue) Next() string { q.mu.Lock() defer q.mu.Unlock() - if q.Len() == 0 { + cnt := len(q.pworkers) + if cnt == 0 { return "" } workers := q.pworkers @@ -315,8 +324,6 @@ func (q *workerQueue) Add(worker pworker) { } func (q *workerQueue) Tick(worker pworker) error { - q.mu.Lock() - defer q.mu.Unlock() if _, ok := q.addresses[worker.address]; !ok { return errors.New(fmt.Sprintf("zmq/broker: Ticking non-registered worker = %s", worker.address)) } @@ -334,8 +341,6 @@ func (q *workerQueue) Tick(worker pworker) error { } func (q *workerQueue) Purge() { - q.mu.Lock() - defer q.mu.Unlock() workers := q.pworkers now := time.Now() @@ -350,8 +355,6 @@ func (q *workerQueue) Purge() { } func (q *workerQueue) Remove(address string) { - q.mu.Lock() - defer q.mu.Unlock() delete(q.addresses, address) workers := q.pworkers diff --git a/pkg/server/plugin/zmq/broker_test.go b/pkg/server/plugin/zmq/broker_test.go index 8387b85ec..f8d4a7631 100644 --- a/pkg/server/plugin/zmq/broker_test.go +++ b/pkg/server/plugin/zmq/broker_test.go @@ -44,6 +44,17 @@ func clientSock(t *testing.T, id string, addr string) *goczmq.Sock { return sock } +func recvNonControlFrame(w *goczmq.Sock) [][]byte { + var msg [][]byte + for { + msg, _ = w.RecvMessage() + if len(msg) != 1 { + break + } + } + return msg +} + func bytesArray(ss ...string) (bs [][]byte) { for _, s := range ss { bs = append(bs, []byte(s)) @@ -134,7 +145,7 @@ func TestBrokerWorker(t *testing.T) { w.Destroy() }() w.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers + time.Sleep(HeartbeatInterval) So(broker.workers.Len(), ShouldEqual, 1) w.SendMessage(bytesArray(Shutdown)) @@ -147,14 +158,13 @@ func TestBrokerWorker(t *testing.T) { w1.Destroy() }() w1.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers w2 := workerSock(t, "ready2", workerAddr) defer func() { w2.SendMessage(bytesArray(Shutdown)) w2.Destroy() }() w2.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers + time.Sleep(HeartbeatInterval) So(broker.workers.Len(), ShouldEqual, 2) w1.SendMessage(bytesArray(Shutdown)) @@ -197,7 +207,7 @@ func TestBrokerWorker(t *testing.T) { w.Destroy() }() w.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers + time.Sleep(HeartbeatInterval) So(broker.workers.Len(), ShouldEqual, 1) @@ -225,16 +235,14 @@ func TestBrokerWorker(t *testing.T) { time.Sleep(HeartbeatInterval) w := workerSock(t, "lateworker", workerAddr) - w.SetRcvtimeo(heartbeatIntervalMS) + w.SetRcvtimeo(heartbeatIntervalMS * 2) defer func() { w.SendMessage(bytesArray(Shutdown)) w.Destroy() }() w.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers - So(broker.workers.Len(), ShouldEqual, 1) - msg, _ := w.RecvMessage() + msg := recvNonControlFrame(w) So(len(msg), ShouldEqual, 3) So(msg[2], ShouldResemble, []byte("from server")) msg[2] = []byte("from worker") @@ -246,20 +254,18 @@ func TestBrokerWorker(t *testing.T) { Convey("broker RPC recive worker reply", func() { w := workerSock(t, "worker", workerAddr) - w.SetRcvtimeo(heartbeatIntervalMS) + w.SetRcvtimeo(heartbeatIntervalMS * 2) defer func() { w.SendMessage(bytesArray(Shutdown)) w.Destroy() }() w.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers - So(broker.workers.Len(), ShouldEqual, 1) reqChan := make(chan chan []byte) broker.RPC(reqChan, []byte(("from server"))) respChan := <-reqChan - msg, _ := w.RecvMessage() + msg := recvNonControlFrame(w) So(len(msg), ShouldEqual, 3) So(msg[2], ShouldResemble, []byte("from server")) msg[2] = []byte("from worker") @@ -271,29 +277,25 @@ func TestBrokerWorker(t *testing.T) { Convey("send message from server to multple plugin", func() { w := workerSock(t, "worker1", workerAddr) - w.SetRcvtimeo(heartbeatIntervalMS) + w.SetRcvtimeo(heartbeatIntervalMS * 2) defer func() { w.SendMessage(bytesArray(Shutdown)) w.Destroy() }() w.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers w2 := workerSock(t, "worker2", workerAddr) - w2.SetRcvtimeo(heartbeatIntervalMS) + w2.SetRcvtimeo(heartbeatIntervalMS * 2) defer func() { w2.SendMessage(bytesArray(Shutdown)) w2.Destroy() }() w2.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers - - So(broker.workers.Len(), ShouldEqual, 2) reqChan := make(chan chan []byte) broker.RPC(reqChan, []byte(("from server"))) respChan := <-reqChan - msg, _ := w2.RecvMessage() + msg := recvNonControlFrame(w2) So(len(msg), ShouldEqual, 3) So(msg[2], ShouldResemble, []byte("from server")) msg[2] = []byte("from worker2") @@ -305,23 +307,19 @@ func TestBrokerWorker(t *testing.T) { Convey("send multiple message from server to multple plugin", func(c C) { w := workerSock(t, "mworker1", workerAddr) - w.SetRcvtimeo(heartbeatIntervalMS) + w.SetRcvtimeo(heartbeatIntervalMS * 2) defer func() { w.SendMessage(bytesArray(Shutdown)) w.Destroy() }() w.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers w2 := workerSock(t, "mworker2", workerAddr) - w2.SetRcvtimeo(heartbeatIntervalMS) + w2.SetRcvtimeo(heartbeatIntervalMS * 2) defer func() { w2.SendMessage(bytesArray(Shutdown)) w2.Destroy() }() w2.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers - - So(broker.workers.Len(), ShouldEqual, 2) reqChan := make(chan chan []byte) broker.RPC(reqChan, []byte(("from server"))) @@ -332,7 +330,7 @@ func TestBrokerWorker(t *testing.T) { resp2Chan := <-req2Chan go func() { - msg, _ := w.RecvMessage() + msg := recvNonControlFrame(w) c.So(len(msg), ShouldEqual, 3) c.So(msg[2], ShouldResemble, []byte("from server")) msg[2] = []byte("from worker1") @@ -340,7 +338,7 @@ func TestBrokerWorker(t *testing.T) { }() go func() { - msg, _ := w2.RecvMessage() + msg := recvNonControlFrame(w2) c.So(len(msg), ShouldEqual, 3) c.So(msg[2], ShouldResemble, []byte("from server")) msg[2] = []byte("from worker2") diff --git a/pkg/server/plugin/zmq/rpc.go b/pkg/server/plugin/zmq/rpc.go index 3206c42c6..92db6c2f1 100644 --- a/pkg/server/plugin/zmq/rpc.go +++ b/pkg/server/plugin/zmq/rpc.go @@ -125,21 +125,16 @@ func (p *zmqTransport) setState(state skyplugin.TransportState) { } } +// RequestInit is expected to run in separate gorountine and called once to +// set it internal state with coordinate with broker. func (p *zmqTransport) RequestInit() { for { - address := <-p.broker.freshWorkers - - if p.state != skyplugin.TransportStateUninitialized { - // Although the plugin is only initialized once, we need - // to clear the channel buffer so that broker doesn't get stuck + out, err := p.RunInit() + if err != nil { + p.logger.WithField("err", err). + Warnf(`zmq/rpc: Unable to send init request to plugin "%s". Retrying...`, p.name) continue } - - p.logger.Debugf("zmq transport got fresh worker %s", address) - - // TODO: Only send init to the new address. For now, we let - // the broker decide. - out, err := p.RunInit() if p.initHandler != nil { handlerError := p.initHandler(out, err) if err != nil || handlerError != nil { @@ -147,6 +142,7 @@ func (p *zmqTransport) RequestInit() { } } p.setState(skyplugin.TransportStateReady) + break } } @@ -155,14 +151,7 @@ func (p *zmqTransport) RunInit() (out []byte, err error) { Config skyconfig.Configuration `json:"config"` }{p.config} req := request{Kind: "init", Param: param} - for { - out, err = p.ipc(&req) - if err == nil { - break - } - p.logger.WithField("err", err).Warnf(`zmq/rpc: Unable to send init request to plugin "%s". Retrying...`, p.name) - time.Sleep(initRequestTimeout) - } + out, err = p.ipc(&req) return } From c46cf7e5c1213c5222331a95fb66b8926eb346e9 Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Tue, 18 Oct 2016 14:20:05 +0800 Subject: [PATCH 09/18] Correct spelling Channler -> Channeler --- pkg/server/plugin/zmq/broker.go | 6 +++--- pkg/server/plugin/zmq/broker_test.go | 2 +- pkg/server/plugin/zmq/rpc.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index e78bfc1ee..e866aa0dd 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -170,9 +170,9 @@ func (lb *Broker) Run() { } } -func (lb *Broker) Channler() { +func (lb *Broker) Channeler() { lb.logger.Infof("zmq channler running %p\n", lb) -ChannlerLoop: +ChannelerLoop: for { select { case frames := <-lb.frontend: @@ -225,7 +225,7 @@ ChannlerLoop: delete(lb.addressChan, address) respChan <- []byte{0} case <-lb.stop: - break ChannlerLoop + break ChannelerLoop } } lb.logger.Infof("zmq channler stopped %p!\n", lb) diff --git a/pkg/server/plugin/zmq/broker_test.go b/pkg/server/plugin/zmq/broker_test.go index f8d4a7631..766be049c 100644 --- a/pkg/server/plugin/zmq/broker_test.go +++ b/pkg/server/plugin/zmq/broker_test.go @@ -135,7 +135,7 @@ func TestBrokerWorker(t *testing.T) { t.Fatalf("Failed to init broker: %v", err) } go broker.Run() - go broker.Channler() + go broker.Channeler() Convey("Test Broker", t, func() { Convey("receive Ready signal will register the worker", func() { diff --git a/pkg/server/plugin/zmq/rpc.go b/pkg/server/plugin/zmq/rpc.go index 92db6c2f1..75f63af77 100644 --- a/pkg/server/plugin/zmq/rpc.go +++ b/pkg/server/plugin/zmq/rpc.go @@ -280,7 +280,7 @@ func (f zmqTransportFactory) Open(name string, args []string, config skyconfig.C broker.Run() }() go func() { - broker.Channler() + broker.Channeler() }() return &p From e4d5cec2572b8180728bf4174d3b657580c40a14 Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Tue, 18 Oct 2016 14:52:22 +0800 Subject: [PATCH 10/18] Comment on workerQueue behaviour --- pkg/server/plugin/zmq/broker.go | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index e866aa0dd..7c0c7796f 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -65,9 +65,9 @@ func newParcel(frame []byte) *parcel { } } -// Broker implements the Paranoid Pirate queue described in the zguide: -// http://zguide.zeromq.org/py:all#Robust-Reliable-Queuing-Paranoid-Pirate-Pattern +// Broker implements the Paranoid Pirate queue described as follow: // Related RFC: https://rfc.zeromq.org/spec:6/PPP +// refs: http://zguide.zeromq.org/py:all#Robust-Reliable-Queuing-Paranoid-Pirate-Pattern // with the addition of: // // 1. Shutdown signal, which signifies a normal termination of worker to provide @@ -117,8 +117,7 @@ func NewBroker(name, backendAddr string) (*Broker, error) { }, nil } -// Run kicks start the Broker and listens for requests. It blocks function -// execution. +// Run the Broker and listens for zmq requests. func (lb *Broker) Run() { heartbeatAt := time.Now().Add(HeartbeatInterval) for { @@ -170,6 +169,8 @@ func (lb *Broker) Run() { } } +// Channeler accept message from RPC and dispatch to zmq if avalible. +// It retry and timeout the request if the zmq worker is not yet avalible. func (lb *Broker) Channeler() { lb.logger.Infof("zmq channler running %p\n", lb) ChannelerLoop: @@ -272,8 +273,17 @@ func newWorker(address string) pworker { } // workerQueue is a last tick fist out queue. -// A worker need to register itself using Add before it can tick. -// Ticking of an non-registered worker will be no-ops. +// +// Worker is expect to register itself on ready. Tick itself when it is +// avalible. The most recently Tick worker will got the job. +// A worker do not Tick itself within the expiry will regards as disconnected +// and requires to Add itself again to become avaliable. +// +// workerQueue is not goroutine safe. To use it safely across goroutine. +// Please use the Lock/Unlock interace before manupliate the queue item via +// methods like Add/Tick/Purge. +// Comsuming method Next is the only method will acquire the mutex lock +// by itself. type workerQueue struct { pworkers []pworker addresses map[string]bool @@ -302,6 +312,9 @@ func (q *workerQueue) Len() int { return len(q.pworkers) } +// Next will pop the next avaliable worker, and the worker will not avalible +// until it Tick back to the workerQueue again. +// This worker comsuing method will acquire mutex lock. func (q *workerQueue) Next() string { q.mu.Lock() defer q.mu.Unlock() @@ -315,6 +328,8 @@ func (q *workerQueue) Next() string { return worker.address } +// Add will register the worker as live worker and call Tick to make itself to +// the next avaliable worker. func (q *workerQueue) Add(worker pworker) { q.addresses[worker.address] = true err := q.Tick(worker) @@ -323,6 +338,8 @@ func (q *workerQueue) Add(worker pworker) { } } +// Tick will make the worker to be the next avalible worker. Ticking an un +// registered worker will be no-op. func (q *workerQueue) Tick(worker pworker) error { if _, ok := q.addresses[worker.address]; !ok { return errors.New(fmt.Sprintf("zmq/broker: Ticking non-registered worker = %s", worker.address)) @@ -340,6 +357,8 @@ func (q *workerQueue) Tick(worker pworker) error { return nil } +// Purge will unregister the worker that is not heathly. i.e. haven't Tick for +// a while. func (q *workerQueue) Purge() { workers := q.pworkers @@ -354,6 +373,8 @@ func (q *workerQueue) Purge() { } } +// Remove will unregister the worker with specified address regardless of its +// expiry. Intented for clean shutdown and fast removal of worker. func (q *workerQueue) Remove(address string) { delete(q.addresses, address) workers := q.pworkers From 81bc660fcc88baa41a038b86201c6d04eae31244 Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Wed, 19 Oct 2016 18:51:30 +0800 Subject: [PATCH 11/18] Make the broker channler and Run goroutine safe --- pkg/server/plugin/zmq/broker.go | 81 ++++++++++++--------- pkg/server/plugin/zmq/broker_test.go | 104 +++++++++++++++------------ 2 files changed, 106 insertions(+), 79 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index 7c0c7796f..eec293982 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -73,17 +73,15 @@ func newParcel(frame []byte) *parcel { // 1. Shutdown signal, which signifies a normal termination of worker to provide // a fast path of worker removal type Broker struct { - name string - backend *goczmq.Sock - bothPoller *goczmq.Poller - backendPoller *goczmq.Poller - frontend chan [][]byte - recvChan chan *parcel - addressChan map[string]chan []byte - timeout chan string - workers workerQueue - logger *logrus.Entry - stop chan int + name string + backendAddr string + frontend chan [][]byte + recvChan chan *parcel + addressChan map[string]chan []byte + timeout chan string + workers workerQueue + logger *logrus.Entry + stop chan int } // NewBroker returns a new *Broker. @@ -93,40 +91,46 @@ func NewBroker(name, backendAddr string) (*Broker, error) { "eaddr": backendAddr, }) - backend, err := goczmq.NewRouter(backendAddr) + return &Broker{ + name: name, + backendAddr: backendAddr, + frontend: make(chan [][]byte, 10), + recvChan: make(chan *parcel, 10), + addressChan: map[string]chan []byte{}, + timeout: make(chan string), + workers: newWorkerQueue(), + logger: namedLogger, + stop: make(chan int), + }, nil +} + +// Run the Broker and listens for zmq requests. +func (lb *Broker) Run() { + backend, err := goczmq.NewRouter(lb.backendAddr) if err != nil { panic(err) } + defer backend.Destroy() - backendPoller, err := goczmq.NewPoller(backend) + pull, err := goczmq.NewPull(fmt.Sprintf("inproc://chanpipeline%d", lb.name)) if err != nil { panic(err) } + defer pull.Destroy() - return &Broker{ - name: name, - backend: backend, - backendPoller: backendPoller, - frontend: make(chan [][]byte, 10), - recvChan: make(chan *parcel, 10), - addressChan: map[string]chan []byte{}, - timeout: make(chan string), - workers: newWorkerQueue(), - logger: namedLogger, - stop: make(chan int), - }, nil -} + backendPoller, err := goczmq.NewPoller(backend, pull) + if err != nil { + panic(err) + } -// Run the Broker and listens for zmq requests. -func (lb *Broker) Run() { heartbeatAt := time.Now().Add(HeartbeatInterval) for { - sock := lb.backendPoller.Wait(heartbeatIntervalMS) + sock := backendPoller.Wait(heartbeatIntervalMS) lb.workers.Lock() switch sock { - case lb.backend: - frames, err := lb.backend.RecvMessage() + case backend: + frames, err := backend.RecvMessage() if err != nil { panic(err) } @@ -147,6 +151,12 @@ func (lb *Broker) Run() { lb.frontend <- msg lb.logger.Debugf("zmq/broker: plugin => server: %#x, %s\n", msg[0], msg) } + case pull: + frames, err := pull.RecvMessage() + if err != nil { + panic(err) + } + backend.SendMessage(frames) case nil: // do nothing default: @@ -159,7 +169,7 @@ func (lb *Broker) Run() { []byte(worker.address), []byte(Heartbeat), } - lb.backend.SendMessage(msg) + backend.SendMessage(msg) } heartbeatAt = time.Now().Add(HeartbeatInterval) } @@ -173,6 +183,11 @@ func (lb *Broker) Run() { // It retry and timeout the request if the zmq worker is not yet avalible. func (lb *Broker) Channeler() { lb.logger.Infof("zmq channler running %p\n", lb) + push, err := goczmq.NewPush(fmt.Sprintf("inproc://chanpipeline%d", lb.name)) + if err != nil { + panic(err) + } + defer push.Destroy() ChannelerLoop: for { select { @@ -214,7 +229,7 @@ ChannelerLoop: p.frame, } lb.addressChan[address] = p.respChan - lb.backend.SendMessage(frames) + push.SendMessage(frames) lb.logger.Debugf("zmq/broker: channel => zmq: %#x, %s\n", addr, frames) go lb.setTimeout(address, HeartbeatInterval*HeartbeatLiveness) case address := <-lb.timeout: diff --git a/pkg/server/plugin/zmq/broker_test.go b/pkg/server/plugin/zmq/broker_test.go index 766be049c..a7078c5ee 100644 --- a/pkg/server/plugin/zmq/broker_test.go +++ b/pkg/server/plugin/zmq/broker_test.go @@ -275,61 +275,58 @@ func TestBrokerWorker(t *testing.T) { So(resp, ShouldResemble, []byte("from worker")) }) - Convey("send message from server to multple plugin", func() { - w := workerSock(t, "worker1", workerAddr) - w.SetRcvtimeo(heartbeatIntervalMS * 2) - defer func() { - w.SendMessage(bytesArray(Shutdown)) - w.Destroy() + Convey("send message from server to multiple plugin", func(c C) { + go func() { + w := workerSock(t, "worker1", workerAddr) + w.SetRcvtimeo(heartbeatIntervalMS * 2) + defer func() { + w.SendMessage(bytesArray(Shutdown)) + time.Sleep((HeartbeatLiveness + 1) * HeartbeatInterval) + w.Destroy() + }() + w.SendMessage(bytesArray(Ready)) + msg := recvNonControlFrame(w) + if len(msg) == 3 { + c.So(msg[2], ShouldResemble, []byte("from server")) + msg[2] = []byte("from worker") + w.SendMessage(msg) + } }() - w.SendMessage(bytesArray(Ready)) - w2 := workerSock(t, "worker2", workerAddr) - w2.SetRcvtimeo(heartbeatIntervalMS * 2) - defer func() { - w2.SendMessage(bytesArray(Shutdown)) - w2.Destroy() + + go func() { + w2 := workerSock(t, "worker2", workerAddr) + w2.SetRcvtimeo(heartbeatIntervalMS * 2) + defer func() { + w2.SendMessage(bytesArray(Shutdown)) + time.Sleep((HeartbeatLiveness + 1) * HeartbeatInterval) + w2.Destroy() + }() + w2.SendMessage(bytesArray(Ready)) + msg := recvNonControlFrame(w2) + if len(msg) == 3 { + c.So(msg[2], ShouldResemble, []byte("from server")) + msg[2] = []byte("from worker") + w2.SendMessage(msg) + } }() - w2.SendMessage(bytesArray(Ready)) reqChan := make(chan chan []byte) broker.RPC(reqChan, []byte(("from server"))) respChan := <-reqChan - - msg := recvNonControlFrame(w2) - So(len(msg), ShouldEqual, 3) - So(msg[2], ShouldResemble, []byte("from server")) - msg[2] = []byte("from worker2") - w2.SendMessage(msg) - resp := <-respChan - So(resp, ShouldResemble, []byte("from worker2")) + So(resp, ShouldResemble, []byte("from worker")) }) Convey("send multiple message from server to multple plugin", func(c C) { - w := workerSock(t, "mworker1", workerAddr) - w.SetRcvtimeo(heartbeatIntervalMS * 2) - defer func() { - w.SendMessage(bytesArray(Shutdown)) - w.Destroy() - }() - w.SendMessage(bytesArray(Ready)) - w2 := workerSock(t, "mworker2", workerAddr) - w2.SetRcvtimeo(heartbeatIntervalMS * 2) - defer func() { - w2.SendMessage(bytesArray(Shutdown)) - w2.Destroy() - }() - w2.SendMessage(bytesArray(Ready)) - - reqChan := make(chan chan []byte) - broker.RPC(reqChan, []byte(("from server"))) - respChan := <-reqChan - - req2Chan := make(chan chan []byte) - broker.RPC(req2Chan, []byte(("from server"))) - resp2Chan := <-req2Chan - go func() { + w := workerSock(t, "mworker1", workerAddr) + w.SetRcvtimeo(heartbeatIntervalMS * 2) + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() + w.SendMessage(bytesArray(Ready)) + msg := recvNonControlFrame(w) c.So(len(msg), ShouldEqual, 3) c.So(msg[2], ShouldResemble, []byte("from server")) @@ -338,6 +335,14 @@ func TestBrokerWorker(t *testing.T) { }() go func() { + w2 := workerSock(t, "mworker2", workerAddr) + w2.SetRcvtimeo(heartbeatIntervalMS * 2) + defer func() { + w2.SendMessage(bytesArray(Shutdown)) + w2.Destroy() + }() + w2.SendMessage(bytesArray(Ready)) + msg := recvNonControlFrame(w2) c.So(len(msg), ShouldEqual, 3) c.So(msg[2], ShouldResemble, []byte("from server")) @@ -349,17 +354,24 @@ func TestBrokerWorker(t *testing.T) { wg.Add(2) go func() { defer wg.Done() + reqChan := make(chan chan []byte) + broker.RPC(reqChan, []byte(("from server"))) + respChan := <-reqChan resp := <-respChan - c.So(resp, ShouldResemble, []byte("from worker2")) + c.So(resp, ShouldNotBeEmpty) }() go func() { defer wg.Done() + req2Chan := make(chan chan []byte) + broker.RPC(req2Chan, []byte(("from server"))) + resp2Chan := <-req2Chan resp := <-resp2Chan - c.So(resp, ShouldResemble, []byte("from worker1")) + c.So(resp, ShouldNotBeEmpty) }() wg.Wait() }) }) broker.stop <- 1 + } From 9f467f835c95542bafd9ebe1b1ff78f724992de3 Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Wed, 19 Oct 2016 19:10:51 +0800 Subject: [PATCH 12/18] Kick start the go routine within NewBroker --- pkg/server/plugin/zmq/broker.go | 8 ++++++-- pkg/server/plugin/zmq/broker_test.go | 2 -- pkg/server/plugin/zmq/rpc.go | 7 ------- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index eec293982..fe6c2eaeb 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -91,7 +91,7 @@ func NewBroker(name, backendAddr string) (*Broker, error) { "eaddr": backendAddr, }) - return &Broker{ + broker := &Broker{ name: name, backendAddr: backendAddr, frontend: make(chan [][]byte, 10), @@ -101,7 +101,11 @@ func NewBroker(name, backendAddr string) (*Broker, error) { workers: newWorkerQueue(), logger: namedLogger, stop: make(chan int), - }, nil + } + + go broker.Run() + go broker.Channeler() + return broker, nil } // Run the Broker and listens for zmq requests. diff --git a/pkg/server/plugin/zmq/broker_test.go b/pkg/server/plugin/zmq/broker_test.go index a7078c5ee..bebdd36b3 100644 --- a/pkg/server/plugin/zmq/broker_test.go +++ b/pkg/server/plugin/zmq/broker_test.go @@ -134,8 +134,6 @@ func TestBrokerWorker(t *testing.T) { if err != nil { t.Fatalf("Failed to init broker: %v", err) } - go broker.Run() - go broker.Channeler() Convey("Test Broker", t, func() { Convey("receive Ready signal will register the worker", func() { diff --git a/pkg/server/plugin/zmq/rpc.go b/pkg/server/plugin/zmq/rpc.go index 75f63af77..8c35ec2fd 100644 --- a/pkg/server/plugin/zmq/rpc.go +++ b/pkg/server/plugin/zmq/rpc.go @@ -276,13 +276,6 @@ func (f zmqTransportFactory) Open(name string, args []string, config skyconfig.C config: config, } - go func() { - broker.Run() - }() - go func() { - broker.Channeler() - }() - return &p } From f01bfb6abcf1506a1ac16c481ca982b9fce4bacb Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Thu, 20 Oct 2016 16:37:55 +0800 Subject: [PATCH 13/18] Ignore zmq.Run in Complexity check --- .gogocyclo | 1 + 1 file changed, 1 insertion(+) diff --git a/.gogocyclo b/.gogocyclo index f66a6909e..e5bf15599 100644 --- a/.gogocyclo +++ b/.gogocyclo @@ -4,3 +4,4 @@ ignores = `pq`.(*recordScanner).Scan ignores = `pq`.(*database).remoteColumnTypes ignores = `skydb`.(Predicate).Validate ignores = `skyconv`.ParseLiteral +ignores = `zmq`.(*Broker).Run From 11df43d6d0d8b17526ad83931ba001bda5000d0a Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Fri, 21 Oct 2016 13:26:09 +0800 Subject: [PATCH 14/18] Change the RPC error to plugin Since it will throw to the end-user. Providing meaningful exception with user term is better. --- pkg/server/plugin/zmq/rpc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/server/plugin/zmq/rpc.go b/pkg/server/plugin/zmq/rpc.go index 8c35ec2fd..61776f1a0 100644 --- a/pkg/server/plugin/zmq/rpc.go +++ b/pkg/server/plugin/zmq/rpc.go @@ -249,7 +249,7 @@ func (p *zmqTransport) ipc(req *request) (out []byte, err error) { msg := <-respChan if bytes.Equal(msg, []byte{0}) { - err = fmt.Errorf("RPC time out") + err = fmt.Errorf("Plugin time out") } else { out = msg } From 334e97fd1550f7efb197103407c44f3983806a85 Mon Sep 17 00:00:00 2001 From: Kwok-kuen Cheung Date: Mon, 24 Oct 2016 11:08:36 +0800 Subject: [PATCH 15/18] Fix typos --- pkg/server/plugin/zmq/broker.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index fe6c2eaeb..1c3b6d1c9 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -65,7 +65,7 @@ func newParcel(frame []byte) *parcel { } } -// Broker implements the Paranoid Pirate queue described as follow: +// Broker implements the Paranoid Pirate queue described as follows: // Related RFC: https://rfc.zeromq.org/spec:6/PPP // refs: http://zguide.zeromq.org/py:all#Robust-Reliable-Queuing-Paranoid-Pirate-Pattern // with the addition of: @@ -183,10 +183,10 @@ func (lb *Broker) Run() { } } -// Channeler accept message from RPC and dispatch to zmq if avalible. -// It retry and timeout the request if the zmq worker is not yet avalible. +// Channeler accept message from RPC and dispatch to zmq if available. +// It retry and timeout the request if the zmq worker is not yet available. func (lb *Broker) Channeler() { - lb.logger.Infof("zmq channler running %p\n", lb) + lb.logger.Infof("zmq channeler running %p\n", lb) push, err := goczmq.NewPush(fmt.Sprintf("inproc://chanpipeline%d", lb.name)) if err != nil { panic(err) @@ -209,19 +209,19 @@ ChannelerLoop: case p := <-lb.recvChan: // Save the chan and dispatch the message to zmq // If current no worker ready, will retry after HeartbeatInterval. - // Retry for HeartbeatLiveness tine + // Retry for HeartbeatLiveness times address := lb.workers.Next() if address == "" { if p.retry < HeartbeatLiveness { p.retry += 1 - lb.logger.Infof("zmq/broker: no worker avaliable, retry %d...\n", p.retry) + lb.logger.Infof("zmq/broker: no worker available, retry %d...\n", p.retry) go func(p2 *parcel) { time.Sleep(HeartbeatInterval) lb.recvChan <- p2 }(p) break } - lb.logger.Infof("zmq/broker: no worker avaliable, timeout.\n") + lb.logger.Infof("zmq/broker: no worker available, timeout.\n") p.respChan <- []byte{0} break } @@ -241,14 +241,14 @@ ChannelerLoop: if !ok { break } - lb.logger.Infof("zmq/broker: chan time out for worker %s\n", address) + lb.logger.Infof("zmq/broker: chan timeout for worker %s\n", address) delete(lb.addressChan, address) respChan <- []byte{0} case <-lb.stop: break ChannelerLoop } } - lb.logger.Infof("zmq channler stopped %p!\n", lb) + lb.logger.Infof("zmq channeler stopped %p!\n", lb) } func (lb *Broker) RPC(requestChan chan chan []byte, in []byte) { @@ -294,8 +294,8 @@ func newWorker(address string) pworker { // workerQueue is a last tick fist out queue. // // Worker is expect to register itself on ready. Tick itself when it is -// avalible. The most recently Tick worker will got the job. -// A worker do not Tick itself within the expiry will regards as disconnected +// available. The most recently Tick worker will got the job. +// A worker do not Tick itself within the expiry will regard as disconnected // and requires to Add itself again to become avaliable. // // workerQueue is not goroutine safe. To use it safely across goroutine. @@ -348,7 +348,7 @@ func (q *workerQueue) Next() string { } // Add will register the worker as live worker and call Tick to make itself to -// the next avaliable worker. +// the next available worker. func (q *workerQueue) Add(worker pworker) { q.addresses[worker.address] = true err := q.Tick(worker) @@ -357,7 +357,7 @@ func (q *workerQueue) Add(worker pworker) { } } -// Tick will make the worker to be the next avalible worker. Ticking an un +// Tick will make the worker to be the next available worker. Ticking an un- // registered worker will be no-op. func (q *workerQueue) Tick(worker pworker) error { if _, ok := q.addresses[worker.address]; !ok { @@ -393,7 +393,7 @@ func (q *workerQueue) Purge() { } // Remove will unregister the worker with specified address regardless of its -// expiry. Intented for clean shutdown and fast removal of worker. +// expiry. Intended for clean shutdown and fast removal of worker. func (q *workerQueue) Remove(address string) { delete(q.addresses, address) workers := q.pworkers From bf373225834abb5bf4ef5427ee31426d12ea973d Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Mon, 24 Oct 2016 16:07:32 +0800 Subject: [PATCH 16/18] Use defer and return in ChannelerLoop --- pkg/server/plugin/zmq/broker.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index 1c3b6d1c9..fd14ad090 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -187,12 +187,12 @@ func (lb *Broker) Run() { // It retry and timeout the request if the zmq worker is not yet available. func (lb *Broker) Channeler() { lb.logger.Infof("zmq channeler running %p\n", lb) + defer lb.logger.Infof("zmq channeler stopped %p!\n", lb) push, err := goczmq.NewPush(fmt.Sprintf("inproc://chanpipeline%d", lb.name)) if err != nil { panic(err) } defer push.Destroy() -ChannelerLoop: for { select { case frames := <-lb.frontend: @@ -245,10 +245,9 @@ ChannelerLoop: delete(lb.addressChan, address) respChan <- []byte{0} case <-lb.stop: - break ChannelerLoop + return } } - lb.logger.Infof("zmq channeler stopped %p!\n", lb) } func (lb *Broker) RPC(requestChan chan chan []byte, in []byte) { From c2eb4f36b8ba2357ee28b24880d757e6bb26abcc Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Mon, 24 Oct 2016 16:51:20 +0800 Subject: [PATCH 17/18] Add comments and TODO --- pkg/server/plugin/zmq/broker.go | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index fd14ad090..088be64f5 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -72,16 +72,27 @@ func newParcel(frame []byte) *parcel { // // 1. Shutdown signal, which signifies a normal termination of worker to provide // a fast path of worker removal +// TODO: channeler can be separated into a separate struct, communiate with +// broker using frontend chan, workers and pull/push sock address. type Broker struct { - name string + // name is assume to be unique and used to construct the zmq address + name string + // backendAddr is the address to communicate with plugin backendAddr string - frontend chan [][]byte - recvChan chan *parcel + // frontend chan is receive zmq messgae and handle at Channeler + frontend chan [][]byte + // recvChan receive RPC request and dispatch to zmq Run Loop using + // push/pull zsock + recvChan chan *parcel + // addressChan is use zmq worker addess as key to route the message to + // correct go chan addressChan map[string]chan []byte - timeout chan string - workers workerQueue - logger *logrus.Entry - stop chan int + // for RPC timeout, used by Channeler + timeout chan string + workers workerQueue + logger *logrus.Entry + // for stoping the channeler + stop chan int } // NewBroker returns a new *Broker. @@ -300,7 +311,7 @@ func newWorker(address string) pworker { // workerQueue is not goroutine safe. To use it safely across goroutine. // Please use the Lock/Unlock interace before manupliate the queue item via // methods like Add/Tick/Purge. -// Comsuming method Next is the only method will acquire the mutex lock +// Consuming the queue using Next is the only method will acquire the mutex lock // by itself. type workerQueue struct { pworkers []pworker @@ -332,7 +343,7 @@ func (q *workerQueue) Len() int { // Next will pop the next avaliable worker, and the worker will not avalible // until it Tick back to the workerQueue again. -// This worker comsuing method will acquire mutex lock. +// This method for consuming the queue will acquire mutex lock. func (q *workerQueue) Next() string { q.mu.Lock() defer q.mu.Unlock() From 11c08e8793467e9845ab5535552fa956bada6648 Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Mon, 24 Oct 2016 16:54:26 +0800 Subject: [PATCH 18/18] Use go test -cpu1,4 in runnning test Run zmq test with GOMAXPROCS=4 and pq test with GOMAXPROCS=1 --- .travis.yml | 4 ++-- Makefile | 2 +- pkg/server/plugin/zmq/broker_test.go | 4 ++++ pkg/server/skydb/pq/pq_test.go | 4 ++++ 4 files changed, 11 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index dc2903283..9b51e8884 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: go -go: 1.6 +go: 1.6.3 addons: postgresql: "9.5" @@ -49,7 +49,7 @@ script: - go generate ./pkg/... - git status | grep "_string.go$"; test $? -eq 1 - gocyclo -over 15 pkg | gogocyclo - - go test --tags zmq -cover -timeout 1m ./pkg/... + - WITH_ZMQ=1 make test before_deploy: - if [[ -z "$TRAVIS_TAG" ]]; make all; fi diff --git a/Makefile b/Makefile index 2b1987b36..5c9fc1e88 100644 --- a/Makefile +++ b/Makefile @@ -48,7 +48,7 @@ before-test: test: # Run `go install` to compile packages to speed up test process $(DOCKER_COMPOSE_RUN) go install $(GO_BUILD_ARGS) - $(DOCKER_COMPOSE_RUN) go test $(GO_BUILD_ARGS) -cover -timeout $(GO_TEST_TIMEOUT) ./pkg/... + $(DOCKER_COMPOSE_RUN) go test $(GO_BUILD_ARGS) -cover -timeout $(GO_TEST_TIMEOUT) -cpu 1,4 ./pkg/... .PHONY: after-docker-test after-docker-test: diff --git a/pkg/server/plugin/zmq/broker_test.go b/pkg/server/plugin/zmq/broker_test.go index bebdd36b3..9992a623b 100644 --- a/pkg/server/plugin/zmq/broker_test.go +++ b/pkg/server/plugin/zmq/broker_test.go @@ -17,6 +17,7 @@ package zmq import ( + "runtime" "sync" "testing" "time" @@ -127,6 +128,9 @@ func TestWorker(t *testing.T) { }) } func TestBrokerWorker(t *testing.T) { + if runtime.GOMAXPROCS(0) == 1 { + t.Skip("skipping zmq test in GOMAXPROCS=1") + } const ( workerAddr = "inproc://plugin.test" ) diff --git a/pkg/server/skydb/pq/pq_test.go b/pkg/server/skydb/pq/pq_test.go index 8aeb4288a..ff644ff09 100644 --- a/pkg/server/skydb/pq/pq_test.go +++ b/pkg/server/skydb/pq/pq_test.go @@ -19,6 +19,7 @@ import ( "fmt" "math/rand" "os" + "runtime" "testing" "github.com/jmoiron/sqlx" @@ -45,6 +46,9 @@ func testAppName() string { } func getTestConn(t *testing.T) *conn { + if runtime.GOMAXPROCS(0) > 1 { + t.Skip("skipping zmq test in GOMAXPROCS>1") + } defaultTo := func(envvar string, value string) { if os.Getenv(envvar) == "" { os.Setenv(envvar, value)