Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watch random scheduler 3.4 #15478

Merged
merged 3 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ func configureHttpServer(srv *http.Server, cfg etcdserver.ServerConfig) error {
// todo (ahrtr): should we support configuring other parameters in the future as well?
return http2.ConfigureServer(srv, &http2.Server{
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
// Override to avoid using priority scheduler which is affected by https://github.com/golang/go/issues/58804.
NewWriteScheduler: http2.NewRandomWriteScheduler,
})
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
github.com/sirupsen/logrus v1.6.0 // indirect
go.uber.org/atomic v1.3.2 // indirect
go.uber.org/multierr v1.1.0 // indirect
golang.org/x/sync v0.1.0 // indirect
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems not correct. Why it's a indirect instead of a direct dependency?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I added started using errgroup directly.

golang.org/x/text v0.7.0 // indirect
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 // indirect
google.golang.org/protobuf v1.26.0-rc.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
6 changes: 3 additions & 3 deletions pkg/stringutil/rand.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func UniqueStrings(slen uint, n int) (ss []string) {
exist := make(map[string]struct{})
ss = make([]string, 0, n)
for len(ss) < n {
s := randString(slen)
s := RandString(slen)
if _, ok := exist[s]; !ok {
ss = append(ss, s)
exist[s] = struct{}{}
Expand All @@ -37,14 +37,14 @@ func UniqueStrings(slen uint, n int) (ss []string) {
func RandomStrings(slen uint, n int) (ss []string) {
ss = make([]string, 0, n)
for i := 0; i < n; i++ {
ss = append(ss, randString(slen))
ss = append(ss, RandString(slen))
}
return ss
}

const chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"

func randString(l uint) string {
func RandString(l uint) string {
rand.Seed(time.Now().UnixNano())
s := make([]byte, l)
for i := 0; i < int(l); i++ {
Expand Down
8 changes: 7 additions & 1 deletion tests/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/url"
"os"
"strings"
"time"

"go.etcd.io/etcd/etcdserver"
)
Expand Down Expand Up @@ -135,7 +136,8 @@ type etcdProcessClusterConfig struct {
initialCorruptCheck bool
authTokenOpts string

MaxConcurrentStreams uint32 // default is math.MaxUint32
MaxConcurrentStreams uint32 // default is math.MaxUint32
WatchProcessNotifyInterval time.Duration
}

// newEtcdProcessCluster launches a new cluster from etcd processes, returning
Expand Down Expand Up @@ -269,6 +271,10 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
args = append(args, "--max-concurrent-streams", fmt.Sprintf("%d", cfg.MaxConcurrentStreams))
}

if cfg.WatchProcessNotifyInterval != 0 {
args = append(args, "--experimental-watch-progress-notify-interval", cfg.WatchProcessNotifyInterval.String())
}

etcdCfgs[i] = &etcdServerProcessConfig{
execPath: cfg.execPath,
args: args,
Expand Down
277 changes: 277 additions & 0 deletions tests/e2e/watch_delay_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// These tests are performance sensitive, addition of cluster proxy makes them unstable.
//go:build !cluster_proxy

package e2e

import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/pkg/stringutil"
"go.etcd.io/etcd/pkg/testutil"
"go.etcd.io/etcd/pkg/transport"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)

const (
watchResponsePeriod = 100 * time.Millisecond
watchTestDuration = 5 * time.Second
// TODO: Reduce maxWatchDelay when https://github.com/etcd-io/etcd/issues/15402 is addressed.
maxWatchDelay = 2 * time.Second
// Configure enough read load to cause starvation from https://github.com/etcd-io/etcd/issues/15402.
// Tweaked to pass on GitHub runner. For local runs please increase parameters.
// TODO: Increase when https://github.com/etcd-io/etcd/issues/15402 is fully addressed.
numberOfPreexistingKeys = 100
sizeOfPreexistingValues = 5000
readLoadConcurrency = 10
)

type testCase struct {
name string
config etcdProcessClusterConfig
}

var tcs = []testCase{
{
name: "NoTLS",
config: etcdProcessClusterConfig{clusterSize: 1},
},
{
name: "ClientTLS",
config: etcdProcessClusterConfig{clusterSize: 1, isClientAutoTLS: true, clientTLS: clientTLS},
},
}

func TestWatchDelayForPeriodicProgressNotification(t *testing.T) {
defer testutil.AfterTest(t)
for _, tc := range tcs {
tc := tc
tc.config.WatchProcessNotifyInterval = watchResponsePeriod
t.Run(tc.name, func(t *testing.T) {
clus, err := newEtcdProcessCluster(&tc.config)
require.NoError(t, err)
defer clus.Close()
c := newClient(t, clus, tc.config)
require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues))

ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration)
defer cancel()
g := errgroup.Group{}
continuouslyExecuteGetAll(ctx, t, &g, c)
validateWatchDelay(t, c.Watch(ctx, "fake-key", clientv3.WithProgressNotify()))
require.NoError(t, g.Wait())
})
}
}

func TestWatchDelayForManualProgressNotification(t *testing.T) {
defer testutil.AfterTest(t)
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
clus, err := newEtcdProcessCluster(&tc.config)
require.NoError(t, err)
defer clus.Close()
c := newClient(t, clus, tc.config)
require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues))

ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration)
defer cancel()
g := errgroup.Group{}
continuouslyExecuteGetAll(ctx, t, &g, c)
g.Go(func() error {
for {
err := c.RequestProgress(ctx)
if err != nil {
if strings.Contains(err.Error(), "context deadline exceeded") {
return nil
}
return err
}
time.Sleep(watchResponsePeriod)
}
})
validateWatchDelay(t, c.Watch(ctx, "fake-key"))
require.NoError(t, g.Wait())
})
}
}

func TestWatchDelayForEvent(t *testing.T) {
defer testutil.AfterTest(t)
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
clus, err := newEtcdProcessCluster(&tc.config)
require.NoError(t, err)
defer clus.Close()
c := newClient(t, clus, tc.config)
require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues))

ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration)
defer cancel()
g := errgroup.Group{}
g.Go(func() error {
i := 0
for {
_, err := c.Put(ctx, "key", fmt.Sprintf("%d", i))
if err != nil {
if strings.Contains(err.Error(), "context deadline exceeded") {
return nil
}
return err
}
time.Sleep(watchResponsePeriod)
}
})
continuouslyExecuteGetAll(ctx, t, &g, c)
validateWatchDelay(t, c.Watch(ctx, "key"))
require.NoError(t, g.Wait())
})
}
}

func validateWatchDelay(t *testing.T, watch clientv3.WatchChan) {
start := time.Now()
var maxDelay time.Duration
for range watch {
sinceLast := time.Since(start)
if sinceLast > watchResponsePeriod+maxWatchDelay {
t.Errorf("Unexpected watch response delayed over allowed threshold %s, delay: %s", maxWatchDelay, sinceLast-watchResponsePeriod)
} else {
t.Logf("Got watch response, since last: %s", sinceLast)
}
if sinceLast > maxDelay {
maxDelay = sinceLast
}
start = time.Now()
}
sinceLast := time.Since(start)
if sinceLast > maxDelay && sinceLast > watchResponsePeriod+maxWatchDelay {
t.Errorf("Unexpected watch response delayed over allowed threshold %s, delay: unknown", maxWatchDelay)
t.Errorf("Test finished while in middle of delayed response, measured delay: %s", sinceLast-watchResponsePeriod)
t.Logf("Please increase the test duration to measure delay")
} else {
t.Logf("Max delay: %s", maxDelay-watchResponsePeriod)
}
}

func fillEtcdWithData(ctx context.Context, c *clientv3.Client, keyCount int, valueSize uint) error {
g := errgroup.Group{}
concurrency := 10
keysPerRoutine := keyCount / concurrency
for i := 0; i < concurrency; i++ {
i := i
g.Go(func() error {
for j := 0; j < keysPerRoutine; j++ {
_, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(valueSize))
if err != nil {
return err
}
}
return nil
})
}
return g.Wait()
}

func continuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Group, c *clientv3.Client) {
mux := sync.RWMutex{}
size := 0
for i := 0; i < readLoadConcurrency; i++ {
g.Go(func() error {
for {
_, err := c.Get(ctx, "", clientv3.WithPrefix())
if err != nil {
if strings.Contains(err.Error(), "context deadline exceeded") {
return nil
}
return err
}
mux.Lock()
size += numberOfPreexistingKeys * sizeOfPreexistingValues
mux.Unlock()
}
})
}
g.Go(func() error {
lastSize := size
for range time.Tick(time.Second) {
select {
case <-ctx.Done():
return nil
default:
}
mux.RLock()
t.Logf("Generating read load around %.1f MB/s", float64(size-lastSize)/1000/1000)
lastSize = size
mux.RUnlock()
}
return nil
})
}

func newClient(t *testing.T, clus *etcdProcessCluster, cfg etcdProcessClusterConfig) *clientv3.Client {
tlscfg, err := tlsInfo(t, cfg)
if err != nil {
t.Fatal(err)
}
ccfg := clientv3.Config{
Endpoints: clus.EndpointsV3(),
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
}
if tlscfg != nil {
tls, err := tlscfg.ClientConfig()
if err != nil {
t.Fatal(err)
}
ccfg.TLS = tls
}
c, err := clientv3.New(ccfg)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
c.Close()
})
return c
}

func tlsInfo(t testing.TB, cfg etcdProcessClusterConfig) (*transport.TLSInfo, error) {
switch cfg.clientTLS {
case clientNonTLS, clientTLSAndNonTLS:
return nil, nil
case clientTLS:
if cfg.isClientAutoTLS {
tls, err := transport.SelfCert(zap.NewNop(), t.TempDir(), []string{"localhost"}, 1)
if err != nil {
return nil, fmt.Errorf("failed to generate cert: %s", err)
}
return &tls, nil
}
panic("Unsupported non-auto tls")
default:
return nil, fmt.Errorf("config %v not supported", cfg)
}
}