Skip to content

Commit 5d22781

Browse files
authored
Merge pull request #19167 from joshuazh-x/fix-embed-close-deadlock-3.5
[3.5] Avoid deadlock in etcd.Close when stopping during bootstrapping
2 parents 6349cb8 + 80b0a73 commit 5d22781

File tree

4 files changed

+82
-5
lines changed

4 files changed

+82
-5
lines changed

server/embed/etcd.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ type Etcd struct {
8686
errc chan error
8787

8888
closeOnce sync.Once
89+
wg sync.WaitGroup
8990
}
9091

9192
type peerListener struct {
@@ -111,7 +112,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
111112
if !serving {
112113
// errored before starting gRPC server for serveCtx.serversC
113114
for _, sctx := range e.sctxs {
114-
close(sctx.serversC)
115+
sctx.close()
115116
}
116117
}
117118
e.Close()
@@ -436,6 +437,7 @@ func (e *Etcd) Close() {
436437
}
437438
}
438439
if e.errc != nil {
440+
e.wg.Wait()
439441
close(e.errc)
440442
}
441443
}
@@ -880,6 +882,9 @@ func (e *Etcd) serveMetrics() (err error) {
880882
}
881883

882884
func (e *Etcd) errHandler(err error) {
885+
e.wg.Add(1)
886+
defer e.wg.Done()
887+
883888
select {
884889
case <-e.stopc:
885890
return

server/embed/serve.go

+18-3
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@ package embed
1616

1717
import (
1818
"context"
19+
"errors"
1920
"fmt"
2021
"io/ioutil"
2122
defaultLog "log"
2223
"net"
2324
"net/http"
2425
"strings"
26+
"sync"
2527

2628
etcdservergw "go.etcd.io/etcd/api/v3/etcdserverpb/gw"
2729
"go.etcd.io/etcd/client/pkg/v3/transport"
@@ -64,6 +66,7 @@ type serveCtx struct {
6466
userHandlers map[string]http.Handler
6567
serviceRegister func(*grpc.Server)
6668
serversC chan *servers
69+
closeOnce sync.Once
6770
}
6871

6972
type servers struct {
@@ -98,7 +101,15 @@ func (sctx *serveCtx) serve(
98101
splitHttp bool,
99102
gopts ...grpc.ServerOption) (err error) {
100103
logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
101-
<-s.ReadyNotify()
104+
105+
// Make sure serversC is closed even if we prematurely exit the function.
106+
defer sctx.close()
107+
108+
select {
109+
case <-s.StoppingNotify():
110+
return errors.New("server is stopping")
111+
case <-s.ReadyNotify():
112+
}
102113

103114
sctx.lg.Info("ready to serve client requests")
104115

@@ -113,8 +124,6 @@ func (sctx *serveCtx) serve(
113124
servElection := v3election.NewElectionServer(v3c)
114125
servLock := v3lock.NewLockServer(v3c)
115126

116-
// Make sure serversC is closed even if we prematurely exit the function.
117-
defer close(sctx.serversC)
118127
var gwmux *gw.ServeMux
119128
if s.Cfg.EnableGRPCGateway {
120129
// GRPC gateway connects to grpc server via connection provided by grpc dial.
@@ -497,3 +506,9 @@ func (sctx *serveCtx) registerTrace() {
497506
evf := func(w http.ResponseWriter, r *http.Request) { trace.RenderEvents(w, r, true) }
498507
sctx.registerUserHandler("/debug/events", http.HandlerFunc(evf))
499508
}
509+
510+
func (sctx *serveCtx) close() {
511+
sctx.closeOnce.Do(func() {
512+
close(sctx.serversC)
513+
})
514+
}

server/etcdserver/server.go

+1
Original file line numberDiff line numberDiff line change
@@ -2130,6 +2130,7 @@ func (s *EtcdServer) publish(timeout time.Duration) {
21302130
Val: string(b),
21312131
}
21322132

2133+
// gofail: var beforePublishing struct{}
21332134
for {
21342135
ctx, cancel := context.WithTimeout(s.ctx, timeout)
21352136
_, err := s.Do(ctx, req)

tests/integration/embed/embed_test.go

+57-1
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,14 @@ import (
3030
"testing"
3131
"time"
3232

33+
"github.com/stretchr/testify/require"
34+
3335
"go.etcd.io/etcd/client/pkg/v3/testutil"
3436
"go.etcd.io/etcd/client/pkg/v3/transport"
35-
"go.etcd.io/etcd/client/v3"
37+
clientv3 "go.etcd.io/etcd/client/v3"
3638
"go.etcd.io/etcd/server/v3/embed"
3739
"go.etcd.io/etcd/tests/v3/integration"
40+
gofail "go.etcd.io/gofail/runtime"
3841
)
3942

4043
var (
@@ -210,3 +213,56 @@ func setupEmbedCfg(cfg *embed.Config, curls []url.URL, purls []url.URL) {
210213
}
211214
cfg.InitialCluster = cfg.InitialCluster[1:]
212215
}
216+
217+
func TestEmbedEtcdStopDuringBootstrapping(t *testing.T) {
218+
if len(gofail.List()) == 0 {
219+
t.Skip("please run 'make gofail-enable' before running the test")
220+
}
221+
222+
fpName := "beforePublishing"
223+
require.NoError(t, gofail.Enable(fpName, `sleep("2s")`))
224+
t.Cleanup(func() {
225+
terr := gofail.Disable(fpName)
226+
if terr != nil && terr != gofail.ErrDisabled {
227+
t.Fatalf("failed to disable %s: %v", fpName, terr)
228+
}
229+
})
230+
231+
done := make(chan struct{})
232+
go func() {
233+
defer close(done)
234+
235+
cfg := embed.NewConfig()
236+
urls := newEmbedURLs(false, 2)
237+
setupEmbedCfg(cfg, []url.URL{urls[0]}, []url.URL{urls[1]})
238+
cfg.Dir = filepath.Join(t.TempDir(), "embed-etcd")
239+
240+
e, err := embed.StartEtcd(cfg)
241+
if err != nil {
242+
t.Errorf("Failed to start etcd, got error %v", err)
243+
}
244+
defer e.Close()
245+
246+
go func() {
247+
time.Sleep(time.Second)
248+
e.Server.Stop()
249+
t.Log("Stopped server during bootstrapping")
250+
}()
251+
252+
select {
253+
case <-e.Server.ReadyNotify():
254+
t.Log("Server is ready!")
255+
case <-e.Server.StopNotify():
256+
t.Log("Server is stopped")
257+
case <-time.After(20 * time.Second):
258+
e.Server.Stop() // trigger a shutdown
259+
t.Error("Server took too long to start!")
260+
}
261+
}()
262+
263+
select {
264+
case <-done:
265+
case <-time.After(10 * time.Second):
266+
t.Error("timeout in bootstrapping etcd")
267+
}
268+
}

0 commit comments

Comments
 (0)