Skip to content

Commit 0c8e7b1

Browse files
committed
Ensure all goroutines created by StartEtcd to exit before closing the errc
Signed-off-by: Benjamin Wang <benjamin.ahrtr@gmail.com>
1 parent a7ab766 commit 0c8e7b1

File tree

2 files changed

+67
-13
lines changed

2 files changed

+67
-13
lines changed

server/embed/etcd.go

+39-7
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,23 @@ type Etcd struct {
8181

8282
Server *etcdserver.EtcdServer
8383

84-
cfg Config
85-
stopc chan struct{}
86-
errc chan error
84+
cfg Config
8785

86+
// closeOnce is to ensure `stopc` is closed only once, no matter
87+
// how many times the Close() method is called.
8888
closeOnce sync.Once
89-
wg sync.WaitGroup
89+
// stopc is used to notify the sub goroutines not to send
90+
// any errors to `errc`.
91+
stopc chan struct{}
92+
// errc is used to receive error from sub goroutines (including
93+
// client handler, peer handler and metrics handler). It's closed
94+
// after all these sub goroutines exit (checked via `wg`). Writers
95+
// should avoid writing after `stopc` is closed by selecting on
96+
// reading from `stopc`.
97+
errc chan error
98+
99+
// wg is used to track the lifecycle of all sub goroutines created by `StartEtcd`.
100+
wg sync.WaitGroup
90101
}
91102

92103
type peerListener struct {
@@ -368,6 +379,24 @@ func (e *Etcd) Config() Config {
368379
// Close gracefully shuts down all servers/listeners.
369380
// Client requests will be terminated with request timeout.
370381
// After timeout, enforce remaning requests be closed immediately.
382+
//
383+
// The rough workflow to shut down etcd:
384+
// 1. close the `stopc` channel, so that all error handlers (child
385+
// goroutines) won't send back any errors anymore;
386+
// 2. stop the http and grpc servers gracefully, within request timeout;
387+
// 3. close all client and metrics listeners, so that etcd server
388+
// stops receiving any new connection;
389+
// 4. call the cancel function to close the gateway context, so that
390+
// all gateway connections are closed.
391+
// 5. stop etcd server gracefully, and ensure the main raft loop
392+
// goroutine is stopped;
393+
// 6. stop all peer listeners, so that it stops receiving peer connections
394+
// and messages (wait up to 1-second);
395+
// 7. wait for all child goroutines (i.e. client handlers, peer handlers
396+
// and metrics handlers) to exit;
397+
// 8. close the `errc` channel to release the resource. Note that it's only
398+
// safe to close the `errc` after step 7 above is done, otherwise the
399+
// child goroutines may send errors back to already closed `errc` channel.
371400
func (e *Etcd) Close() {
372401
fields := []zap.Field{
373402
zap.String("name", e.cfg.Name),
@@ -597,7 +626,9 @@ func (e *Etcd) servePeers() (err error) {
597626

598627
// start peer servers in a goroutine
599628
for _, pl := range e.Peers {
629+
e.wg.Add(1)
600630
go func(l *peerListener) {
631+
defer e.wg.Done()
601632
u := l.Addr().String()
602633
e.cfg.logger.Info(
603634
"serving peer traffic",
@@ -781,7 +812,9 @@ func (e *Etcd) serveClients() (err error) {
781812

782813
// start client servers in each goroutine
783814
for _, sctx := range e.sctxs {
815+
e.wg.Add(1)
784816
go func(s *serveCtx) {
817+
defer e.wg.Done()
785818
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, e.grpcGatewayDial(splitHttp), splitHttp, gopts...))
786819
}(sctx)
787820
}
@@ -869,7 +902,9 @@ func (e *Etcd) serveMetrics() (err error) {
869902
return err
870903
}
871904
e.metricsListeners = append(e.metricsListeners, ml)
905+
e.wg.Add(1)
872906
go func(u url.URL, ln net.Listener) {
907+
defer e.wg.Done()
873908
e.cfg.logger.Info(
874909
"serving metrics",
875910
zap.String("address", u.String()),
@@ -882,9 +917,6 @@ func (e *Etcd) serveMetrics() (err error) {
882917
}
883918

884919
func (e *Etcd) errHandler(err error) {
885-
e.wg.Add(1)
886-
defer e.wg.Done()
887-
888920
select {
889921
case <-e.stopc:
890922
return

server/embed/serve.go

+28-6
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,23 @@ type serveCtx struct {
6060
insecure bool
6161
httpOnly bool
6262

63+
// ctx is used to control the grpc gateway. Terminate the grpc gateway
64+
// by calling `cancel` when shutting down the etcd.
6365
ctx context.Context
6466
cancel context.CancelFunc
6567

6668
userHandlers map[string]http.Handler
6769
serviceRegister func(*grpc.Server)
68-
serversC chan *servers
69-
closeOnce sync.Once
70+
71+
// serversC is used to receive the http and grpc server objects (created
72+
// in `serve`), both of which will be closed when shutting down the etcd.
73+
// Close it when `serve` returns or when etcd fails to bootstrap.
74+
serversC chan *servers
75+
// closeOnce is to ensure `serversC` is closed only once.
76+
closeOnce sync.Once
77+
78+
// wg is used to track the lifecycle of all sub goroutines created by `serve`.
79+
wg sync.WaitGroup
7080
}
7181

7282
type servers struct {
@@ -180,13 +190,17 @@ func (sctx *serveCtx) serve(
180190
server = m.Serve
181191

182192
httpl := m.Match(cmux.HTTP1())
193+
sctx.wg.Add(1)
183194
go func(srvhttp *http.Server, tlsLis net.Listener) {
195+
defer sctx.wg.Done()
184196
errHandler(srvhttp.Serve(tlsLis))
185197
}(srv, httpl)
186198

187199
if grpcEnabled {
188200
grpcl := m.Match(cmux.HTTP2())
201+
sctx.wg.Add(1)
189202
go func(gs *grpc.Server, l net.Listener) {
203+
defer sctx.wg.Done()
190204
errHandler(gs.Serve(l))
191205
}(gs, grpcl)
192206
}
@@ -246,11 +260,13 @@ func (sctx *serveCtx) serve(
246260
} else {
247261
server = m.Serve
248262

249-
tlsl, err := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
250-
if err != nil {
251-
return err
263+
tlsl, tlsErr := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
264+
if tlsErr != nil {
265+
return tlsErr
252266
}
267+
sctx.wg.Add(1)
253268
go func(srvhttp *http.Server, tlsl net.Listener) {
269+
defer sctx.wg.Done()
254270
errHandler(srvhttp.Serve(tlsl))
255271
}(srv, tlsl)
256272
}
@@ -263,7 +279,11 @@ func (sctx *serveCtx) serve(
263279
)
264280
}
265281

266-
return server()
282+
err = server()
283+
sctx.close()
284+
// ensure all goroutines, which are created by this method, to complete before this method returns.
285+
sctx.wg.Wait()
286+
return err
267287
}
268288

269289
func configureHttpServer(srv *http.Server, cfg config.ServerConfig) error {
@@ -316,7 +336,9 @@ func (sctx *serveCtx) registerGateway(dial func(ctx context.Context) (*grpc.Clie
316336
return nil, err
317337
}
318338
}
339+
sctx.wg.Add(1)
319340
go func() {
341+
defer sctx.wg.Done()
320342
<-ctx.Done()
321343
if cerr := conn.Close(); cerr != nil {
322344
sctx.lg.Warn(

0 commit comments

Comments
 (0)