diff --git a/pkg/server/plugin/exec/process.go b/pkg/server/plugin/exec/process.go index c0965a723..d4fb02415 100644 --- a/pkg/server/plugin/exec/process.go +++ b/pkg/server/plugin/exec/process.go @@ -152,11 +152,7 @@ func (p *execTransport) State() skyplugin.TransportState { return p.state } -func (p *execTransport) SetInitHandler(f skyplugin.TransportInitHandler) { - p.initHandler = f -} - -func (p *execTransport) setState(state skyplugin.TransportState) { +func (p *execTransport) SetState(state skyplugin.TransportState) { if state != p.state { oldState := p.state p.state = state @@ -164,23 +160,6 @@ func (p *execTransport) setState(state skyplugin.TransportState) { } } -func (p *execTransport) RequestInit() { - out, err := p.RunInit() - if p.initHandler != nil { - handlerError := p.initHandler(out, err) - if err != nil || handlerError != nil { - p.setState(skyplugin.TransportStateError) - return - } - } - p.setState(skyplugin.TransportStateReady) -} - -func (p *execTransport) RunInit() (out []byte, err error) { - out, err = p.run([]string{"init"}, []string{}, []byte{}) - return -} - func (p *execTransport) SendEvent(name string, in []byte) ([]byte, error) { return p.runProc([]string{"event", name}, []string{}, in) } diff --git a/pkg/server/plugin/exec/process_test.go b/pkg/server/plugin/exec/process_test.go index 0a3e46543..f6c0a5b61 100644 --- a/pkg/server/plugin/exec/process_test.go +++ b/pkg/server/plugin/exec/process_test.go @@ -104,12 +104,6 @@ func TestRun(t *testing.T) { startCommand = originalCommand }() - Convey("init", func() { - out, err := transport.RunInit() - So(err, ShouldBeNil) - So(string(out), ShouldEqual, "init") - }) - startCommand = func(cmd *exec.Cmd, in []byte) (out []byte, err error) { out, err = originalCommand(cmd, in) out = append([]byte(`{"result":"`), out...) @@ -142,12 +136,6 @@ func TestRun(t *testing.T) { Args: []string{"-c", `"cat"`}, } - Convey("init", func() { - out, err := transport.RunInit() - So(err, ShouldBeNil) - So(string(out), ShouldEqual, "") - }) - Convey("op", func() { out, err := transport.RunLambda(nil, "hello:world", []byte(`{"result": "hello world"}`)) So(err, ShouldBeNil) @@ -657,7 +645,7 @@ func TestRun(t *testing.T) { Args: []string{}, } - _, err := transport.RunInit() + _, err := transport.SendEvent("init", []byte{}) So(err, ShouldNotBeNil) }) @@ -667,7 +655,7 @@ func TestRun(t *testing.T) { Args: []string{}, } - _, err := transport.RunInit() + _, err := transport.SendEvent("init", []byte{}) So(err, ShouldNotBeNil) }) @@ -677,7 +665,7 @@ func TestRun(t *testing.T) { Args: []string{}, } - _, err := transport.RunInit() + _, err := transport.SendEvent("init", []byte{}) So(err, ShouldNotBeNil) }) }) diff --git a/pkg/server/plugin/http/http.go b/pkg/server/plugin/http/http.go index d9dfbe0b8..5626e2444 100644 --- a/pkg/server/plugin/http/http.go +++ b/pkg/server/plugin/http/http.go @@ -20,7 +20,6 @@ import ( "fmt" "io/ioutil" "net/http" - "time" "github.com/skygeario/skygear-server/pkg/server/logging" skyplugin "github.com/skygeario/skygear-server/pkg/server/plugin" @@ -94,11 +93,7 @@ func (p *httpTransport) State() skyplugin.TransportState { return p.state } -func (p *httpTransport) SetInitHandler(f skyplugin.TransportInitHandler) { - p.initHandler = f -} - -func (p *httpTransport) setState(state skyplugin.TransportState) { +func (p *httpTransport) SetState(state skyplugin.TransportState) { if state != p.state { oldState := p.state p.state = state @@ -106,34 +101,6 @@ func (p *httpTransport) setState(state skyplugin.TransportState) { } } -func (p *httpTransport) RequestInit() { - out, err := p.RunInit() - if p.initHandler != nil { - handlerError := p.initHandler(out, err) - if err != nil || handlerError != nil { - p.setState(skyplugin.TransportStateError) - return - } - } - p.setState(skyplugin.TransportStateReady) -} - -func (p *httpTransport) RunInit() (out []byte, err error) { - param := struct { - Config skyconfig.Configuration `json:"config"` - }{p.config} - req := pluginrequest.Request{Kind: "init", Param: param} - for { - out, err = p.ipc(&req) - if err == nil { - return - } - time.Sleep(time.Second) - log.WithField("err", err). - Warnf(`http: Unable to send init request to plugin "%s". Retrying...`, p.Path) - } -} - func (p *httpTransport) SendEvent(name string, in []byte) (out []byte, err error) { out, err = p.rpc(pluginrequest.NewEventRequest(name, in)) return diff --git a/pkg/server/plugin/http/http_test.go b/pkg/server/plugin/http/http_test.go index 2e20daff2..02144d21b 100644 --- a/pkg/server/plugin/http/http_test.go +++ b/pkg/server/plugin/http/http_test.go @@ -59,22 +59,6 @@ func TestRun(t *testing.T) { config: appconfig, } - Convey("run init", func() { - httpmock.RegisterResponder("POST", "http://localhost:8000", - func(req *http.Request) (*http.Response, error) { - out, _ := ioutil.ReadAll(req.Body) - So(string(out), ShouldContainSubstring, "hello-world") - return httpmock.NewJsonResponse(200, map[string]interface{}{ - "data": "hello", - }) - }, - ) - - out, err := transport.RunInit() - So(out, ShouldEqualJSON, `{"data": "hello"}`) - So(err, ShouldBeNil) - }) - Convey("send event", func() { Convey("success case", func() { data := []byte(`{"data": "hello-world"}`) diff --git a/pkg/server/plugin/plugin.go b/pkg/server/plugin/plugin.go index c29652722..12d5d274e 100644 --- a/pkg/server/plugin/plugin.go +++ b/pkg/server/plugin/plugin.go @@ -19,6 +19,7 @@ import ( "fmt" "net/http" "strings" + "time" "github.com/Sirupsen/logrus" @@ -32,11 +33,17 @@ import ( var log = logging.LoggerEntry("plugin") +const ( + // PluginInitMaxRetryCount defines the maximum retries for plugin initialization + PluginInitMaxRetryCount = 100 +) + // Plugin represents a collection of handlers, hooks and lambda functions // that extends or modifies functionality provided by skygear. type Plugin struct { - transport Transport - gatewayMap map[string]*router.Gateway + initRetryCount int + transport Transport + gatewayMap map[string]*router.Gateway } type pluginHandlerInfo struct { @@ -117,6 +124,11 @@ type InitContext struct { Config skyconfig.Configuration } +// InitPayload models the payload for plugin initialization +type InitPayload struct { + Config skyconfig.Configuration `json:"config"` +} + // AddPluginConfiguration creates and appends a plugin func (c *InitContext) AddPluginConfiguration(name string, path string, args []string) *Plugin { plug := NewPlugin(name, path, args, c.Config) @@ -127,7 +139,7 @@ func (c *InitContext) AddPluginConfiguration(name string, path string, args []st // InitPlugins initializes all plugins registered func (c *InitContext) InitPlugins() { for _, plug := range c.plugins { - plug.Init(c) + go plug.Init(c) } } @@ -143,24 +155,60 @@ func (c *InitContext) IsReady() bool { // Init instantiates a plugin. This sets up hooks and handlers. func (p *Plugin) Init(context *InitContext) { - p.transport.SetInitHandler(func(out []byte, err error) error { - if err != nil { - panic(fmt.Sprintf("Unable to get registration info from plugin. Error: %v", err)) - } + for { + log. + WithField("retry", p.initRetryCount). + Info("Sending init event to plugin") - regInfo := registrationInfo{} - if err := json.Unmarshal(out, ®Info); err != nil { - panic(err) + p.transport.SetState(TransportStateUninitialized) + regInfo, err := p.requestInit(context) + if err != nil { + p.transport.SetState(TransportStateError) + + p.initRetryCount++ + if p.initRetryCount >= PluginInitMaxRetryCount { + log.Panic("Fail to initialize plugin") + } + time.Sleep(2 * time.Second) + continue } + p.transport.SetState(TransportStateReady) p.processRegistrationInfo(context, regInfo) - return nil - }) + break + } +} + +func (p *Plugin) requestInit(context *InitContext) (regInfo registrationInfo, initErr error) { + payload := InitPayload{ + Config: context.Config, + } + + in, err := json.Marshal(payload) + if err != nil { + initErr = fmt.Errorf("Cannot encode plugin initialization payload. Error: %v", err) + return + } + + out, err := p.transport.SendEvent("init", in) log.WithFields(logrus.Fields{ + "out": string(out), + "err": err, "plugin": p, - }).Debugln("request plugin to return configuration") - go p.transport.RequestInit() + }).Info("Get response from init") + + if err != nil { + initErr = fmt.Errorf("Cannot encode plugin initialization payload. Error: %v", err) + return + } + + if err := json.Unmarshal(out, ®Info); err != nil { + initErr = fmt.Errorf("Unable to decode plugin initialization info. Error: %v", err) + return + } + + return } // IsReady tells whether the plugin is ready diff --git a/pkg/server/plugin/transport.go b/pkg/server/plugin/transport.go index 230848116..9146b87ab 100644 --- a/pkg/server/plugin/transport.go +++ b/pkg/server/plugin/transport.go @@ -63,10 +63,7 @@ type TransportInitHandler func([]byte, error) error // and remote process. type Transport interface { State() TransportState - - SetInitHandler(TransportInitHandler) - RequestInit() - RunInit() ([]byte, error) + SetState(TransportState) SendEvent(name string, in []byte) ([]byte, error) diff --git a/pkg/server/plugin/transport_test.go b/pkg/server/plugin/transport_test.go index fd79782c6..688e7a49b 100644 --- a/pkg/server/plugin/transport_test.go +++ b/pkg/server/plugin/transport_test.go @@ -26,27 +26,16 @@ import ( ) type nullTransport struct { + state TransportState initHandler TransportInitHandler lastContext context.Context } func (t *nullTransport) State() TransportState { - return TransportStateReady + return t.state } - -func (t *nullTransport) SetInitHandler(f TransportInitHandler) { - t.initHandler = f -} - -func (t *nullTransport) RequestInit() { - if t.initHandler != nil { - t.initHandler([]byte{}, nil) - } - return -} -func (t nullTransport) RunInit() (out []byte, err error) { - out = []byte{} - return +func (t *nullTransport) SetState(newState TransportState) { + t.state = newState } func (t nullTransport) SendEvent(name string, in []byte) (out []byte, err error) { out = in diff --git a/pkg/server/plugin/zmq/rpc.go b/pkg/server/plugin/zmq/rpc.go index 8ba5d2ce1..b22c601d1 100644 --- a/pkg/server/plugin/zmq/rpc.go +++ b/pkg/server/plugin/zmq/rpc.go @@ -47,11 +47,7 @@ func (p *zmqTransport) State() skyplugin.TransportState { return p.state } -func (p *zmqTransport) SetInitHandler(f skyplugin.TransportInitHandler) { - p.initHandler = f -} - -func (p *zmqTransport) setState(state skyplugin.TransportState) { +func (p *zmqTransport) SetState(state skyplugin.TransportState) { if state != p.state { oldState := p.state p.state = state @@ -59,36 +55,6 @@ func (p *zmqTransport) setState(state skyplugin.TransportState) { } } -// RequestInit is expected to run in separate gorountine and called once to -// set it internal state with coordinate with broker. -func (p *zmqTransport) RequestInit() { - for { - out, err := p.RunInit() - if err != nil { - p.logger.WithField("err", err). - Warnf(`zmq/rpc: Unable to send init request to plugin "%s". Retrying...`, p.name) - continue - } - if p.initHandler != nil { - handlerError := p.initHandler(out, err) - if err != nil || handlerError != nil { - p.setState(skyplugin.TransportStateError) - } - } - p.setState(skyplugin.TransportStateReady) - break - } -} - -func (p *zmqTransport) RunInit() (out []byte, err error) { - param := struct { - Config skyconfig.Configuration `json:"config"` - }{p.config} - req := pluginrequest.Request{Kind: "init", Param: param} - out, err = p.ipc(&req) - return -} - func (p *zmqTransport) SendEvent(name string, in []byte) ([]byte, error) { return p.rpc(pluginrequest.NewEventRequest(name, in)) }