forked from phalaaxx/milter
-
Notifications
You must be signed in to change notification settings - Fork 3
/
server.go
139 lines (125 loc) · 3.09 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// A Go library for milter support
package milter
import (
"errors"
"fmt"
"net"
"sync"
"time"
)
var defaultServer Server
// MilterFactory initializes milter options
// multiple options can be set using a bitmask
type MilterFactory func() (SessionHandler, OptAction, OptProtocol, RequestMacros)
// Close server listener and wait worked process
func Close() {
defaultServer.Close()
}
// Server Milter for handling and processing incoming connections
// support panic handling via ErrHandler
// couple of func(error) could be provided for handling error
type Server struct {
listener net.Listener
milterFactory MilterFactory
errHandlers []func(error)
logger CustomLogger
wg sync.WaitGroup
quit chan struct{}
exited chan struct{}
}
// New generates a new Server
func New(milterfactory MilterFactory, lopt ListenerOption, opts ...Option) *Server {
server := &Server{
milterFactory: milterfactory,
logger: stdoutLogger{},
wg: sync.WaitGroup{},
}
lopt.lapply(server)
for _, opt := range opts {
opt.apply(server)
}
return server
}
// RequestMacros - Also known as SetSymList: the list of macros that the milter wants to receive from the MTA for a protocol Stage (stages has the prefix SMFIM_).
// if nil, then there are not Macros requested and the default macros from MTA are used.
type RequestMacros map[Stage][]Macro
// Close for graceful shutdown
// Stop accepting new connections
// And wait until processing connections ends
func (s *Server) Close() error {
close(s.quit)
s.listener.Close()
<-s.exited
return nil
}
// Run starts milter server via provided listener
func (s *Server) Run() error {
if s.listener == nil {
return ErrNoListenAddr
}
s.quit = make(chan struct{})
s.exited = make(chan struct{})
for {
select {
case <-s.quit:
s.wg.Wait()
close(s.exited)
return nil
default:
conn, err := s.listener.Accept()
if err != nil {
if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
continue
}
s.logger.Printf("Error: Failed to accept connection: %s", err.Error())
time.Sleep(200 * time.Millisecond)
continue
}
if conn == nil {
s.logger.Printf("Error: conn is nil")
continue
}
s.wg.Add(1)
go func() {
defer handlePanic(s.errHandlers)
defer s.wg.Done()
s.handleCon(conn)
}()
}
}
}
// Handle incoming connections
func (s *Server) handleCon(conn net.Conn) {
// create milter object
milter, actions, protocol, requestmacros := s.milterFactory()
session := milterSession{
actions: actions,
protocol: protocol,
sock: conn,
milter: milter,
logger: s.logger,
symlists: requestmacros,
}
// handle connection commands
session.HandleMilterCommands()
}
// Recover panic from session and call handle with occurred error
// If no any handle provided panics will not recovered
func handlePanic(handlers []func(error)) {
var err error
if handlers == nil {
return
}
r := recover()
switch r.(type) {
case nil:
return
case error:
err = r.(error)
default:
err = errors.New(fmt.Sprint(r))
}
for _, f := range handlers {
f(err)
}
}