From b2829c8746f9407c15a4adaca89eab6f6c4db882 Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Thu, 13 Oct 2016 14:27:02 +0800 Subject: [PATCH 01/12] Remove eaddr from zmq transport --- pkg/server/plugin/zmq/rpc.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/server/plugin/zmq/rpc.go b/pkg/server/plugin/zmq/rpc.go index c4db9a1f7..af714d269 100644 --- a/pkg/server/plugin/zmq/rpc.go +++ b/pkg/server/plugin/zmq/rpc.go @@ -38,7 +38,6 @@ type zmqTransport struct { state skyplugin.TransportState name string iaddr string // the internal addr used by goroutines to make request to plugin - eaddr string // the addr exposed for plugin to connect to with REP. broker *Broker initHandler skyplugin.TransportInitHandler logger *logrus.Entry @@ -305,7 +304,6 @@ func (f zmqTransportFactory) Open(name string, args []string, config skyconfig.C state: skyplugin.TransportStateUninitialized, name: name, iaddr: internalAddr, - eaddr: externalAddr, broker: broker, logger: logger, config: config, From 6e0f637716dfd154dd03890aa33a6042dc623eb8 Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Thu, 13 Oct 2016 22:48:32 +0800 Subject: [PATCH 02/12] Remove the frontend zmq router and replace with go chan --- pkg/server/plugin/zmq/broker.go | 117 +++++++++++++++------ pkg/server/plugin/zmq/broker_test.go | 152 +++++++++------------------ pkg/server/plugin/zmq/rpc.go | 67 ++++-------- 3 files changed, 159 insertions(+), 177 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index 8dbc670e6..de4eb3444 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -50,6 +50,19 @@ const ( Shutdown = "\003" ) +// parcel is used to multiplex the chan with zmq worker +type parcel struct { + respChan chan []byte + frame []byte +} + +func newParcel(frame []byte) *parcel { + return &parcel{ + respChan: make(chan []byte), + frame: frame, + } +} + // Broker implements the Paranoid Pirate queue described in the zguide: // http://zguide.zeromq.org/py:all#Robust-Reliable-Queuing-Paranoid-Pirate-Pattern // Related RFC: https://rfc.zeromq.org/spec:6/PPP @@ -58,25 +71,26 @@ const ( // 1. Shutdown signal, which signifies a normal termination of worker to provide // a fast path of worker removal type Broker struct { - name string - // NOTE: goroutines are caller of plugin, so frontend is Go side, - // backend is plugin side - frontend *goczmq.Sock + name string backend *goczmq.Sock bothPoller *goczmq.Poller backendPoller *goczmq.Poller + frontend chan [][]byte + recvChan chan *parcel + addressChan map[string]chan []byte + timeout chan string workers workerQueue freshWorkers chan []byte logger *logrus.Entry + stop chan int } // NewBroker returns a new *Broker. -func NewBroker(name, frontendAddr, backendAddr string) (*Broker, error) { - namedLogger := log.WithFields(logrus.Fields{"plugin": name}) - frontend, err := goczmq.NewRouter(frontendAddr) - if err != nil { - panic(err) - } +func NewBroker(name, backendAddr string) (*Broker, error) { + namedLogger := log.WithFields(logrus.Fields{ + "plugin": name, + "eaddr": backendAddr, + }) backend, err := goczmq.NewRouter(backendAddr) if err != nil { @@ -88,34 +102,28 @@ func NewBroker(name, frontendAddr, backendAddr string) (*Broker, error) { panic(err) } - bothPoller, err := goczmq.NewPoller(frontend, backend) - if err != nil { - panic(err) - } - return &Broker{ name: name, - frontend: frontend, backend: backend, - bothPoller: bothPoller, backendPoller: backendPoller, + frontend: make(chan [][]byte), + recvChan: make(chan *parcel), + addressChan: map[string]chan []byte{}, + timeout: make(chan string), workers: newWorkerQueue(), freshWorkers: make(chan []byte, 1), logger: namedLogger, + stop: make(chan int), }, nil } // Run kicks start the Broker and listens for requests. It blocks function // execution. func (lb *Broker) Run() { + lb.logger.Infof("Running zmq broker") heartbeatAt := time.Now().Add(HeartbeatInterval) for { - var sock *goczmq.Sock - if lb.workers.Len() == 0 { - sock = lb.backendPoller.Wait(heartbeatIntervalMS) - } else { - sock = lb.bothPoller.Wait(heartbeatIntervalMS) - } + sock := lb.backendPoller.Wait(heartbeatIntervalMS) switch sock { case lb.backend: @@ -137,18 +145,9 @@ func (lb *Broker) Run() { status := string(msg[0]) lb.handleWorkerStatus(&lb.workers, address, status) } else { - lb.frontend.SendMessage(msg) + lb.frontend <- msg lb.logger.Debugf("zmq/broker: plugin => server: %#x, %s\n", msg[0], msg) } - case lb.frontend: - frames, err := lb.frontend.RecvMessage() - if err != nil { - panic(err) - } - - frames = append([][]byte{lb.workers.Next()}, frames...) - lb.backend.SendMessage(frames) - lb.logger.Debugf("zmq/broker: server => plugin: %#x, %s\n", frames[0], frames) case nil: // do nothing default: @@ -169,6 +168,58 @@ func (lb *Broker) Run() { } } +func (lb *Broker) Channler() { + lb.logger.Infof("zmq channler running %p", lb) + for { + select { + case frames := <-lb.frontend: + lb.logger.Debugf("zmq/broker: zmq => channel %#x, %s\n", frames[0], frames) + // Dispacth back to the channel based on the zmq first frame + address := string(frames[0]) + respChan, ok := lb.addressChan[address] + if !ok { + lb.logger.Infof("zmq/broker: chan not found for worker %#x\n", address) + return + } + delete(lb.addressChan, address) + respChan <- frames[2] + case p := <-lb.recvChan: + // Save the chan and dispatch the message to zmq + addr := lb.workers.Next() + frames := append([][]byte{addr}, addr, []byte{}, p.frame) + address := string(addr) + lb.addressChan[address] = p.respChan + lb.backend.SendMessage(frames) + lb.logger.Debugf("zmq/broker: channel => zmq: %#x, %s\n", addr, frames) + go lb.setTimeout(address, HeartbeatInterval*HeartbeatLiveness) + case address := <-lb.timeout: + respChan, ok := lb.addressChan[address] + if !ok { + return + } + lb.logger.Infof("zmq/broker: chan time out for worker %#x\n", address) + delete(lb.addressChan, address) + respChan <- []byte{0} + case <-lb.stop: + break + } + } + lb.logger.Infof("zmq channler stopped %p!", lb) +} + +func (lb *Broker) RPC(requestChan chan chan []byte, in []byte) { + p := newParcel(in) + lb.recvChan <- p + go func() { + requestChan <- p.respChan + }() +} + +func (lb *Broker) setTimeout(address string, wait time.Duration) { + time.Sleep(wait) + lb.timeout <- address +} + func (lb *Broker) handleWorkerStatus(workers *workerQueue, address []byte, status string) { switch status { case Ready: diff --git a/pkg/server/plugin/zmq/broker_test.go b/pkg/server/plugin/zmq/broker_test.go index c31dcbdb2..6e7179a88 100644 --- a/pkg/server/plugin/zmq/broker_test.go +++ b/pkg/server/plugin/zmq/broker_test.go @@ -17,8 +17,6 @@ package zmq import ( - "reflect" - "strings" "testing" "time" @@ -52,94 +50,6 @@ func bytesArray(ss ...string) (bs [][]byte) { return } -func TestBrokerSock(t *testing.T) { - const ( - clientAddr = "inproc://client.test" - workerAddr = "inproc://worker.test" - ) - broker, err := NewBroker("", clientAddr, workerAddr) - if err != nil { - t.Fatalf("Failed to init broker: %v", err) - } - go broker.Run() - - w0 := workerSock(t, "w0", workerAddr) - defer w0.Destroy() - w0.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers - - c0 := clientSock(t, "c0", clientAddr) - defer c0.Destroy() - c0.SendMessage(bytesArray("simple job")) - - msg, _ := w0.RecvMessage() - expectedMsg := bytesArray(c0.Identity(), "simple job") - if !reflect.DeepEqual(msg, expectedMsg) { - t.Fatalf(`want %v, got %v`, expectedMsg, msg) - } - - w0.SendMessage(expectedMsg) - msg, _ = c0.RecvMessage() - expectedMsg = bytesArray("simple job") - if !reflect.DeepEqual(msg, expectedMsg) { - t.Fatalf("want %v, got %v", expectedMsg, msg) - } - - // multiple workers, multiple clients - w1 := workerSock(t, "w1", workerAddr) - defer w1.Destroy() - w1.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers - - c1 := clientSock(t, "c1", clientAddr) - defer c1.Destroy() - c2 := clientSock(t, "c2", clientAddr) - defer c2.Destroy() - - c0.SendMessage(bytesArray("job 0")) - c1.SendMessage(bytesArray("job 1")) - c2.SendMessage(bytesArray("job 2")) - - work0, _ := w0.RecvMessage() - if !strings.HasPrefix(string(work0[1]), "job") { - t.Fatalf(`want job *, got %s`, work0[1]) - } - - work1, _ := w1.RecvMessage() - if !strings.HasPrefix(string(work1[1]), "job") { - t.Fatalf(`want job, got %v`, work1[1]) - } - - // let w0 complete its work and receive new job - w0.SendMessage(work0) - work2, _ := w0.RecvMessage() - if !strings.HasPrefix(string(work2[1]), "job") { - t.Fatalf(`want job, got %v`, work2[1]) - } - - // complete all jobs - w0.SendMessage(work2) - w1.SendMessage(work1) - - // clients receive all jobs - resp0, err := c0.RecvMessage() - resp1, err := c1.RecvMessage() - resp2, err := c2.RecvMessage() - - resps := []string{ - string(resp0[0]), - string(resp1[0]), - string(resp2[0]), - } - if !reflect.DeepEqual(resps, []string{ - "job 0", - "job 1", - "job 2", - }) { - t.Fatalf(`want ["job 0", "job 1", "job 2"], got %v`, resps) - } -} - func TestWorker(t *testing.T) { Convey("Test workerQueue", t, func() { address1 := []byte("address1") @@ -206,14 +116,14 @@ func TestWorker(t *testing.T) { } func TestBrokerWorker(t *testing.T) { const ( - clientAddr = "inproc://server.test" workerAddr = "inproc://plugin.test" ) - broker, err := NewBroker("", clientAddr, workerAddr) + broker, err := NewBroker("", workerAddr) if err != nil { t.Fatalf("Failed to init broker: %v", err) } go broker.Run() + go broker.Channler() Convey("Test Broker with worker", t, func() { Convey("receive Ready signal will register the worker", func() { @@ -250,9 +160,45 @@ func TestBrokerWorker(t *testing.T) { So(broker.workers.Len(), ShouldEqual, 0) }) - Convey("send message from server to plugin", func() { - s := clientSock(t, "server", clientAddr) - defer s.Destroy() + Convey("broker RPC will timeout", func() { + w := workerSock(t, "worker", workerAddr) + w.SetRcvtimeo(heartbeatIntervalMS) + defer w.Destroy() + w.SendMessage(bytesArray(Ready)) + <-broker.freshWorkers + + So(broker.workers.Len(), ShouldEqual, 1) + + reqChan := make(chan chan []byte) + broker.RPC(reqChan, []byte(("from server"))) + respChan := <-reqChan + msg := <-respChan + So(msg, ShouldResemble, []byte{0}) + }) + + Convey("broker RPC recive worker reply", func() { + w := workerSock(t, "worker", workerAddr) + w.SetRcvtimeo(heartbeatIntervalMS) + defer w.Destroy() + w.SendMessage(bytesArray(Ready)) + <-broker.freshWorkers + So(broker.workers.Len(), ShouldEqual, 1) + + reqChan := make(chan chan []byte) + broker.RPC(reqChan, []byte(("from server"))) + respChan := <-reqChan + + msg, _ := w.RecvMessage() + So(len(msg), ShouldEqual, 3) + So(msg[2], ShouldResemble, []byte("from server")) + msg[2] = []byte("from worker") + w.SendMessage(msg) + + resp := <-respChan + So(resp, ShouldResemble, []byte("from worker")) + }) + + Convey("send message from server to multple plugin", func() { w := workerSock(t, "worker", workerAddr) w.SetRcvtimeo(heartbeatIntervalMS) defer w.Destroy() @@ -266,12 +212,18 @@ func TestBrokerWorker(t *testing.T) { So(broker.workers.Len(), ShouldEqual, 2) - s.SendMessage(bytesArray("from server")) - // Wait the poller to get the message - time.Sleep(HeartbeatInterval) - So(broker.workers.Len(), ShouldEqual, 1) + reqChan := make(chan chan []byte) + broker.RPC(reqChan, []byte(("from server"))) + respChan := <-reqChan + msg, _ := w2.RecvMessage() - So(msg[1], ShouldResemble, []byte("from server")) + So(len(msg), ShouldEqual, 3) + So(msg[2], ShouldResemble, []byte("from server")) + msg[2] = []byte("from worker2") + w2.SendMessage(msg) + + resp := <-respChan + So(resp, ShouldResemble, []byte("from worker2")) }) }) } diff --git a/pkg/server/plugin/zmq/rpc.go b/pkg/server/plugin/zmq/rpc.go index af714d269..e412c5cb3 100644 --- a/pkg/server/plugin/zmq/rpc.go +++ b/pkg/server/plugin/zmq/rpc.go @@ -17,6 +17,7 @@ package zmq import ( + "bytes" "encoding/json" "fmt" "runtime/debug" @@ -34,16 +35,6 @@ import ( const initRequestTimeout = 2000 -type zmqTransport struct { - state skyplugin.TransportState - name string - iaddr string // the internal addr used by goroutines to make request to plugin - broker *Broker - initHandler skyplugin.TransportInitHandler - logger *logrus.Entry - config skyconfig.Configuration -} - type request struct { Context context.Context Kind string @@ -109,6 +100,15 @@ func (req *request) MarshalJSON() ([]byte, error) { return json.Marshal(¶mReq) } +type zmqTransport struct { + state skyplugin.TransportState + name string + broker *Broker + initHandler skyplugin.TransportInitHandler + logger *logrus.Entry + config skyconfig.Configuration +} + func (p *zmqTransport) State() skyplugin.TransportState { return p.state } @@ -246,40 +246,22 @@ func (p *zmqTransport) ipc(req *request) (out []byte, err error) { return } }() - var ( - in []byte - reqSock *goczmq.Sock - ) - in, err = json.Marshal(req) + in, err := json.Marshal(req) if err != nil { return } - reqSock, err = goczmq.NewReq(p.iaddr) - if err != nil { - return - } - defer func() { - reqSock.Destroy() - }() - if req.Timeout > 0 { - reqSock.SetRcvtimeo(req.Timeout) - } - err = reqSock.SendMessage([][]byte{in}) - if err != nil { - return - } + reqChan := make(chan chan []byte) + p.broker.RPC(reqChan, in) + respChan := <-reqChan + // Broker will sent back a null byte if time out + msg := <-respChan - msg, err := reqSock.RecvMessage() - if err != nil { - return - } - - if len(msg) != 1 { - err = fmt.Errorf("malformed resp msg = %s", msg) + if bytes.Equal(msg, []byte{0}) { + err = fmt.Errorf("RPC time out") } else { - out = msg[0] + out = msg } return @@ -289,12 +271,8 @@ type zmqTransportFactory struct { } func (f zmqTransportFactory) Open(name string, args []string, config skyconfig.Configuration) (transport skyplugin.Transport) { - const internalAddrFmt = `inproc://%s` - - internalAddr := fmt.Sprintf(internalAddrFmt, name) externalAddr := args[0] - - broker, err := NewBroker(name, internalAddr, externalAddr) + broker, err := NewBroker(name, externalAddr) logger := log.WithFields(logrus.Fields{"plugin": name}) if err != nil { logger.Panicf("Failed to init broker for zmq transport: %v", err) @@ -303,16 +281,17 @@ func (f zmqTransportFactory) Open(name string, args []string, config skyconfig.C p := zmqTransport{ state: skyplugin.TransportStateUninitialized, name: name, - iaddr: internalAddr, broker: broker, logger: logger, config: config, } go func() { - logger.Infof("Running zmq broker:\niaddr = %s\neaddr = %s", internalAddr, externalAddr) broker.Run() }() + go func() { + broker.Channler() + }() return &p } From 233c67632b1326cbab4ace0ba74a765bd36b580f Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Fri, 14 Oct 2016 18:24:40 +0800 Subject: [PATCH 03/12] Add mulitple messge and plugin test --- pkg/server/plugin/zmq/broker_test.go | 71 +++++++++++++++++++++++++++- 1 file changed, 69 insertions(+), 2 deletions(-) diff --git a/pkg/server/plugin/zmq/broker_test.go b/pkg/server/plugin/zmq/broker_test.go index 6e7179a88..90e582993 100644 --- a/pkg/server/plugin/zmq/broker_test.go +++ b/pkg/server/plugin/zmq/broker_test.go @@ -17,6 +17,7 @@ package zmq import ( + "sync" "testing" "time" @@ -201,12 +202,18 @@ func TestBrokerWorker(t *testing.T) { Convey("send message from server to multple plugin", func() { w := workerSock(t, "worker", workerAddr) w.SetRcvtimeo(heartbeatIntervalMS) - defer w.Destroy() + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() w.SendMessage(bytesArray(Ready)) <-broker.freshWorkers w2 := workerSock(t, "worker2", workerAddr) w2.SetRcvtimeo(heartbeatIntervalMS) - defer w2.Destroy() + defer func() { + w2.SendMessage(bytesArray(Shutdown)) + w2.Destroy() + }() w2.SendMessage(bytesArray(Ready)) <-broker.freshWorkers @@ -225,5 +232,65 @@ func TestBrokerWorker(t *testing.T) { resp := <-respChan So(resp, ShouldResemble, []byte("from worker2")) }) + + Convey("send multiple message from server to multple plugin", func(c C) { + w := workerSock(t, "mworker", workerAddr) + w.SetRcvtimeo(heartbeatIntervalMS) + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() + w.SendMessage(bytesArray(Ready)) + <-broker.freshWorkers + w2 := workerSock(t, "mworker2", workerAddr) + w2.SetRcvtimeo(heartbeatIntervalMS) + defer func() { + w2.SendMessage(bytesArray(Shutdown)) + w2.Destroy() + }() + w2.SendMessage(bytesArray(Ready)) + <-broker.freshWorkers + + So(broker.workers.Len(), ShouldEqual, 2) + + reqChan := make(chan chan []byte) + broker.RPC(reqChan, []byte(("from server"))) + respChan := <-reqChan + + req2Chan := make(chan chan []byte) + broker.RPC(req2Chan, []byte(("from server"))) + resp2Chan := <-req2Chan + + go func() { + msg, _ := w.RecvMessage() + c.So(len(msg), ShouldEqual, 3) + c.So(msg[2], ShouldResemble, []byte("from server")) + msg[2] = []byte("from worker1") + w.SendMessage(msg) + }() + + go func() { + msg, _ := w2.RecvMessage() + c.So(len(msg), ShouldEqual, 3) + c.So(msg[2], ShouldResemble, []byte("from server")) + msg[2] = []byte("from worker2") + w2.SendMessage(msg) + }() + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + resp := <-respChan + c.So(resp, ShouldResemble, []byte("from worker2")) + }() + go func() { + defer wg.Done() + resp := <-resp2Chan + c.So(resp, ShouldResemble, []byte("from worker1")) + }() + wg.Wait() + + }) }) } From cff6a786c235be8f3fa32c644d020dd880d9f210 Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Fri, 14 Oct 2016 18:54:39 +0800 Subject: [PATCH 04/12] Use string address in go space do the conversation on the zmq boundary --- pkg/server/plugin/zmq/broker.go | 45 ++++++++++++++++------------ pkg/server/plugin/zmq/broker_test.go | 6 ++-- pkg/server/plugin/zmq/rpc.go | 2 +- 3 files changed, 30 insertions(+), 23 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index de4eb3444..2d89de564 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -17,7 +17,6 @@ package zmq import ( - "bytes" "errors" "fmt" "time" @@ -80,7 +79,7 @@ type Broker struct { addressChan map[string]chan []byte timeout chan string workers workerQueue - freshWorkers chan []byte + freshWorkers chan string logger *logrus.Entry stop chan int } @@ -111,7 +110,7 @@ func NewBroker(name, backendAddr string) (*Broker, error) { addressChan: map[string]chan []byte{}, timeout: make(chan string), workers: newWorkerQueue(), - freshWorkers: make(chan []byte, 1), + freshWorkers: make(chan string, 1), logger: namedLogger, stop: make(chan int), }, nil @@ -132,7 +131,7 @@ func (lb *Broker) Run() { panic(err) } - address := frames[0] + address := string(frames[0]) msg := frames[1:] tErr := lb.workers.Tick(newWorker(address)) if tErr != nil { @@ -157,7 +156,10 @@ func (lb *Broker) Run() { lb.logger.Debugf("zmq/broker: idle worker count %d\n", lb.workers.Len()) if heartbeatAt.Before(time.Now()) { for _, worker := range lb.workers.pworkers { - msg := [][]byte{worker.address, []byte(Heartbeat)} + msg := [][]byte{ + []byte(worker.address), + []byte(Heartbeat), + } lb.logger.Debugf("zmq/broker: server => plugin Heartbeat: %s\n", worker.address) lb.backend.SendMessage(msg) } @@ -185,9 +187,14 @@ func (lb *Broker) Channler() { respChan <- frames[2] case p := <-lb.recvChan: // Save the chan and dispatch the message to zmq - addr := lb.workers.Next() - frames := append([][]byte{addr}, addr, []byte{}, p.frame) - address := string(addr) + address := lb.workers.Next() + addr := []byte(address) + frames := [][]byte{ + addr, + addr, + []byte{}, + p.frame, + } lb.addressChan[address] = p.respChan lb.backend.SendMessage(frames) lb.logger.Debugf("zmq/broker: channel => zmq: %#x, %s\n", addr, frames) @@ -220,7 +227,7 @@ func (lb *Broker) setTimeout(address string, wait time.Duration) { lb.timeout <- address } -func (lb *Broker) handleWorkerStatus(workers *workerQueue, address []byte, status string) { +func (lb *Broker) handleWorkerStatus(workers *workerQueue, address string, status string) { switch status { case Ready: log.Infof("zmq/broker: ready worker = %s", address) @@ -237,11 +244,11 @@ func (lb *Broker) handleWorkerStatus(workers *workerQueue, address []byte, statu } type pworker struct { - address []byte + address string expiry time.Time } -func newWorker(address []byte) pworker { +func newWorker(address string) pworker { return pworker{ address, time.Now().Add(HeartbeatLiveness * HeartbeatInterval), @@ -267,7 +274,7 @@ func (q workerQueue) Len() int { return len(q.pworkers) } -func (q *workerQueue) Next() []byte { +func (q *workerQueue) Next() string { workers := q.pworkers worker := workers[len(workers)-1] q.pworkers = workers[:len(workers)-1] @@ -275,7 +282,7 @@ func (q *workerQueue) Next() []byte { } func (q *workerQueue) Add(worker pworker) { - q.addresses[string(worker.address)] = true + q.addresses[worker.address] = true err := q.Tick(worker) if err == nil { return @@ -283,13 +290,13 @@ func (q *workerQueue) Add(worker pworker) { } func (q *workerQueue) Tick(worker pworker) error { - if _, ok := q.addresses[string(worker.address)]; !ok { + if _, ok := q.addresses[worker.address]; !ok { return errors.New(fmt.Sprintf("zmq/broker: Ticking non-registered worker = %s", worker.address)) } workers := q.pworkers for i, w := range workers { - if bytes.Equal(w.address, worker.address) { + if w.address == worker.address { q.pworkers = append(append(workers[:i], workers[i+1:]...), worker) return nil } @@ -308,17 +315,17 @@ func (q *workerQueue) Purge() { break } q.pworkers = workers[i+1:] - delete(q.addresses, string(w.address)) + delete(q.addresses, w.address) log.Infof("zmq/broker: disconnected worker = %s", w.address) } } -func (q *workerQueue) Remove(address []byte) { - delete(q.addresses, string(address)) +func (q *workerQueue) Remove(address string) { + delete(q.addresses, address) workers := q.pworkers for i, w := range workers { - if bytes.Equal(w.address, address) { + if w.address == address { q.pworkers = append(workers[:i], workers[i+1:]...) break } diff --git a/pkg/server/plugin/zmq/broker_test.go b/pkg/server/plugin/zmq/broker_test.go index 90e582993..1039b439d 100644 --- a/pkg/server/plugin/zmq/broker_test.go +++ b/pkg/server/plugin/zmq/broker_test.go @@ -53,9 +53,9 @@ func bytesArray(ss ...string) (bs [][]byte) { func TestWorker(t *testing.T) { Convey("Test workerQueue", t, func() { - address1 := []byte("address1") - address2 := []byte("address2") - address3 := []byte("address3") + address1 := "address1" + address2 := "address2" + address3 := "address3" Convey("Add and pick a worker", func() { q := newWorkerQueue() diff --git a/pkg/server/plugin/zmq/rpc.go b/pkg/server/plugin/zmq/rpc.go index e412c5cb3..630053584 100644 --- a/pkg/server/plugin/zmq/rpc.go +++ b/pkg/server/plugin/zmq/rpc.go @@ -135,7 +135,7 @@ func (p *zmqTransport) RequestInit() { continue } - p.logger.Debugf("zmq transport got fresh worker %s", string(address)) + p.logger.Debugf("zmq transport got fresh worker %s", address) // TODO: Only send init to the new address. For now, we let // the broker decide. From ea6e19a284aece9e14346f68b614fdc3dd588a95 Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Fri, 14 Oct 2016 19:01:35 +0800 Subject: [PATCH 05/12] Remove heartbeat logs --- pkg/server/plugin/zmq/broker.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index 2d89de564..810ead9fd 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -153,14 +153,12 @@ func (lb *Broker) Run() { panic("zmq/broker: received unknown socket") } - lb.logger.Debugf("zmq/broker: idle worker count %d\n", lb.workers.Len()) if heartbeatAt.Before(time.Now()) { for _, worker := range lb.workers.pworkers { msg := [][]byte{ []byte(worker.address), []byte(Heartbeat), } - lb.logger.Debugf("zmq/broker: server => plugin Heartbeat: %s\n", worker.address) lb.backend.SendMessage(msg) } heartbeatAt = time.Now().Add(HeartbeatInterval) From b4c4dc8b1c4009fea53155ac9079f88c968171e2 Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Mon, 17 Oct 2016 11:30:36 +0800 Subject: [PATCH 06/12] Properly break the channler on exit --- pkg/server/plugin/zmq/broker.go | 15 ++++---- pkg/server/plugin/zmq/broker_test.go | 57 ++++++++++++++++++++++------ 2 files changed, 54 insertions(+), 18 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index 810ead9fd..9817b3678 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -169,7 +169,8 @@ func (lb *Broker) Run() { } func (lb *Broker) Channler() { - lb.logger.Infof("zmq channler running %p", lb) + lb.logger.Infof("zmq channler running %p\n", lb) +ChannlerLoop: for { select { case frames := <-lb.frontend: @@ -178,8 +179,8 @@ func (lb *Broker) Channler() { address := string(frames[0]) respChan, ok := lb.addressChan[address] if !ok { - lb.logger.Infof("zmq/broker: chan not found for worker %#x\n", address) - return + lb.logger.Infof("zmq/broker: chan not found for worker %s\n", address) + break } delete(lb.addressChan, address) respChan <- frames[2] @@ -200,16 +201,16 @@ func (lb *Broker) Channler() { case address := <-lb.timeout: respChan, ok := lb.addressChan[address] if !ok { - return + break } - lb.logger.Infof("zmq/broker: chan time out for worker %#x\n", address) + lb.logger.Infof("zmq/broker: chan time out for worker %s\n", address) delete(lb.addressChan, address) respChan <- []byte{0} case <-lb.stop: - break + break ChannlerLoop } } - lb.logger.Infof("zmq channler stopped %p!", lb) + lb.logger.Infof("zmq channler stopped %p!\n", lb) } func (lb *Broker) RPC(requestChan chan chan []byte, in []byte) { diff --git a/pkg/server/plugin/zmq/broker_test.go b/pkg/server/plugin/zmq/broker_test.go index 1039b439d..49e7902c1 100644 --- a/pkg/server/plugin/zmq/broker_test.go +++ b/pkg/server/plugin/zmq/broker_test.go @@ -119,7 +119,7 @@ func TestBrokerWorker(t *testing.T) { const ( workerAddr = "inproc://plugin.test" ) - broker, err := NewBroker("", workerAddr) + broker, err := NewBroker("test", workerAddr) if err != nil { t.Fatalf("Failed to init broker: %v", err) } @@ -129,7 +129,10 @@ func TestBrokerWorker(t *testing.T) { Convey("Test Broker with worker", t, func() { Convey("receive Ready signal will register the worker", func() { w := workerSock(t, "ready", workerAddr) - defer w.Destroy() + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() w.SendMessage(bytesArray(Ready)) <-broker.freshWorkers @@ -139,11 +142,17 @@ func TestBrokerWorker(t *testing.T) { Convey("receive multiple Ready signal will register all workers", func() { w1 := workerSock(t, "ready1", workerAddr) - defer w1.Destroy() + defer func() { + w1.SendMessage(bytesArray(Shutdown)) + w1.Destroy() + }() w1.SendMessage(bytesArray(Ready)) <-broker.freshWorkers w2 := workerSock(t, "ready2", workerAddr) - defer w2.Destroy() + defer func() { + w2.SendMessage(bytesArray(Shutdown)) + w2.Destroy() + }() w2.SendMessage(bytesArray(Ready)) <-broker.freshWorkers @@ -152,19 +161,41 @@ func TestBrokerWorker(t *testing.T) { w2.SendMessage(bytesArray(Shutdown)) }) - Convey("reveice Heartbeat without Reay will not register the worker", func() { + Convey("reveice Heartbeat without Ready will not register the worker", func() { w := workerSock(t, "heartbeat", workerAddr) - defer w.Destroy() + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() w.SendMessage(bytesArray(Heartbeat)) // Wait the poller to get the message time.Sleep(HeartbeatInterval) So(broker.workers.Len(), ShouldEqual, 0) }) + Convey("reveice message without Reay will be ignored", func() { + w := workerSock(t, "unregistered", workerAddr) + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() + w.SendMessage([][]byte{ + []byte("unregistered"), + []byte{0}, + []byte("Message to be ignored"), + }) + // Wait the poller to get the message + time.Sleep(HeartbeatInterval) + So(broker.workers.Len(), ShouldEqual, 0) + }) + Convey("broker RPC will timeout", func() { - w := workerSock(t, "worker", workerAddr) + w := workerSock(t, "timeout", workerAddr) w.SetRcvtimeo(heartbeatIntervalMS) - defer w.Destroy() + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() w.SendMessage(bytesArray(Ready)) <-broker.freshWorkers @@ -180,7 +211,10 @@ func TestBrokerWorker(t *testing.T) { Convey("broker RPC recive worker reply", func() { w := workerSock(t, "worker", workerAddr) w.SetRcvtimeo(heartbeatIntervalMS) - defer w.Destroy() + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() w.SendMessage(bytesArray(Ready)) <-broker.freshWorkers So(broker.workers.Len(), ShouldEqual, 1) @@ -200,7 +234,7 @@ func TestBrokerWorker(t *testing.T) { }) Convey("send message from server to multple plugin", func() { - w := workerSock(t, "worker", workerAddr) + w := workerSock(t, "worker1", workerAddr) w.SetRcvtimeo(heartbeatIntervalMS) defer func() { w.SendMessage(bytesArray(Shutdown)) @@ -234,7 +268,7 @@ func TestBrokerWorker(t *testing.T) { }) Convey("send multiple message from server to multple plugin", func(c C) { - w := workerSock(t, "mworker", workerAddr) + w := workerSock(t, "mworker1", workerAddr) w.SetRcvtimeo(heartbeatIntervalMS) defer func() { w.SendMessage(bytesArray(Shutdown)) @@ -293,4 +327,5 @@ func TestBrokerWorker(t *testing.T) { }) }) + broker.stop <- 1 } From 986b64fc8f42e70a64040075f1a48ef10a822718 Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Mon, 17 Oct 2016 19:06:53 +0800 Subject: [PATCH 07/12] Add RPC timeout and retry on no worker ready yet This will properly handle worker crash on init by timeout. And in case of all workers crash, the channeler will not crash. Just all RPC will timeout. Also add mutex lock to make the workerQueue goroutine safe. --- pkg/server/plugin/zmq/broker.go | 32 +++++++++++++++++++++ pkg/server/plugin/zmq/broker_test.go | 42 ++++++++++++++++++++++++++-- pkg/server/plugin/zmq/rpc.go | 7 +++-- 3 files changed, 75 insertions(+), 6 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index 9817b3678..5afb954b0 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -19,6 +19,7 @@ package zmq import ( "errors" "fmt" + "sync" "time" "github.com/Sirupsen/logrus" @@ -53,12 +54,14 @@ const ( type parcel struct { respChan chan []byte frame []byte + retry int } func newParcel(frame []byte) *parcel { return &parcel{ respChan: make(chan []byte), frame: frame, + retry: 0, } } @@ -186,7 +189,22 @@ ChannlerLoop: respChan <- frames[2] case p := <-lb.recvChan: // Save the chan and dispatch the message to zmq + // If current no worker ready, will retry after HeartbeatInterval. + // Retry for HeartbeatLiveness tine address := lb.workers.Next() + if address == "" { + if p.retry < HeartbeatLiveness { + p.retry += 1 + lb.logger.Infof("zmq/broker: no worker avaliable, retry %d...\n", p.retry) + go func() { + time.Sleep(HeartbeatInterval) + lb.recvChan <- p + }() + break + } + lb.logger.Infof("zmq/broker: no worker avaliable, timeout.\n") + p.respChan <- []byte{0} + } addr := []byte(address) frames := [][]byte{ addr, @@ -260,12 +278,15 @@ func newWorker(address string) pworker { type workerQueue struct { pworkers []pworker addresses map[string]bool + mu *sync.Mutex } func newWorkerQueue() workerQueue { + mu := new(sync.Mutex) return workerQueue{ []pworker{}, map[string]bool{}, + mu, } } @@ -274,6 +295,11 @@ func (q workerQueue) Len() int { } func (q *workerQueue) Next() string { + q.mu.Lock() + defer q.mu.Unlock() + if q.Len() == 0 { + return "" + } workers := q.pworkers worker := workers[len(workers)-1] q.pworkers = workers[:len(workers)-1] @@ -289,6 +315,8 @@ func (q *workerQueue) Add(worker pworker) { } func (q *workerQueue) Tick(worker pworker) error { + q.mu.Lock() + defer q.mu.Unlock() if _, ok := q.addresses[worker.address]; !ok { return errors.New(fmt.Sprintf("zmq/broker: Ticking non-registered worker = %s", worker.address)) } @@ -306,6 +334,8 @@ func (q *workerQueue) Tick(worker pworker) error { } func (q *workerQueue) Purge() { + q.mu.Lock() + defer q.mu.Unlock() workers := q.pworkers now := time.Now() @@ -320,6 +350,8 @@ func (q *workerQueue) Purge() { } func (q *workerQueue) Remove(address string) { + q.mu.Lock() + defer q.mu.Unlock() delete(q.addresses, address) workers := q.pworkers diff --git a/pkg/server/plugin/zmq/broker_test.go b/pkg/server/plugin/zmq/broker_test.go index 49e7902c1..8387b85ec 100644 --- a/pkg/server/plugin/zmq/broker_test.go +++ b/pkg/server/plugin/zmq/broker_test.go @@ -126,7 +126,7 @@ func TestBrokerWorker(t *testing.T) { go broker.Run() go broker.Channler() - Convey("Test Broker with worker", t, func() { + Convey("Test Broker", t, func() { Convey("receive Ready signal will register the worker", func() { w := workerSock(t, "ready", workerAddr) defer func() { @@ -173,7 +173,7 @@ func TestBrokerWorker(t *testing.T) { So(broker.workers.Len(), ShouldEqual, 0) }) - Convey("reveice message without Reay will be ignored", func() { + Convey("reveice worker message without Reay will be ignored", func() { w := workerSock(t, "unregistered", workerAddr) defer func() { w.SendMessage(bytesArray(Shutdown)) @@ -189,7 +189,7 @@ func TestBrokerWorker(t *testing.T) { So(broker.workers.Len(), ShouldEqual, 0) }) - Convey("broker RPC will timeout", func() { + Convey("receive RPC will timeout", func() { w := workerSock(t, "timeout", workerAddr) w.SetRcvtimeo(heartbeatIntervalMS) defer func() { @@ -208,6 +208,42 @@ func TestBrokerWorker(t *testing.T) { So(msg, ShouldResemble, []byte{0}) }) + Convey("recive RPC without Ready worker will wait for Heartbeat Liveness time", func() { + reqChan := make(chan chan []byte) + timeout := time.Now().Add(HeartbeatInterval * HeartbeatLiveness) + broker.RPC(reqChan, []byte(("from server"))) + respChan := <-reqChan + resp := <-respChan + So(resp, ShouldResemble, []byte{0}) + So(time.Now(), ShouldHappenAfter, timeout) + }) + + Convey("worker after recive RPC and before timeout will got the message", func() { + reqChan := make(chan chan []byte) + broker.RPC(reqChan, []byte(("from server"))) + respChan := <-reqChan + time.Sleep(HeartbeatInterval) + + w := workerSock(t, "lateworker", workerAddr) + w.SetRcvtimeo(heartbeatIntervalMS) + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() + w.SendMessage(bytesArray(Ready)) + <-broker.freshWorkers + So(broker.workers.Len(), ShouldEqual, 1) + + msg, _ := w.RecvMessage() + So(len(msg), ShouldEqual, 3) + So(msg[2], ShouldResemble, []byte("from server")) + msg[2] = []byte("from worker") + w.SendMessage(msg) + + resp := <-respChan + So(resp, ShouldResemble, []byte("from worker")) + }) + Convey("broker RPC recive worker reply", func() { w := workerSock(t, "worker", workerAddr) w.SetRcvtimeo(heartbeatIntervalMS) diff --git a/pkg/server/plugin/zmq/rpc.go b/pkg/server/plugin/zmq/rpc.go index 630053584..3206c42c6 100644 --- a/pkg/server/plugin/zmq/rpc.go +++ b/pkg/server/plugin/zmq/rpc.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "runtime/debug" + "time" "github.com/Sirupsen/logrus" @@ -33,14 +34,13 @@ import ( "golang.org/x/net/context" ) -const initRequestTimeout = 2000 +const initRequestTimeout = time.Second * 2 type request struct { Context context.Context Kind string Name string Param interface{} - Timeout int // timeout in millisecond } type hookRequest struct { @@ -154,13 +154,14 @@ func (p *zmqTransport) RunInit() (out []byte, err error) { param := struct { Config skyconfig.Configuration `json:"config"` }{p.config} - req := request{Kind: "init", Param: param, Timeout: initRequestTimeout} + req := request{Kind: "init", Param: param} for { out, err = p.ipc(&req) if err == nil { break } p.logger.WithField("err", err).Warnf(`zmq/rpc: Unable to send init request to plugin "%s". Retrying...`, p.name) + time.Sleep(initRequestTimeout) } return } From 637b2ef437e616cec61db53d018c943b6a6a09c0 Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Wed, 19 Oct 2016 13:34:53 +0800 Subject: [PATCH 08/12] Fix goroutine halt on multiple worker land with init --- pkg/server/plugin/zmq/broker.go | 47 +++++++++++++------------ pkg/server/plugin/zmq/broker_test.go | 52 +++++++++++++--------------- pkg/server/plugin/zmq/rpc.go | 27 +++++---------- 3 files changed, 58 insertions(+), 68 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index 5afb954b0..e78bfc1ee 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -82,7 +82,6 @@ type Broker struct { addressChan map[string]chan []byte timeout chan string workers workerQueue - freshWorkers chan string logger *logrus.Entry stop chan int } @@ -108,12 +107,11 @@ func NewBroker(name, backendAddr string) (*Broker, error) { name: name, backend: backend, backendPoller: backendPoller, - frontend: make(chan [][]byte), - recvChan: make(chan *parcel), + frontend: make(chan [][]byte, 10), + recvChan: make(chan *parcel, 10), addressChan: map[string]chan []byte{}, timeout: make(chan string), workers: newWorkerQueue(), - freshWorkers: make(chan string, 1), logger: namedLogger, stop: make(chan int), }, nil @@ -122,10 +120,10 @@ func NewBroker(name, backendAddr string) (*Broker, error) { // Run kicks start the Broker and listens for requests. It blocks function // execution. func (lb *Broker) Run() { - lb.logger.Infof("Running zmq broker") heartbeatAt := time.Now().Add(HeartbeatInterval) for { sock := lb.backendPoller.Wait(heartbeatIntervalMS) + lb.workers.Lock() switch sock { case lb.backend: @@ -145,7 +143,7 @@ func (lb *Broker) Run() { } if len(msg) == 1 { status := string(msg[0]) - lb.handleWorkerStatus(&lb.workers, address, status) + lb.handleWorkerStatus(address, status) } else { lb.frontend <- msg lb.logger.Debugf("zmq/broker: plugin => server: %#x, %s\n", msg[0], msg) @@ -168,6 +166,7 @@ func (lb *Broker) Run() { } lb.workers.Purge() + lb.workers.Unlock() } } @@ -196,14 +195,15 @@ ChannlerLoop: if p.retry < HeartbeatLiveness { p.retry += 1 lb.logger.Infof("zmq/broker: no worker avaliable, retry %d...\n", p.retry) - go func() { + go func(p2 *parcel) { time.Sleep(HeartbeatInterval) - lb.recvChan <- p - }() + lb.recvChan <- p2 + }(p) break } lb.logger.Infof("zmq/broker: no worker avaliable, timeout.\n") p.respChan <- []byte{0} + break } addr := []byte(address) frames := [][]byte{ @@ -244,16 +244,15 @@ func (lb *Broker) setTimeout(address string, wait time.Duration) { lb.timeout <- address } -func (lb *Broker) handleWorkerStatus(workers *workerQueue, address string, status string) { +func (lb *Broker) handleWorkerStatus(address string, status string) { switch status { case Ready: log.Infof("zmq/broker: ready worker = %s", address) - workers.Add(newWorker(address)) - lb.freshWorkers <- address + lb.workers.Add(newWorker(address)) case Heartbeat: // no-op case Shutdown: - workers.Remove(address) + lb.workers.Remove(address) log.Infof("zmq/broker: shutdown of worker = %s", address) default: log.Errorf("zmq/broker: invalid status from worker = %s: %s", address, status) @@ -282,22 +281,32 @@ type workerQueue struct { } func newWorkerQueue() workerQueue { - mu := new(sync.Mutex) + mu := &sync.Mutex{} return workerQueue{ []pworker{}, map[string]bool{}, mu, } } +func (q *workerQueue) Lock() { + q.mu.Lock() +} -func (q workerQueue) Len() int { +func (q *workerQueue) Unlock() { + q.mu.Unlock() +} + +func (q *workerQueue) Len() int { + q.mu.Lock() + defer q.mu.Unlock() return len(q.pworkers) } func (q *workerQueue) Next() string { q.mu.Lock() defer q.mu.Unlock() - if q.Len() == 0 { + cnt := len(q.pworkers) + if cnt == 0 { return "" } workers := q.pworkers @@ -315,8 +324,6 @@ func (q *workerQueue) Add(worker pworker) { } func (q *workerQueue) Tick(worker pworker) error { - q.mu.Lock() - defer q.mu.Unlock() if _, ok := q.addresses[worker.address]; !ok { return errors.New(fmt.Sprintf("zmq/broker: Ticking non-registered worker = %s", worker.address)) } @@ -334,8 +341,6 @@ func (q *workerQueue) Tick(worker pworker) error { } func (q *workerQueue) Purge() { - q.mu.Lock() - defer q.mu.Unlock() workers := q.pworkers now := time.Now() @@ -350,8 +355,6 @@ func (q *workerQueue) Purge() { } func (q *workerQueue) Remove(address string) { - q.mu.Lock() - defer q.mu.Unlock() delete(q.addresses, address) workers := q.pworkers diff --git a/pkg/server/plugin/zmq/broker_test.go b/pkg/server/plugin/zmq/broker_test.go index 8387b85ec..f8d4a7631 100644 --- a/pkg/server/plugin/zmq/broker_test.go +++ b/pkg/server/plugin/zmq/broker_test.go @@ -44,6 +44,17 @@ func clientSock(t *testing.T, id string, addr string) *goczmq.Sock { return sock } +func recvNonControlFrame(w *goczmq.Sock) [][]byte { + var msg [][]byte + for { + msg, _ = w.RecvMessage() + if len(msg) != 1 { + break + } + } + return msg +} + func bytesArray(ss ...string) (bs [][]byte) { for _, s := range ss { bs = append(bs, []byte(s)) @@ -134,7 +145,7 @@ func TestBrokerWorker(t *testing.T) { w.Destroy() }() w.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers + time.Sleep(HeartbeatInterval) So(broker.workers.Len(), ShouldEqual, 1) w.SendMessage(bytesArray(Shutdown)) @@ -147,14 +158,13 @@ func TestBrokerWorker(t *testing.T) { w1.Destroy() }() w1.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers w2 := workerSock(t, "ready2", workerAddr) defer func() { w2.SendMessage(bytesArray(Shutdown)) w2.Destroy() }() w2.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers + time.Sleep(HeartbeatInterval) So(broker.workers.Len(), ShouldEqual, 2) w1.SendMessage(bytesArray(Shutdown)) @@ -197,7 +207,7 @@ func TestBrokerWorker(t *testing.T) { w.Destroy() }() w.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers + time.Sleep(HeartbeatInterval) So(broker.workers.Len(), ShouldEqual, 1) @@ -225,16 +235,14 @@ func TestBrokerWorker(t *testing.T) { time.Sleep(HeartbeatInterval) w := workerSock(t, "lateworker", workerAddr) - w.SetRcvtimeo(heartbeatIntervalMS) + w.SetRcvtimeo(heartbeatIntervalMS * 2) defer func() { w.SendMessage(bytesArray(Shutdown)) w.Destroy() }() w.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers - So(broker.workers.Len(), ShouldEqual, 1) - msg, _ := w.RecvMessage() + msg := recvNonControlFrame(w) So(len(msg), ShouldEqual, 3) So(msg[2], ShouldResemble, []byte("from server")) msg[2] = []byte("from worker") @@ -246,20 +254,18 @@ func TestBrokerWorker(t *testing.T) { Convey("broker RPC recive worker reply", func() { w := workerSock(t, "worker", workerAddr) - w.SetRcvtimeo(heartbeatIntervalMS) + w.SetRcvtimeo(heartbeatIntervalMS * 2) defer func() { w.SendMessage(bytesArray(Shutdown)) w.Destroy() }() w.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers - So(broker.workers.Len(), ShouldEqual, 1) reqChan := make(chan chan []byte) broker.RPC(reqChan, []byte(("from server"))) respChan := <-reqChan - msg, _ := w.RecvMessage() + msg := recvNonControlFrame(w) So(len(msg), ShouldEqual, 3) So(msg[2], ShouldResemble, []byte("from server")) msg[2] = []byte("from worker") @@ -271,29 +277,25 @@ func TestBrokerWorker(t *testing.T) { Convey("send message from server to multple plugin", func() { w := workerSock(t, "worker1", workerAddr) - w.SetRcvtimeo(heartbeatIntervalMS) + w.SetRcvtimeo(heartbeatIntervalMS * 2) defer func() { w.SendMessage(bytesArray(Shutdown)) w.Destroy() }() w.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers w2 := workerSock(t, "worker2", workerAddr) - w2.SetRcvtimeo(heartbeatIntervalMS) + w2.SetRcvtimeo(heartbeatIntervalMS * 2) defer func() { w2.SendMessage(bytesArray(Shutdown)) w2.Destroy() }() w2.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers - - So(broker.workers.Len(), ShouldEqual, 2) reqChan := make(chan chan []byte) broker.RPC(reqChan, []byte(("from server"))) respChan := <-reqChan - msg, _ := w2.RecvMessage() + msg := recvNonControlFrame(w2) So(len(msg), ShouldEqual, 3) So(msg[2], ShouldResemble, []byte("from server")) msg[2] = []byte("from worker2") @@ -305,23 +307,19 @@ func TestBrokerWorker(t *testing.T) { Convey("send multiple message from server to multple plugin", func(c C) { w := workerSock(t, "mworker1", workerAddr) - w.SetRcvtimeo(heartbeatIntervalMS) + w.SetRcvtimeo(heartbeatIntervalMS * 2) defer func() { w.SendMessage(bytesArray(Shutdown)) w.Destroy() }() w.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers w2 := workerSock(t, "mworker2", workerAddr) - w2.SetRcvtimeo(heartbeatIntervalMS) + w2.SetRcvtimeo(heartbeatIntervalMS * 2) defer func() { w2.SendMessage(bytesArray(Shutdown)) w2.Destroy() }() w2.SendMessage(bytesArray(Ready)) - <-broker.freshWorkers - - So(broker.workers.Len(), ShouldEqual, 2) reqChan := make(chan chan []byte) broker.RPC(reqChan, []byte(("from server"))) @@ -332,7 +330,7 @@ func TestBrokerWorker(t *testing.T) { resp2Chan := <-req2Chan go func() { - msg, _ := w.RecvMessage() + msg := recvNonControlFrame(w) c.So(len(msg), ShouldEqual, 3) c.So(msg[2], ShouldResemble, []byte("from server")) msg[2] = []byte("from worker1") @@ -340,7 +338,7 @@ func TestBrokerWorker(t *testing.T) { }() go func() { - msg, _ := w2.RecvMessage() + msg := recvNonControlFrame(w2) c.So(len(msg), ShouldEqual, 3) c.So(msg[2], ShouldResemble, []byte("from server")) msg[2] = []byte("from worker2") diff --git a/pkg/server/plugin/zmq/rpc.go b/pkg/server/plugin/zmq/rpc.go index 3206c42c6..92db6c2f1 100644 --- a/pkg/server/plugin/zmq/rpc.go +++ b/pkg/server/plugin/zmq/rpc.go @@ -125,21 +125,16 @@ func (p *zmqTransport) setState(state skyplugin.TransportState) { } } +// RequestInit is expected to run in separate gorountine and called once to +// set it internal state with coordinate with broker. func (p *zmqTransport) RequestInit() { for { - address := <-p.broker.freshWorkers - - if p.state != skyplugin.TransportStateUninitialized { - // Although the plugin is only initialized once, we need - // to clear the channel buffer so that broker doesn't get stuck + out, err := p.RunInit() + if err != nil { + p.logger.WithField("err", err). + Warnf(`zmq/rpc: Unable to send init request to plugin "%s". Retrying...`, p.name) continue } - - p.logger.Debugf("zmq transport got fresh worker %s", address) - - // TODO: Only send init to the new address. For now, we let - // the broker decide. - out, err := p.RunInit() if p.initHandler != nil { handlerError := p.initHandler(out, err) if err != nil || handlerError != nil { @@ -147,6 +142,7 @@ func (p *zmqTransport) RequestInit() { } } p.setState(skyplugin.TransportStateReady) + break } } @@ -155,14 +151,7 @@ func (p *zmqTransport) RunInit() (out []byte, err error) { Config skyconfig.Configuration `json:"config"` }{p.config} req := request{Kind: "init", Param: param} - for { - out, err = p.ipc(&req) - if err == nil { - break - } - p.logger.WithField("err", err).Warnf(`zmq/rpc: Unable to send init request to plugin "%s". Retrying...`, p.name) - time.Sleep(initRequestTimeout) - } + out, err = p.ipc(&req) return } From 740bffaec0a0158734b30d33527d676a4ade6490 Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Tue, 18 Oct 2016 14:20:05 +0800 Subject: [PATCH 09/12] Correct spelling Channler -> Channeler --- pkg/server/plugin/zmq/broker.go | 6 +++--- pkg/server/plugin/zmq/broker_test.go | 2 +- pkg/server/plugin/zmq/rpc.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index e78bfc1ee..e866aa0dd 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -170,9 +170,9 @@ func (lb *Broker) Run() { } } -func (lb *Broker) Channler() { +func (lb *Broker) Channeler() { lb.logger.Infof("zmq channler running %p\n", lb) -ChannlerLoop: +ChannelerLoop: for { select { case frames := <-lb.frontend: @@ -225,7 +225,7 @@ ChannlerLoop: delete(lb.addressChan, address) respChan <- []byte{0} case <-lb.stop: - break ChannlerLoop + break ChannelerLoop } } lb.logger.Infof("zmq channler stopped %p!\n", lb) diff --git a/pkg/server/plugin/zmq/broker_test.go b/pkg/server/plugin/zmq/broker_test.go index f8d4a7631..766be049c 100644 --- a/pkg/server/plugin/zmq/broker_test.go +++ b/pkg/server/plugin/zmq/broker_test.go @@ -135,7 +135,7 @@ func TestBrokerWorker(t *testing.T) { t.Fatalf("Failed to init broker: %v", err) } go broker.Run() - go broker.Channler() + go broker.Channeler() Convey("Test Broker", t, func() { Convey("receive Ready signal will register the worker", func() { diff --git a/pkg/server/plugin/zmq/rpc.go b/pkg/server/plugin/zmq/rpc.go index 92db6c2f1..75f63af77 100644 --- a/pkg/server/plugin/zmq/rpc.go +++ b/pkg/server/plugin/zmq/rpc.go @@ -280,7 +280,7 @@ func (f zmqTransportFactory) Open(name string, args []string, config skyconfig.C broker.Run() }() go func() { - broker.Channler() + broker.Channeler() }() return &p From 71ea4723fe4dda5695fa21263c0b6626149cae66 Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Tue, 18 Oct 2016 14:52:22 +0800 Subject: [PATCH 10/12] Comment on workerQueue behaviour --- pkg/server/plugin/zmq/broker.go | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index e866aa0dd..7c0c7796f 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -65,9 +65,9 @@ func newParcel(frame []byte) *parcel { } } -// Broker implements the Paranoid Pirate queue described in the zguide: -// http://zguide.zeromq.org/py:all#Robust-Reliable-Queuing-Paranoid-Pirate-Pattern +// Broker implements the Paranoid Pirate queue described as follow: // Related RFC: https://rfc.zeromq.org/spec:6/PPP +// refs: http://zguide.zeromq.org/py:all#Robust-Reliable-Queuing-Paranoid-Pirate-Pattern // with the addition of: // // 1. Shutdown signal, which signifies a normal termination of worker to provide @@ -117,8 +117,7 @@ func NewBroker(name, backendAddr string) (*Broker, error) { }, nil } -// Run kicks start the Broker and listens for requests. It blocks function -// execution. +// Run the Broker and listens for zmq requests. func (lb *Broker) Run() { heartbeatAt := time.Now().Add(HeartbeatInterval) for { @@ -170,6 +169,8 @@ func (lb *Broker) Run() { } } +// Channeler accept message from RPC and dispatch to zmq if avalible. +// It retry and timeout the request if the zmq worker is not yet avalible. func (lb *Broker) Channeler() { lb.logger.Infof("zmq channler running %p\n", lb) ChannelerLoop: @@ -272,8 +273,17 @@ func newWorker(address string) pworker { } // workerQueue is a last tick fist out queue. -// A worker need to register itself using Add before it can tick. -// Ticking of an non-registered worker will be no-ops. +// +// Worker is expect to register itself on ready. Tick itself when it is +// avalible. The most recently Tick worker will got the job. +// A worker do not Tick itself within the expiry will regards as disconnected +// and requires to Add itself again to become avaliable. +// +// workerQueue is not goroutine safe. To use it safely across goroutine. +// Please use the Lock/Unlock interace before manupliate the queue item via +// methods like Add/Tick/Purge. +// Comsuming method Next is the only method will acquire the mutex lock +// by itself. type workerQueue struct { pworkers []pworker addresses map[string]bool @@ -302,6 +312,9 @@ func (q *workerQueue) Len() int { return len(q.pworkers) } +// Next will pop the next avaliable worker, and the worker will not avalible +// until it Tick back to the workerQueue again. +// This worker comsuing method will acquire mutex lock. func (q *workerQueue) Next() string { q.mu.Lock() defer q.mu.Unlock() @@ -315,6 +328,8 @@ func (q *workerQueue) Next() string { return worker.address } +// Add will register the worker as live worker and call Tick to make itself to +// the next avaliable worker. func (q *workerQueue) Add(worker pworker) { q.addresses[worker.address] = true err := q.Tick(worker) @@ -323,6 +338,8 @@ func (q *workerQueue) Add(worker pworker) { } } +// Tick will make the worker to be the next avalible worker. Ticking an un +// registered worker will be no-op. func (q *workerQueue) Tick(worker pworker) error { if _, ok := q.addresses[worker.address]; !ok { return errors.New(fmt.Sprintf("zmq/broker: Ticking non-registered worker = %s", worker.address)) @@ -340,6 +357,8 @@ func (q *workerQueue) Tick(worker pworker) error { return nil } +// Purge will unregister the worker that is not heathly. i.e. haven't Tick for +// a while. func (q *workerQueue) Purge() { workers := q.pworkers @@ -354,6 +373,8 @@ func (q *workerQueue) Purge() { } } +// Remove will unregister the worker with specified address regardless of its +// expiry. Intented for clean shutdown and fast removal of worker. func (q *workerQueue) Remove(address string) { delete(q.addresses, address) workers := q.pworkers From 60ca9f14acb1458a9933b967399da52b2c758acf Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Wed, 19 Oct 2016 18:51:30 +0800 Subject: [PATCH 11/12] Make the broker channler and Run goroutine safe --- pkg/server/plugin/zmq/broker.go | 81 ++++++++++++--------- pkg/server/plugin/zmq/broker_test.go | 104 +++++++++++++++------------ 2 files changed, 106 insertions(+), 79 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index 7c0c7796f..eec293982 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -73,17 +73,15 @@ func newParcel(frame []byte) *parcel { // 1. Shutdown signal, which signifies a normal termination of worker to provide // a fast path of worker removal type Broker struct { - name string - backend *goczmq.Sock - bothPoller *goczmq.Poller - backendPoller *goczmq.Poller - frontend chan [][]byte - recvChan chan *parcel - addressChan map[string]chan []byte - timeout chan string - workers workerQueue - logger *logrus.Entry - stop chan int + name string + backendAddr string + frontend chan [][]byte + recvChan chan *parcel + addressChan map[string]chan []byte + timeout chan string + workers workerQueue + logger *logrus.Entry + stop chan int } // NewBroker returns a new *Broker. @@ -93,40 +91,46 @@ func NewBroker(name, backendAddr string) (*Broker, error) { "eaddr": backendAddr, }) - backend, err := goczmq.NewRouter(backendAddr) + return &Broker{ + name: name, + backendAddr: backendAddr, + frontend: make(chan [][]byte, 10), + recvChan: make(chan *parcel, 10), + addressChan: map[string]chan []byte{}, + timeout: make(chan string), + workers: newWorkerQueue(), + logger: namedLogger, + stop: make(chan int), + }, nil +} + +// Run the Broker and listens for zmq requests. +func (lb *Broker) Run() { + backend, err := goczmq.NewRouter(lb.backendAddr) if err != nil { panic(err) } + defer backend.Destroy() - backendPoller, err := goczmq.NewPoller(backend) + pull, err := goczmq.NewPull(fmt.Sprintf("inproc://chanpipeline%d", lb.name)) if err != nil { panic(err) } + defer pull.Destroy() - return &Broker{ - name: name, - backend: backend, - backendPoller: backendPoller, - frontend: make(chan [][]byte, 10), - recvChan: make(chan *parcel, 10), - addressChan: map[string]chan []byte{}, - timeout: make(chan string), - workers: newWorkerQueue(), - logger: namedLogger, - stop: make(chan int), - }, nil -} + backendPoller, err := goczmq.NewPoller(backend, pull) + if err != nil { + panic(err) + } -// Run the Broker and listens for zmq requests. -func (lb *Broker) Run() { heartbeatAt := time.Now().Add(HeartbeatInterval) for { - sock := lb.backendPoller.Wait(heartbeatIntervalMS) + sock := backendPoller.Wait(heartbeatIntervalMS) lb.workers.Lock() switch sock { - case lb.backend: - frames, err := lb.backend.RecvMessage() + case backend: + frames, err := backend.RecvMessage() if err != nil { panic(err) } @@ -147,6 +151,12 @@ func (lb *Broker) Run() { lb.frontend <- msg lb.logger.Debugf("zmq/broker: plugin => server: %#x, %s\n", msg[0], msg) } + case pull: + frames, err := pull.RecvMessage() + if err != nil { + panic(err) + } + backend.SendMessage(frames) case nil: // do nothing default: @@ -159,7 +169,7 @@ func (lb *Broker) Run() { []byte(worker.address), []byte(Heartbeat), } - lb.backend.SendMessage(msg) + backend.SendMessage(msg) } heartbeatAt = time.Now().Add(HeartbeatInterval) } @@ -173,6 +183,11 @@ func (lb *Broker) Run() { // It retry and timeout the request if the zmq worker is not yet avalible. func (lb *Broker) Channeler() { lb.logger.Infof("zmq channler running %p\n", lb) + push, err := goczmq.NewPush(fmt.Sprintf("inproc://chanpipeline%d", lb.name)) + if err != nil { + panic(err) + } + defer push.Destroy() ChannelerLoop: for { select { @@ -214,7 +229,7 @@ ChannelerLoop: p.frame, } lb.addressChan[address] = p.respChan - lb.backend.SendMessage(frames) + push.SendMessage(frames) lb.logger.Debugf("zmq/broker: channel => zmq: %#x, %s\n", addr, frames) go lb.setTimeout(address, HeartbeatInterval*HeartbeatLiveness) case address := <-lb.timeout: diff --git a/pkg/server/plugin/zmq/broker_test.go b/pkg/server/plugin/zmq/broker_test.go index 766be049c..a7078c5ee 100644 --- a/pkg/server/plugin/zmq/broker_test.go +++ b/pkg/server/plugin/zmq/broker_test.go @@ -275,61 +275,58 @@ func TestBrokerWorker(t *testing.T) { So(resp, ShouldResemble, []byte("from worker")) }) - Convey("send message from server to multple plugin", func() { - w := workerSock(t, "worker1", workerAddr) - w.SetRcvtimeo(heartbeatIntervalMS * 2) - defer func() { - w.SendMessage(bytesArray(Shutdown)) - w.Destroy() + Convey("send message from server to multiple plugin", func(c C) { + go func() { + w := workerSock(t, "worker1", workerAddr) + w.SetRcvtimeo(heartbeatIntervalMS * 2) + defer func() { + w.SendMessage(bytesArray(Shutdown)) + time.Sleep((HeartbeatLiveness + 1) * HeartbeatInterval) + w.Destroy() + }() + w.SendMessage(bytesArray(Ready)) + msg := recvNonControlFrame(w) + if len(msg) == 3 { + c.So(msg[2], ShouldResemble, []byte("from server")) + msg[2] = []byte("from worker") + w.SendMessage(msg) + } }() - w.SendMessage(bytesArray(Ready)) - w2 := workerSock(t, "worker2", workerAddr) - w2.SetRcvtimeo(heartbeatIntervalMS * 2) - defer func() { - w2.SendMessage(bytesArray(Shutdown)) - w2.Destroy() + + go func() { + w2 := workerSock(t, "worker2", workerAddr) + w2.SetRcvtimeo(heartbeatIntervalMS * 2) + defer func() { + w2.SendMessage(bytesArray(Shutdown)) + time.Sleep((HeartbeatLiveness + 1) * HeartbeatInterval) + w2.Destroy() + }() + w2.SendMessage(bytesArray(Ready)) + msg := recvNonControlFrame(w2) + if len(msg) == 3 { + c.So(msg[2], ShouldResemble, []byte("from server")) + msg[2] = []byte("from worker") + w2.SendMessage(msg) + } }() - w2.SendMessage(bytesArray(Ready)) reqChan := make(chan chan []byte) broker.RPC(reqChan, []byte(("from server"))) respChan := <-reqChan - - msg := recvNonControlFrame(w2) - So(len(msg), ShouldEqual, 3) - So(msg[2], ShouldResemble, []byte("from server")) - msg[2] = []byte("from worker2") - w2.SendMessage(msg) - resp := <-respChan - So(resp, ShouldResemble, []byte("from worker2")) + So(resp, ShouldResemble, []byte("from worker")) }) Convey("send multiple message from server to multple plugin", func(c C) { - w := workerSock(t, "mworker1", workerAddr) - w.SetRcvtimeo(heartbeatIntervalMS * 2) - defer func() { - w.SendMessage(bytesArray(Shutdown)) - w.Destroy() - }() - w.SendMessage(bytesArray(Ready)) - w2 := workerSock(t, "mworker2", workerAddr) - w2.SetRcvtimeo(heartbeatIntervalMS * 2) - defer func() { - w2.SendMessage(bytesArray(Shutdown)) - w2.Destroy() - }() - w2.SendMessage(bytesArray(Ready)) - - reqChan := make(chan chan []byte) - broker.RPC(reqChan, []byte(("from server"))) - respChan := <-reqChan - - req2Chan := make(chan chan []byte) - broker.RPC(req2Chan, []byte(("from server"))) - resp2Chan := <-req2Chan - go func() { + w := workerSock(t, "mworker1", workerAddr) + w.SetRcvtimeo(heartbeatIntervalMS * 2) + defer func() { + w.SendMessage(bytesArray(Shutdown)) + w.Destroy() + }() + w.SendMessage(bytesArray(Ready)) + msg := recvNonControlFrame(w) c.So(len(msg), ShouldEqual, 3) c.So(msg[2], ShouldResemble, []byte("from server")) @@ -338,6 +335,14 @@ func TestBrokerWorker(t *testing.T) { }() go func() { + w2 := workerSock(t, "mworker2", workerAddr) + w2.SetRcvtimeo(heartbeatIntervalMS * 2) + defer func() { + w2.SendMessage(bytesArray(Shutdown)) + w2.Destroy() + }() + w2.SendMessage(bytesArray(Ready)) + msg := recvNonControlFrame(w2) c.So(len(msg), ShouldEqual, 3) c.So(msg[2], ShouldResemble, []byte("from server")) @@ -349,17 +354,24 @@ func TestBrokerWorker(t *testing.T) { wg.Add(2) go func() { defer wg.Done() + reqChan := make(chan chan []byte) + broker.RPC(reqChan, []byte(("from server"))) + respChan := <-reqChan resp := <-respChan - c.So(resp, ShouldResemble, []byte("from worker2")) + c.So(resp, ShouldNotBeEmpty) }() go func() { defer wg.Done() + req2Chan := make(chan chan []byte) + broker.RPC(req2Chan, []byte(("from server"))) + resp2Chan := <-req2Chan resp := <-resp2Chan - c.So(resp, ShouldResemble, []byte("from worker1")) + c.So(resp, ShouldNotBeEmpty) }() wg.Wait() }) }) broker.stop <- 1 + } From 2c9eda0e9b508107b67d2cb9439733e719b7060e Mon Sep 17 00:00:00 2001 From: Rick Mak Date: Wed, 19 Oct 2016 19:10:51 +0800 Subject: [PATCH 12/12] Kick start the go routine within NewBroker --- pkg/server/plugin/zmq/broker.go | 8 ++++++-- pkg/server/plugin/zmq/broker_test.go | 2 -- pkg/server/plugin/zmq/rpc.go | 7 ------- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/pkg/server/plugin/zmq/broker.go b/pkg/server/plugin/zmq/broker.go index eec293982..fe6c2eaeb 100644 --- a/pkg/server/plugin/zmq/broker.go +++ b/pkg/server/plugin/zmq/broker.go @@ -91,7 +91,7 @@ func NewBroker(name, backendAddr string) (*Broker, error) { "eaddr": backendAddr, }) - return &Broker{ + broker := &Broker{ name: name, backendAddr: backendAddr, frontend: make(chan [][]byte, 10), @@ -101,7 +101,11 @@ func NewBroker(name, backendAddr string) (*Broker, error) { workers: newWorkerQueue(), logger: namedLogger, stop: make(chan int), - }, nil + } + + go broker.Run() + go broker.Channeler() + return broker, nil } // Run the Broker and listens for zmq requests. diff --git a/pkg/server/plugin/zmq/broker_test.go b/pkg/server/plugin/zmq/broker_test.go index a7078c5ee..bebdd36b3 100644 --- a/pkg/server/plugin/zmq/broker_test.go +++ b/pkg/server/plugin/zmq/broker_test.go @@ -134,8 +134,6 @@ func TestBrokerWorker(t *testing.T) { if err != nil { t.Fatalf("Failed to init broker: %v", err) } - go broker.Run() - go broker.Channeler() Convey("Test Broker", t, func() { Convey("receive Ready signal will register the worker", func() { diff --git a/pkg/server/plugin/zmq/rpc.go b/pkg/server/plugin/zmq/rpc.go index 75f63af77..8c35ec2fd 100644 --- a/pkg/server/plugin/zmq/rpc.go +++ b/pkg/server/plugin/zmq/rpc.go @@ -276,13 +276,6 @@ func (f zmqTransportFactory) Open(name string, args []string, config skyconfig.C config: config, } - go func() { - broker.Run() - }() - go func() { - broker.Channeler() - }() - return &p }