Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Analytics Dashboard #72

Open
wants to merge 104 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
104 commits
Select commit Hold shift + click to select a range
902d6c7
start on admin dashboard
jordanschalm Jan 3, 2017
4236bac
fix imports
jordanschalm Jan 3, 2017
8906068
get websockets working reliably across reloads, add unsubscribe metho…
jordanschalm Jan 4, 2017
7ad2f0c
get logs into dashboard module
jordanschalm Jan 5, 2017
2ae4951
add log hooks to main package
jordanschalm Jan 24, 2017
1515594
giving up on plain js -- setup react
jordanschalm Jan 24, 2017
55eedaa
set up statik file system and move session management to occur over w…
jordanschalm Jan 24, 2017
f0f59f3
set up redux store and hook up to websockets
jordanschalm Jan 30, 2017
2bbfa29
time conversion on fe, and feed from be into charts
jordanschalm Feb 4, 2017
5ed2b25
styling of charts, and fix sync issue with event store
jordanschalm Feb 6, 2017
4da0bbb
add initSess stub
jordanschalm Feb 6, 2017
13191e0
glide.lock auto-update
jordanschalm Feb 6, 2017
31cb581
merge master into dashboard
jordanschalm Feb 6, 2017
dd5094c
add decorator pattern
flashmob Feb 9, 2017
20cf073
Merge branch 'master' into more-backends-refactoring
flashmob Feb 9, 2017
6476239
dummy backend uses the decorator pattern with two decorators: "debugg…
flashmob Feb 9, 2017
44ad353
dummy backend uses the decorator pattern with two decorators: "debugg…
flashmob Feb 9, 2017
b19bac2
more progress with decorators:
flashmob Feb 10, 2017
00dfc08
more progress with decorators:
flashmob Feb 10, 2017
87c442a
refactored Processor initialization and shutdown: now using initializ…
flashmob Feb 13, 2017
9127bc0
- got rid of abstract.go backend
flashmob Feb 14, 2017
e2d7375
- redis: when not compressed, delivery header was not added
flashmob Feb 14, 2017
91d4e66
- synchronise BackendService
flashmob Feb 14, 2017
0c84f5e
- backend logger refactoring
flashmob Feb 14, 2017
35f069b
forgot to set the log level
flashmob Feb 14, 2017
32c1e9e
Fixed tests
flashmob Feb 15, 2017
6c1c48b
- AddProcessor function to give more abstraction into the processor c…
flashmob Feb 16, 2017
0d1e35c
add error checking for backend processor initialization
flashmob Feb 16, 2017
7dbe397
trim whitespace from user part
flashmob Feb 16, 2017
0c160b6
rename process_line to process_stack to better reflect name
flashmob Feb 16, 2017
c4fea83
Send actions over websockets directly as redux actions
jordanschalm Feb 21, 2017
8e1e342
Refactoring
flashmob Feb 22, 2017
7aa52bf
Add ranking telemetry for top clients by helo, ip, domain and create …
jordanschalm Feb 23, 2017
b90508a
test travis
jordanschalm Feb 23, 2017
f56b3c8
test travis
jordanschalm Feb 23, 2017
85d6aef
test travis
jordanschalm Feb 23, 2017
0a4965f
test travis
jordanschalm Feb 23, 2017
462ebb2
test travis
jordanschalm Feb 23, 2017
2107c27
test travis
jordanschalm Feb 23, 2017
5f44d28
test travis
jordanschalm Feb 23, 2017
dfcd016
test travis
jordanschalm Feb 23, 2017
2db6573
test travis
jordanschalm Feb 23, 2017
1d59402
add old backend compatibility
flashmob Feb 23, 2017
5764ae4
old version of guerrilla-db-redis, to remove
flashmob Feb 23, 2017
9c09c57
ported guerrilla_db_redis to new backend system
flashmob Feb 23, 2017
60494c1
remove old deprecated backend system
flashmob Feb 23, 2017
f94e4f4
Reimplement ranking analytics to save (lots of) memory
jordanschalm Feb 23, 2017
f96ebc7
Add config options to dashboard and update sample config and readme
jordanschalm Feb 25, 2017
1da6c64
change enabled key to match rest of config
jordanschalm Feb 25, 2017
f7a2032
- renamed envelope package to mail because *envelope.Envelope didn't …
flashmob Feb 25, 2017
2f62ce7
return ErrProcessorNotFound error if procesor not found
flashmob Feb 25, 2017
98b297c
update readme to include info about the new backend system
flashmob Feb 25, 2017
0966eaa
- debugger processor name, make case insensitive
flashmob Feb 26, 2017
407dcd3
limit helo to 16 characters logged, update README
jordanschalm Feb 26, 2017
2a12a28
fix slice bound error
jordanschalm Feb 26, 2017
bc52714
- tweak debug messages
flashmob Feb 26, 2017
83f98cc
- clean up comments
flashmob Feb 27, 2017
224edf4
fix broken build
flashmob Feb 27, 2017
a0e78f7
Add dashboard deps and build to makefile
jordanschalm Feb 27, 2017
b1fff47
update clean in makefile
jordanschalm Feb 28, 2017
6e9d2ca
update build process in readme
jordanschalm Feb 28, 2017
c359a95
- Backend shutdown process re-written to use channels (workStoppers)…
flashmob Mar 3, 2017
3a31bd2
remove unneeded import from test
flashmob Mar 3, 2017
74b0540
fix format error string
flashmob Mar 3, 2017
7188363
update readme
flashmob Mar 3, 2017
28d7542
change log re-open signal to SIGUSR1 instead of SIGHUP
flashmob Mar 3, 2017
e9878f3
make sure to register to get SIGUSR1 signal
flashmob Mar 3, 2017
df08327
- moved backed config from CmdConfig to AppConfig, including backend …
flashmob Mar 5, 2017
4b9a781
- New API (with default configuration options)
flashmob Mar 7, 2017
7043262
refactor serve.go to use the new API
flashmob Mar 7, 2017
7d6d1fd
* api tests
flashmob Mar 8, 2017
565c3f9
use the makefile to run test
flashmob Mar 8, 2017
d4bcc13
make log public
flashmob Mar 9, 2017
3cb670d
- Envelopes now have their own pool. This is so that if processing ta…
flashmob Mar 10, 2017
a452c06
add testrace to makefile
flashmob Mar 10, 2017
ef7202c
update sample config
flashmob Mar 10, 2017
29db21a
separate process stack for validating recipients - it means it use a …
flashmob Mar 11, 2017
b532c91
- remove /sloonz/go-qprintable and replace with go stdlib equivalent
flashmob Mar 11, 2017
e82cf80
Readme update: edit description and add features list
flashmob Mar 11, 2017
5427265
merge backend refactor into dashboard and fix conflicts. Also fix a b…
jordanschalm Mar 12, 2017
dd8dceb
update gitignore in test
jordanschalm Mar 12, 2017
9e50efb
add dashboard glide deps
jordanschalm Mar 13, 2017
bfe58e2
Merge branch 'master' into dashboard
flashmob Mar 21, 2017
e3b8469
fix merge conflicts
flashmob Mar 21, 2017
8fcb6c2
- add Stop function
flashmob Mar 23, 2017
2ff69cf
- fix deadlock for when the http server for dashboard fails and Stop…
flashmob Mar 23, 2017
abf5280
fix statik build
flashmob Mar 23, 2017
ebf74b8
Fix dashboard target in Makefile
jordanschalm Mar 23, 2017
5c46f53
Fix bug where JS console prints error on INIT message from dashboard …
jordanschalm Mar 23, 2017
f1521b0
add a simulation test for the dashboard
flashmob Mar 24, 2017
b44b3bf
Merge branch 'dashboard' of github.com:flashmob/go-guerrilla into das…
flashmob Mar 24, 2017
ea7149d
remove line limit from dashboard simulation test
flashmob Mar 24, 2017
7e86a4e
- split hook to hook.go
flashmob Mar 25, 2017
21e9d2a
create heap structure for connection records
jordanschalm Apr 11, 2017
96340cd
Merge branch 'master' into dashboard
flashmob May 31, 2018
b1b7061
fix imports
flashmob May 31, 2018
ef7478b
fix deadlock
flashmob May 31, 2018
e1c505b
Merge branch 'master' into dashboard
flashmob Jan 30, 2019
2b4a8cd
- update readme
flashmob Jan 31, 2019
c5ca3e5
fix deadlog caused by debuf message that wasn't removed
flashmob Jan 31, 2019
b314384
fix race condition when incrementing number of clients
flashmob Feb 1, 2019
339f7df
fix tests
flashmob Feb 1, 2019
666750d
update dashboard readme
flashmob Feb 1, 2019
2e7c6db
add screenshot
flashmob Feb 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactored Processor initialization and shutdown: now using initializ…
…er and shutdowner
flashmob committed Feb 13, 2017
commit 87c442ae591026730d8df5199a963d057dcc721e
39 changes: 6 additions & 33 deletions backends/abstract.go
Original file line number Diff line number Diff line change
@@ -4,19 +4,15 @@ import (
"errors"
"fmt"
"github.com/flashmob/go-guerrilla/envelope"
"github.com/flashmob/go-guerrilla/ev"
"reflect"
"runtime/debug"
"strings"
)

type AbstractBackend struct {
config abstractConfig
Extend Worker
p Processor
configLoaders []ConfigLoaderFunc
configTesters []ConfigTesterFunc
initializers []DecoratorinitializeFunc
config abstractConfig
Extend Worker
p Processor
}

type abstractConfig struct {
@@ -25,24 +21,6 @@ type abstractConfig struct {

var ab AbstractBackend

type ConfigLoaderFunc func(backendConfig BackendConfig) error

func (b *AbstractBackend) AddConfigLoader(f ConfigLoaderFunc) {
b.configLoaders = append(b.configLoaders, f)
}

type ConfigTesterFunc func(backendConfig BackendConfig) error

func (b *AbstractBackend) AddConfigTester(f ConfigTesterFunc) {
b.configTesters = append(b.configTesters, f)
}

type DecoratorinitializeFunc func() error

func (b *AbstractBackend) AddInitializer(f DecoratorinitializeFunc) {
b.initializers = append(b.initializers, f)
}

// Your backend should implement this method and set b.config field with a custom config struct
// Therefore, your implementation would have your own custom config type instead of dummyConfig
func (b *AbstractBackend) loadConfig(backendConfig BackendConfig) (err error) {
@@ -69,14 +47,9 @@ func (b *AbstractBackend) SetProcessors(p ...Decorator) {
}

func (b *AbstractBackend) Initialize(config BackendConfig) error {
for _, loader := range b.configLoaders {
loader(config)
}
for i := range b.initializers {
b.initializers[i]()
}
//Service.Publish(ev.BackendProcConfigLoad, config)
Service.Publish(ev.BackendProcInitialize, config)

Service.Initialize(config)

return nil

// TODO delete
66 changes: 52 additions & 14 deletions backends/backend.go
Original file line number Diff line number Diff line change
@@ -3,18 +3,17 @@ package backends
import (
"fmt"
"github.com/flashmob/go-guerrilla/envelope"
"github.com/flashmob/go-guerrilla/ev"
"github.com/flashmob/go-guerrilla/log"
"strconv"
"strings"
)

var mainlog log.Logger

var Service BackendService
var Service *BackendService

func init() {
Service = BackendService{}
Service = &BackendService{}
}

// Backends process received mail. Depending on the implementation, they can store mail in the database,
@@ -38,23 +37,13 @@ type Worker interface {
// parse the configuration files
loadConfig(BackendConfig) error

AddConfigLoader(f ConfigLoaderFunc)
AddConfigTester(f ConfigTesterFunc)
AddInitializer(f DecoratorinitializeFunc)

Shutdown() error
Process(*envelope.Envelope) BackendResult
Initialize(BackendConfig) error

SetProcessors(p ...Decorator)
}

type DecoratorCallbacks struct {
loader ConfigLoaderFunc
tester ConfigTesterFunc
initialize DecoratorinitializeFunc
}

type BackendConfig map[string]interface{}

var backends = map[string]Worker{}
@@ -107,6 +96,55 @@ func NewBackendResult(message string) BackendResult {
return backendResult(message)
}

type ProcessorInitializer interface {
Initialize(backendConfig BackendConfig) error
}

type ProcessorShutdowner interface {
Shutdown() error
}

type Initialize func(backendConfig BackendConfig) error
type Shutdown func() error

// Satisfy ProcessorInitializer interface
// So we can now pass an anonymous function that implements ProcessorInitializer
func (i Initialize) Initialize(backendConfig BackendConfig) error {
// delegate to the anonymous function
return i(backendConfig)
}

// satisfy ProcessorShutdowner interface, same concept as Initialize type
func (s Shutdown) Shutdown() error {
// delegate
return s()
}

type BackendService struct {
ev.EventHandler
ProcessorHandlers
}

type ProcessorHandlers struct {
Initializers []ProcessorInitializer
Shutdowners []ProcessorShutdowner
}

func (b *BackendService) AddInitializer(i ProcessorInitializer) {
b.Initializers = append(b.Initializers, i)
}

func (b *BackendService) AddShutdowner(i ProcessorShutdowner) {
b.Shutdowners = append(b.Shutdowners, i)
}

func (b *BackendService) Initialize(backend BackendConfig) {
for i := range b.Initializers {
b.Initializers[i].Initialize(backend)
}
}

func (b *BackendService) Shutdown() {
for i := range b.Shutdowners {
b.Shutdowners[i].Shutdown()
}
}
8 changes: 1 addition & 7 deletions backends/dummy.go
Original file line number Diff line number Diff line change
@@ -2,14 +2,8 @@ package backends

func init() {
backends["dummy"] = &AbstractBackend{}
debuggercb := &DecoratorCallbacks{}
headerCB := &DecoratorCallbacks{}
redisCB := &DecoratorCallbacks{}
backends["dummy"].SetProcessors(
MySql(), Redis(redisCB), Compressor(), Header(headerCB), Hasher(), Debugger(debuggercb), HeadersParser())
backends["dummy"].AddConfigLoader(debuggercb.loader)
backends["dummy"].AddConfigLoader(headerCB.loader)
backends["dummy"].AddConfigLoader(redisCB.loader)
MySql(), Redis(), Compressor(), Header(), Hasher(), Debugger(), HeadersParser())
}

// custom configuration we will parse from the json
9 changes: 4 additions & 5 deletions backends/p_debugger.go
Original file line number Diff line number Diff line change
@@ -8,19 +8,18 @@ type debuggerConfig struct {
LogReceivedMails bool `json:"log_received_mails"`
}

func Debugger(dc *DecoratorCallbacks) Decorator {

func Debugger() Decorator {
var config *debuggerConfig
dc.loader = func(backendConfig BackendConfig) error {
initFunc := Initialize(func(backendConfig BackendConfig) error {
configType := baseConfig(&debuggerConfig{})
bcfg, err := ab.extractConfig(backendConfig, configType)
if err != nil {
return err
}
config = bcfg.(*debuggerConfig)
return nil
}

})
Service.AddInitializer(initFunc)
return func(c Processor) Processor {
return ProcessorFunc(func(e *envelope.Envelope) (BackendResult, error) {
if config.LogReceivedMails {
10 changes: 5 additions & 5 deletions backends/p_header.go
Original file line number Diff line number Diff line change
@@ -11,20 +11,20 @@ type HeaderConfig struct {
}

// Generate the MTA delivery header
// Sets e.DeliveryHeader with the result
func Header(dc *DecoratorCallbacks) Decorator {
// Sets e.DeliveryHeader part of the envelope with the generated header
func Header() Decorator {

var config *HeaderConfig
dc.loader = func(backendConfig BackendConfig) error {

Service.AddInitializer(Initialize(func(backendConfig BackendConfig) error {
configType := baseConfig(&HeaderConfig{})
bcfg, err := ab.extractConfig(backendConfig, configType)
if err != nil {
return err
}
config = bcfg.(*HeaderConfig)

return nil
}
}))

return func(c Processor) Processor {
return ProcessorFunc(func(e *envelope.Envelope) (BackendResult, error) {
85 changes: 46 additions & 39 deletions backends/p_mysql.go
Original file line number Diff line number Diff line change
@@ -8,7 +8,6 @@ import (
"github.com/flashmob/go-guerrilla/envelope"
"github.com/go-sql-driver/mysql"

"github.com/flashmob/go-guerrilla/ev"
"runtime/debug"
)

@@ -24,13 +23,35 @@ type MysqlProcessorConfig struct {
PrimaryHost string `json:"primary_mail_host"`
}

type MysqlProcessorDecorator struct {
type MysqlProcessor struct {
cache stmtCache
config *MysqlProcessorConfig
}

func (m *MysqlProcessor) connect(config *MysqlProcessorConfig) (*sql.DB, error) {
var db *sql.DB
var err error
conf := mysql.Config{
User: config.MysqlUser,
Passwd: config.MysqlPass,
DBName: config.MysqlDB,
Net: "tcp",
Addr: config.MysqlHost,
ReadTimeout: GuerrillaDBAndRedisBatchTimeout + (time.Second * 10),
WriteTimeout: GuerrillaDBAndRedisBatchTimeout + (time.Second * 10),
Params: map[string]string{"collation": "utf8_general_ci"},
}
if db, err = sql.Open("mysql", conf.FormatDSN()); err != nil {
mainlog.Error("cannot open mysql", err)
return nil, err
}
mainlog.Info("connected to mysql on tcp ", config.MysqlHost)
return db, err

}

// prepares the sql query with the number of rows that can be batched with it
func (g *MysqlProcessorDecorator) prepareInsertQuery(rows int, db *sql.DB) *sql.Stmt {
func (g *MysqlProcessor) prepareInsertQuery(rows int, db *sql.DB) *sql.Stmt {
if rows == 0 {
panic("rows argument cannot be 0")
}
@@ -58,7 +79,7 @@ func (g *MysqlProcessorDecorator) prepareInsertQuery(rows int, db *sql.DB) *sql.
return stmt
}

func (g *MysqlProcessorDecorator) doQuery(c int, db *sql.DB, insertStmt *sql.Stmt, vals *[]interface{}) {
func (g *MysqlProcessor) doQuery(c int, db *sql.DB, insertStmt *sql.Stmt, vals *[]interface{}) {
var execErr error
defer func() {
if r := recover(); r != nil {
@@ -84,48 +105,34 @@ func (g *MysqlProcessorDecorator) doQuery(c int, db *sql.DB, insertStmt *sql.Stm

func MySql() Decorator {

decorator := MysqlProcessorDecorator{}

var config *MysqlProcessorConfig

var vals []interface{}
var db *sql.DB
mp := &MysqlProcessor{}

mysqlConnect := func() (*sql.DB, error) {
conf := mysql.Config{
User: config.MysqlUser,
Passwd: config.MysqlPass,
DBName: config.MysqlDB,
Net: "tcp",
Addr: config.MysqlHost,
ReadTimeout: GuerrillaDBAndRedisBatchTimeout + (time.Second * 10),
WriteTimeout: GuerrillaDBAndRedisBatchTimeout + (time.Second * 10),
Params: map[string]string{"collation": "utf8_general_ci"},
}
if db, err := sql.Open("mysql", conf.FormatDSN()); err != nil {
mainlog.Error("cannot open mysql", err)
return nil, err
} else {
mainlog.Info("connected to mysql on tcp ", config.MysqlHost)
return db, nil
}

}
Service.Subscribe(ev.BackendProcInitialize, func(backendConfig BackendConfig) {

Service.AddInitializer(Initialize(func(backendConfig BackendConfig) error {
configType := baseConfig(&MysqlProcessorConfig{})
// TODO deal with error (supressed) push them on a channel? eg, Service.Errors <- service.ErrConfigLoad
bcfg, _ := ab.extractConfig(backendConfig, configType)
bcfg, err := ab.extractConfig(backendConfig, configType)
if err != nil {
return err
}
config = bcfg.(*MysqlProcessorConfig)
decorator.config = config

// todo backendErrors chan error
var err error
db, err = mysqlConnect()
mp.config = config
db, err = mp.connect(config)
if err != nil {
mainlog.Fatalf("cannot open mysql: %s", err)
return err
}
return nil
}))

// shutdown
Service.AddShutdowner(Shutdown(func() error {
if db != nil {
db.Close()
}
})
return nil
}))

return func(c Processor) Processor {
return ProcessorFunc(func(e *envelope.Envelope) (BackendResult, error) {
@@ -174,8 +181,8 @@ func MySql() Decorator {
trimToLimit(e.MailFrom.String(), 255),
e.TLS)

stmt := decorator.prepareInsertQuery(1, db)
decorator.doQuery(1, db, stmt, &vals)
stmt := mp.prepareInsertQuery(1, db)
mp.doQuery(1, db, stmt, &vals)
// continue to the next Processor in the decorator chain
return c.Process(e)
})
42 changes: 37 additions & 5 deletions backends/p_redis.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,64 @@
package backends

import (
"fmt"

"github.com/flashmob/go-guerrilla/envelope"

"github.com/flashmob/go-guerrilla/response"
"github.com/garyburd/redigo/redis"
)

type RedisProcessorConfig struct {
RedisExpireSeconds int `json:"redis_expire_seconds"`
RedisInterface string `json:"redis_interface"`
}

type RedisProcessor struct {
isConnected bool
conn redis.Conn
}

func (r *RedisProcessor) redisConnection(redisInterface string) (err error) {
if r.isConnected == false {
r.conn, err = redis.Dial("tcp", redisInterface)
if err != nil {
// handle error
return err
}
r.isConnected = true
}
return nil
}

// The redis decorator stores the email data in redis

func Redis(dc *DecoratorCallbacks) Decorator {
func Redis() Decorator {

var config *RedisProcessorConfig
redisClient := &redisClient{}
dc.loader = func(backendConfig BackendConfig) error {
redisClient := &RedisProcessor{}
// read the config into RedisProcessorConfig
Service.AddInitializer(Initialize(func(backendConfig BackendConfig) error {
configType := baseConfig(&RedisProcessorConfig{})
bcfg, err := ab.extractConfig(backendConfig, configType)
if err != nil {
return err
}
config = bcfg.(*RedisProcessorConfig)

if redisErr := redisClient.redisConnection(config.RedisInterface); redisErr != nil {
err := fmt.Errorf("Redis cannot connect, check your settings: %s", redisErr)
return err
}
return nil
}
}))
// When shutting down
Service.AddShutdowner(Shutdown(func() error {
if redisClient.isConnected {
redisClient.conn.Close()
}
return nil
}))

var redisErr error

return func(c Processor) Processor {
2 changes: 1 addition & 1 deletion backends/processor.go
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ type Processor interface {
// Signature of DoFunc
type ProcessorFunc func(*envelope.Envelope) (BackendResult, error)

// Add method to DoFunc type to satisfy Client interface
// Make ProcessorFunc will satisfy the Processor interface
func (f ProcessorFunc) Process(e *envelope.Envelope) (BackendResult, error) {
return f(e)
}
13 changes: 6 additions & 7 deletions cmd/guerrillad/serve.go
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@ import (
"fmt"
"github.com/flashmob/go-guerrilla"
"github.com/flashmob/go-guerrilla/backends"
"github.com/flashmob/go-guerrilla/ev"
"github.com/flashmob/go-guerrilla/log"
"github.com/spf13/cobra"
"io/ioutil"
@@ -86,7 +85,7 @@ func sigHandler(app guerrilla.Guerrilla) {
}
}

func subscribeBackendEvent(event ev.Event, backend backends.Backend, app guerrilla.Guerrilla) {
func subscribeBackendEvent(event guerrilla.Event, backend backends.Backend, app guerrilla.Guerrilla) {

app.Subscribe(event, func(cmdConfig *CmdConfig) {
logger, _ := log.GetLogger(cmdConfig.LogFile)
@@ -145,12 +144,12 @@ func serve(cmd *cobra.Command, args []string) {
if err != nil {
mainlog.WithError(err).Error("Error(s) when starting server(s)")
}
subscribeBackendEvent(ev.ConfigBackendConfig, backend, app)
subscribeBackendEvent(ev.ConfigBackendName, backend, app)
subscribeBackendEvent(guerrilla.EventConfigBackendConfig, backend, app)
subscribeBackendEvent(guerrilla.EventConfigBackendName, backend, app)
// Write out our PID
writePid(cmdConfig.PidFile)
// ...and write out our pid whenever the file name changes in the config
app.Subscribe(ev.ConfigPidFile, func(ac *guerrilla.AppConfig) {
app.Subscribe(guerrilla.EventConfigPidFile, func(ac *guerrilla.AppConfig) {
writePid(ac.PidFile)
})
// change the logger from stdrerr to one from config
@@ -185,10 +184,10 @@ func (c *CmdConfig) load(jsonBytes []byte) error {
func (c *CmdConfig) emitChangeEvents(oldConfig *CmdConfig, app guerrilla.Guerrilla) {
// has backend changed?
if !reflect.DeepEqual((*c).BackendConfig, (*oldConfig).BackendConfig) {
app.Publish(ev.ConfigBackendConfig, c)
app.Publish(guerrilla.EventConfigBackendConfig, c)
}
if c.BackendName != oldConfig.BackendName {
app.Publish(ev.ConfigBackendName, c)
app.Publish(guerrilla.EventConfigBackendName, c)
}
// call other emitChangeEvents
c.AppConfig.EmitChangeEvents(&oldConfig.AppConfig, app)
37 changes: 18 additions & 19 deletions config.go
Original file line number Diff line number Diff line change
@@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/flashmob/go-guerrilla/ev"
"os"
"reflect"
"strings"
@@ -67,26 +66,26 @@ func (c *AppConfig) Load(jsonBytes []byte) error {
func (c *AppConfig) EmitChangeEvents(oldConfig *AppConfig, app Guerrilla) {
// has config changed, general check
if !reflect.DeepEqual(oldConfig, c) {
app.Publish(ev.ConfigNewConfig, c)
app.Publish(EventConfigNewConfig, c)
}
// has 'allowed hosts' changed?
if !reflect.DeepEqual(oldConfig.AllowedHosts, c.AllowedHosts) {
app.Publish(ev.ConfigAllowedHosts, c)
app.Publish(EventConfigAllowedHosts, c)
}
// has pid file changed?
if strings.Compare(oldConfig.PidFile, c.PidFile) != 0 {
app.Publish(ev.ConfigPidFile, c)
app.Publish(EventConfigPidFile, c)
}
// has mainlog log changed?
if strings.Compare(oldConfig.LogFile, c.LogFile) != 0 {
app.Publish(ev.ConfigLogFile, c)
app.Publish(EventConfigLogFile, c)
} else {
// since config file has not changed, we reload it
app.Publish(ev.ConfigLogReopen, c)
app.Publish(EventConfigLogReopen, c)
}
// has log level changed?
if strings.Compare(oldConfig.LogLevel, c.LogLevel) != 0 {
app.Publish(ev.ConfigLogLevel, c)
app.Publish(EventConfigLogLevel, c)
}
// server config changes
oldServers := oldConfig.getServers()
@@ -99,21 +98,21 @@ func (c *AppConfig) EmitChangeEvents(oldConfig *AppConfig, app Guerrilla) {
newServer.emitChangeEvents(oldServer, app)
} else {
// start new server
app.Publish(ev.ConfigEvServerNew, newServer)
app.Publish(EventConfigEvServerNew, newServer)
}

}
// remove any servers that don't exist anymore
for _, oldserver := range oldServers {
app.Publish(ev.ConfigServerRemove, oldserver)
app.Publish(EventConfigServerRemove, oldserver)
}
}

// EmitLogReopen emits log reopen events using existing config
func (c *AppConfig) EmitLogReopenEvents(app Guerrilla) {
app.Publish(ev.ConfigLogReopen, c)
app.Publish(EventConfigLogReopen, c)
for _, sc := range c.getServers() {
app.Publish(ev.ConfigServerLogReopen, sc)
app.Publish(EventConfigServerLogReopen, sc)
}
}

@@ -136,33 +135,33 @@ func (sc *ServerConfig) emitChangeEvents(oldServer *ServerConfig, app Guerrilla)
)
if len(changes) > 0 {
// something changed in the server config
app.Publish(ev.ConfigServerConfig, sc)
app.Publish(EventConfigServerConfig, sc)
}

// enable or disable?
if _, ok := changes["IsEnabled"]; ok {
if sc.IsEnabled {
app.Publish(ev.ConfigServerStart, sc)
app.Publish(EventConfigServerStart, sc)
} else {
app.Publish(ev.ConfigServerStop, sc)
app.Publish(EventConfigServerStop, sc)
}
// do not emit any more events when IsEnabled changed
return
}
// log file change?
if _, ok := changes["LogFile"]; ok {
app.Publish(ev.ConfigServerLogFile, sc)
app.Publish(EventConfigServerLogFile, sc)
} else {
// since config file has not changed, we reload it
app.Publish(ev.ConfigServerLogReopen, sc)
app.Publish(EventConfigServerLogReopen, sc)
}
// timeout changed
if _, ok := changes["Timeout"]; ok {
app.Publish(ev.ConfigServerTimeout, sc)
app.Publish(EventConfigServerTimeout, sc)
}
// max_clients changed
if _, ok := changes["MaxClients"]; ok {
app.Publish(ev.ConfigServerMaxClients, sc)
app.Publish(EventConfigServerMaxClients, sc)
}

// tls changed
@@ -181,7 +180,7 @@ func (sc *ServerConfig) emitChangeEvents(oldServer *ServerConfig, app Guerrilla)
}
return false
}(); ok {
app.Publish(ev.ConfigServerTLSConfig, sc)
app.Publish(EventConfigServerTLSConfig, sc)
}
}

45 changes: 19 additions & 26 deletions ev/event.go → event.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ev
package guerrilla

import (
evbus "github.com/asaskevich/EventBus"
@@ -8,48 +8,41 @@ type Event int

const (
// when a new config was loaded
ConfigNewConfig Event = iota
EventConfigNewConfig Event = iota
// when allowed_hosts changed
ConfigAllowedHosts
EventConfigAllowedHosts
// when pid_file changed
ConfigPidFile
EventConfigPidFile
// when log_file changed
ConfigLogFile
EventConfigLogFile
// when it's time to reload the main log file
ConfigLogReopen
EventConfigLogReopen
// when log level changed
ConfigLogLevel
EventConfigLogLevel
// when the backend changed
ConfigBackendName
EventConfigBackendName
// when the backend's config changed
ConfigBackendConfig
EventConfigBackendConfig
// when a new server was added
ConfigEvServerNew
EventConfigEvServerNew
// when an existing server was removed
ConfigServerRemove
EventConfigServerRemove
// when a new server config was detected (general event)
ConfigServerConfig
EventConfigServerConfig
// when a server was enabled
ConfigServerStart
EventConfigServerStart
// when a server was disabled
ConfigServerStop
EventConfigServerStop
// when a server's log file changed
ConfigServerLogFile
EventConfigServerLogFile
// when it's time to reload the server's log
ConfigServerLogReopen
EventConfigServerLogReopen
// when a server's timeout changed
ConfigServerTimeout
EventConfigServerTimeout
// when a server's max clients changed
ConfigServerMaxClients
EventConfigServerMaxClients
// when a server's TLS config changed
ConfigServerTLSConfig

// Load a backend processor's config todo: dont need it?
BackendProcConfigLoad
// initialize a backend processor
BackendProcInitialize
// shutdown a backend processor
BackendProcShutdown
EventConfigServerTLSConfig
)

var eventList = [...]string{
39 changes: 19 additions & 20 deletions guerrilla.go
Original file line number Diff line number Diff line change
@@ -3,7 +3,6 @@ package guerrilla
import (
"errors"
"github.com/flashmob/go-guerrilla/backends"
"github.com/flashmob/go-guerrilla/ev"
"github.com/flashmob/go-guerrilla/log"
"sync"
"sync/atomic"
@@ -36,9 +35,9 @@ func (e Errors) Error() string {
type Guerrilla interface {
Start() error
Shutdown()
Subscribe(topic ev.Event, fn interface{}) error
Publish(topic ev.Event, args ...interface{})
Unsubscribe(topic ev.Event, handler interface{}) error
Subscribe(topic Event, fn interface{}) error
Publish(topic Event, args ...interface{})
Unsubscribe(topic Event, handler interface{}) error
SetLogger(log.Logger)
}

@@ -49,7 +48,7 @@ type guerrilla struct {
// guard controls access to g.servers
guard sync.Mutex
state int8
ev.EventHandler
EventHandler
logStore
}

@@ -173,20 +172,20 @@ func (g *guerrilla) mapServers(callback func(*server)) map[string]*server {
func (g *guerrilla) subscribeEvents() {

// main config changed
g.Subscribe(ev.ConfigNewConfig, func(c *AppConfig) {
g.Subscribe(EventConfigNewConfig, func(c *AppConfig) {
g.setConfig(c)
})

// allowed_hosts changed, set for all servers
g.Subscribe(ev.ConfigAllowedHosts, func(c *AppConfig) {
g.Subscribe(EventConfigAllowedHosts, func(c *AppConfig) {
g.mapServers(func(server *server) {
server.setAllowedHosts(c.AllowedHosts)
})
g.mainlog().Infof("allowed_hosts config changed, a new list was set")
})

// the main log file changed
g.Subscribe(ev.ConfigLogFile, func(c *AppConfig) {
g.Subscribe(EventConfigLogFile, func(c *AppConfig) {
var err error
var l log.Logger
if l, err = log.GetLogger(c.LogFile); err == nil {
@@ -203,13 +202,13 @@ func (g *guerrilla) subscribeEvents() {
})

// re-open the main log file (file not changed)
g.Subscribe(ev.ConfigLogReopen, func(c *AppConfig) {
g.Subscribe(EventConfigLogReopen, func(c *AppConfig) {
g.mainlog().Reopen()
g.mainlog().Infof("re-opened main log file [%s]", c.LogFile)
})

// when log level changes, apply to mainlog and server logs
g.Subscribe(ev.ConfigLogLevel, func(c *AppConfig) {
g.Subscribe(EventConfigLogLevel, func(c *AppConfig) {
g.mainlog().SetLevel(c.LogLevel)
g.mapServers(func(server *server) {
server.log.SetLevel(c.LogLevel)
@@ -218,12 +217,12 @@ func (g *guerrilla) subscribeEvents() {
})

// server config was updated
g.Subscribe(ev.ConfigServerConfig, func(sc *ServerConfig) {
g.Subscribe(EventConfigServerConfig, func(sc *ServerConfig) {
g.setServerConfig(sc)
})

// add a new server to the config & start
g.Subscribe(ev.ConfigEvServerNew, func(sc *ServerConfig) {
g.Subscribe(EventConfigEvServerNew, func(sc *ServerConfig) {
if _, err := g.findServer(sc.ListenInterface); err != nil {
// not found, lets add it
if err := g.makeServers(); err != nil {
@@ -240,7 +239,7 @@ func (g *guerrilla) subscribeEvents() {
}
})
// start a server that already exists in the config and has been enabled
g.Subscribe(ev.ConfigServerStart, func(sc *ServerConfig) {
g.Subscribe(EventConfigServerStart, func(sc *ServerConfig) {
if server, err := g.findServer(sc.ListenInterface); err == nil {
if server.state == ServerStateStopped || server.state == ServerStateNew {
g.mainlog().Infof("Starting server [%s]", server.listenInterface)
@@ -252,7 +251,7 @@ func (g *guerrilla) subscribeEvents() {
}
})
// stop running a server
g.Subscribe(ev.ConfigServerStop, func(sc *ServerConfig) {
g.Subscribe(EventConfigServerStop, func(sc *ServerConfig) {
if server, err := g.findServer(sc.ListenInterface); err == nil {
if server.state == ServerStateRunning {
server.Shutdown()
@@ -261,7 +260,7 @@ func (g *guerrilla) subscribeEvents() {
}
})
// server was removed from config
g.Subscribe(ev.ConfigServerRemove, func(sc *ServerConfig) {
g.Subscribe(EventConfigServerRemove, func(sc *ServerConfig) {
if server, err := g.findServer(sc.ListenInterface); err == nil {
server.Shutdown()
g.removeServer(sc.ListenInterface)
@@ -270,7 +269,7 @@ func (g *guerrilla) subscribeEvents() {
})

// TLS changes
g.Subscribe(ev.ConfigServerTLSConfig, func(sc *ServerConfig) {
g.Subscribe(EventConfigServerTLSConfig, func(sc *ServerConfig) {
if server, err := g.findServer(sc.ListenInterface); err == nil {
if err := server.configureSSL(); err == nil {
g.mainlog().Infof("Server [%s] new TLS configuration loaded", sc.ListenInterface)
@@ -280,19 +279,19 @@ func (g *guerrilla) subscribeEvents() {
}
})
// when server's timeout change.
g.Subscribe(ev.ConfigServerTimeout, func(sc *ServerConfig) {
g.Subscribe(EventConfigServerTimeout, func(sc *ServerConfig) {
g.mapServers(func(server *server) {
server.setTimeout(sc.Timeout)
})
})
// when server's max clients change.
g.Subscribe(ev.ConfigServerMaxClients, func(sc *ServerConfig) {
g.Subscribe(EventConfigServerMaxClients, func(sc *ServerConfig) {
g.mapServers(func(server *server) {
// TODO resize the pool somehow
})
})
// when a server's log file changes
g.Subscribe(ev.ConfigServerLogFile, func(sc *ServerConfig) {
g.Subscribe(EventConfigServerLogFile, func(sc *ServerConfig) {
if server, err := g.findServer(sc.ListenInterface); err == nil {
var err error
var l log.Logger
@@ -314,7 +313,7 @@ func (g *guerrilla) subscribeEvents() {
}
})
// when the daemon caught a sighup, event for individual server
g.Subscribe(ev.ConfigServerLogReopen, func(sc *ServerConfig) {
g.Subscribe(EventConfigServerLogReopen, func(sc *ServerConfig) {
if server, err := g.findServer(sc.ListenInterface); err == nil {
server.log.Reopen()
g.mainlog().Infof("Server [%s] re-opened log file [%s]", sc.ListenInterface, sc.LogFile)