diff --git a/server/embed/serve.go b/server/embed/serve.go index 7f1a221974f0..9d68f1204411 100644 --- a/server/embed/serve.go +++ b/server/embed/serve.go @@ -159,7 +159,7 @@ func (sctx *serveCtx) serve( defer func(gs *grpc.Server) { if err != nil { sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err)) - gs.Stop() + gs.GracefulStop() sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err)) } }(gs) @@ -202,16 +202,39 @@ func (sctx *serveCtx) serve( } if grpcEnabled { + // TODO(XXX): + // + // WaitForHandlers is experimental function to drain + // all the inflight handlers, including stream RPCs. + // For cmux mode, we can't call GracefulStop because of + // [1]. + // + // Actually, we do call http.Shutdown first in stopServers. + // We still need to drain all the inflight handlers to + // make sure that there is no leaky goroutines to + // use closed backend and panic. Add WaitForHandlers + // to force gs.Stop to drain. We can remove this option + // when we remove cmux [2]. + // + // [1]: https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531 + // [2]: https://github.com/etcd-io/etcd/issues/15402 + gopts = append(gopts, grpc.WaitForHandlers(true)) + gs = v3rpc.Server(s, tlscfg, nil, gopts...) v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) if sctx.serviceRegister != nil { sctx.serviceRegister(gs) } + defer func(gs *grpc.Server) { if err != nil { sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err)) - gs.Stop() + if httpEnabled { + gs.Stop() + } else { + gs.GracefulStop() + } sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err)) } }(gs) diff --git a/server/etcdserver/api/v3rpc/maintenance.go b/server/etcdserver/api/v3rpc/maintenance.go index 4f55c7c74ede..29d5318179ea 100644 --- a/server/etcdserver/api/v3rpc/maintenance.go +++ b/server/etcdserver/api/v3rpc/maintenance.go @@ -103,6 +103,7 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe const snapshotSendBufferSize = 32 * 1024 func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error { + // gofail: var v3rpcBeforeSnapshot struct{} ver := schema.ReadStorageVersion(ms.bg.Backend().ReadTx()) storageVersion := "" if ver != nil { diff --git a/tests/e2e/drain_in_shutdown_test.go b/tests/e2e/drain_in_shutdown_test.go new file mode 100644 index 000000000000..5af81a54f130 --- /dev/null +++ b/tests/e2e/drain_in_shutdown_test.go @@ -0,0 +1,171 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !cluster_proxy + +package e2e + +import ( + "context" + "io" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/tests/v3/framework/e2e" +) + +func TestShouldDrainRequestDuringShutdown(t *testing.T) { + e2e.BeforeTest(t) + + // defaultBuildSnapshotConn is to setup a database with 10 MiB and a + // inflight snapshot streaming RPC. + defaultBuildSnapshotConn := func(ctx context.Context, t *testing.T, cli *clientv3.Client) io.ReadCloser { + t.Helper() + + require.NoError(t, fillEtcdWithData(ctx, cli, 10*1024*1024)) + + rc, err := cli.Snapshot(ctx) + require.NoError(t, err) + t.Cleanup(func() { rc.Close() }) + + // make sure that streaming RPC is in progress + buf := make([]byte, 1) + n, err := rc.Read(buf) + assert.NoError(t, err) + assert.Equal(t, 1, n) + + return rc + } + + // defaultVerifySnapshotConn is to make sure that connection is still + // working even if the server is in shutdown state. + defaultVerifySnapshotConn := func(t *testing.T, rc io.ReadCloser) { + t.Helper() + + _, err := io.Copy(io.Discard, rc) + require.NoError(t, err) + } + + tcs := []struct { + name string + options []e2e.EPClusterOption + cliOpt e2e.ClientConfig + + buildSnapshotConn func(ctx context.Context, t *testing.T, cli *clientv3.Client) io.ReadCloser + verifySnapshotConn func(t *testing.T, rc io.ReadCloser) + }{ + { + name: "no-tls", + options: []e2e.EPClusterOption{ + e2e.WithClusterSize(1), + e2e.WithClientAutoTLS(false), + }, + cliOpt: e2e.ClientConfig{ConnectionType: e2e.ClientNonTLS}, + + buildSnapshotConn: defaultBuildSnapshotConn, + verifySnapshotConn: defaultVerifySnapshotConn, + }, + { + name: "auto-tls_http_separated", + options: []e2e.EPClusterOption{ + e2e.WithClusterSize(1), + e2e.WithClientAutoTLS(true), + e2e.WithClientConnType(e2e.ClientTLS), + e2e.WithClientHTTPSeparate(true), + }, + cliOpt: e2e.ClientConfig{ + ConnectionType: e2e.ClientTLS, + AutoTLS: true, + }, + buildSnapshotConn: defaultBuildSnapshotConn, + verifySnapshotConn: defaultVerifySnapshotConn, + }, + { + name: "auto-tls_cmux", + options: []e2e.EPClusterOption{ + e2e.WithClusterSize(1), + e2e.WithClientAutoTLS(true), + e2e.WithClientConnType(e2e.ClientTLS), + e2e.WithClientHTTPSeparate(false), + e2e.WithGoFailEnabled(true), + // NOTE: Using failpoint is to make sure that + // the RPC handler won't exit because of closed + // connection. + e2e.WithEnvVars(map[string]string{ + "GOFAIL_FAILPOINTS": `v3rpcBeforeSnapshot=sleep("8s")`, + }), + }, + cliOpt: e2e.ClientConfig{ + ConnectionType: e2e.ClientTLS, + AutoTLS: true, + }, + buildSnapshotConn: func(ctx context.Context, t *testing.T, cli *clientv3.Client) io.ReadCloser { + t.Helper() + + rc, err := cli.Snapshot(ctx) + require.NoError(t, err) + t.Cleanup(func() { rc.Close() }) + + // make sure server receives the RPC. + time.Sleep(2 * time.Second) + return rc + }, + verifySnapshotConn: func(t *testing.T, rc io.ReadCloser) { + t.Helper() + + _, err := io.Copy(io.Discard, rc) + require.Error(t, err) // connection will be closed forcely + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + + epc, err := e2e.NewEtcdProcessCluster(ctx, t, tc.options...) + require.NoError(t, err) + t.Cleanup(func() { epc.Close() }) + + grpcEndpoint := epc.Procs[0].EndpointsGRPC()[0] + if tc.cliOpt.ConnectionType == e2e.ClientTLS { + grpcEndpoint = e2e.ToTLS(grpcEndpoint) + } + + cli := newClient(t, []string{grpcEndpoint}, tc.cliOpt) + + rc := tc.buildSnapshotConn(ctx, t, cli) + + errCh := make(chan error, 1) + go func() { + defer close(errCh) + errCh <- epc.Stop() + }() + + select { + case <-time.After(4 * time.Second): + case err := <-errCh: + t.Fatalf("should drain request but got error from cluster stop: %v", err) + } + + tc.verifySnapshotConn(t, rc) + + require.NoError(t, <-errCh) + }) + } +} diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 8f3a102c3059..3687b377e2e8 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -259,6 +259,10 @@ func WithClientAutoTLS(isClientAutoTLS bool) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.Client.AutoTLS = isClientAutoTLS } } +func WithClientHTTPSeparate(separate bool) EPClusterOption { + return func(c *EtcdProcessClusterConfig) { c.ClientHTTPSeparate = separate } +} + func WithClientRevokeCerts(isClientCRL bool) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.Client.RevokeCerts = isClientCRL } }