Skip to content
This repository has been archived by the owner on Dec 22, 2023. It is now read-only.

Fix fast restart of skygear-server result on worker stale #161

Merged
merged 3 commits into from
Oct 14, 2016
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 69 additions & 60 deletions pkg/server/plugin/zmq/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package zmq

import (
"bytes"
"errors"
"fmt"
"time"

"github.com/Sirupsen/logrus"
Expand All @@ -34,6 +36,8 @@ const (
HeartbeatLiveness = 3
)

var heartbeatIntervalMS = int(HeartbeatInterval.Seconds() * 1000)

const (
// Ready is sent by worker to signal broker that it is ready to receive
// jobs.
Expand All @@ -48,77 +52,100 @@ const (

// Broker implements the Paranoid Pirate queue described in the zguide:
// http://zguide.zeromq.org/py:all#Robust-Reliable-Queuing-Paranoid-Pirate-Pattern
// Related RFC: https://rfc.zeromq.org/spec:6/PPP
// with the addition of:
//
// 1. Shutdown signal, which signifies a normal termination of worker to provide
// a fast path of worker removal
//
// NOTE(limouren): it might make a good interface
type Broker struct {
name string
// NOTE: goroutines are caller of plugin, so frontend is Go side,
// backend is plugin side
frontendAddr, backendAddr string
freshWorkers chan []byte
logger *logrus.Entry
frontend *goczmq.Sock
backend *goczmq.Sock
bothPoller *goczmq.Poller
backendPoller *goczmq.Poller
freshWorkers chan []byte
logger *logrus.Entry
}

// NewBroker returns a new *Broker.
func NewBroker(name, frontendAddr, backendAddr string) (*Broker, error) {
namedLogger := log.WithFields(logrus.Fields{"plugin": name})
frontend, err := goczmq.NewRouter(frontendAddr)
if err != nil {
panic(err)
}

backend, err := goczmq.NewRouter(backendAddr)
if err != nil {
panic(err)
}

backendPoller, err := goczmq.NewPoller(backend)
if err != nil {
panic(err)
}

bothPoller, err := goczmq.NewPoller(frontend, backend)
if err != nil {
panic(err)
}

return &Broker{
name: name,
frontendAddr: frontendAddr,
backendAddr: backendAddr,
freshWorkers: make(chan []byte, 1),
logger: log.WithFields(logrus.Fields{"plugin": name}),
name: name,
frontend: frontend,
backend: backend,
bothPoller: bothPoller,
backendPoller: backendPoller,
freshWorkers: make(chan []byte, 1),
logger: namedLogger,
}, nil
}

// Run kicks start the Broker and listens for requests. It blocks function
// execution.
func (lb *Broker) Run() {
frontend, backend := mustInitEndpoints(lb.frontendAddr, lb.backendAddr)
backendPoller, bothPoller := mustInitPollers(frontend, backend)

workers := workerQueue{}
heartbeatAt := time.Now().Add(HeartbeatInterval)
for {
var sock *goczmq.Sock
if workers.Len() == 0 {
sock = backendPoller.Wait(heartbeatIntervalMS)
sock = lb.backendPoller.Wait(heartbeatIntervalMS)
} else {
sock = bothPoller.Wait(heartbeatIntervalMS)
sock = lb.bothPoller.Wait(heartbeatIntervalMS)
}

switch sock {
case backend:
frames, err := backend.RecvMessage()
case lb.backend:
frames, err := lb.backend.RecvMessage()
if err != nil {
panic(err)
}

address := frames[0]
workers.Ready(newWorker(address))

tErr := workers.Tick(newWorker(address))
if tErr != nil {
lb.logger.Warnln(tErr)
}

msg := frames[1:]
if len(msg) == 1 {
status := string(msg[0])
handleWorkerStatus(&workers, address, status)
if status == Ready {
lb.freshWorkers <- address
}
lb.handleWorkerStatus(&workers, address, status)
} else {
frontend.SendMessage(msg)
lb.frontend.SendMessage(msg)
lb.logger.Debugf("zmq/broker: backend => frontend: %#x, %s\n", msg[0], msg)
}
case frontend:
frames, err := frontend.RecvMessage()
case lb.frontend:
frames, err := lb.frontend.RecvMessage()
if err != nil {
panic(err)
}

frames = append([][]byte{workers.Next()}, frames...)
backend.SendMessage(frames)
lb.backend.SendMessage(frames)
lb.logger.Debugf("zmq/broker: frontend => backend: %#x, %s\n", frames[0], frames)
case nil:
// do nothing
Expand All @@ -129,7 +156,7 @@ func (lb *Broker) Run() {
if heartbeatAt.Before(time.Now()) {
for _, worker := range workers {
msg := [][]byte{worker.address, []byte(Heartbeat)}
backend.SendMessage(msg)
lb.backend.SendMessage(msg)
}
heartbeatAt = time.Now().Add(HeartbeatInterval)
}
Expand All @@ -138,37 +165,11 @@ func (lb *Broker) Run() {
}
}

func mustInitEndpoints(frontendAddr, backendAddr string) (*goczmq.Sock, *goczmq.Sock) {
frontend, err := goczmq.NewRouter(frontendAddr)
if err != nil {
panic(err)
}

backend, err := goczmq.NewRouter(backendAddr)
if err != nil {
panic(err)
}

return frontend, backend
}

func mustInitPollers(frontend, backend *goczmq.Sock) (*goczmq.Poller, *goczmq.Poller) {
backendPoller, err := goczmq.NewPoller(backend)
if err != nil {
panic(err)
}

bothPoller, err := goczmq.NewPoller(frontend, backend)
if err != nil {
panic(err)
}

return backendPoller, bothPoller
}

func handleWorkerStatus(workers *workerQueue, address []byte, status string) {
func (lb *Broker) handleWorkerStatus(workers *workerQueue, address []byte, status string) {
switch status {
case Ready:
workers.Ready(newWorker(address))
lb.freshWorkers <- address
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like a good move to here

log.Infof("zmq/broker: ready worker = %s", address)
case Heartbeat:
// do nothing
Expand All @@ -180,8 +181,6 @@ func handleWorkerStatus(workers *workerQueue, address []byte, status string) {
}
}

var heartbeatIntervalMS = int(HeartbeatInterval.Seconds() * 1000)

type pworker struct {
address []byte
expiry time.Time
Expand All @@ -208,6 +207,15 @@ func (q *workerQueue) Next() []byte {
}

func (q *workerQueue) Ready(worker pworker) {
err := q.Tick(worker)
if err == nil {
return
}
workers := *q
*q = append(workers, worker)
}

func (q *workerQueue) Tick(worker pworker) error {
workers := *q

var (
Expand All @@ -217,10 +225,11 @@ func (q *workerQueue) Ready(worker pworker) {
for i, w = range workers {
if bytes.Equal(w.address, worker.address) {
*q = append(append(workers[:i], workers[i+1:]...), worker)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an array that doesn’t work like any other arrays in golang
since we are changing the variable address here... are we sure all references to the array are updated?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh you changed it in a later commit! good job!

return
return nil
}
}
*q = append(workers, worker)

return errors.New(fmt.Sprintf("zmq/broker: Ticking non-existing worker = %s", worker.address))
}

func (q *workerQueue) Purge() {
Expand Down