Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Analytics Dashboard #72

Open
wants to merge 104 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
104 commits
Select commit Hold shift + click to select a range
902d6c7
start on admin dashboard
jordanschalm Jan 3, 2017
4236bac
fix imports
jordanschalm Jan 3, 2017
8906068
get websockets working reliably across reloads, add unsubscribe metho…
jordanschalm Jan 4, 2017
7ad2f0c
get logs into dashboard module
jordanschalm Jan 5, 2017
2ae4951
add log hooks to main package
jordanschalm Jan 24, 2017
1515594
giving up on plain js -- setup react
jordanschalm Jan 24, 2017
55eedaa
set up statik file system and move session management to occur over w…
jordanschalm Jan 24, 2017
f0f59f3
set up redux store and hook up to websockets
jordanschalm Jan 30, 2017
2bbfa29
time conversion on fe, and feed from be into charts
jordanschalm Feb 4, 2017
5ed2b25
styling of charts, and fix sync issue with event store
jordanschalm Feb 6, 2017
4da0bbb
add initSess stub
jordanschalm Feb 6, 2017
13191e0
glide.lock auto-update
jordanschalm Feb 6, 2017
31cb581
merge master into dashboard
jordanschalm Feb 6, 2017
dd5094c
add decorator pattern
flashmob Feb 9, 2017
20cf073
Merge branch 'master' into more-backends-refactoring
flashmob Feb 9, 2017
6476239
dummy backend uses the decorator pattern with two decorators: "debugg…
flashmob Feb 9, 2017
44ad353
dummy backend uses the decorator pattern with two decorators: "debugg…
flashmob Feb 9, 2017
b19bac2
more progress with decorators:
flashmob Feb 10, 2017
00dfc08
more progress with decorators:
flashmob Feb 10, 2017
87c442a
refactored Processor initialization and shutdown: now using initializ…
flashmob Feb 13, 2017
9127bc0
- got rid of abstract.go backend
flashmob Feb 14, 2017
e2d7375
- redis: when not compressed, delivery header was not added
flashmob Feb 14, 2017
91d4e66
- synchronise BackendService
flashmob Feb 14, 2017
0c84f5e
- backend logger refactoring
flashmob Feb 14, 2017
35f069b
forgot to set the log level
flashmob Feb 14, 2017
32c1e9e
Fixed tests
flashmob Feb 15, 2017
6c1c48b
- AddProcessor function to give more abstraction into the processor c…
flashmob Feb 16, 2017
0d1e35c
add error checking for backend processor initialization
flashmob Feb 16, 2017
7dbe397
trim whitespace from user part
flashmob Feb 16, 2017
0c160b6
rename process_line to process_stack to better reflect name
flashmob Feb 16, 2017
c4fea83
Send actions over websockets directly as redux actions
jordanschalm Feb 21, 2017
8e1e342
Refactoring
flashmob Feb 22, 2017
7aa52bf
Add ranking telemetry for top clients by helo, ip, domain and create …
jordanschalm Feb 23, 2017
b90508a
test travis
jordanschalm Feb 23, 2017
f56b3c8
test travis
jordanschalm Feb 23, 2017
85d6aef
test travis
jordanschalm Feb 23, 2017
0a4965f
test travis
jordanschalm Feb 23, 2017
462ebb2
test travis
jordanschalm Feb 23, 2017
2107c27
test travis
jordanschalm Feb 23, 2017
5f44d28
test travis
jordanschalm Feb 23, 2017
dfcd016
test travis
jordanschalm Feb 23, 2017
2db6573
test travis
jordanschalm Feb 23, 2017
1d59402
add old backend compatibility
flashmob Feb 23, 2017
5764ae4
old version of guerrilla-db-redis, to remove
flashmob Feb 23, 2017
9c09c57
ported guerrilla_db_redis to new backend system
flashmob Feb 23, 2017
60494c1
remove old deprecated backend system
flashmob Feb 23, 2017
f94e4f4
Reimplement ranking analytics to save (lots of) memory
jordanschalm Feb 23, 2017
f96ebc7
Add config options to dashboard and update sample config and readme
jordanschalm Feb 25, 2017
1da6c64
change enabled key to match rest of config
jordanschalm Feb 25, 2017
f7a2032
- renamed envelope package to mail because *envelope.Envelope didn't …
flashmob Feb 25, 2017
2f62ce7
return ErrProcessorNotFound error if procesor not found
flashmob Feb 25, 2017
98b297c
update readme to include info about the new backend system
flashmob Feb 25, 2017
0966eaa
- debugger processor name, make case insensitive
flashmob Feb 26, 2017
407dcd3
limit helo to 16 characters logged, update README
jordanschalm Feb 26, 2017
2a12a28
fix slice bound error
jordanschalm Feb 26, 2017
bc52714
- tweak debug messages
flashmob Feb 26, 2017
83f98cc
- clean up comments
flashmob Feb 27, 2017
224edf4
fix broken build
flashmob Feb 27, 2017
a0e78f7
Add dashboard deps and build to makefile
jordanschalm Feb 27, 2017
b1fff47
update clean in makefile
jordanschalm Feb 28, 2017
6e9d2ca
update build process in readme
jordanschalm Feb 28, 2017
c359a95
- Backend shutdown process re-written to use channels (workStoppers)…
flashmob Mar 3, 2017
3a31bd2
remove unneeded import from test
flashmob Mar 3, 2017
74b0540
fix format error string
flashmob Mar 3, 2017
7188363
update readme
flashmob Mar 3, 2017
28d7542
change log re-open signal to SIGUSR1 instead of SIGHUP
flashmob Mar 3, 2017
e9878f3
make sure to register to get SIGUSR1 signal
flashmob Mar 3, 2017
df08327
- moved backed config from CmdConfig to AppConfig, including backend …
flashmob Mar 5, 2017
4b9a781
- New API (with default configuration options)
flashmob Mar 7, 2017
7043262
refactor serve.go to use the new API
flashmob Mar 7, 2017
7d6d1fd
* api tests
flashmob Mar 8, 2017
565c3f9
use the makefile to run test
flashmob Mar 8, 2017
d4bcc13
make log public
flashmob Mar 9, 2017
3cb670d
- Envelopes now have their own pool. This is so that if processing ta…
flashmob Mar 10, 2017
a452c06
add testrace to makefile
flashmob Mar 10, 2017
ef7202c
update sample config
flashmob Mar 10, 2017
29db21a
separate process stack for validating recipients - it means it use a …
flashmob Mar 11, 2017
b532c91
- remove /sloonz/go-qprintable and replace with go stdlib equivalent
flashmob Mar 11, 2017
e82cf80
Readme update: edit description and add features list
flashmob Mar 11, 2017
5427265
merge backend refactor into dashboard and fix conflicts. Also fix a b…
jordanschalm Mar 12, 2017
dd8dceb
update gitignore in test
jordanschalm Mar 12, 2017
9e50efb
add dashboard glide deps
jordanschalm Mar 13, 2017
bfe58e2
Merge branch 'master' into dashboard
flashmob Mar 21, 2017
e3b8469
fix merge conflicts
flashmob Mar 21, 2017
8fcb6c2
- add Stop function
flashmob Mar 23, 2017
2ff69cf
- fix deadlock for when the http server for dashboard fails and Stop…
flashmob Mar 23, 2017
abf5280
fix statik build
flashmob Mar 23, 2017
ebf74b8
Fix dashboard target in Makefile
jordanschalm Mar 23, 2017
5c46f53
Fix bug where JS console prints error on INIT message from dashboard …
jordanschalm Mar 23, 2017
f1521b0
add a simulation test for the dashboard
flashmob Mar 24, 2017
b44b3bf
Merge branch 'dashboard' of github.com:flashmob/go-guerrilla into das…
flashmob Mar 24, 2017
ea7149d
remove line limit from dashboard simulation test
flashmob Mar 24, 2017
7e86a4e
- split hook to hook.go
flashmob Mar 25, 2017
21e9d2a
create heap structure for connection records
jordanschalm Apr 11, 2017
96340cd
Merge branch 'master' into dashboard
flashmob May 31, 2018
b1b7061
fix imports
flashmob May 31, 2018
ef7478b
fix deadlock
flashmob May 31, 2018
e1c505b
Merge branch 'master' into dashboard
flashmob Jan 30, 2019
2b4a8cd
- update readme
flashmob Jan 31, 2019
c5ca3e5
fix deadlog caused by debuf message that wasn't removed
flashmob Jan 31, 2019
b314384
fix race condition when incrementing number of clients
flashmob Feb 1, 2019
339f7df
fix tests
flashmob Feb 1, 2019
666750d
update dashboard readme
flashmob Feb 1, 2019
2e7c6db
add screenshot
flashmob Feb 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
- Envelopes now have their own pool. This is so that if processing ta…
…kes longer than backend timeout, the client can still be reused for the next connection,

while the envelope can be detached. Sometimes the client would panic if it reused a client where the backend still had a pointer to the client's envelope.
'Detaching' the envelope from the client avoids this problem.
- Gateway has new config options to control timeouts: gw_save_timeout and gw_val_rcpt_timeout
- spent some time on improving reliability of guerrilla_db_redis processor example
flashmob committed Mar 10, 2017
commit 3cb670d13747ca1d8c2ff0001f7acc4a6adfa185
20 changes: 13 additions & 7 deletions api_test.go
Original file line number Diff line number Diff line change
@@ -342,7 +342,7 @@ var funkyLogger = func() backends.Decorator {
}),
)

return func(c backends.Processor) backends.Processor {
return func(p backends.Processor) backends.Processor {
return backends.ProcessWith(
func(e *mail.Envelope, task backends.SelectTask) (backends.Result, error) {
if task == backends.TaskValidateRcpt {
@@ -351,13 +351,13 @@ var funkyLogger = func() backends.Decorator {
"another funky recipient [%s]",
e.RcptTo[len(e.RcptTo)-1])
// if valid then forward call to the next processor in the chain
return c.Process(e, task)
return p.Process(e, task)
// if invalid, return a backend result
//return backends.NewResult(response.Canned.FailRcptCmd), nil
} else if task == backends.TaskSaveMail {
backends.Log().Info("Another funky email!")
}
return c.Process(e, task)
return p.Process(e, task)
})
}
}
@@ -411,19 +411,25 @@ func talkToServer(address string) {
}
in := bufio.NewReader(conn)
str, err := in.ReadString('\n')
// fmt.Println(str)
fmt.Fprint(conn, "HELO maildiranasaurustester\r\n")
str, err = in.ReadString('\n')
// fmt.Println(str)
fmt.Fprint(conn, "MAIL FROM:<test@example.com>r\r\n")
str, err = in.ReadString('\n')
// fmt.Println(str)
fmt.Fprint(conn, "RCPT TO:test@grr.la\r\n")
str, err = in.ReadString('\n')
// fmt.Println(str)
fmt.Fprint(conn, "DATA\r\n")
str, err = in.ReadString('\n')
// fmt.Println(str)
fmt.Fprint(conn, "Subject: Test subject\r\n")
fmt.Fprint(conn, "\r\n")
fmt.Fprint(conn, "A an email body\r\n")
fmt.Fprint(conn, ".\r\n")
str, err = in.ReadString('\n')
// fmt.Println(str)
_ = str
}

@@ -455,7 +461,7 @@ func TestPubSubAPI(t *testing.T) {

// new config
cfg := &AppConfig{
PidFile: "tests/pidfile`.pid",
PidFile: "tests/pidfilex.pid",
LogFile: "tests/testlog",
AllowedHosts: []string{"grr.la"},
BackendConfig: backends.BackendConfig{"process_stack": "HeadersParser|Debugger|FunkyLogger"},
@@ -494,14 +500,14 @@ func TestAPILog(t *testing.T) {
os.Truncate("tests/testlog", 0)
d := Daemon{}
l := d.Log()
l.Info("hai") // to stderr
l.Info("logtest1") // to stderr
if l.GetLevel() != "info" {
t.Error("Log level does not eq info, it is ", l.GetLevel())
}
d.Logger = nil
d.Config = &AppConfig{LogFile: "tests/testlog"}
l = d.Log()
l.Info("hai") // to tests/testlog
l.Info("logtest1") // to tests/testlog

//
l = d.Log()
@@ -515,7 +521,7 @@ func TestAPILog(t *testing.T) {
return
}
// lets interrogate the log
if strings.Index(string(b), "hai") < 0 {
if strings.Index(string(b), "logtest1") < 0 {
t.Error("hai was not found in the log, it should have been in tests/testlog")
}
}
42 changes: 34 additions & 8 deletions backends/gateway.go
Original file line number Diff line number Diff line change
@@ -37,8 +37,14 @@ type BackendGateway struct {
}

type GatewayConfig struct {
WorkersSize int `json:"save_workers_size,omitempty"`
// WorkersSize controls how many concurrent workers to start. Defaults to 1
WorkersSize int `json:"save_workers_size,omitempty"`
// ProcessorStack controls which processors to chain in a stack.
ProcessorStack string `json:"process_stack,omitempty"`
// TimeoutSave is the number of seconds before timeout when saving an email
TimeoutSave int `json:"gw_save_timeout,omitempty"`
// TimeoutValidateRcpt is how many seconds before timeout when validating a recipient
TimeoutValidateRcpt int `json:"gw_val_rcpt_timeout,omitempty"`
}

// workerMsg is what get placed on the BackendGateway.saveMailChan channel
@@ -61,8 +67,11 @@ const (
BackendStateError
BackendStateInitialized

processTimeout = time.Second * 30
defaultProcessor = "Debugger"
// default timeout for saving email, if 'gw_save_timeout' not present in config
saveTimeout = time.Second * 30
// default timeout for validating rcpt to, if 'gw_val_rcpt_timeout' not present in config
validateRcptTimeout = time.Second * 5
defaultProcessor = "Debugger"
)

func (s backendState) String() string {
@@ -114,11 +123,10 @@ func (gw *BackendGateway) Process(e *mail.Envelope) Result {
}
return NewResult(response.Canned.SuccessMessageQueued + status.queuedID)

case <-time.After(processTimeout):
Log().Infof("Backend has timed out")
case <-time.After(gw.saveTimeout()):
Log().Error("Backend has timed out while saving eamil")
return NewResult(response.Canned.FailBackendTimeout)
}

}

// ValidateRcpt asks one of the workers to validate the recipient
@@ -139,8 +147,8 @@ func (gw *BackendGateway) ValidateRcpt(e *mail.Envelope) RcptError {
}
return nil

case <-time.After(time.Second):
Log().Infof("Backend has timed out")
case <-time.After(gw.validateRcptTimeout()):
Log().Error("Backend has timed out while validating rcpt")
return StorageTimeout
}
}
@@ -295,6 +303,22 @@ func (gw *BackendGateway) workersSize() int {
return gw.gwConfig.WorkersSize
}

// saveTimeout returns the maximum amount of seconds to wait before timing out a save processing task
func (gw *BackendGateway) saveTimeout() time.Duration {
if gw.gwConfig.TimeoutSave == 0 {
return saveTimeout
}
return time.Duration(gw.gwConfig.TimeoutSave)
}

// validateRcptTimeout returns the maximum amount of seconds to wait before timing out a recipient validation task
func (gw *BackendGateway) validateRcptTimeout() time.Duration {
if gw.gwConfig.TimeoutValidateRcpt == 0 {
return validateRcptTimeout
}
return time.Duration(gw.gwConfig.TimeoutValidateRcpt)
}

func (gw *BackendGateway) workDispatcher(workIn chan *workerMsg, p Processor, workerId int, stop chan bool) {

defer func() {
@@ -317,6 +341,7 @@ func (gw *BackendGateway) workDispatcher(workIn chan *workerMsg, p Processor, wo
Log().Debugf("worker stopped (#%d)", workerId)
return
}
msg.e.Lock()
if msg.task == TaskSaveMail {
// process the email here
// TODO we should check the err
@@ -338,6 +363,7 @@ func (gw *BackendGateway) workDispatcher(workIn chan *workerMsg, p Processor, wo
msg.notifyMe <- &notifyMsg{err: nil}
}
}
msg.e.Unlock()
}
}
}
6 changes: 3 additions & 3 deletions backends/p_compressor.go
Original file line number Diff line number Diff line change
@@ -90,17 +90,17 @@ func (c *compressor) clear() {
}

func Compressor() Decorator {
return func(c Processor) Processor {
return func(p Processor) Processor {
return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {
if task == TaskSaveMail {
compressor := newCompressor()
compressor.set([]byte(e.DeliveryHeader), &e.Data)
// put the pointer in there for other processors to use later in the line
e.Values["zlib-compressor"] = compressor
// continue to the next Processor in the decorator stack
return c.Process(e, task)
return p.Process(e, task)
} else {
return c.Process(e, task)
return p.Process(e, task)
}
})
}
6 changes: 3 additions & 3 deletions backends/p_debugger.go
Original file line number Diff line number Diff line change
@@ -38,17 +38,17 @@ func Debugger() Decorator {
return nil
})
Svc.AddInitializer(initFunc)
return func(c Processor) Processor {
return func(p Processor) Processor {
return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {
if task == TaskSaveMail {
if config.LogReceivedMails {
Log().Infof("Mail from: %s / to: %v", e.MailFrom.String(), e.RcptTo)
Log().Info("Headers are:", e.Header)
}
// continue to the next Processor in the decorator stack
return c.Process(e, task)
return p.Process(e, task)
} else {
return c.Process(e, task)
return p.Process(e, task)
}
})
}
128 changes: 92 additions & 36 deletions backends/p_guerrilla_db_redis.go
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ import (
"github.com/garyburd/redigo/redis"
"github.com/go-sql-driver/mysql"
"io"
"math/rand"
"runtime/debug"
"strings"
"sync"
@@ -36,7 +37,7 @@ func init() {
var queryBatcherId = 0

// how many rows to batch at a time
const GuerrillaDBAndRedisBatchMax = 2
const GuerrillaDBAndRedisBatchMax = 50

// tick on every...
const GuerrillaDBAndRedisBatchTimeout = time.Second * 3
@@ -46,6 +47,8 @@ type GuerrillaDBAndRedisBackend struct {
batcherWg sync.WaitGroup
// cache prepared queries
cache stmtCache

batcherStoppers []chan bool
}

// statement cache. It's an array, not slice
@@ -61,6 +64,7 @@ type guerrillaDBAndRedisConfig struct {
RedisExpireSeconds int `json:"redis_expire_seconds"`
RedisInterface string `json:"redis_interface"`
PrimaryHost string `json:"primary_mail_host"`
BatchTimeout int `json:"redis_mysql_batch_timeout,omitempty"`
}

// Load the backend config for the backend. It has already been unmarshalled
@@ -170,7 +174,7 @@ func (g *GuerrillaDBAndRedisBackend) prepareInsertQuery(rows int, db *sql.DB) *s
return stmt
}

func (g *GuerrillaDBAndRedisBackend) doQuery(c int, db *sql.DB, insertStmt *sql.Stmt, vals *[]interface{}) {
func (g *GuerrillaDBAndRedisBackend) doQuery(c int, db *sql.DB, insertStmt *sql.Stmt, vals *[]interface{}) error {
var execErr error
defer func() {
if r := recover(); r != nil {
@@ -189,9 +193,13 @@ func (g *GuerrillaDBAndRedisBackend) doQuery(c int, db *sql.DB, insertStmt *sql.
// prepare the query used to insert when rows reaches batchMax
insertStmt = g.prepareInsertQuery(c, db)
_, execErr = insertStmt.Exec(*vals...)
//if rand.Intn(2) == 1 {
// return errors.New("uggabooka")
//}
if execErr != nil {
Log().WithError(execErr).Error("There was a problem the insert")
}
return execErr
}

// Batches the rows from the feeder chan in to a single INSERT statement.
@@ -201,30 +209,53 @@ func (g *GuerrillaDBAndRedisBackend) doQuery(c int, db *sql.DB, insertStmt *sql.
// The goroutine can either exit if there's a panic or feeder channel closes
// it returns feederOk which signals if the feeder chanel was ok (still open) while returning
// if it feederOk is false, then it means the feeder chanel is closed
func (g *GuerrillaDBAndRedisBackend) insertQueryBatcher(feeder chan []interface{}, db *sql.DB) (feederOk bool) {
func (g *GuerrillaDBAndRedisBackend) insertQueryBatcher(
feeder feedChan,
db *sql.DB,
batcherId int,
stop chan bool) (feederOk bool) {

// controls shutdown
defer g.batcherWg.Done()
g.batcherWg.Add(1)
// vals is where values are batched to
var vals []interface{}
// how many rows were batched
count := 0
// The timer will tick every second.
// The timer will tick x seconds.
// Interrupting the select clause when there's no data on the feeder channel
t := time.NewTimer(GuerrillaDBAndRedisBatchTimeout)
timeo := GuerrillaDBAndRedisBatchTimeout
if g.config.BatchTimeout > 0 {
timeo = time.Duration(g.config.BatchTimeout)
}
t := time.NewTimer(timeo)
// prepare the query used to insert when rows reaches batchMax
insertStmt := g.prepareInsertQuery(GuerrillaDBAndRedisBatchMax, db)
// inserts executes a batched insert query, clears the vals and resets the count
insert := func(c int) {
inserter := func(c int) {
if c > 0 {
g.doQuery(c, db, insertStmt, &vals)
err := g.doQuery(c, db, insertStmt, &vals)
if err != nil {
// maybe connection prob?
// retry the sql query
attempts := 3
for i := 0; i < attempts; i++ {
Log().Infof("retrying query query rows[%c] ", c)
time.Sleep(time.Second)
err = g.doQuery(c, db, insertStmt, &vals)
if err == nil {
continue
}
}
}
}
vals = nil
count = 0
}
rand.Seed(time.Now().UnixNano())
defer func() {
if r := recover(); r != nil {
Log().Error("insertQueryBatcher caught a panic", r)
Log().Error("insertQueryBatcher caught a panic", r, string(debug.Stack()))
}
}()
// Keep getting values from feeder and add to batch.
@@ -234,30 +265,33 @@ func (g *GuerrillaDBAndRedisBackend) insertQueryBatcher(feeder chan []interface{
for {
select {
// it may panic when reading on a closed feeder channel. feederOK detects if it was closed
case row, feederOk := <-feeder:
if row == nil {
Log().Infof("MySQL query batcher exiting (#%d)", queryBatcherId)
// Insert any remaining rows
insert(count)
return feederOk
}
case <-stop:
Log().Infof("MySQL query batcher stopped (#%d)", batcherId)
// Insert any remaining rows
inserter(count)
feederOk = false
close(feeder)
return
case row := <-feeder:

vals = append(vals, row...)
count++
Log().Debug("new feeder row:", row, " cols:", len(row), " count:", count, " worker", queryBatcherId)
Log().Debug("new feeder row:", row, " cols:", len(row), " count:", count, " worker", batcherId)
if count >= GuerrillaDBAndRedisBatchMax {
insert(GuerrillaDBAndRedisBatchMax)
inserter(GuerrillaDBAndRedisBatchMax)
}
// stop timer from firing (reset the interrupt)
if !t.Stop() {
// darin the timer
<-t.C
}
t.Reset(GuerrillaDBAndRedisBatchTimeout)
t.Reset(timeo)
case <-t.C:
// anything to insert?
if n := len(vals); n > 0 {
insert(count)
inserter(count)
}
t.Reset(GuerrillaDBAndRedisBatchTimeout)
t.Reset(timeo)
}
}
}
@@ -271,14 +305,23 @@ func trimToLimit(str string, limit int) string {
}

func (g *GuerrillaDBAndRedisBackend) mysqlConnect() (*sql.DB, error) {
tOut := GuerrillaDBAndRedisBatchTimeout
if g.config.BatchTimeout > 0 {
tOut = time.Duration(g.config.BatchTimeout)
}
tOut += 10
// don't go to 30 sec or more
if tOut >= 30 {
tOut = 29
}
conf := mysql.Config{
User: g.config.MysqlUser,
Passwd: g.config.MysqlPass,
DBName: g.config.MysqlDB,
Net: "tcp",
Addr: g.config.MysqlHost,
ReadTimeout: GuerrillaDBAndRedisBatchTimeout + (time.Second * 10),
WriteTimeout: GuerrillaDBAndRedisBatchTimeout + (time.Second * 10),
ReadTimeout: tOut * time.Second,
WriteTimeout: tOut * time.Second,
Params: map[string]string{"collation": "utf8_general_ci"},
}
if db, err := sql.Open("mysql", conf.FormatDSN()); err != nil {
@@ -307,6 +350,8 @@ func (c *redisClient) redisConnection(redisInterface string) (err error) {
return nil
}

type feedChan chan []interface{}

// GuerrillaDbReddis is a specialized processor for Guerrilla mail. It is here as an example.
// It's an example of a 'monolithic' processor.
func GuerrillaDbReddis() Decorator {
@@ -319,9 +364,12 @@ func GuerrillaDbReddis() Decorator {

var redisErr error

feeder := make(chan []interface{}, 1)
var feeders []feedChan

g.batcherStoppers = make([]chan bool, 0)

Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {

configType := BaseConfig(&guerrillaDBAndRedisConfig{})
bcfg, err := Svc.ExtractConfig(backendConfig, configType)
if err != nil {
@@ -332,37 +380,45 @@ func GuerrillaDbReddis() Decorator {
if err != nil {
return err
}
queryBatcherId++
// start the query SQL batching where we will send data via the feeder channel
go func() {
queryBatcherId++
stop := make(chan bool)
feeder := make(feedChan, 1)
go func(qbID int, stop chan bool) {
// we loop so that if insertQueryBatcher panics, it can recover and go in again
for {
if feederOK := g.insertQueryBatcher(feeder, db); !feederOK {
Log().Debugf("insertQueryBatcher exited (#%d)", queryBatcherId)
if feederOK := g.insertQueryBatcher(feeder, db, qbID, stop); !feederOK {
Log().Debugf("insertQueryBatcher exited (#%d)", qbID)
return
}
// if insertQueryBatcher panics, it can recover and go in again
Log().Debug("resuming insertQueryBatcher")
}
}()
}(queryBatcherId, stop)
g.batcherStoppers = append(g.batcherStoppers, stop)
feeders = append(feeders, feeder)
return nil
}))

Svc.AddShutdowner(ShutdownWith(func() error {
db.Close()
Log().Infof("closed mysql")
if redisClient.conn != nil {
Log().Infof("closed redis")
redisClient.conn.Close()
}
// close the feeder & wait for query batcher to exit.
close(feeder)
// send a close signal to all query batchers to exit.
for i := range g.batcherStoppers {
g.batcherStoppers[i] <- true
}
g.batcherWg.Wait()

return nil
}))

var vals []interface{}
data := newCompressedData()

return func(c Processor) Processor {
return func(p Processor) Processor {
return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {
if task == TaskSaveMail {
Log().Debug("Got mail from chan,", e.RemoteIP)
@@ -414,12 +470,12 @@ func GuerrillaDbReddis() Decorator {
e.RemoteIP,
trimToLimit(e.MailFrom.String(), 255),
e.TLS)
// give the values to the query batcher
feeder <- vals
return c.Process(e, task)
// give the values to a random query batcher
feeders[rand.Intn(len(feeders))] <- vals
return p.Process(e, task)

} else {
return c.Process(e, task)
return p.Process(e, task)
}
})
}
6 changes: 3 additions & 3 deletions backends/p_hasher.go
Original file line number Diff line number Diff line change
@@ -31,7 +31,7 @@ func init() {
// The hasher decorator computes a hash of the email for each recipient
// It appends the hashes to envelope's Hashes slice.
func Hasher() Decorator {
return func(c Processor) Processor {
return func(p Processor) Processor {
return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {

if task == TaskSaveMail {
@@ -48,9 +48,9 @@ func Hasher() Decorator {
sum := h2.Sum([]byte{})
e.Hashes = append(e.Hashes, fmt.Sprintf("%x", sum))
}
return c.Process(e, task)
return p.Process(e, task)
} else {
return c.Process(e, task)
return p.Process(e, task)
}

})
6 changes: 3 additions & 3 deletions backends/p_header.go
Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@ func Header() Decorator {
return nil
}))

return func(c Processor) Processor {
return func(p Processor) Processor {
return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {
if task == TaskSaveMail {
to := strings.TrimSpace(e.RcptTo[0].User) + "@" + config.PrimaryHost
@@ -64,10 +64,10 @@ func Header() Decorator {
// save the result
e.DeliveryHeader = addHead
// next processor
return c.Process(e, task)
return p.Process(e, task)

} else {
return c.Process(e, task)
return p.Process(e, task)
}
})
}
6 changes: 3 additions & 3 deletions backends/p_headers_parser.go
Original file line number Diff line number Diff line change
@@ -22,15 +22,15 @@ func init() {
}

func HeadersParser() Decorator {
return func(c Processor) Processor {
return func(p Processor) Processor {
return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {
if task == TaskSaveMail {
e.ParseHeaders()
// next processor
return c.Process(e, task)
return p.Process(e, task)
} else {
// next processor
return c.Process(e, task)
return p.Process(e, task)
}
})
}
8 changes: 4 additions & 4 deletions backends/p_mysql.go
Original file line number Diff line number Diff line change
@@ -166,7 +166,7 @@ func MySql() Decorator {
return nil
}))

return func(c Processor) Processor {
return func(p Processor) Processor {
return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {

if task == TaskSaveMail {
@@ -217,7 +217,7 @@ func MySql() Decorator {
stmt := mp.prepareInsertQuery(1, db)
mp.doQuery(1, db, stmt, &vals)
// continue to the next Processor in the decorator chain
return c.Process(e, task)
return p.Process(e, task)
} else if task == TaskValidateRcpt {
// if you need to validate the e.Rcpt then change to:
if len(e.RcptTo) > 0 {
@@ -229,9 +229,9 @@ func MySql() Decorator {
return NewResult(response.Canned.FailNoSenderDataCmd), NoSuchUser
}
}
return c.Process(e, task)
return p.Process(e, task)
} else {
return c.Process(e, task)
return p.Process(e, task)
}

})
6 changes: 3 additions & 3 deletions backends/p_redis.go
Original file line number Diff line number Diff line change
@@ -84,7 +84,7 @@ func Redis() Decorator {

var redisErr error

return func(c Processor) Processor {
return func(p Processor) Processor {
return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {

if task == TaskSaveMail {
@@ -119,10 +119,10 @@ func Redis() Decorator {
Log().Error("Redis needs a Hash() process before it")
}

return c.Process(e, task)
return p.Process(e, task)
} else {
// nothing to do for this task
return c.Process(e, task)
return p.Process(e, task)
}

})
2 changes: 1 addition & 1 deletion backends/processor.go
Original file line number Diff line number Diff line change
@@ -31,7 +31,7 @@ type Processor interface {
// Signature of Processor
type ProcessWith func(*mail.Envelope, SelectTask) (Result, error)

// Make ProcessorFunc will satisfy the Processor interface
// Make ProcessWith will satisfy the Processor interface
func (f ProcessWith) Process(e *mail.Envelope, task SelectTask) (Result, error) {
// delegate to the anonymous function
return f(e, task)
16 changes: 9 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
@@ -50,11 +50,13 @@ type client struct {
log log.Logger
}

// Allocate a new client
func NewClient(conn net.Conn, clientID uint64, logger log.Logger) *client {
// NewClient allocates a new client.
func NewClient(conn net.Conn, clientID uint64, logger log.Logger, envelope *mail.Pool) *client {
c := &client{
conn: conn,
Envelope: mail.NewEnvelope(getRemoteAddr(conn), clientID),
conn: conn,
// Envelope will be borrowed from the envelope pool
// the envelope could be 'detached' from the client later when processing
Envelope: envelope.Borrow(getRemoteAddr(conn), clientID),
ConnectedAt: time.Now(),
bufin: newSMTPBufferedReader(conn),
bufout: bufio.NewWriter(conn),
@@ -153,7 +155,7 @@ func (c *client) closeConn() {
}

// init is called after the client is borrowed from the pool, to get it ready for the connection
func (c *client) init(conn net.Conn, clientID uint64) {
func (c *client) init(conn net.Conn, clientID uint64, ep *mail.Pool) {
c.conn = conn
// reset our reader & writer
c.bufout.Reset(conn)
@@ -164,8 +166,8 @@ func (c *client) init(conn net.Conn, clientID uint64) {
c.ConnectedAt = time.Now()
c.ID = clientID
c.errors = 0
c.Envelope.Reseed(getRemoteAddr(conn), clientID)

// borrow an envelope from the envelope pool
c.Envelope = ep.Borrow(getRemoteAddr(conn), clientID)
}

// getID returns the client's unique ID
9 changes: 9 additions & 0 deletions cmd/guerrillad/serve.go
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ import (
"strconv"
"strings"
"syscall"
"time"
)

const (
@@ -66,6 +67,14 @@ func sigHandler() {
d.ReopenLogs()
} else if sig == syscall.SIGTERM || sig == syscall.SIGQUIT || sig == syscall.SIGINT {
mainlog.Infof("Shutdown signal caught")
go func() {
select {
// exit if graceful shutdown not finished in 60 sec.
case <-time.After(time.Second * 60):
mainlog.Error("graceful shutdown timed out")
os.Exit(1)
}
}()
d.Shutdown()
mainlog.Infof("Shutdown completed, exiting.")
return
67 changes: 67 additions & 0 deletions mail/envelope.go
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ import (
"net/textproto"
"regexp"
"strings"
"sync"
"time"
)

@@ -59,6 +60,8 @@ type Envelope struct {
DeliveryHeader string
// Email(s) will be queued with this id
QueuedId string
// When locked, it means that the envelope is being processed by the backend
sync.Mutex
}

func NewEnvelope(remoteAddr string, clientID uint64) *Envelope {
@@ -264,3 +267,67 @@ func fixCharset(charset string) string {
}
return charset
}

// Envelopes have their own pool

type Pool struct {
// envelopes that are ready to be borrowed
pool chan *Envelope
// semaphore to control number of maximum borrowed envelopes
sem chan bool
}

func NewPool(poolSize int) *Pool {
return &Pool{
pool: make(chan *Envelope, poolSize),
sem: make(chan bool, poolSize),
}
}

func (p *Pool) Borrow(remoteAddr string, clientID uint64) *Envelope {
var e *Envelope
p.sem <- true // block the envelope until more room
select {
case e = <-p.pool:
e.Reseed(remoteAddr, clientID)
default:
e = NewEnvelope(remoteAddr, clientID)
}
return e
}

// Return returns an envelope back to the envelope pool
// Note that an envelope will not be recycled while it still is
// processing
func (p *Pool) Return(e *Envelope) {
// we down't want to recycle an envelope that may still be processing
isUnlocked := func() <-chan bool {
signal := make(chan bool)
// make sure envelope finished processing
go func() {
// lock will block if still processing
e.Lock()
// got the lock, it means processing finished
e.Unlock()
// generate a signal
signal <- true
}()
return signal
}()

select {
case <-time.After(time.Second * 30):
// envelope still processing, we can't recycle it.
case <-isUnlocked:
// The envelope was _unlocked_, it finished processing
// put back in the pool or destroy
select {
case p.pool <- e:
//placed envelope back in pool
default:
// pool is full, don't return
}
}
// take a value off the semaphore to make room for more envelopes
<-p.sem
}
9 changes: 5 additions & 4 deletions pool.go
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ package guerrilla
import (
"errors"
"github.com/flashmob/go-guerrilla/log"
"github.com/flashmob/go-guerrilla/mail"
"net"
"sync"
"sync/atomic"
@@ -18,7 +19,7 @@ type Poolable interface {
// ability to set read/write timeout
setTimeout(t time.Duration)
// set a new connection and client id
init(c net.Conn, clientID uint64)
init(c net.Conn, clientID uint64, ep *mail.Pool)
// get a unique id
getID() uint64
}
@@ -121,7 +122,7 @@ func (p *Pool) GetActiveClientsCount() int {
}

// Borrow a Client from the pool. Will block if len(activeClients) > maxClients
func (p *Pool) Borrow(conn net.Conn, clientID uint64, logger log.Logger) (Poolable, error) {
func (p *Pool) Borrow(conn net.Conn, clientID uint64, logger log.Logger, ep *mail.Pool) (Poolable, error) {
p.poolGuard.Lock()
defer p.poolGuard.Unlock()

@@ -134,9 +135,9 @@ func (p *Pool) Borrow(conn net.Conn, clientID uint64, logger log.Logger) (Poolab
case p.sem <- true: // block the client from serving until there is room
select {
case c = <-p.pool:
c.init(conn, clientID)
c.init(conn, clientID, ep)
default:
c = NewClient(conn, clientID, logger)
c = NewClient(conn, clientID, logger, ep)
}
p.activeClientsAdd(c)

5 changes: 4 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
@@ -61,6 +61,7 @@ type server struct {
logStore atomic.Value
mainlogStore atomic.Value
backendStore atomic.Value
envelopePool *mail.Pool
}

type allowedHosts struct {
@@ -76,6 +77,7 @@ func newServer(sc *ServerConfig, b backends.Backend, l log.Logger) (*server, err
listenInterface: sc.ListenInterface,
state: ServerStateNew,
mainlog: l,
envelopePool: mail.NewPool(sc.MaxClients),
}
server.backendStore.Store(b)
var logOpenError error
@@ -216,6 +218,7 @@ func (server *server) Start(startWG *sync.WaitGroup) error {
c := p.(*client)
if borrow_err == nil {
server.handleClient(c)
server.envelopePool.Return(c.Envelope)
server.clientPool.Return(c)
} else {
server.log.WithError(borrow_err).Info("couldn't borrow a new client")
@@ -225,7 +228,7 @@ func (server *server) Start(startWG *sync.WaitGroup) error {
}
// intentionally placed Borrow in args so that it's called in the
// same main goroutine.
}(server.clientPool.Borrow(conn, clientID, server.log))
}(server.clientPool.Borrow(conn, clientID, server.log, server.envelopePool))

}
}
3 changes: 2 additions & 1 deletion server_test.go
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ import (

"github.com/flashmob/go-guerrilla/backends"
"github.com/flashmob/go-guerrilla/log"
"github.com/flashmob/go-guerrilla/mail"
"github.com/flashmob/go-guerrilla/mocks"
)

@@ -67,7 +68,7 @@ func TestHandleClient(t *testing.T) {
}
conn, server := getMockServerConn(sc, t)
// call the serve.handleClient() func in a goroutine.
client := NewClient(conn.Server, 1, mainlog)
client := NewClient(conn.Server, 1, mainlog, mail.NewPool(5))
var wg sync.WaitGroup
wg.Add(1)
go func() {