Skip to content
This repository has been archived by the owner on Dec 22, 2023. It is now read-only.

Use channel in messaging between goroutine in stead of zmq.REQ #168

Closed
wants to merge 19 commits into from

Conversation

rickmak
Copy link
Member

@rickmak rickmak commented Oct 20, 2016

This will resolve socket exhaust problem on high concurrency.

connects #160

@cheungpat
Copy link
Contributor

timeout when running the test with the ./pkg/server/plugin/zmq package (used -timeout 5m)

i hope these are helpful:

time="2016-10-22T09:08:52Z" level=info msg="zmq/broker: disconnected worker = address1" logger=plugin
time="2016-10-22T09:08:52Z" level=info msg="zmq/broker: disconnected worker = address2" logger=plugin
.
16 total assertions

time="2016-10-22T09:08:52Z" level=info msg="zmq channler running 0xc820120080\n" eaddr="inproc://plugin.test" logger=plugin plugin=test
time="2016-10-22T09:08:52Z" level=info msg="zmq/broker: ready worker = ready" logger=plugin
.time="2016-10-22T09:08:53Z" level=info msg="zmq/broker: shutdown of worker = ready" logger=plugin
time="2016-10-22T09:08:53Z" level=info msg="zmq/broker: ready worker = ready1" logger=plugin
time="2016-10-22T09:08:53Z" level=info msg="zmq/broker: ready worker = ready2" logger=plugin
time="2016-10-22T09:08:53Z" level=warning msg="zmq/broker: Ticking non-registered worker = ready" eaddr="inproc://plugin.test" logger=plugin plugin=test
time="2016-10-22T09:08:53Z" level=info msg="zmq/broker: shutdown of worker = ready" logger=plugin
.time="2016-10-22T09:08:54Z" level=info msg="zmq/broker: shutdown of worker = ready1" logger=plugin
time="2016-10-22T09:08:54Z" level=info msg="zmq/broker: shutdown of worker = ready2" logger=plugin
time="2016-10-22T09:08:54Z" level=warning msg="zmq/broker: Ticking non-registered worker = heartbeat" eaddr="inproc://plugin.test" logger=plugin plugin=test
time="2016-10-22T09:08:54Z" level=warning msg="zmq/broker: Ticking non-registered worker = ready1" eaddr="inproc://plugin.test" logger=plugin plugin=test
time="2016-10-22T09:08:54Z" level=info msg="zmq/broker: shutdown of worker = ready1" logger=plugin
time="2016-10-22T09:08:54Z" level=warning msg="zmq/broker: Ticking non-registered worker = ready2" eaddr="inproc://plugin.test" logger=plugin plugin=test
time="2016-10-22T09:08:54Z" level=info msg="zmq/broker: shutdown of worker = ready2" logger=plugin
.time="2016-10-22T09:08:55Z" level=warning msg="zmq/broker: Ticking non-registered worker = heartbeat" eaddr="inproc://plugin.test" logger=plugin plugin=test
time="2016-10-22T09:08:55Z" level=info msg="zmq/broker: shutdown of worker = heartbeat" logger=plugin
time="2016-10-22T09:08:55Z" level=warning msg="zmq/broker: Ticking non-registered worker = unregistered" eaddr="inproc://plugin.test" logger=plugin plugin=test
time="2016-10-22T09:08:55Z" level=info msg="zmq/broker: chan not found for worker unregistered\n" eaddr="inproc://plugin.test" logger=plugin plugin=test
.time="2016-10-22T09:08:56Z" level=warning msg="zmq/broker: Ticking non-registered worker = unregistered" eaddr="inproc://plugin.test" logger=plugin plugin=test
time="2016-10-22T09:08:56Z" level=info msg="zmq/broker: shutdown of worker = unregistered" logger=plugin
time="2016-10-22T09:08:56Z" level=info msg="zmq/broker: ready worker = timeout" logger=plugin
.time="2016-10-22T09:09:00Z" level=info msg="zmq/broker: chan time out for  worker timeout\n" eaddr="inproc://plugin.test" logger=plugin plugin=test
.time="2016-10-22T09:09:00Z" level=info msg="zmq/broker: shutdown of worker = timeout" logger=plugin
time="2016-10-22T09:09:00Z" level=info msg="zmq/broker: no worker avaliable, retry 1...\n" eaddr="inproc://plugin.test" logger=plugin plugin=test
time="2016-10-22T09:09:01Z" level=info msg="zmq/broker: no worker avaliable, retry 2...\n" eaddr="inproc://plugin.test" logger=plugin plugin=test
time="2016-10-22T09:09:02Z" level=info msg="zmq/broker: no worker avaliable, retry 3...\n" eaddr="inproc://plugin.test" logger=plugin plugin=test
time="2016-10-22T09:09:03Z" level=info msg="zmq/broker: no worker avaliable, timeout.\n" eaddr="inproc://plugin.test" logger=plugin plugin=test
..time="2016-10-22T09:09:03Z" level=info msg="zmq/broker: no worker avaliable, retry 1...\n" eaddr="inproc://plugin.test" logger=plugin plugin=test
time="2016-10-22T09:09:04Z" level=info msg="zmq/broker: no worker avaliable, retry 2...\n" eaddr="inproc://plugin.test" logger=plugin plugin=test
time="2016-10-22T09:09:04Z" level=info msg="zmq/broker: ready worker = lateworker" logger=plugin
...time="2016-10-22T09:09:05Z" level=info msg="zmq/broker: shutdown of worker = lateworker" logger=plugin
time="2016-10-22T09:09:05Z" level=info msg="zmq/broker: ready worker = worker" logger=plugin
time="2016-10-22T09:09:08Z" level=info msg="zmq/broker: chan time out for  worker lateworker\n" eaddr="inproc://plugin.test" logger=plugin plugin=test
time="2016-10-22T09:09:08Z" level=info msg="zmq/broker: disconnected worker = worker" logger=plugin
xtime="2016-10-22T09:09:10Z" level=warning msg="zmq/broker: Ticking non-registered worker = worker" eaddr="inproc://plugin.test" logger=plugin plugin=test
time="2016-10-22T09:09:10Z" level=info msg="zmq/broker: shutdown of worker = worker" logger=plugin
time="2016-10-22T09:09:10Z" level=info msg="zmq/broker: ready worker = worker1" logger=plugin
time="2016-10-22T09:09:10Z" level=info msg="zmq/broker: ready worker = worker2" logger=plugin
time="2016-10-22T09:09:13Z" level=info msg="zmq/broker: disconnected worker = worker1" logger=plugin
time="2016-10-22T09:09:13Z" level=info msg="zmq/broker: disconnected worker = worker2" logger=plugin
time="2016-10-22T09:09:15Z" level=warning msg="zmq/broker: Ticking non-registered worker = worker2" eaddr="inproc://plugin.test" logger=plugin plugin=test
time="2016-10-22T09:09:15Z" level=info msg="zmq/broker: shutdown of worker = worker2" logger=plugin
time="2016-10-22T09:09:15Z" level=warning msg="zmq/broker: Ticking non-registered worker = worker1" eaddr="inproc://plugin.test" logger=plugin plugin=test
time="2016-10-22T09:09:15Z" level=info msg="zmq/broker: shutdown of worker = worker1" logger=plugin
panic: test timed out after 5m0s

goroutine 35 [running]:
panic(0x7a17c0, 0xc820138f10)
    /usr/local/go/src/runtime/panic.go:481 +0x3e6
testing.startAlarm.func1()
    /usr/local/go/src/testing/testing.go:725 +0x14b
created by time.goFunc
    /usr/local/go/src/time/sleep.go:129 +0x3a

goroutine 1 [chan receive]:
testing.RunTests(0x9b2d60, 0xd21c40, 0x2, 0x2, 0xc82007ab01)
    /usr/local/go/src/testing/testing.go:583 +0x8d2
testing.(*M).Run(0xc820047ef8, 0xd3fc00)
    /usr/local/go/src/testing/testing.go:515 +0x81
main.main()
    github.com/skygeario/skygear-server/pkg/server/plugin/zmq/_test/_testmain.go:108 +0x253

goroutine 17 [syscall, locked to thread]:
runtime.goexit()
    /usr/local/go/src/runtime/asm_amd64.s:1998 +0x1

goroutine 18 [chan receive]:
github.com/skygeario/skygear-server/pkg/server/plugin/zmq.TestBrokerWorker.func1.9(0x7f0e120efc70, 0xc82000eb40)
    /go/src/github.com/skygeario/skygear-server/pkg/server/plugin/zmq/broker_test.go:314 +0x1a5
github.com/skygeario/skygear-server/vendor/github.com/smartystreets/goconvey/convey.(*context).conveyInner(0xc82000eb40, 0x977620, 0x2b, 0xc820165800)
    /go/src/github.com/skygeario/skygear-server/vendor/github.com/smartystreets/goconvey/convey/context.go:261 +0x21a
github.com/skygeario/skygear-server/vendor/github.com/smartystreets/goconvey/convey.(*context).Convey.func1()
    /go/src/github.com/skygeario/skygear-server/vendor/github.com/smartystreets/goconvey/convey/context.go:163 +0x40
github.com/skygeario/skygear-server/vendor/github.com/jtolds/gls._m(0x0, 0xc820165820)
    /go/src/github.com/skygeario/skygear-server/vendor/github.com/jtolds/gls/stack_tags.go:39 +0x2b
github.com/skygeario/skygear-server/vendor/github.com/jtolds/gls.mark1(0x0, 0xc820165820)
    /go/src/github.com/skygeario/skygear-server/vendor/github.com/jtolds/gls/stack_tags.go:18 +0x2b
github.com/skygeario/skygear-server/vendor/github.com/jtolds/gls._m(0x1, 0xc820165820)
    /go/src/github.com/skygeario/skygear-server/vendor/github.com/jtolds/gls/stack_tags.go:41 +0x5a
github.com/skygeario/skygear-server/vendor/github.com/jtolds/gls.markS(0x1, 0xc820165820)
    /go/src/github.com/skygeario/skygear-server/vendor/github.com/jtolds/gls/stack_tags.go:16 +0x2b
github.com/skygeario/skygear-server/vendor/github.com/jtolds/gls.addStackTag(0x1, 0xc820165820)
    /go/src/github.com/skygeario/skygear-server/vendor/github.com/jtolds/gls/stack_tags.go:13 +0x37
github.com/skygeario/skygear-server/vendor/github.com/jtolds/gls.(*ContextManager).SetValues(0xc8200bb200, 0xc82016ce70, 0xc820165820)
    /go/src/github.com/skygeario/skygear-server/vendor/github.com/jtolds/gls/context.go:92 +0x4b3
github.com/skygeario/skygear-server/vendor/github.com/smartystreets/goconvey/convey.(*context).Convey(0xc82010c2a0, 0xc8200499c0, 0x2, 0x2)
    /go/src/github.com/skygeario/skygear-server/vendor/github.com/smartystreets/goconvey/convey/context.go:164 +0x36b
github.com/skygeario/skygear-server/vendor/github.com/smartystreets/goconvey/convey.Convey(0xc8200499c0, 0x2, 0x2)
    /go/src/github.com/skygeario/skygear-server/vendor/github.com/smartystreets/goconvey/convey/doc.go:77 +0x62
github.com/skygeario/skygear-server/pkg/server/plugin/zmq.TestBrokerWorker.func1()
    /go/src/github.com/skygeario/skygear-server/pkg/server/plugin/zmq/broker_test.go:316 +0xd8b
github.com/skygeario/skygear-server/vendor/github.com/smartystreets/goconvey/convey.parseAction.func1(0x7f0e120efc70, 0xc82010c2a0)
    /go/src/github.com/skygeario/skygear-server/vendor/github.com/smartystreets/goconvey/convey/discovery.go:80 +0x18
github.com/skygeario/skygear-server/vendor/github.com/smartystreets/goconvey/convey.(*context).conveyInner(0xc82010c2a0, 0x8ed6f0, 0xb, 0xc820106390)
    /go/src/github.com/skygeario/skygear-server/vendor/github.com/smartystreets/goconvey/convey/context.go:261 +0x21a
github.com/skygeario/skygear-server/vendor/github.com/smartystreets/goconvey/convey.rootConvey.func1()
    /go/src/github.com/skygeario/skygear-server/vendor/github.com/smartystreets/goconvey/convey/context.go:110 +0x142
github.com/skygeario/skygear-server/vendor/github.com/jtolds/gls._m(0x0, 0xc82010e280)
    /go/src/github.com/skygeario/skygear-server/vendor/github.com/jtolds/gls/stack_tags.go:39 +0x2b
github.com/skygeario/skygear-server/vendor/github.com/jtolds/gls.markS(0x0, 0xc82010e280)
    /go/src/github.com/skygeario/skygear-server/vendor/github.com/jtolds/gls/stack_tags.go:16 +0x2b
github.com/skygeario/skygear-server/vendor/github.com/jtolds/gls.addStackTag(0x0, 0xc82010e280)
    /go/src/github.com/skygeario/skygear-server/vendor/github.com/jtolds/gls/stack_tags.go:13 +0x37
github.com/skygeario/skygear-server/vendor/github.com/jtolds/gls.(*ContextManager).SetValues(0xc8200bb200, 0xc820114270, 0xc82010e280)
    /go/src/github.com/skygeario/skygear-server/vendor/github.com/jtolds/gls/context.go:92 +0x4b3
github.com/skygeario/skygear-server/vendor/github.com/smartystreets/goconvey/convey.rootConvey(0xc820049f20, 0x3, 0x3)
    /go/src/github.com/skygeario/skygear-server/vendor/github.com/smartystreets/goconvey/convey/context.go:113 +0x3eb
github.com/skygeario/skygear-server/vendor/github.com/smartystreets/goconvey/convey.Convey(0xc820049f20, 0x3, 0x3)
    /go/src/github.com/skygeario/skygear-server/vendor/github.com/smartystreets/goconvey/convey/doc.go:75 +0x45
github.com/skygeario/skygear-server/pkg/server/plugin/zmq.TestBrokerWorker(0xc820124000)
    /go/src/github.com/skygeario/skygear-server/pkg/server/plugin/zmq/broker_test.go:372 +0x337
testing.tRunner(0xc820124000, 0xd21c58)
    /usr/local/go/src/testing/testing.go:473 +0x98
created by testing.RunTests
    /usr/local/go/src/testing/testing.go:582 +0x892

goroutine 19 [syscall, locked to thread]:
github.com/skygeario/skygear-server/vendor/github.com/zeromq/goczmq._Cfunc_zpoller_wait(0x7f0e00009780, 0x7f0e000003e8, 0x0)
    github.com/skygeario/skygear-server/vendor/github.com/zeromq/goczmq/_obj/_cgo_gotypes.go:628 +0x42
github.com/skygeario/skygear-server/vendor/github.com/zeromq/goczmq.(*Poller).Wait(0xc82010e3e0, 0x3e8, 0xd3fcc0)
    /go/src/github.com/skygeario/skygear-server/vendor/github.com/zeromq/goczmq/poller.go:87 +0xd7
github.com/skygeario/skygear-server/pkg/server/plugin/zmq.(*Broker).Run(0xc820120080)
    github.com/skygeario/skygear-server/pkg/server/plugin/zmq/_test/_obj_test/broker.go:142 +0x4c2
created by github.com/skygeario/skygear-server/pkg/server/plugin/zmq.NewBroker
    github.com/skygeario/skygear-server/pkg/server/plugin/zmq/_test/_obj_test/broker.go:108 +0x5e0

goroutine 20 [chan send]:
github.com/skygeario/skygear-server/pkg/server/plugin/zmq.(*Broker).Channeler(0xc820120080)
    github.com/skygeario/skygear-server/pkg/server/plugin/zmq/_test/_obj_test/broker.go:289 +0x1251
created by github.com/skygeario/skygear-server/pkg/server/plugin/zmq.NewBroker
    github.com/skygeario/skygear-server/pkg/server/plugin/zmq/_test/_obj_test/broker.go:109 +0x602

goroutine 28 [chan send]:
github.com/skygeario/skygear-server/pkg/server/plugin/zmq.(*Broker).setTimeout(0xc820120080, 0xc82015f170, 0xa, 0xb2d05e00)
    github.com/skygeario/skygear-server/pkg/server/plugin/zmq/_test/_obj_test/broker.go:312 +0x67
created by github.com/skygeario/skygear-server/pkg/server/plugin/zmq.(*Broker).Channeler
    github.com/skygeario/skygear-server/pkg/server/plugin/zmq/_test/_obj_test/broker.go:278 +0xf19
FAIL    github.com/skygeario/skygear-server/pkg/server/plugin/zmq   300.016s

@cheungpat cheungpat changed the title Use channel in messging between goroutine in stead of zmq.REQ Use channel in messaging between goroutine in stead of zmq.REQ Oct 23, 2016
}
heartbeatAt = time.Now().Add(HeartbeatInterval)
}

lb.workers.Purge()
lb.workers.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should defer lb.workers.Unlock()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is within the for control. And the defer is bind to func closure scope. If we want to use defer here. We need to create a large function scope. Which I don't prefer.

delete(lb.addressChan, address)
respChan <- []byte{0}
case <-lb.stop:
break ChannelerLoop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think simply returns here

break ChannelerLoop
}
}
lb.logger.Infof("zmq channler stopped %p!\n", lb)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be defer lb.logger.Infof(...)

frames = append([][]byte{lb.workers.Next()}, frames...)
lb.backend.SendMessage(frames)
lb.logger.Debugf("zmq/broker: server => plugin: %#x, %s\n", frames[0], frames)
backend.SendMessage(frames)
case nil:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can also quit the loop by checking the stop channel here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This switch case is only zmq socket. Not go chan. So we cannot consume the chan here.

timeout chan string
workers workerQueue
logger *logrus.Entry
stop chan int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while you still have fresh memory what this struct does, could you add some comments to the meaning of the struct members?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you have still got time: there are two components to this broker: one is a broker and the other is the channeler. There are some struct members that are only used in one component but not the other, perhaps we could make the distinction more obvious by creating separate structs for these two components.

@cheungpat
Copy link
Contributor

if you enable edits on the branch for the pull request, I could make assorted changes such as typos on the pull request on your behalf

// workerQueue is not goroutine safe. To use it safely across goroutine.
// Please use the Lock/Unlock interace before manupliate the queue item via
// methods like Add/Tick/Purge.
// Comsuming method Next is the only method will acquire the mutex lock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

“Comsuming”?

@rickmak
Copy link
Member Author

rickmak commented Oct 24, 2016

The timeout is because Travis running the test with GOMAXPROCS=1. When we call zmq code, the runtime internally calls LockOSThread since it is C code. So it result in a dead lock.

Setting GOMAXPROCS=2 (or larger number) will make the test pass.

@rickmak
Copy link
Member Author

rickmak commented Oct 24, 2016

Making another PR.

@rickmak rickmak closed this Oct 24, 2016
@rickmak rickmak removed the In Review label Oct 24, 2016
@rickmak rickmak deleted the zmq-chan2 branch July 10, 2017 06:10
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants