-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
124 lines (106 loc) · 2.83 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package provider
import (
"os"
"os/signal"
"path/filepath"
"strconv"
"sync"
"syscall"
log "github.com/Sirupsen/logrus"
"github.com/mistifyio/acomm"
)
// Server is the main server struct.
type Server struct {
config *Config
tasks map[string]*task
tracker *acomm.Tracker
}
// Provider is an interface to allow a provider to register its tasks with a
// Server.
type Provider interface {
RegisterTasks(Server)
}
// NewServer creates and initializes a new Server.
func NewServer(config *Config) (*Server, error) {
if err := config.Validate(); err != nil {
return nil, err
}
responseSocket := filepath.Join(
config.SocketDir(),
"response",
config.ServiceName()+".sock")
tracker, err := acomm.NewTracker(responseSocket, nil, config.RequestTimeout())
if err != nil {
return nil, err
}
return &Server{
config: config,
tasks: make(map[string]*task),
tracker: tracker,
}, nil
}
// Tracker returns the request/response tracker of the Server.
func (s *Server) Tracker() *acomm.Tracker {
return s.tracker
}
// RegisterTask registers a new task and its handler with the server.
func (s *Server) RegisterTask(taskName string, handler TaskHandler) {
s.tasks[taskName] = newTask(taskName, s.TaskSocketPath(taskName), s.config.TaskTimeout(taskName), handler)
}
// TaskSocketPath returns the unix socket path for a task
func (s *Server) TaskSocketPath(taskName string) string {
return filepath.Join(
s.config.SocketDir(),
taskName,
strconv.Itoa(s.config.TaskPriority(taskName))+"-"+s.config.ServiceName()+".sock")
}
// RegisteredTasks returns a list of registered task names.
func (s *Server) RegisteredTasks() []string {
taskNames := make([]string, 0, len(s.tasks))
for taskName := range s.tasks {
taskNames = append(taskNames, taskName)
}
return taskNames
}
// Start starts up all of the registered tasks and response handling
func (s *Server) Start() error {
if err := s.tracker.Start(); err != nil {
return err
}
for _, t := range s.tasks {
if err := t.start(); err != nil {
return err
}
}
return nil
}
// Stop stops all of the registered tasks and response handling. Blocks until complete.
func (s *Server) Stop() {
// Stop all actively handled tasks
var taskWG sync.WaitGroup
for _, t := range s.tasks {
taskWG.Add(1)
go func(t *task) {
defer taskWG.Done()
t.stop()
}(t)
}
taskWG.Wait()
s.tracker.Stop()
return
}
// StopOnSignal will wait until one of the specified signals is received and
// then stop the server. If no signals are specified, it will use a default
// set.
func (s *Server) StopOnSignal(signals ...os.Signal) {
if len(signals) == 0 {
signals = []os.Signal{os.Interrupt, os.Kill, syscall.SIGTERM}
}
sigChan := make(chan os.Signal)
signal.Notify(sigChan, signals...)
sig := <-sigChan
log.WithFields(log.Fields{
"signal": sig,
}).Info("signal received, stopping")
s.Stop()
}