Skip to content
This repository has been archived by the owner on Dec 22, 2023. It is now read-only.

Commit

Permalink
Add lifecycle related events
Browse files Browse the repository at this point in the history
Ref. #199
  • Loading branch information
Ben Lei committed Nov 3, 2016
1 parent 18623cf commit 5138c85
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 27 deletions.
32 changes: 19 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func main() {
if !config.App.Slave {
cronjob = cron.New()
}
initContext := plugin.InitContext{
pluginContext := plugin.Context{
Router: r,
Mux: serveMux,
Preprocessors: preprocessorRegistry,
Expand Down Expand Up @@ -133,19 +133,23 @@ func main() {
Option: config.DB.Option,
DevMode: config.App.DevMode,
}
preprocessorRegistry["plugin"] = &pp.EnsurePluginReadyPreprocessor{&initContext}
preprocessorRegistry["plugin"] = &pp.EnsurePluginReadyPreprocessor{
PluginContext: &pluginContext,
}
preprocessorRegistry["inject_user"] = &pp.InjectUserIfPresent{}
preprocessorRegistry["require_user"] = &pp.RequireUserForWrite{}
preprocessorRegistry["inject_db"] = &pp.InjectDatabase{}
preprocessorRegistry["inject_public_db"] = &pp.InjectPublicDatabase{}
preprocessorRegistry["dev_only"] = &pp.DevOnlyProcessor{config.App.DevMode}
preprocessorRegistry["dev_only"] = &pp.DevOnlyProcessor{
DevMode: config.App.DevMode,
}

r.Map("", &handler.HomeHandler{})

g := &inject.Graph{}
injectErr := g.Provide(
&inject.Object{Value: initContext.ProviderRegistry, Complete: true, Name: "ProviderRegistry"},
&inject.Object{Value: initContext.HookRegistry, Complete: true, Name: "HookRegistry"},
&inject.Object{Value: pluginContext.ProviderRegistry, Complete: true, Name: "ProviderRegistry"},
&inject.Object{Value: pluginContext.HookRegistry, Complete: true, Name: "HookRegistry"},
&inject.Object{Value: tokenStore, Complete: true, Name: "TokenStore"},
&inject.Object{Value: initAssetStore(config), Complete: true, Name: "AssetStore"},
&inject.Object{Value: pushSender, Complete: true, Name: "PushSender"},
Expand All @@ -160,8 +164,8 @@ func main() {
}

injector := router.HandlerInjector{
g,
&preprocessorRegistry,
ServiceGraph: g,
PreprocessorMap: &preprocessorRegistry,
}

r.Map("auth:signup", injector.Inject(&handler.SignupHandler{}))
Expand Down Expand Up @@ -259,7 +263,9 @@ func main() {
}

// Bootstrap finished, starting services
initPlugin(config, &initContext)
initPlugin(config, &pluginContext)

pluginContext.SendEvents("server-ready", []byte{}, false)

log.Printf("Listening on %v...", config.HTTP.Host)
err := http.ListenAndServe(config.HTTP.Host, finalMux)
Expand Down Expand Up @@ -414,18 +420,18 @@ func initSubscription(config skyconfig.Configuration, connOpener func() (skydb.C
go subscriptionService.Run()
}

func initPlugin(config skyconfig.Configuration, initContext *plugin.InitContext) {
func initPlugin(config skyconfig.Configuration, ctx *plugin.Context) {
log.Infof("Supported plugin transports: %s", strings.Join(plugin.SupportedTransports(), ", "))

if initContext.Scheduler != nil {
initContext.Scheduler.Start()
if ctx.Scheduler != nil {
ctx.Scheduler.Start()
}

for _, pluginConfig := range config.Plugin {
initContext.AddPluginConfiguration(pluginConfig.Transport, pluginConfig.Path, pluginConfig.Args)
ctx.AddPluginConfiguration(pluginConfig.Transport, pluginConfig.Path, pluginConfig.Args)
}

initContext.InitPlugins()
ctx.InitPlugins()
}

func initLogger(config skyconfig.Configuration) {
Expand Down
54 changes: 42 additions & 12 deletions pkg/server/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"net/http"
"strings"
"sync"
"time"

"github.com/Sirupsen/logrus"
Expand Down Expand Up @@ -112,8 +113,8 @@ func NewPlugin(name string, path string, args []string, config skyconfig.Configu
return p
}

// InitContext contains reference to structs that will be initialized by plugin.
type InitContext struct {
// Context contains reference to structs that will be initialized by plugin.
type Context struct {
plugins []*Plugin
Router *router.Router
Mux *http.ServeMux
Expand All @@ -125,31 +126,59 @@ type InitContext struct {
}

// AddPluginConfiguration creates and appends a plugin
func (c *InitContext) AddPluginConfiguration(name string, path string, args []string) *Plugin {
func (c *Context) AddPluginConfiguration(name string, path string, args []string) *Plugin {
plug := NewPlugin(name, path, args, c.Config)
c.plugins = append(c.plugins, &plug)
return &plug
}

// InitPlugins initializes all plugins registered
func (c *InitContext) InitPlugins() {
for _, plug := range c.plugins {
go plug.Init(c)
func (c *Context) InitPlugins() {
wg := sync.WaitGroup{}
for _, eachPlugin := range c.plugins {
wg.Add(1)
go func(plug *Plugin) {
defer wg.Done()
plug.Init(c)
}(eachPlugin)
}

log.
WithField("count", len(c.plugins)).
Info("Wait for all plugin configurations")
wg.Wait()
c.SendEvents("before-plugins-ready", []byte{}, false)
c.SendEvents("after-plugins-ready", []byte{}, false)
}

// IsReady returns true if all the configured plugins are available
func (c *InitContext) IsReady() bool {
for _, plug := range c.plugins {
if !plug.IsReady() {
func (c *Context) IsReady() bool {
for _, eachPlugin := range c.plugins {
if !eachPlugin.IsReady() {
return false
}
}
return true
}

// SendEvents sends event to all plugins
func (c *Context) SendEvents(name string, data []byte, async bool) {
sendEventFunc := func(plugin *Plugin, name string, data []byte) {
plugin.transport.SendEvent(name, data)
}

for _, eachPlugin := range c.plugins {
if async {
go sendEventFunc(eachPlugin, name, data)
} else {
sendEventFunc(eachPlugin, name, data)
}
}
}

// Init instantiates a plugin. This sets up hooks and handlers.
func (p *Plugin) Init(context *InitContext) {
func (p *Plugin) Init(context *Context) {
p.transport.SendEvent("before-config", []byte{})
for {
log.
WithField("retry", p.initRetryCount).
Expand All @@ -173,9 +202,10 @@ func (p *Plugin) Init(context *InitContext) {

break
}
p.transport.SendEvent("after-config", []byte{})
}

func (p *Plugin) requestInit(context *InitContext) (regInfo registrationInfo, initErr error) {
func (p *Plugin) requestInit(context *Context) (regInfo registrationInfo, initErr error) {
payload := struct {
Config skyconfig.Configuration `json:"config"`
}{context.Config}
Expand Down Expand Up @@ -211,7 +241,7 @@ func (p *Plugin) IsReady() bool {
return p.transport.State() == TransportStateReady
}

func (p *Plugin) processRegistrationInfo(context *InitContext, regInfo registrationInfo) {
func (p *Plugin) processRegistrationInfo(context *Context, regInfo registrationInfo) {
log.WithFields(logrus.Fields{
"regInfo": regInfo,
"transport": p.transport,
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/preprocessor/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (
)

type EnsurePluginReadyPreprocessor struct {
PluginInitContext *plugin.InitContext
PluginContext *plugin.Context
}

func (p *EnsurePluginReadyPreprocessor) Preprocess(payload *router.Payload, response *router.Response) int {
if !p.PluginInitContext.IsReady() {
if !p.PluginContext.IsReady() {
log.Errorf("Request cannot be handled because plugins are unavailable at the moment.")
response.Err = skyerr.NewError(skyerr.PluginUnavailable, "plugins are unavailable at the moment")
return http.StatusServiceUnavailable
Expand Down

0 comments on commit 5138c85

Please sign in to comment.