Skip to content

Commit

Permalink
[3.4] backport health check e2e tests.
Browse files Browse the repository at this point in the history
Signed-off-by: Siyuan Zhang <sizhang@google.com>
  • Loading branch information
siyuanfoundation committed Dec 19, 2023
1 parent 2a07f80 commit a798e86
Show file tree
Hide file tree
Showing 22 changed files with 815 additions and 49 deletions.
1 change: 1 addition & 0 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2179,6 +2179,7 @@ func (s *EtcdServer) apply(
e := es[i]
switch e.Type {
case raftpb.EntryNormal:
// gofail: var beforeApplyOneEntryNormal struct{}
s.applyEntryNormal(&e)
s.setAppliedIndex(e.Index)
s.setTerm(e.Term)
Expand Down
1 change: 1 addition & 0 deletions mvcc/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ func newBatchTxBuffered(backend *backend) *batchTxBuffered {
func (t *batchTxBuffered) Unlock() {
if t.pending != 0 {
t.backend.readTx.Lock() // blocks txReadBuffer for writing.
// gofail: var beforeWritebackBuf struct{}
t.buf.writeback(&t.backend.readTx.buf)
t.backend.readTx.Unlock()
if t.pending >= t.backend.batchLimit {
Expand Down
4 changes: 4 additions & 0 deletions pkg/expect/expect.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,7 @@ func (ep *ExpectProcess) Send(command string) error {
_, err := io.WriteString(ep.fpty, command)
return err
}

func (ep *ExpectProcess) IsRunning() bool {
return ep.cmd != nil
}
28 changes: 11 additions & 17 deletions tests/e2e/cluster_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ import (
)

type proxyEtcdProcess struct {
etcdProc etcdProcess
proxyV2 *proxyV2Proc
proxyV3 *proxyV3Proc
*etcdServerProcess
proxyV2 *proxyV2Proc
proxyV3 *proxyV3Proc
}

func newEtcdProcess(cfg *etcdServerProcessConfig) (etcdProcess, error) {
Expand All @@ -45,15 +45,13 @@ func newProxyEtcdProcess(cfg *etcdServerProcessConfig) (*proxyEtcdProcess, error
return nil, err
}
pep := &proxyEtcdProcess{
etcdProc: ep,
proxyV2: newProxyV2Proc(cfg),
proxyV3: newProxyV3Proc(cfg),
etcdServerProcess: ep,
proxyV2: newProxyV2Proc(cfg),
proxyV3: newProxyV3Proc(cfg),
}
return pep, nil
}

func (p *proxyEtcdProcess) Config() *etcdServerProcessConfig { return p.etcdProc.Config() }

func (p *proxyEtcdProcess) EndpointsV2() []string { return p.EndpointsHTTP() }
func (p *proxyEtcdProcess) EndpointsV3() []string { return p.EndpointsGRPC() }
func (p *proxyEtcdProcess) EndpointsHTTP() []string { return p.proxyV2.endpoints() }
Expand All @@ -63,7 +61,7 @@ func (p *proxyEtcdProcess) EndpointsMetrics() []string {
}

func (p *proxyEtcdProcess) Start() error {
if err := p.etcdProc.Start(); err != nil {
if err := p.etcdServerProcess.Start(); err != nil {
return err
}
if err := p.proxyV2.Start(); err != nil {
Expand All @@ -73,7 +71,7 @@ func (p *proxyEtcdProcess) Start() error {
}

func (p *proxyEtcdProcess) Restart() error {
if err := p.etcdProc.Restart(); err != nil {
if err := p.etcdServerProcess.Restart(); err != nil {
return err
}
if err := p.proxyV2.Restart(); err != nil {
Expand All @@ -87,7 +85,7 @@ func (p *proxyEtcdProcess) Stop() error {
if v3err := p.proxyV3.Stop(); err == nil {
err = v3err
}
if eerr := p.etcdProc.Stop(); eerr != nil && err == nil {
if eerr := p.etcdServerProcess.Stop(); eerr != nil && err == nil {
// fails on go-grpc issue #1384
if !strings.Contains(eerr.Error(), "exit status 2") {
err = eerr
Expand All @@ -101,7 +99,7 @@ func (p *proxyEtcdProcess) Close() error {
if v3err := p.proxyV3.Close(); err == nil {
err = v3err
}
if eerr := p.etcdProc.Close(); eerr != nil && err == nil {
if eerr := p.etcdServerProcess.Close(); eerr != nil && err == nil {
// fails on go-grpc issue #1384
if !strings.Contains(eerr.Error(), "exit status 2") {
err = eerr
Expand All @@ -113,11 +111,7 @@ func (p *proxyEtcdProcess) Close() error {
func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal {
p.proxyV3.WithStopSignal(sig)
p.proxyV3.WithStopSignal(sig)
return p.etcdProc.WithStopSignal(sig)
}

func (p *proxyEtcdProcess) Logs() logsExpect {
return p.etcdProc.Logs()
return p.etcdServerProcess.WithStopSignal(sig)
}

type proxyProc struct {
Expand Down
61 changes: 50 additions & 11 deletions tests/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (
"net/url"
"os"
"strings"
"testing"
"time"

"go.etcd.io/etcd/etcdserver"
"go.etcd.io/etcd/pkg/proxy"
"go.uber.org/zap"
)

const etcdProcessBasePort = 20000
Expand Down Expand Up @@ -97,10 +100,12 @@ type etcdProcessCluster struct {
}

type etcdProcessClusterConfig struct {
execPath string
dataDirPath string
keepDataDir bool
envVars map[string]string
execPath string
dataDirPath string
keepDataDir bool
goFailEnabled bool
peerProxy bool
envVars map[string]string

clusterSize int

Expand Down Expand Up @@ -141,13 +146,13 @@ type etcdProcessClusterConfig struct {

// newEtcdProcessCluster launches a new cluster from etcd processes, returning
// a new etcdProcessCluster once all nodes are ready to accept client requests.
func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
func newEtcdProcessCluster(t testing.TB, cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
epc, err := initEtcdProcessCluster(cfg)
if err != nil {
return nil, err
}

return startEtcdProcessCluster(epc, cfg)
return startEtcdProcessCluster(t, epc, cfg)
}

// `initEtcdProcessCluster` initializes a new cluster based on the given config.
Expand All @@ -174,7 +179,7 @@ func initEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster,
}

// `startEtcdProcessCluster` launches a new cluster from etcd processes.
func startEtcdProcessCluster(epc *etcdProcessCluster, cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
func startEtcdProcessCluster(t testing.TB, epc *etcdProcessCluster, cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
if err := epc.Start(); err != nil {
return nil, err
}
Expand All @@ -185,6 +190,12 @@ func startEtcdProcessCluster(epc *etcdProcessCluster, cfg *etcdProcessClusterCon
proc.WithStopSignal(cfg.stopSignal)
}
}
for _, proc := range epc.procs {
if cfg.goFailEnabled && !proc.Failpoints().Enabled() {
epc.Close()
t.Skip("please run test with 'FAILPOINTS=true'")
}
}
return epc, nil
}

Expand Down Expand Up @@ -223,6 +234,8 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
var curls []string
var curl string
port := cfg.basePort + 5*i
peerPort := port + 1
peer2Port := port + 3
clientPort := port
clientHttpPort := port + 4

Expand All @@ -235,6 +248,20 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
}

purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
peerAdvertiseUrl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
var proxyCfg *proxy.ServerConfig
if cfg.peerProxy {
if !cfg.isPeerTLS {
panic("Can't use peer proxy without peer TLS as it can result in malformed packets")
}
peerAdvertiseUrl.Host = fmt.Sprintf("localhost:%d", peer2Port)
proxyCfg = &proxy.ServerConfig{
Logger: zap.NewNop(),
To: purl,
From: peerAdvertiseUrl,
}
}

name := fmt.Sprintf("testname%d", i)
dataDirPath := cfg.dataDirPath
if cfg.dataDirPath == "" {
Expand All @@ -244,14 +271,14 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
panic(fmt.Sprintf("could not get tempdir for datadir: %s", derr))
}
}
initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
initialCluster[i] = fmt.Sprintf("%s=%s", name, peerAdvertiseUrl.String())

args := []string{
"--name", name,
"--listen-client-urls", strings.Join(curls, ","),
"--advertise-client-urls", strings.Join(curls, ","),
"--listen-peer-urls", purl.String(),
"--initial-advertise-peer-urls", purl.String(),
"--initial-advertise-peer-urls", peerAdvertiseUrl.String(),
"--initial-cluster-token", cfg.initialToken,
"--data-dir", dataDirPath,
"--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount),
Expand Down Expand Up @@ -309,19 +336,31 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
args = append(args, "--debug")
}

envVars := map[string]string{}
for key, value := range cfg.envVars {
envVars[key] = value
}
var gofailPort int
if cfg.goFailEnabled {
gofailPort = (i+1)*10000 + 2381
envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", gofailPort)
}

etcdCfgs[i] = &etcdServerProcessConfig{
execPath: cfg.execPath,
args: args,
envVars: cfg.envVars,
envVars: envVars,
tlsArgs: cfg.tlsArgs(),
dataDirPath: dataDirPath,
keepDataDir: cfg.keepDataDir,
name: name,
purl: purl,
purl: peerAdvertiseUrl,
acurl: curl,
murl: murl,
initialToken: cfg.initialToken,
clientHttpUrl: clientHttpUrl,
goFailPort: gofailPort,
proxy: proxyCfg,
}
}

Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/cmux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestConnectionMultiplexing(t *testing.T) {
clientHttpSeparate: tc.separateHttpPort,
stopSignal: syscall.SIGTERM, // check graceful stop
}
clus, err := newEtcdProcessCluster(&cfg)
clus, err := newEtcdProcessCluster(t, &cfg)
require.NoError(t, err)

defer func() {
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/ctl_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func setupEtcdctlTest(t *testing.T, cfg *etcdProcessClusterConfig, quorum bool)
if !quorum {
cfg = configStandalone(*cfg)
}
epc, err := newEtcdProcessCluster(cfg)
epc, err := newEtcdProcessCluster(t, cfg)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/ctl_v3_auth_no_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestCtlV3AuthCertCNWithWithConcurrentOperation(t *testing.T) {
initialToken: "new",
}

epc, err := newEtcdProcessCluster(&cx.cfg)
epc, err := newEtcdProcessCluster(t, &cx.cfg)
if err != nil {
t.Fatalf("Failed to start etcd cluster: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/ctl_v3_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestAuthority(t *testing.T) {
// Enable debug mode to get logs with http2 headers (including authority)
cfg.envVars = map[string]string{"GODEBUG": "http2debug=2"}

epc, err := newEtcdProcessCluster(&cfg)
epc, err := newEtcdProcessCluster(t, &cfg)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/ctl_v3_make_mirror_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func testMirrorCommand(cx ctlCtx, flags []string, sourcekvs []kv, destkvs []kvEx
dialTimeout: 7 * time.Second,
}

mirrorepc, err := newEtcdProcessCluster(&mirrorctx.cfg)
mirrorepc, err := newEtcdProcessCluster(cx.t, &mirrorctx.cfg)
if err != nil {
cx.t.Fatalf("could not start etcd process cluster (%v)", err)
}
Expand Down
4 changes: 2 additions & 2 deletions tests/e2e/ctl_v3_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func TestIssue6361(t *testing.T) {
os.Setenv("ETCDCTL_API", "3")
defer os.Unsetenv("ETCDCTL_API")

epc, err := newEtcdProcessCluster(&etcdProcessClusterConfig{
epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{
clusterSize: 1,
initialToken: "new",
keepDataDir: true,
Expand Down Expand Up @@ -276,7 +276,7 @@ func TestIssue6361(t *testing.T) {
func TestRestoreCompactionRevBump(t *testing.T) {
defer testutil.AfterTest(t)

epc, err := newEtcdProcessCluster(&etcdProcessClusterConfig{
epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{
clusterSize: 1,
initialToken: "new",
keepDataDir: true,
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/ctl_v3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
ret.cfg.initialCorruptCheck = ret.initialCorruptCheck
}

epc, err := newEtcdProcessCluster(&ret.cfg)
epc, err := newEtcdProcessCluster(t, &ret.cfg)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/etcd_corrupt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestInPlaceRecovery(t *testing.T) {
corruptCheckTime: time.Second,
basePort: basePort,
}
epcOld, err := newEtcdProcessCluster(&cfgOld)
epcOld, err := newEtcdProcessCluster(t, &cfgOld)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
Expand Down
Loading

0 comments on commit a798e86

Please sign in to comment.