From a798e864643e7fdad100794170643fe6566da8dc Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Mon, 18 Dec 2023 14:37:37 -0800 Subject: [PATCH] [3.4] backport health check e2e tests. Signed-off-by: Siyuan Zhang --- etcdserver/server.go | 1 + mvcc/backend/batch_tx.go | 1 + pkg/expect/expect.go | 4 + tests/e2e/cluster_proxy_test.go | 28 +- tests/e2e/cluster_test.go | 61 +++- tests/e2e/cmux_test.go | 2 +- tests/e2e/ctl_v2_test.go | 2 +- tests/e2e/ctl_v3_auth_no_proxy_test.go | 2 +- tests/e2e/ctl_v3_grpc_test.go | 2 +- tests/e2e/ctl_v3_make_mirror_test.go | 2 +- tests/e2e/ctl_v3_snapshot_test.go | 4 +- tests/e2e/ctl_v3_test.go | 2 +- tests/e2e/etcd_corrupt_test.go | 2 +- tests/e2e/etcd_process.go | 192 ++++++++++- tests/e2e/etcd_release_upgrade_test.go | 4 +- tests/e2e/etcdctl.go | 35 ++ tests/e2e/gateway_test.go | 2 +- tests/e2e/http_health_check_test.go | 444 +++++++++++++++++++++++++ tests/e2e/util.go | 64 ++++ tests/e2e/v2_curl_test.go | 2 +- tests/e2e/v3_curl_maxstream_test.go | 2 +- tests/e2e/watch_delay_test.go | 6 +- 22 files changed, 815 insertions(+), 49 deletions(-) create mode 100644 tests/e2e/http_health_check_test.go diff --git a/etcdserver/server.go b/etcdserver/server.go index c044e3d44de1..49fb7f2efa62 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -2179,6 +2179,7 @@ func (s *EtcdServer) apply( e := es[i] switch e.Type { case raftpb.EntryNormal: + // gofail: var beforeApplyOneEntryNormal struct{} s.applyEntryNormal(&e) s.setAppliedIndex(e.Index) s.setTerm(e.Term) diff --git a/mvcc/backend/batch_tx.go b/mvcc/backend/batch_tx.go index d5c8a88c353e..adebe7d14068 100644 --- a/mvcc/backend/batch_tx.go +++ b/mvcc/backend/batch_tx.go @@ -276,6 +276,7 @@ func newBatchTxBuffered(backend *backend) *batchTxBuffered { func (t *batchTxBuffered) Unlock() { if t.pending != 0 { t.backend.readTx.Lock() // blocks txReadBuffer for writing. + // gofail: var beforeWritebackBuf struct{} t.buf.writeback(&t.backend.readTx.buf) t.backend.readTx.Unlock() if t.pending >= t.backend.batchLimit { diff --git a/pkg/expect/expect.go b/pkg/expect/expect.go index 2e32eb526062..6f63f48fd489 100644 --- a/pkg/expect/expect.go +++ b/pkg/expect/expect.go @@ -167,3 +167,7 @@ func (ep *ExpectProcess) Send(command string) error { _, err := io.WriteString(ep.fpty, command) return err } + +func (ep *ExpectProcess) IsRunning() bool { + return ep.cmd != nil +} diff --git a/tests/e2e/cluster_proxy_test.go b/tests/e2e/cluster_proxy_test.go index 4387983484b3..51c319ff2b9f 100644 --- a/tests/e2e/cluster_proxy_test.go +++ b/tests/e2e/cluster_proxy_test.go @@ -30,9 +30,9 @@ import ( ) type proxyEtcdProcess struct { - etcdProc etcdProcess - proxyV2 *proxyV2Proc - proxyV3 *proxyV3Proc + *etcdServerProcess + proxyV2 *proxyV2Proc + proxyV3 *proxyV3Proc } func newEtcdProcess(cfg *etcdServerProcessConfig) (etcdProcess, error) { @@ -45,15 +45,13 @@ func newProxyEtcdProcess(cfg *etcdServerProcessConfig) (*proxyEtcdProcess, error return nil, err } pep := &proxyEtcdProcess{ - etcdProc: ep, - proxyV2: newProxyV2Proc(cfg), - proxyV3: newProxyV3Proc(cfg), + etcdServerProcess: ep, + proxyV2: newProxyV2Proc(cfg), + proxyV3: newProxyV3Proc(cfg), } return pep, nil } -func (p *proxyEtcdProcess) Config() *etcdServerProcessConfig { return p.etcdProc.Config() } - func (p *proxyEtcdProcess) EndpointsV2() []string { return p.EndpointsHTTP() } func (p *proxyEtcdProcess) EndpointsV3() []string { return p.EndpointsGRPC() } func (p *proxyEtcdProcess) EndpointsHTTP() []string { return p.proxyV2.endpoints() } @@ -63,7 +61,7 @@ func (p *proxyEtcdProcess) EndpointsMetrics() []string { } func (p *proxyEtcdProcess) Start() error { - if err := p.etcdProc.Start(); err != nil { + if err := p.etcdServerProcess.Start(); err != nil { return err } if err := p.proxyV2.Start(); err != nil { @@ -73,7 +71,7 @@ func (p *proxyEtcdProcess) Start() error { } func (p *proxyEtcdProcess) Restart() error { - if err := p.etcdProc.Restart(); err != nil { + if err := p.etcdServerProcess.Restart(); err != nil { return err } if err := p.proxyV2.Restart(); err != nil { @@ -87,7 +85,7 @@ func (p *proxyEtcdProcess) Stop() error { if v3err := p.proxyV3.Stop(); err == nil { err = v3err } - if eerr := p.etcdProc.Stop(); eerr != nil && err == nil { + if eerr := p.etcdServerProcess.Stop(); eerr != nil && err == nil { // fails on go-grpc issue #1384 if !strings.Contains(eerr.Error(), "exit status 2") { err = eerr @@ -101,7 +99,7 @@ func (p *proxyEtcdProcess) Close() error { if v3err := p.proxyV3.Close(); err == nil { err = v3err } - if eerr := p.etcdProc.Close(); eerr != nil && err == nil { + if eerr := p.etcdServerProcess.Close(); eerr != nil && err == nil { // fails on go-grpc issue #1384 if !strings.Contains(eerr.Error(), "exit status 2") { err = eerr @@ -113,11 +111,7 @@ func (p *proxyEtcdProcess) Close() error { func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal { p.proxyV3.WithStopSignal(sig) p.proxyV3.WithStopSignal(sig) - return p.etcdProc.WithStopSignal(sig) -} - -func (p *proxyEtcdProcess) Logs() logsExpect { - return p.etcdProc.Logs() + return p.etcdServerProcess.WithStopSignal(sig) } type proxyProc struct { diff --git a/tests/e2e/cluster_test.go b/tests/e2e/cluster_test.go index 046eaa107fcd..a66f3219aadb 100644 --- a/tests/e2e/cluster_test.go +++ b/tests/e2e/cluster_test.go @@ -20,9 +20,12 @@ import ( "net/url" "os" "strings" + "testing" "time" "go.etcd.io/etcd/etcdserver" + "go.etcd.io/etcd/pkg/proxy" + "go.uber.org/zap" ) const etcdProcessBasePort = 20000 @@ -97,10 +100,12 @@ type etcdProcessCluster struct { } type etcdProcessClusterConfig struct { - execPath string - dataDirPath string - keepDataDir bool - envVars map[string]string + execPath string + dataDirPath string + keepDataDir bool + goFailEnabled bool + peerProxy bool + envVars map[string]string clusterSize int @@ -141,13 +146,13 @@ type etcdProcessClusterConfig struct { // newEtcdProcessCluster launches a new cluster from etcd processes, returning // a new etcdProcessCluster once all nodes are ready to accept client requests. -func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) { +func newEtcdProcessCluster(t testing.TB, cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) { epc, err := initEtcdProcessCluster(cfg) if err != nil { return nil, err } - return startEtcdProcessCluster(epc, cfg) + return startEtcdProcessCluster(t, epc, cfg) } // `initEtcdProcessCluster` initializes a new cluster based on the given config. @@ -174,7 +179,7 @@ func initEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, } // `startEtcdProcessCluster` launches a new cluster from etcd processes. -func startEtcdProcessCluster(epc *etcdProcessCluster, cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) { +func startEtcdProcessCluster(t testing.TB, epc *etcdProcessCluster, cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) { if err := epc.Start(); err != nil { return nil, err } @@ -185,6 +190,12 @@ func startEtcdProcessCluster(epc *etcdProcessCluster, cfg *etcdProcessClusterCon proc.WithStopSignal(cfg.stopSignal) } } + for _, proc := range epc.procs { + if cfg.goFailEnabled && !proc.Failpoints().Enabled() { + epc.Close() + t.Skip("please run test with 'FAILPOINTS=true'") + } + } return epc, nil } @@ -223,6 +234,8 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro var curls []string var curl string port := cfg.basePort + 5*i + peerPort := port + 1 + peer2Port := port + 3 clientPort := port clientHttpPort := port + 4 @@ -235,6 +248,20 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro } purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)} + peerAdvertiseUrl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)} + var proxyCfg *proxy.ServerConfig + if cfg.peerProxy { + if !cfg.isPeerTLS { + panic("Can't use peer proxy without peer TLS as it can result in malformed packets") + } + peerAdvertiseUrl.Host = fmt.Sprintf("localhost:%d", peer2Port) + proxyCfg = &proxy.ServerConfig{ + Logger: zap.NewNop(), + To: purl, + From: peerAdvertiseUrl, + } + } + name := fmt.Sprintf("testname%d", i) dataDirPath := cfg.dataDirPath if cfg.dataDirPath == "" { @@ -244,14 +271,14 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro panic(fmt.Sprintf("could not get tempdir for datadir: %s", derr)) } } - initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String()) + initialCluster[i] = fmt.Sprintf("%s=%s", name, peerAdvertiseUrl.String()) args := []string{ "--name", name, "--listen-client-urls", strings.Join(curls, ","), "--advertise-client-urls", strings.Join(curls, ","), "--listen-peer-urls", purl.String(), - "--initial-advertise-peer-urls", purl.String(), + "--initial-advertise-peer-urls", peerAdvertiseUrl.String(), "--initial-cluster-token", cfg.initialToken, "--data-dir", dataDirPath, "--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount), @@ -309,19 +336,31 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro args = append(args, "--debug") } + envVars := map[string]string{} + for key, value := range cfg.envVars { + envVars[key] = value + } + var gofailPort int + if cfg.goFailEnabled { + gofailPort = (i+1)*10000 + 2381 + envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", gofailPort) + } + etcdCfgs[i] = &etcdServerProcessConfig{ execPath: cfg.execPath, args: args, - envVars: cfg.envVars, + envVars: envVars, tlsArgs: cfg.tlsArgs(), dataDirPath: dataDirPath, keepDataDir: cfg.keepDataDir, name: name, - purl: purl, + purl: peerAdvertiseUrl, acurl: curl, murl: murl, initialToken: cfg.initialToken, clientHttpUrl: clientHttpUrl, + goFailPort: gofailPort, + proxy: proxyCfg, } } diff --git a/tests/e2e/cmux_test.go b/tests/e2e/cmux_test.go index 8d394b31c663..dd6a1129b9a1 100644 --- a/tests/e2e/cmux_test.go +++ b/tests/e2e/cmux_test.go @@ -71,7 +71,7 @@ func TestConnectionMultiplexing(t *testing.T) { clientHttpSeparate: tc.separateHttpPort, stopSignal: syscall.SIGTERM, // check graceful stop } - clus, err := newEtcdProcessCluster(&cfg) + clus, err := newEtcdProcessCluster(t, &cfg) require.NoError(t, err) defer func() { diff --git a/tests/e2e/ctl_v2_test.go b/tests/e2e/ctl_v2_test.go index ce1f36bcc1fd..0f493f149287 100644 --- a/tests/e2e/ctl_v2_test.go +++ b/tests/e2e/ctl_v2_test.go @@ -547,7 +547,7 @@ func setupEtcdctlTest(t *testing.T, cfg *etcdProcessClusterConfig, quorum bool) if !quorum { cfg = configStandalone(*cfg) } - epc, err := newEtcdProcessCluster(cfg) + epc, err := newEtcdProcessCluster(t, cfg) if err != nil { t.Fatalf("could not start etcd process cluster (%v)", err) } diff --git a/tests/e2e/ctl_v3_auth_no_proxy_test.go b/tests/e2e/ctl_v3_auth_no_proxy_test.go index 58ead6e9b8e2..55ee8b4a6f1f 100644 --- a/tests/e2e/ctl_v3_auth_no_proxy_test.go +++ b/tests/e2e/ctl_v3_auth_no_proxy_test.go @@ -58,7 +58,7 @@ func TestCtlV3AuthCertCNWithWithConcurrentOperation(t *testing.T) { initialToken: "new", } - epc, err := newEtcdProcessCluster(&cx.cfg) + epc, err := newEtcdProcessCluster(t, &cx.cfg) if err != nil { t.Fatalf("Failed to start etcd cluster: %v", err) } diff --git a/tests/e2e/ctl_v3_grpc_test.go b/tests/e2e/ctl_v3_grpc_test.go index 3b3533f63498..c5d9b2626f20 100644 --- a/tests/e2e/ctl_v3_grpc_test.go +++ b/tests/e2e/ctl_v3_grpc_test.go @@ -93,7 +93,7 @@ func TestAuthority(t *testing.T) { // Enable debug mode to get logs with http2 headers (including authority) cfg.envVars = map[string]string{"GODEBUG": "http2debug=2"} - epc, err := newEtcdProcessCluster(&cfg) + epc, err := newEtcdProcessCluster(t, &cfg) if err != nil { t.Fatalf("could not start etcd process cluster (%v)", err) } diff --git a/tests/e2e/ctl_v3_make_mirror_test.go b/tests/e2e/ctl_v3_make_mirror_test.go index ef1cf24c5939..0dcc25b3651d 100644 --- a/tests/e2e/ctl_v3_make_mirror_test.go +++ b/tests/e2e/ctl_v3_make_mirror_test.go @@ -68,7 +68,7 @@ func testMirrorCommand(cx ctlCtx, flags []string, sourcekvs []kv, destkvs []kvEx dialTimeout: 7 * time.Second, } - mirrorepc, err := newEtcdProcessCluster(&mirrorctx.cfg) + mirrorepc, err := newEtcdProcessCluster(cx.t, &mirrorctx.cfg) if err != nil { cx.t.Fatalf("could not start etcd process cluster (%v)", err) } diff --git a/tests/e2e/ctl_v3_snapshot_test.go b/tests/e2e/ctl_v3_snapshot_test.go index 98c54beefeb7..b4e01e3ab6bf 100644 --- a/tests/e2e/ctl_v3_snapshot_test.go +++ b/tests/e2e/ctl_v3_snapshot_test.go @@ -164,7 +164,7 @@ func TestIssue6361(t *testing.T) { os.Setenv("ETCDCTL_API", "3") defer os.Unsetenv("ETCDCTL_API") - epc, err := newEtcdProcessCluster(&etcdProcessClusterConfig{ + epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{ clusterSize: 1, initialToken: "new", keepDataDir: true, @@ -276,7 +276,7 @@ func TestIssue6361(t *testing.T) { func TestRestoreCompactionRevBump(t *testing.T) { defer testutil.AfterTest(t) - epc, err := newEtcdProcessCluster(&etcdProcessClusterConfig{ + epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{ clusterSize: 1, initialToken: "new", keepDataDir: true, diff --git a/tests/e2e/ctl_v3_test.go b/tests/e2e/ctl_v3_test.go index b2a2c093e3c4..e451fa24b469 100644 --- a/tests/e2e/ctl_v3_test.go +++ b/tests/e2e/ctl_v3_test.go @@ -175,7 +175,7 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) { ret.cfg.initialCorruptCheck = ret.initialCorruptCheck } - epc, err := newEtcdProcessCluster(&ret.cfg) + epc, err := newEtcdProcessCluster(t, &ret.cfg) if err != nil { t.Fatalf("could not start etcd process cluster (%v)", err) } diff --git a/tests/e2e/etcd_corrupt_test.go b/tests/e2e/etcd_corrupt_test.go index 51f8ba534f47..3cab20fa9a46 100644 --- a/tests/e2e/etcd_corrupt_test.go +++ b/tests/e2e/etcd_corrupt_test.go @@ -147,7 +147,7 @@ func TestInPlaceRecovery(t *testing.T) { corruptCheckTime: time.Second, basePort: basePort, } - epcOld, err := newEtcdProcessCluster(&cfgOld) + epcOld, err := newEtcdProcessCluster(t, &cfgOld) if err != nil { t.Fatalf("could not start etcd process cluster (%v)", err) } diff --git a/tests/e2e/etcd_process.go b/tests/e2e/etcd_process.go index ef46ff97265e..f8a6fac83de1 100644 --- a/tests/e2e/etcd_process.go +++ b/tests/e2e/etcd_process.go @@ -15,12 +15,20 @@ package e2e import ( + "bytes" + "context" + "errors" "fmt" + "io" + "net/http" "net/url" "os" + "strings" + "time" "go.etcd.io/etcd/pkg/expect" "go.etcd.io/etcd/pkg/fileutil" + "go.etcd.io/etcd/pkg/proxy" ) var ( @@ -45,6 +53,9 @@ type etcdProcess interface { Config() *etcdServerProcessConfig Logs() logsExpect + PeerProxy() proxy.Server + Failpoints() *BinaryFailpoints + IsRunning() bool } type logsExpect interface { @@ -52,9 +63,11 @@ type logsExpect interface { } type etcdServerProcess struct { - cfg *etcdServerProcessConfig - proc *expect.ExpectProcess - donec chan struct{} // closed when Interact() terminates + cfg *etcdServerProcessConfig + proc *expect.ExpectProcess + proxy proxy.Server + failpoints *BinaryFailpoints + donec chan struct{} // closed when Interact() terminates } type etcdServerProcessConfig struct { @@ -76,6 +89,9 @@ type etcdServerProcessConfig struct { initialToken string initialCluster string + + proxy *proxy.ServerConfig + goFailPort int } func newEtcdServerProcess(cfg *etcdServerProcessConfig) (*etcdServerProcess, error) { @@ -87,7 +103,11 @@ func newEtcdServerProcess(cfg *etcdServerProcessConfig) (*etcdServerProcess, err return nil, err } } - return &etcdServerProcess{cfg: cfg, donec: make(chan struct{})}, nil + ep := &etcdServerProcess{cfg: cfg, donec: make(chan struct{})} + if cfg.goFailPort != 0 { + ep.failpoints = &BinaryFailpoints{member: ep} + } + return ep, nil } func (ep *etcdServerProcess) EndpointsV2() []string { return ep.EndpointsHTTP() } @@ -105,6 +125,15 @@ func (ep *etcdServerProcess) Start() error { if ep.proc != nil { panic("already started") } + if ep.cfg.proxy != nil && ep.proxy == nil { + // ep.cfg.lg.Info("starting proxy...", zap.String("name", ep.cfg.name), zap.String("from", ep.cfg.proxy.From.String()), zap.String("to", ep.cfg.proxy.To.String())) + ep.proxy = proxy.NewServer(*ep.cfg.proxy) + select { + case <-ep.proxy.Ready(): + case err := <-ep.proxy.Error(): + return err + } + } proc, err := spawnCmdWithEnv(append([]string{ep.cfg.execPath}, ep.cfg.args...), ep.cfg.envVars) if err != nil { return err @@ -138,6 +167,13 @@ func (ep *etcdServerProcess) Stop() (err error) { return err } } + if ep.proxy != nil { + err = ep.proxy.Close() + ep.proxy = nil + if err != nil { + return err + } + } return nil } @@ -167,3 +203,151 @@ func (ep *etcdServerProcess) Logs() logsExpect { } return ep.proc } + +func (ep *etcdServerProcess) PeerProxy() proxy.Server { + return ep.proxy +} + +func (ep *etcdServerProcess) Failpoints() *BinaryFailpoints { + return ep.failpoints +} + +func (ep *etcdServerProcess) IsRunning() bool { + if ep.proc == nil { + return false + } + + if ep.proc.IsRunning() { + return true + } + ep.proc = nil + return false +} + +type BinaryFailpoints struct { + member etcdProcess + availableCache map[string]string +} + +func (f *BinaryFailpoints) SetupEnv(failpoint, payload string) error { + if f.member.IsRunning() { + return errors.New("cannot setup environment variable while process is running") + } + f.member.Config().envVars["GOFAIL_FAILPOINTS"] = fmt.Sprintf("%s=%s", failpoint, payload) + return nil +} + +func (f *BinaryFailpoints) SetupHTTP(ctx context.Context, failpoint, payload string) error { + host := fmt.Sprintf("127.0.0.1:%d", f.member.Config().goFailPort) + failpointUrl := url.URL{ + Scheme: "http", + Host: host, + Path: failpoint, + } + r, err := http.NewRequestWithContext(ctx, "PUT", failpointUrl.String(), bytes.NewBuffer([]byte(payload))) + if err != nil { + return err + } + resp, err := httpClient.Do(r) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusNoContent { + return fmt.Errorf("bad status code: %d", resp.StatusCode) + } + return nil +} + +func (f *BinaryFailpoints) DeactivateHTTP(ctx context.Context, failpoint string) error { + host := fmt.Sprintf("127.0.0.1:%d", f.member.Config().goFailPort) + failpointUrl := url.URL{ + Scheme: "http", + Host: host, + Path: failpoint, + } + r, err := http.NewRequestWithContext(ctx, "DELETE", failpointUrl.String(), nil) + if err != nil { + return err + } + resp, err := httpClient.Do(r) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusNoContent { + return fmt.Errorf("bad status code: %d", resp.StatusCode) + } + return nil +} + +var httpClient = http.Client{ + Timeout: 1 * time.Second, +} + +func (f *BinaryFailpoints) Enabled() bool { + _, err := failpoints(f.member) + if err != nil { + return false + } + return true +} + +func (f *BinaryFailpoints) Available(failpoint string) bool { + if f.availableCache == nil { + fs, err := failpoints(f.member) + if err != nil { + panic(err) + } + f.availableCache = fs + } + _, found := f.availableCache[failpoint] + return found +} + +func failpoints(member etcdProcess) (map[string]string, error) { + body, err := fetchFailpointsBody(member) + if err != nil { + return nil, err + } + defer body.Close() + return parseFailpointsBody(body) +} + +func fetchFailpointsBody(member etcdProcess) (io.ReadCloser, error) { + address := fmt.Sprintf("127.0.0.1:%d", member.Config().goFailPort) + failpointUrl := url.URL{ + Scheme: "http", + Host: address, + } + resp, err := http.Get(failpointUrl.String()) + if err != nil { + return nil, err + } + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + return nil, fmt.Errorf("invalid status code, %d", resp.StatusCode) + } + return resp.Body, nil +} + +func parseFailpointsBody(body io.Reader) (map[string]string, error) { + data, err := io.ReadAll(body) + if err != nil { + return nil, err + } + lines := strings.Split(string(data), "\n") + failpoints := map[string]string{} + for _, line := range lines { + // Format: + // failpoint=value + parts := strings.SplitN(line, "=", 2) + failpoint := parts[0] + var value string + if len(parts) == 2 { + value = parts[1] + } + failpoints[failpoint] = value + } + return failpoints, nil +} diff --git a/tests/e2e/etcd_release_upgrade_test.go b/tests/e2e/etcd_release_upgrade_test.go index 6181bd7e4695..477ad30002ff 100644 --- a/tests/e2e/etcd_release_upgrade_test.go +++ b/tests/e2e/etcd_release_upgrade_test.go @@ -42,7 +42,7 @@ func TestReleaseUpgrade(t *testing.T) { copiedCfg.snapshotCount = 3 copiedCfg.baseScheme = "unix" // to avoid port conflict - epc, err := newEtcdProcessCluster(&copiedCfg) + epc, err := newEtcdProcessCluster(t, &copiedCfg) if err != nil { t.Fatalf("could not start etcd process cluster (%v)", err) } @@ -127,7 +127,7 @@ func TestReleaseUpgradeWithRestart(t *testing.T) { copiedCfg.snapshotCount = 10 copiedCfg.baseScheme = "unix" - epc, err := newEtcdProcessCluster(&copiedCfg) + epc, err := newEtcdProcessCluster(t, &copiedCfg) if err != nil { t.Fatalf("could not start etcd process cluster (%v)", err) } diff --git a/tests/e2e/etcdctl.go b/tests/e2e/etcdctl.go index 0b7794b8f1d3..cd1d2b8ec7ba 100644 --- a/tests/e2e/etcdctl.go +++ b/tests/e2e/etcdctl.go @@ -53,6 +53,15 @@ func (ctl *Etcdctl) Put(key, value string) error { return spawnWithExpect(args, "OK") } +func (ctl *Etcdctl) PutWithAuth(key, value, username, password string) error { + if ctl.v2 { + panic("Unsupported method for v2") + } + args := ctl.cmdArgs() + args = append(args, "--user", fmt.Sprintf("%s:%s", username, password), "put", key, value) + return spawnWithExpect(args, "OK") +} + func (ctl *Etcdctl) Set(key, value string) error { if !ctl.v2 { panic("Unsupported method for v3") @@ -62,6 +71,32 @@ func (ctl *Etcdctl) Set(key, value string) error { return spawnWithExpect(args, value) } +func (ctl *Etcdctl) AuthEnable() error { + args := ctl.cmdArgs("auth", "enable") + return spawnWithExpect(args, "Authentication Enabled") +} + +func (ctl *Etcdctl) UserGrantRole(user string, role string) (*clientv3.AuthUserGrantRoleResponse, error) { + var resp clientv3.AuthUserGrantRoleResponse + err := ctl.spawnJsonCmd(&resp, "", "user", "grant-role", user, role) + return &resp, err +} + +func (ctl *Etcdctl) UserAdd(name, password string) (*clientv3.AuthUserAddResponse, error) { + args := []string{"user", "add"} + if password == "" { + args = append(args, name) + args = append(args, "--no-password") + } else { + args = append(args, fmt.Sprintf("%s:%s", name, password)) + } + args = append(args, "--interactive=false") + + var resp clientv3.AuthUserAddResponse + err := ctl.spawnJsonCmd(&resp, "", args...) + return &resp, err +} + func (ctl *Etcdctl) AlarmList() (*clientv3.AlarmResponse, error) { if ctl.v2 { panic("Unsupported method for v2") diff --git a/tests/e2e/gateway_test.go b/tests/e2e/gateway_test.go index ee7c415b0a49..cd50d5b1def0 100644 --- a/tests/e2e/gateway_test.go +++ b/tests/e2e/gateway_test.go @@ -27,7 +27,7 @@ var ( ) func TestGateway(t *testing.T) { - ec, err := newEtcdProcessCluster(&configNoTLS) + ec, err := newEtcdProcessCluster(t, &configNoTLS) if err != nil { t.Fatal(err) } diff --git a/tests/e2e/http_health_check_test.go b/tests/e2e/http_health_check_test.go new file mode 100644 index 000000000000..f51766260327 --- /dev/null +++ b/tests/e2e/http_health_check_test.go @@ -0,0 +1,444 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !cluster_proxy + +package e2e + +import ( + "context" + "fmt" + "io" + "net/http" + "os" + "path" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/etcdserver/etcdserverpb" + "go.etcd.io/etcd/pkg/testutil" +) + +const ( + healthCheckTimeout = 3 * time.Second + putCommandTimeout = 200 * time.Millisecond +) + +type healthCheckConfig struct { + url string + expectedStatusCode int + expectedTimeoutError bool + expectedRespSubStrings []string +} + +type injectFailure func(ctx context.Context, t *testing.T, clus *etcdProcessCluster, duration time.Duration) + +func TestHTTPHealthHandler(t *testing.T) { + client := &http.Client{} + tcs := []struct { + name string + injectFailure injectFailure + clusterConfig etcdProcessClusterConfig + healthChecks []healthCheckConfig + }{ + { + name: "no failures", // happy case + clusterConfig: etcdProcessClusterConfig{clusterSize: 1}, + healthChecks: []healthCheckConfig{ + { + url: "/health", + expectedStatusCode: http.StatusOK, + }, + }, + }, + { + name: "activated no space alarm", + injectFailure: triggerNoSpaceAlarm, + clusterConfig: etcdProcessClusterConfig{clusterSize: 1, quotaBackendBytes: int64(13 * os.Getpagesize())}, + healthChecks: []healthCheckConfig{ + { + url: "/health", + expectedStatusCode: http.StatusServiceUnavailable, + }, + { + url: "/health?exclude=NOSPACE", + expectedStatusCode: http.StatusOK, + }, + }, + }, + { + name: "overloaded server slow apply", + injectFailure: triggerSlowApply, + clusterConfig: etcdProcessClusterConfig{clusterSize: 3, goFailEnabled: true}, + healthChecks: []healthCheckConfig{ + { + url: "/health?serializable=true", + expectedStatusCode: http.StatusOK, + }, + { + url: "/health?serializable=false", + expectedTimeoutError: true, + }, + }, + }, + { + name: "network partitioned", + injectFailure: blackhole, + clusterConfig: etcdProcessClusterConfig{clusterSize: 3, isPeerTLS: true, peerProxy: true}, + healthChecks: []healthCheckConfig{ + { + url: "/health?serializable=true", + expectedStatusCode: http.StatusOK, + }, + { + url: "/health?serializable=false", + expectedTimeoutError: true, + expectedStatusCode: http.StatusServiceUnavailable, + // old leader may return "etcdserver: leader changed" error with 503 in ReadIndex leaderChangedNotifier + }, + }, + }, + { + name: "raft loop deadlock", + injectFailure: triggerRaftLoopDeadLock, + clusterConfig: etcdProcessClusterConfig{clusterSize: 1, goFailEnabled: true}, + healthChecks: []healthCheckConfig{ + { + // current kubeadm etcd liveness check failed to detect raft loop deadlock in steady state + // ref. https://github.com/kubernetes/kubernetes/blob/master/cmd/kubeadm/app/phases/etcd/local.go#L225-L226 + // current liveness probe depends on the etcd /health check has a flaw that new /livez check should resolve. + url: "/health?serializable=true", + expectedStatusCode: http.StatusOK, + }, + { + url: "/health?serializable=false", + expectedTimeoutError: true, + }, + }, + }, + // verify that auth enabled serializable read must go through mvcc + { + name: "slow buffer write back with auth enabled", + injectFailure: triggerSlowBufferWriteBackWithAuth, + clusterConfig: etcdProcessClusterConfig{clusterSize: 1, goFailEnabled: true}, + healthChecks: []healthCheckConfig{ + { + url: "/health?serializable=true", + expectedTimeoutError: true, + }, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + defer testutil.AfterTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + cx := getDefaultCtlCtx(t) + cx.cfg = tc.clusterConfig + + clus, err := newEtcdProcessCluster(t, &cx.cfg) + require.NoError(t, err) + defer clus.Close() + executeUntil(ctx, t, func() { + if tc.injectFailure != nil { + // guaranteed that failure point is active until all the health checks timeout. + duration := time.Duration(len(tc.healthChecks)+10) * healthCheckTimeout + tc.injectFailure(ctx, t, clus, duration) + } + + for _, hc := range tc.healthChecks { + requestURL := clus.procs[0].EndpointsHTTP()[0] + hc.url + t.Logf("health check URL is %s", requestURL) + doHealthCheckAndVerify(t, client, requestURL, hc.expectedTimeoutError, hc.expectedStatusCode, hc.expectedRespSubStrings) + } + }) + }) + } +} + +var ( + defaultHealthCheckConfigs = []healthCheckConfig{ + { + url: "/livez", + expectedStatusCode: http.StatusOK, + expectedRespSubStrings: []string{`ok`}, + }, + { + url: "/readyz", + expectedStatusCode: http.StatusOK, + expectedRespSubStrings: []string{`ok`}, + }, + { + url: "/livez?verbose=true", + expectedStatusCode: http.StatusOK, + expectedRespSubStrings: []string{`[+]serializable_read ok`}, + }, + { + url: "/readyz?verbose=true", + expectedStatusCode: http.StatusOK, + expectedRespSubStrings: []string{ + `[+]serializable_read ok`, + `[+]data_corruption ok`, + }, + }, + } +) + +func TestHTTPLivezReadyzHandler(t *testing.T) { + client := &http.Client{} + tcs := []struct { + name string + injectFailure injectFailure + clusterConfig etcdProcessClusterConfig + healthChecks []healthCheckConfig + }{ + { + name: "no failures", // happy case + clusterConfig: etcdProcessClusterConfig{clusterSize: 1}, + healthChecks: defaultHealthCheckConfigs, + }, + { + name: "activated no space alarm", + injectFailure: triggerNoSpaceAlarm, + clusterConfig: etcdProcessClusterConfig{clusterSize: 1, quotaBackendBytes: int64(13 * os.Getpagesize())}, + healthChecks: defaultHealthCheckConfigs, + }, + // Readiness is not an indicator of performance. Slow response is not covered by readiness. + // refer to https://tinyurl.com/livez-readyz-design-doc or https://github.com/etcd-io/etcd/issues/16007#issuecomment-1726541091 in case tinyurl is down. + { + name: "overloaded server slow apply", + injectFailure: triggerSlowApply, + clusterConfig: etcdProcessClusterConfig{clusterSize: 3, goFailEnabled: true}, + // TODO expected behavior of readyz check should be 200 after ReadIndex check is implemented to replace linearizable read. + healthChecks: []healthCheckConfig{ + { + url: "/livez", + expectedStatusCode: http.StatusOK, + }, + { + url: "/readyz", + expectedTimeoutError: true, + expectedStatusCode: http.StatusServiceUnavailable, + }, + }, + }, + { + name: "network partitioned", + injectFailure: blackhole, + clusterConfig: etcdProcessClusterConfig{clusterSize: 3, isPeerTLS: true, peerProxy: true}, + healthChecks: []healthCheckConfig{ + { + url: "/livez", + expectedStatusCode: http.StatusOK, + }, + { + url: "/readyz", + expectedTimeoutError: true, + expectedStatusCode: http.StatusServiceUnavailable, + expectedRespSubStrings: []string{ + `[-]linearizable_read failed: etcdserver: leader changed`, + }, + }, + }, + }, + { + name: "raft loop deadlock", + injectFailure: triggerRaftLoopDeadLock, + clusterConfig: etcdProcessClusterConfig{clusterSize: 1, goFailEnabled: true}, + // TODO expected behavior of livez check should be 503 or timeout after RaftLoopDeadLock check is implemented. + healthChecks: []healthCheckConfig{ + { + url: "/livez", + expectedStatusCode: http.StatusOK, + }, + { + url: "/readyz", + expectedTimeoutError: true, + expectedStatusCode: http.StatusServiceUnavailable, + }, + }, + }, + // verify that auth enabled serializable read must go through mvcc + { + name: "slow buffer write back with auth enabled", + injectFailure: triggerSlowBufferWriteBackWithAuth, + clusterConfig: etcdProcessClusterConfig{clusterSize: 1, goFailEnabled: true}, + healthChecks: []healthCheckConfig{ + { + url: "/livez", + expectedTimeoutError: true, + }, + { + url: "/readyz", + expectedTimeoutError: true, + }, + }, + }, + { + name: "corrupt", + injectFailure: triggerCorrupt, + clusterConfig: etcdProcessClusterConfig{clusterSize: 3, corruptCheckTime: time.Second}, + healthChecks: []healthCheckConfig{ + { + url: "/livez?verbose=true", + expectedStatusCode: http.StatusOK, + expectedRespSubStrings: []string{`[+]serializable_read ok`}, + }, + { + url: "/readyz", + expectedStatusCode: http.StatusServiceUnavailable, + expectedRespSubStrings: []string{ + `[+]serializable_read ok`, + `[-]data_corruption failed: alarm activated: CORRUPT`, + }, + }, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + defer testutil.AfterTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + cx := getDefaultCtlCtx(t) + cx.cfg = tc.clusterConfig + + clus, err := newEtcdProcessCluster(t, &cx.cfg) + require.NoError(t, err) + defer clus.Close() + executeUntil(ctx, t, func() { + if tc.injectFailure != nil { + // guaranteed that failure point is active until all the health checks timeout. + duration := time.Duration(len(tc.healthChecks)+10) * healthCheckTimeout + tc.injectFailure(ctx, t, clus, duration) + } + + for _, hc := range tc.healthChecks { + requestURL := clus.procs[0].EndpointsHTTP()[0] + hc.url + t.Logf("health check URL is %s", requestURL) + doHealthCheckAndVerify(t, client, requestURL, hc.expectedTimeoutError, hc.expectedStatusCode, hc.expectedRespSubStrings) + } + }) + }) + } +} + +func doHealthCheckAndVerify(t *testing.T, client *http.Client, url string, expectTimeoutError bool, expectStatusCode int, expectRespSubStrings []string) { + ctx, cancel := context.WithTimeout(context.Background(), healthCheckTimeout) + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + require.NoErrorf(t, err, "failed to creat request %+v", err) + resp, herr := client.Do(req) + cancel() + if expectTimeoutError { + if herr != nil && strings.Contains(herr.Error(), context.DeadlineExceeded.Error()) { + return + } + } + require.NoErrorf(t, herr, "failed to get response %+v", err) + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + resp.Body.Close() + require.NoErrorf(t, err, "failed to read response %+v", err) + + t.Logf("health check response body is:\n%s", body) + require.Equal(t, expectStatusCode, resp.StatusCode) + for _, expectRespSubString := range expectRespSubStrings { + require.Contains(t, string(body), expectRespSubString) + } +} + +func triggerNoSpaceAlarm(ctx context.Context, t *testing.T, clus *etcdProcessCluster, _ time.Duration) { + buf := strings.Repeat("b", os.Getpagesize()) + etcdctl := NewEtcdctl(clus.procs[0].EndpointsV3(), clientNonTLS, false, false) + for { + if err := etcdctl.Put("foo", buf); err != nil { + if !strings.Contains(err.Error(), "etcdserver: mvcc: database space exceeded") { + t.Fatal(err) + } + break + } + } +} + +func triggerSlowApply(ctx context.Context, t *testing.T, clus *etcdProcessCluster, duration time.Duration) { + // the following proposal will be blocked at applying stage + // because when apply index < committed index, linearizable read would time out. + require.NoError(t, clus.procs[0].Failpoints().SetupHTTP(ctx, "beforeApplyOneEntryNormal", fmt.Sprintf(`sleep("%s")`, duration))) + etcdctl := NewEtcdctl(clus.procs[1].EndpointsV3(), clientNonTLS, false, false) + etcdctl.Put("foo", "bar") +} + +func blackhole(_ context.Context, t *testing.T, clus *etcdProcessCluster, _ time.Duration) { + member := clus.procs[0] + proxy := member.PeerProxy() + t.Logf("Blackholing traffic from and to member %q", member.Config().name) + proxy.BlackholeTx() + proxy.BlackholeRx() +} + +func triggerRaftLoopDeadLock(ctx context.Context, t *testing.T, clus *etcdProcessCluster, duration time.Duration) { + require.NoError(t, clus.procs[0].Failpoints().SetupHTTP(ctx, "raftBeforeSaveWaitWalSync", fmt.Sprintf(`sleep("%s")`, duration))) + etcdctl := NewEtcdctl(clus.procs[0].EndpointsV3(), clientNonTLS, false, false) + etcdctl.Put("foo", "bar") +} + +func triggerSlowBufferWriteBackWithAuth(ctx context.Context, t *testing.T, clus *etcdProcessCluster, duration time.Duration) { + etcdctl := NewEtcdctl(clus.procs[0].EndpointsV3(), clientNonTLS, false, false) + + _, err := etcdctl.UserAdd("root", "root") + require.NoError(t, err) + _, err = etcdctl.UserGrantRole("root", "root") + require.NoError(t, err) + require.NoError(t, etcdctl.AuthEnable()) + + require.NoError(t, clus.procs[0].Failpoints().SetupHTTP(ctx, "beforeWritebackBuf", fmt.Sprintf(`sleep("%s")`, duration))) + etcdctl.PutWithAuth("foo", "bar", "root", "root") +} + +func triggerCorrupt(ctx context.Context, t *testing.T, clus *etcdProcessCluster, _ time.Duration) { + etcdctl := NewEtcdctl(clus.procs[0].EndpointsV3(), clientNonTLS, false, false) + for i := 0; i < 10; i++ { + require.NoError(t, etcdctl.Put("foo", "bar")) + } + err := clus.procs[0].Stop() + require.NoError(t, err) + err = corruptBBolt(path.Join(clus.procs[0].Config().dataDirPath, "member", "snap", "db")) + require.NoError(t, err) + err = clus.procs[0].Start() + for { + time.Sleep(time.Second) + select { + case <-ctx.Done(): + require.NoError(t, err) + default: + } + response, err := etcdctl.AlarmList() + if err != nil { + continue + } + if len(response.Alarms) == 0 { + continue + } + require.Len(t, response.Alarms, 1) + if response.Alarms[0].Alarm == etcdserverpb.AlarmType_CORRUPT { + break + } + } +} diff --git a/tests/e2e/util.go b/tests/e2e/util.go index ce7289ae94d2..4ae1c2db1789 100644 --- a/tests/e2e/util.go +++ b/tests/e2e/util.go @@ -15,13 +15,20 @@ package e2e import ( + "context" "encoding/json" + "errors" "fmt" "math/rand" + "os" "strings" + "testing" "time" + "go.etcd.io/bbolt" + "go.etcd.io/etcd/mvcc/mvccpb" "go.etcd.io/etcd/pkg/expect" + "go.etcd.io/etcd/pkg/testutil" ) func waitReadyExpectProc(exproc *expect.ExpectProcess, readyStrs []string) error { @@ -108,3 +115,60 @@ func closeWithTimeout(p *expect.ExpectProcess, d time.Duration) error { func toTLS(s string) string { return strings.Replace(s, "http://", "https://", 1) } + +func executeUntil(ctx context.Context, t *testing.T, f func()) { + deadline, deadlineSet := ctx.Deadline() + timeout := time.Until(deadline) + donec := make(chan struct{}) + go func() { + defer close(donec) + f() + }() + + select { + case <-ctx.Done(): + msg := ctx.Err().Error() + if deadlineSet { + msg = fmt.Sprintf("test timed out after %v, err: %v", timeout, msg) + } + testutil.FatalStack(t, msg) + case <-donec: + } +} + +func corruptBBolt(fpath string) error { + db, derr := bbolt.Open(fpath, os.ModePerm, &bbolt.Options{}) + if derr != nil { + return derr + } + defer db.Close() + + return db.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket([]byte("key")) + if b == nil { + return errors.New("got nil bucket for 'key'") + } + keys, vals := [][]byte{}, [][]byte{} + c := b.Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + keys = append(keys, k) + var kv mvccpb.KeyValue + if uerr := kv.Unmarshal(v); uerr != nil { + return uerr + } + kv.Key[0]++ + kv.Value[0]++ + v2, v2err := kv.Marshal() + if v2err != nil { + return v2err + } + vals = append(vals, v2) + } + for i := range keys { + if perr := b.Put(keys[i], vals[i]); perr != nil { + return perr + } + } + return nil + }) +} diff --git a/tests/e2e/v2_curl_test.go b/tests/e2e/v2_curl_test.go index 223945449d43..54056351f996 100644 --- a/tests/e2e/v2_curl_test.go +++ b/tests/e2e/v2_curl_test.go @@ -38,7 +38,7 @@ func testCurlPutGet(t *testing.T, cfg *etcdProcessClusterConfig) { cfg = configStandalone(*cfg) cfg.enableV2 = true - epc, err := newEtcdProcessCluster(cfg) + epc, err := newEtcdProcessCluster(t, cfg) if err != nil { t.Fatalf("could not start etcd process cluster (%v)", err) } diff --git a/tests/e2e/v3_curl_maxstream_test.go b/tests/e2e/v3_curl_maxstream_test.go index f1a7d0c1fdac..c4782b870437 100644 --- a/tests/e2e/v3_curl_maxstream_test.go +++ b/tests/e2e/v3_curl_maxstream_test.go @@ -87,7 +87,7 @@ func testV3CurlMaxStream(t *testing.T, reachLimit bool, opts ...ctlOption) { // Step 2: create the cluster t.Log("Creating an etcd cluster") - epc, err := newEtcdProcessCluster(&cx.cfg) + epc, err := newEtcdProcessCluster(t, &cx.cfg) if err != nil { t.Fatalf("Failed to start etcd cluster: %v", err) } diff --git a/tests/e2e/watch_delay_test.go b/tests/e2e/watch_delay_test.go index 99e603de68c0..71627cb6d2aa 100644 --- a/tests/e2e/watch_delay_test.go +++ b/tests/e2e/watch_delay_test.go @@ -86,7 +86,7 @@ func TestWatchDelayForPeriodicProgressNotification(t *testing.T) { tc := tc tc.config.WatchProcessNotifyInterval = watchResponsePeriod t.Run(tc.name, func(t *testing.T) { - clus, err := newEtcdProcessCluster(&tc.config) + clus, err := newEtcdProcessCluster(t, &tc.config) require.NoError(t, err) defer clus.Close() c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS) @@ -106,7 +106,7 @@ func TestWatchDelayForManualProgressNotification(t *testing.T) { defer testutil.AfterTest(t) for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - clus, err := newEtcdProcessCluster(&tc.config) + clus, err := newEtcdProcessCluster(t, &tc.config) require.NoError(t, err) defer clus.Close() c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS) @@ -138,7 +138,7 @@ func TestWatchDelayForEvent(t *testing.T) { defer testutil.AfterTest(t) for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - clus, err := newEtcdProcessCluster(&tc.config) + clus, err := newEtcdProcessCluster(t, &tc.config) require.NoError(t, err) defer clus.Close() c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS)