Skip to content

Commit

Permalink
Enable sending event via exec, http and zmq
Browse files Browse the repository at this point in the history
  • Loading branch information
Ben Lei committed Nov 3, 2016
1 parent 0982212 commit 9b22fcf
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/server/plugin/exec/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ func (p *execTransport) RunInit() (out []byte, err error) {
return
}

func (p *execTransport) SendEvent(name string, in []byte) ([]byte, error) {
return p.runProc([]string{"event", name}, []string{}, in)
}

func (p *execTransport) RunLambda(ctx context.Context, name string, in []byte) (out []byte, err error) {
pluginCtx := skyplugin.ContextMap(ctx)
encodedCtx, err := common.EncodeBase64JSON(pluginCtx)
Expand Down
12 changes: 12 additions & 0 deletions pkg/server/plugin/exec/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ func TestRun(t *testing.T) {
So(string(out), ShouldEqual, `"op hello:world"`)
})

Convey("event", func() {
out, err := transport.SendEvent("foo:bar", []byte{})
So(err, ShouldBeNil)
So(string(out), ShouldEqual, `"event foo:bar"`)
})

Convey("handler", func() {
out, err := transport.RunHandler(nil, "hello:world", []byte{})
So(err, ShouldBeNil)
Expand All @@ -148,6 +154,12 @@ func TestRun(t *testing.T) {
So(string(out), ShouldEqual, `"hello world"`)
})

Convey("event", func() {
out, err := transport.SendEvent("foo:bar", []byte(`{"result": "haha"}`))
So(err, ShouldBeNil)
So(string(out), ShouldEqual, `"haha"`)
})

Convey("handler", func() {
out, err := transport.RunHandler(nil, "hello:world", []byte(`{"result": "hello world"}`))
So(err, ShouldBeNil)
Expand Down
5 changes: 5 additions & 0 deletions pkg/server/plugin/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ func (p *httpTransport) RunInit() (out []byte, err error) {
}
}

func (p *httpTransport) SendEvent(name string, in []byte) (out []byte, err error) {
out, err = p.rpc(pluginrequest.NewEventRequest(name, in))
return
}

func (p *httpTransport) RunLambda(ctx context.Context, name string, in []byte) (out []byte, err error) {
out, err = p.rpc(pluginrequest.NewLambdaRequest(ctx, name, in))
return
Expand Down
54 changes: 54 additions & 0 deletions pkg/server/plugin/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/skygeario/skygear-server/pkg/server/router"
"github.com/skygeario/skygear-server/pkg/server/skyconfig"
"github.com/skygeario/skygear-server/pkg/server/skydb"
"github.com/skygeario/skygear-server/pkg/server/skyerr"
. "github.com/skygeario/skygear-server/pkg/server/skytest"
. "github.com/smartystreets/goconvey/convey"
)
Expand Down Expand Up @@ -74,6 +75,59 @@ func TestRun(t *testing.T) {
So(err, ShouldBeNil)
})

Convey("send event", func() {
Convey("success case", func() {
data := []byte(`{"data": "hello-world"}`)
httpmock.RegisterResponder(
"POST",
"http://localhost:8000",
func(req *http.Request) (*http.Response, error) {
bodyBytes, _ := ioutil.ReadAll(req.Body)
So(
bodyBytes,
ShouldEqualJSON,
`{"kind":"event","name":"foo","param":{"data":"hello-world"}}`,
)

return httpmock.NewJsonResponse(200, map[string]interface{}{
"result": map[string]interface{}{"data": "hello-world-resp"},
})
},
)

out, err := transport.SendEvent("foo", data)
So(out, ShouldEqualJSON, `{"data": "hello-world-resp"}`)
So(err, ShouldBeNil)
})

Convey("fail case", func() {
data := []byte(`{"data": "hello-world"}`)
httpmock.RegisterResponder(
"POST",
"http://localhost:8000",
func(req *http.Request) (*http.Response, error) {
bodyBytes, _ := ioutil.ReadAll(req.Body)
So(
bodyBytes,
ShouldEqualJSON,
`{"kind":"event","name":"foo2","param":{"data":"hello-world"}}`,
)

return httpmock.NewJsonResponse(500, map[string]interface{}{
"error": map[string]interface{}{
"code": skyerr.UnexpectedError,
"message": "test error",
},
})
},
)

out, err := transport.SendEvent("foo2", data)
So(out, ShouldBeNil)
So(err.Error(), ShouldEqual, "UnexpectedError: test error")
})
})

Convey("run lambda", func() {
ctx := context.WithValue(context.Background(), router.UserIDContextKey, "user")
data := `{"data": "bye"}`
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func RegisterTransport(name string, transport TransportFactory) {
transportFactories[name] = transport
}

// SupportedTransports tells all supported transport names
func SupportedTransports() []string {
var transports []string
for name := range transportFactories {
Expand Down Expand Up @@ -116,12 +117,14 @@ type InitContext struct {
Config skyconfig.Configuration
}

// AddPluginConfiguration creates and appends a plugin
func (c *InitContext) AddPluginConfiguration(name string, path string, args []string) *Plugin {
plug := NewPlugin(name, path, args, c.Config)
c.plugins = append(c.plugins, &plug)
return &plug
}

// InitPlugins initializes all plugins registered
func (c *InitContext) InitPlugins() {
for _, plug := range c.plugins {
plug.Init(c)
Expand Down Expand Up @@ -160,6 +163,7 @@ func (p *Plugin) Init(context *InitContext) {
go p.transport.RequestInit()
}

// IsReady tells whether the plugin is ready
func (p *Plugin) IsReady() bool {
return p.transport.State() == TransportStateReady
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/server/plugin/request/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ func NewLambdaRequest(ctx context.Context, name string, args json.RawMessage) *R
return &Request{Kind: "op", Name: name, Param: args, Context: ctx}
}

// NewEventRequest creates a new event request
func NewEventRequest(name string, data json.RawMessage) *Request {
return &Request{Kind: "event", Name: name, Param: data}
}

// NewHandlerRequest creates a new handler request.
func NewHandlerRequest(ctx context.Context, name string, input json.RawMessage) *Request {
return &Request{Kind: "handler", Name: name, Param: input, Context: ctx}
Expand Down
5 changes: 5 additions & 0 deletions pkg/server/plugin/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,20 @@ const (
TransportStateError
)

// TransportInitHandler models the handler for transport init
type TransportInitHandler func([]byte, error) error

// A Transport represents the interface of data transfer between skygear
// and remote process.
type Transport interface {
State() TransportState

SetInitHandler(TransportInitHandler)
RequestInit()
RunInit() ([]byte, error)

SendEvent(name string, in []byte) ([]byte, error)

RunLambda(ctx context.Context, name string, in []byte) ([]byte, error)
RunHandler(ctx context.Context, name string, in []byte) ([]byte, error)

Expand Down
4 changes: 4 additions & 0 deletions pkg/server/plugin/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (t nullTransport) RunInit() (out []byte, err error) {
out = []byte{}
return
}
func (t nullTransport) SendEvent(name string, in []byte) (out []byte, err error) {
out = in
return
}
func (t *nullTransport) RunLambda(ctx context.Context, name string, in []byte) (out []byte, err error) {
out = in
t.lastContext = ctx
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/plugin/zmq/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func (p *zmqTransport) RunInit() (out []byte, err error) {
return
}

func (p *zmqTransport) SendEvent(name string, in []byte) ([]byte, error) {
return p.rpc(pluginrequest.NewEventRequest(name, in))
}

func (p *zmqTransport) RunLambda(ctx context.Context, name string, in []byte) (out []byte, err error) {
out, err = p.rpc(pluginrequest.NewLambdaRequest(ctx, name, in))
return
Expand Down

0 comments on commit 9b22fcf

Please sign in to comment.