Skip to content
This repository has been archived by the owner on Dec 22, 2023. It is now read-only.

Use channel in messaging between goroutine instead of zmq.REQ #181

Closed
wants to merge 20 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pkg/server/plugin/zmq/broker.go
Original file line number Diff line number Diff line change
@@ -91,7 +91,7 @@ func NewBroker(name, backendAddr string) (*Broker, error) {
"eaddr": backendAddr,
})

return &Broker{
broker := &Broker{
name: name,
backendAddr: backendAddr,
frontend: make(chan [][]byte, 10),
@@ -101,7 +101,11 @@ func NewBroker(name, backendAddr string) (*Broker, error) {
workers: newWorkerQueue(),
logger: namedLogger,
stop: make(chan int),
}, nil
}

go broker.Run()
go broker.Channeler()
return broker, nil
}

// Run the Broker and listens for zmq requests.
2 changes: 0 additions & 2 deletions pkg/server/plugin/zmq/broker_test.go
Original file line number Diff line number Diff line change
@@ -134,8 +134,6 @@ func TestBrokerWorker(t *testing.T) {
if err != nil {
t.Fatalf("Failed to init broker: %v", err)
}
go broker.Run()
go broker.Channeler()

Convey("Test Broker", t, func() {
Convey("receive Ready signal will register the worker", func() {
7 changes: 0 additions & 7 deletions pkg/server/plugin/zmq/rpc.go
Original file line number Diff line number Diff line change
@@ -276,13 +276,6 @@ func (f zmqTransportFactory) Open(name string, args []string, config skyconfig.C
config: config,
}

go func() {
broker.Run()
}()
go func() {
broker.Channeler()
}()

return &p
}