-
Notifications
You must be signed in to change notification settings - Fork 4
/
main.go
140 lines (112 loc) · 3.01 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package main
import (
"context"
"log"
"net"
"os"
"os/signal"
"strconv"
"syscall"
"time"
"github.com/kelseyhightower/envconfig"
"google.golang.org/grpc"
grpclb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
)
var cfg = struct {
Host string `default:"" desc:"Hostname to listen on"`
Port int `default:"8000" desc:"Port of the grpclb server"`
ShutdownGracePeriod time.Duration `default:"25s" desc:"Duration of graceful shutdown period"` // during this time, we try to answer open reqs but won't accept new ones
Namespace string `default:"default" desc:"Kubernetes namespace in which to operate; empty for all namespaces"`
Service string `desc:"Name of the service in Kubernetes" required:"true"`
LabelSelector string `desc:"Label selector for the service (foo=bar,baz=bang)"`
TargetPort string `default:"grpc" desc:"Target port name to forward to"`
WatchMaxRetries int `default:"60" desc:"Number of times to retry establishing the Kubernetess watch"`
WatchRetryDelay time.Duration `default:"1s" desc:"Delay between retries"`
WatchTimeout time.Duration `default:"30s" desc:"Timeout for the watch until we acquire a new one"`
}{}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
configure()
ch, err := watchService(ctx)
if err != nil {
log.Panic(err)
}
bc := newBroadcast(ctx, ch)
srv := startServer(bc)
logChanges(ctx, bc)
log.Println("waiting for TERM")
awaitTerm()
awaitShutdown(srv)
log.Println("bye")
}
func logChanges(ctx context.Context, bc *broadcast) {
ch := make(chan ServerList)
bc.addListener(ch)
go func() {
for {
select {
case <-ctx.Done():
bc.remListener(ch)
close(ch)
return
case msg := <-ch:
log.Print("endpoints:")
for _, server := range msg {
log.Printf("\t%s:%d", server.IP, server.Port)
}
}
}
}()
}
func configure() {
envconfig.MustProcess("JAWLB", &cfg)
if len(os.Args) > 1 && os.Args[1] == "help" {
if err := envconfig.Usage("JAWLB", &cfg); err != nil {
log.Panic(err)
}
}
}
func startServer(bc *broadcast) *grpc.Server {
// setup listening socket
conn, err := listen()
if err != nil {
log.Panic(err)
}
srv := grpc.NewServer()
grpclb.RegisterLoadBalancerServer(srv, &lb{bc, 0})
go func() {
if err := srv.Serve(conn); err != nil {
log.Println("grpc connection closed", err)
}
}()
return srv
}
func listen() (conn net.Listener, err error) {
addr := net.JoinHostPort(cfg.Host, strconv.Itoa(cfg.Port))
conn, err = net.Listen("tcp", addr)
return
}
func awaitTerm() {
sig := make(chan os.Signal, 1)
signal.Notify(sig,
syscall.SIGTERM,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGQUIT)
<-sig
}
func awaitShutdown(server *grpc.Server) {
log.Println("performing graceful shutdown")
done := make(chan bool)
go func() {
server.GracefulStop()
done <- true
}()
select {
case <-time.After(cfg.ShutdownGracePeriod):
log.Println("graceful shutdown failed -- hard stop")
server.Stop()
case <-done:
}
}