Skip to content
This repository has been archived by the owner on Dec 22, 2023. It is now read-only.

Commit

Permalink
Merge plugin init trigger to init event
Browse files Browse the repository at this point in the history
Ref. #199
  • Loading branch information
Ben Lei committed Nov 3, 2016
1 parent 9b22fcf commit 4676178
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 155 deletions.
23 changes: 1 addition & 22 deletions pkg/server/plugin/exec/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,35 +152,14 @@ func (p *execTransport) State() skyplugin.TransportState {
return p.state
}

func (p *execTransport) SetInitHandler(f skyplugin.TransportInitHandler) {
p.initHandler = f
}

func (p *execTransport) setState(state skyplugin.TransportState) {
func (p *execTransport) SetState(state skyplugin.TransportState) {
if state != p.state {
oldState := p.state
p.state = state
log.Infof("Transport state changes from %v to %v.", oldState, p.state)
}
}

func (p *execTransport) RequestInit() {
out, err := p.RunInit()
if p.initHandler != nil {
handlerError := p.initHandler(out, err)
if err != nil || handlerError != nil {
p.setState(skyplugin.TransportStateError)
return
}
}
p.setState(skyplugin.TransportStateReady)
}

func (p *execTransport) RunInit() (out []byte, err error) {
out, err = p.run([]string{"init"}, []string{}, []byte{})
return
}

func (p *execTransport) SendEvent(name string, in []byte) ([]byte, error) {
return p.runProc([]string{"event", name}, []string{}, in)
}
Expand Down
18 changes: 3 additions & 15 deletions pkg/server/plugin/exec/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,6 @@ func TestRun(t *testing.T) {
startCommand = originalCommand
}()

Convey("init", func() {
out, err := transport.RunInit()
So(err, ShouldBeNil)
So(string(out), ShouldEqual, "init")
})

startCommand = func(cmd *exec.Cmd, in []byte) (out []byte, err error) {
out, err = originalCommand(cmd, in)
out = append([]byte(`{"result":"`), out...)
Expand Down Expand Up @@ -142,12 +136,6 @@ func TestRun(t *testing.T) {
Args: []string{"-c", `"cat"`},
}

Convey("init", func() {
out, err := transport.RunInit()
So(err, ShouldBeNil)
So(string(out), ShouldEqual, "")
})

Convey("op", func() {
out, err := transport.RunLambda(nil, "hello:world", []byte(`{"result": "hello world"}`))
So(err, ShouldBeNil)
Expand Down Expand Up @@ -657,7 +645,7 @@ func TestRun(t *testing.T) {
Args: []string{},
}

_, err := transport.RunInit()
_, err := transport.SendEvent("init", []byte{})
So(err, ShouldNotBeNil)
})

Expand All @@ -667,7 +655,7 @@ func TestRun(t *testing.T) {
Args: []string{},
}

_, err := transport.RunInit()
_, err := transport.SendEvent("init", []byte{})
So(err, ShouldNotBeNil)
})

Expand All @@ -677,7 +665,7 @@ func TestRun(t *testing.T) {
Args: []string{},
}

_, err := transport.RunInit()
_, err := transport.SendEvent("init", []byte{})
So(err, ShouldNotBeNil)
})
})
Expand Down
35 changes: 1 addition & 34 deletions pkg/server/plugin/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/skygeario/skygear-server/pkg/server/logging"
skyplugin "github.com/skygeario/skygear-server/pkg/server/plugin"
Expand Down Expand Up @@ -94,46 +93,14 @@ func (p *httpTransport) State() skyplugin.TransportState {
return p.state
}

func (p *httpTransport) SetInitHandler(f skyplugin.TransportInitHandler) {
p.initHandler = f
}

func (p *httpTransport) setState(state skyplugin.TransportState) {
func (p *httpTransport) SetState(state skyplugin.TransportState) {
if state != p.state {
oldState := p.state
p.state = state
log.Infof("Transport state changes from %v to %v.", oldState, p.state)
}
}

func (p *httpTransport) RequestInit() {
out, err := p.RunInit()
if p.initHandler != nil {
handlerError := p.initHandler(out, err)
if err != nil || handlerError != nil {
p.setState(skyplugin.TransportStateError)
return
}
}
p.setState(skyplugin.TransportStateReady)
}

func (p *httpTransport) RunInit() (out []byte, err error) {
param := struct {
Config skyconfig.Configuration `json:"config"`
}{p.config}
req := pluginrequest.Request{Kind: "init", Param: param}
for {
out, err = p.ipc(&req)
if err == nil {
return
}
time.Sleep(time.Second)
log.WithField("err", err).
Warnf(`http: Unable to send init request to plugin "%s". Retrying...`, p.Path)
}
}

func (p *httpTransport) SendEvent(name string, in []byte) (out []byte, err error) {
out, err = p.rpc(pluginrequest.NewEventRequest(name, in))
return
Expand Down
16 changes: 0 additions & 16 deletions pkg/server/plugin/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,6 @@ func TestRun(t *testing.T) {
config: appconfig,
}

Convey("run init", func() {
httpmock.RegisterResponder("POST", "http://localhost:8000",
func(req *http.Request) (*http.Response, error) {
out, _ := ioutil.ReadAll(req.Body)
So(string(out), ShouldContainSubstring, "hello-world")
return httpmock.NewJsonResponse(200, map[string]interface{}{
"data": "hello",
})
},
)

out, err := transport.RunInit()
So(out, ShouldEqualJSON, `{"data": "hello"}`)
So(err, ShouldBeNil)
})

Convey("send event", func() {
Convey("success case", func() {
data := []byte(`{"data": "hello-world"}`)
Expand Down
76 changes: 62 additions & 14 deletions pkg/server/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"net/http"
"strings"
"time"

"github.com/Sirupsen/logrus"

Expand All @@ -32,11 +33,17 @@ import (

var log = logging.LoggerEntry("plugin")

const (
// PluginInitMaxRetryCount defines the maximum retries for plugin initialization
PluginInitMaxRetryCount = 100
)

// Plugin represents a collection of handlers, hooks and lambda functions
// that extends or modifies functionality provided by skygear.
type Plugin struct {
transport Transport
gatewayMap map[string]*router.Gateway
initRetryCount int
transport Transport
gatewayMap map[string]*router.Gateway
}

type pluginHandlerInfo struct {
Expand Down Expand Up @@ -117,6 +124,11 @@ type InitContext struct {
Config skyconfig.Configuration
}

// InitPayload models the payload for plugin initialization
type InitPayload struct {
Config skyconfig.Configuration `json:"config"`
}

// AddPluginConfiguration creates and appends a plugin
func (c *InitContext) AddPluginConfiguration(name string, path string, args []string) *Plugin {
plug := NewPlugin(name, path, args, c.Config)
Expand All @@ -127,7 +139,7 @@ func (c *InitContext) AddPluginConfiguration(name string, path string, args []st
// InitPlugins initializes all plugins registered
func (c *InitContext) InitPlugins() {
for _, plug := range c.plugins {
plug.Init(c)
go plug.Init(c)
}
}

Expand All @@ -143,24 +155,60 @@ func (c *InitContext) IsReady() bool {

// Init instantiates a plugin. This sets up hooks and handlers.
func (p *Plugin) Init(context *InitContext) {
p.transport.SetInitHandler(func(out []byte, err error) error {
if err != nil {
panic(fmt.Sprintf("Unable to get registration info from plugin. Error: %v", err))
}
for {
log.
WithField("retry", p.initRetryCount).
Info("Sending init event to plugin")

regInfo := registrationInfo{}
if err := json.Unmarshal(out, &regInfo); err != nil {
panic(err)
p.transport.SetState(TransportStateUninitialized)
regInfo, err := p.requestInit(context)
if err != nil {
p.transport.SetState(TransportStateError)

p.initRetryCount++
if p.initRetryCount >= PluginInitMaxRetryCount {
log.Panic("Fail to initialize plugin")
}
time.Sleep(2 * time.Second)
continue
}

p.transport.SetState(TransportStateReady)
p.processRegistrationInfo(context, regInfo)
return nil
})

break
}
}

func (p *Plugin) requestInit(context *InitContext) (regInfo registrationInfo, initErr error) {
payload := InitPayload{
Config: context.Config,
}

in, err := json.Marshal(payload)
if err != nil {
initErr = fmt.Errorf("Cannot encode plugin initialization payload. Error: %v", err)
return
}

out, err := p.transport.SendEvent("init", in)
log.WithFields(logrus.Fields{
"out": string(out),
"err": err,
"plugin": p,
}).Debugln("request plugin to return configuration")
go p.transport.RequestInit()
}).Info("Get response from init")

if err != nil {
initErr = fmt.Errorf("Cannot encode plugin initialization payload. Error: %v", err)
return
}

if err := json.Unmarshal(out, &regInfo); err != nil {
initErr = fmt.Errorf("Unable to decode plugin initialization info. Error: %v", err)
return
}

return
}

// IsReady tells whether the plugin is ready
Expand Down
5 changes: 1 addition & 4 deletions pkg/server/plugin/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,7 @@ type TransportInitHandler func([]byte, error) error
// and remote process.
type Transport interface {
State() TransportState

SetInitHandler(TransportInitHandler)
RequestInit()
RunInit() ([]byte, error)
SetState(TransportState)

SendEvent(name string, in []byte) ([]byte, error)

Expand Down
19 changes: 4 additions & 15 deletions pkg/server/plugin/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,16 @@ import (
)

type nullTransport struct {
state TransportState
initHandler TransportInitHandler
lastContext context.Context
}

func (t *nullTransport) State() TransportState {
return TransportStateReady
return t.state
}

func (t *nullTransport) SetInitHandler(f TransportInitHandler) {
t.initHandler = f
}

func (t *nullTransport) RequestInit() {
if t.initHandler != nil {
t.initHandler([]byte{}, nil)
}
return
}
func (t nullTransport) RunInit() (out []byte, err error) {
out = []byte{}
return
func (t *nullTransport) SetState(newState TransportState) {
t.state = newState
}
func (t nullTransport) SendEvent(name string, in []byte) (out []byte, err error) {
out = in
Expand Down
36 changes: 1 addition & 35 deletions pkg/server/plugin/zmq/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,48 +47,14 @@ func (p *zmqTransport) State() skyplugin.TransportState {
return p.state
}

func (p *zmqTransport) SetInitHandler(f skyplugin.TransportInitHandler) {
p.initHandler = f
}

func (p *zmqTransport) setState(state skyplugin.TransportState) {
func (p *zmqTransport) SetState(state skyplugin.TransportState) {
if state != p.state {
oldState := p.state
p.state = state
p.logger.Infof("Transport state changes from %v to %v.", oldState, p.state)
}
}

// RequestInit is expected to run in separate gorountine and called once to
// set it internal state with coordinate with broker.
func (p *zmqTransport) RequestInit() {
for {
out, err := p.RunInit()
if err != nil {
p.logger.WithField("err", err).
Warnf(`zmq/rpc: Unable to send init request to plugin "%s". Retrying...`, p.name)
continue
}
if p.initHandler != nil {
handlerError := p.initHandler(out, err)
if err != nil || handlerError != nil {
p.setState(skyplugin.TransportStateError)
}
}
p.setState(skyplugin.TransportStateReady)
break
}
}

func (p *zmqTransport) RunInit() (out []byte, err error) {
param := struct {
Config skyconfig.Configuration `json:"config"`
}{p.config}
req := pluginrequest.Request{Kind: "init", Param: param}
out, err = p.ipc(&req)
return
}

func (p *zmqTransport) SendEvent(name string, in []byte) ([]byte, error) {
return p.rpc(pluginrequest.NewEventRequest(name, in))
}
Expand Down

0 comments on commit 4676178

Please sign in to comment.