From 0d6a2ae4b61405bbfc8cfa6472e316c6b8298b95 Mon Sep 17 00:00:00 2001 From: Steve Kuznetsov Date: Thu, 2 Feb 2023 11:38:08 -0700 Subject: [PATCH] cache: add a synthetic delay to the cache server Signed-off-by: Steve Kuznetsov --- Makefile | 2 +- cmd/sharded-test-server/cache.go | 3 ++- cmd/sharded-test-server/main.go | 8 +++++--- pkg/cache/server/config.go | 1 + pkg/cache/server/handler.go | 9 +++++++++ pkg/cache/server/options/options.go | 5 +++++ 6 files changed, 23 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index e435d5e94c5..57a84799ecf 100644 --- a/Makefile +++ b/Makefile @@ -317,7 +317,7 @@ test-e2e-sharded: require-kind build-all build-kind-images kind get kubeconfig > "$(WORK_DIR)/.kcp/kind.kubeconfig" rm -f "$(WORK_DIR)/.kcp/ready-to-test" UNSAFE_E2E_HACK_DISABLE_ETCD_FSYNC=true NO_GORUN=1 \ - ./bin/sharded-test-server --quiet --v=2 --log-dir-path="$(LOG_DIR)" --work-dir-path="$(WORK_DIR)" --shard-run-virtual-workspaces=false $(TEST_SERVER_ARGS) --number-of-shards=$(SHARDS) 2>&1 & PID=$$!; echo "PID $$PID" && \ + ./bin/sharded-test-server --quiet --v=2 --log-dir-path="$(LOG_DIR)" --work-dir-path="$(WORK_DIR)" --shard-run-virtual-workspaces=false $(TEST_SERVER_ARGS) --number-of-shards=$(SHARDS) --cache-synthetic-delay=500ms 2>&1 & PID=$$!; echo "PID $$PID" && \ trap 'kill -TERM $$PID' TERM INT EXIT && \ while [ ! -f "$(WORK_DIR)/.kcp/ready-to-test" ]; do sleep 1; done && \ NO_GORUN=1 GOOS=$(OS) GOARCH=$(ARCH) \ diff --git a/cmd/sharded-test-server/cache.go b/cmd/sharded-test-server/cache.go index 5903ad7b60f..a12ee496a33 100644 --- a/cmd/sharded-test-server/cache.go +++ b/cmd/sharded-test-server/cache.go @@ -39,7 +39,7 @@ import ( "github.com/kcp-dev/kcp/test/e2e/framework" ) -func startCacheServer(ctx context.Context, logDirPath, workingDir string) (<-chan error, string, error) { +func startCacheServer(ctx context.Context, logDirPath, workingDir string, syntheticDelay time.Duration) (<-chan error, string, error) { cyan := color.New(color.BgHiCyan, color.FgHiWhite).SprintFunc() inverse := color.New(color.BgHiWhite, color.FgHiCyan).SprintFunc() out := lineprefix.New( @@ -59,6 +59,7 @@ func startCacheServer(ctx context.Context, logDirPath, workingDir string) (<-cha "--embedded-etcd-client-port=8010", "--embedded-etcd-peer-port=8011", fmt.Sprintf("--secure-port=%d", cachePort), + fmt.Sprintf("--synthetic-delay=%s", syntheticDelay.String()), ) fmt.Fprintf(out, "running: %v\n", strings.Join(commandLine, " ")) cmd := exec.CommandContext(ctx, commandLine[0], commandLine[1:]...) //nolint:gosec diff --git a/cmd/sharded-test-server/main.go b/cmd/sharded-test-server/main.go index 98c1b2140e9..594005b71af 100644 --- a/cmd/sharded-test-server/main.go +++ b/cmd/sharded-test-server/main.go @@ -24,6 +24,7 @@ import ( "os" "path/filepath" "strings" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -46,6 +47,7 @@ func main() { logDirPath := flag.String("log-dir-path", "", "Path to the log files. If empty, log files are stored in the dot directories.") workDirPath := flag.String("work-dir-path", "", "Path to the working directory where the .kcp* dot directories are created. If empty, the working directory is the current directory.") numberOfShards := flag.Int("number-of-shards", 1, "The number of shards to create. The first created is assumed root.") + cacheSyntheticDelay := flag.Duration("cache-synthetic-delay", 0, "The duration of time the cache server will inject a delay for to all inbound requests.") quiet := flag.Bool("quiet", false, "Suppress output of the subprocesses") // split flags into --proxy-*, --shard-* and everything else (generic). The former are @@ -62,13 +64,13 @@ func main() { } flag.CommandLine.Parse(genericFlags) //nolint:errcheck - if err := start(proxyFlags, shardFlags, *logDirPath, *workDirPath, *numberOfShards, *quiet); err != nil { + if err := start(proxyFlags, shardFlags, *logDirPath, *workDirPath, *numberOfShards, *quiet, *cacheSyntheticDelay); err != nil { fmt.Println(err.Error()) os.Exit(1) } } -func start(proxyFlags, shardFlags []string, logDirPath, workDirPath string, numberOfShards int, quiet bool) error { +func start(proxyFlags, shardFlags []string, logDirPath, workDirPath string, numberOfShards int, quiet bool, cacheSyntheticDelay time.Duration) error { // We use a shutdown context to know that it's time to gather metrics, before stopping the shards, proxy, etc. shutdownCtx, shutdownCancel := context.WithCancel(genericapiserver.SetupSignalContext()) defer shutdownCancel() @@ -186,7 +188,7 @@ func start(proxyFlags, shardFlags []string, logDirPath, workDirPath string, numb cacheServerErrCh := make(chan indexErrTuple) cacheServerConfigPath := "" - cacheServerCh, configPath, err := startCacheServer(ctx, logDirPath, workDirPath) + cacheServerCh, configPath, err := startCacheServer(ctx, logDirPath, workDirPath, cacheSyntheticDelay) if err != nil { return fmt.Errorf("error starting the cache server: %w", err) } diff --git a/pkg/cache/server/config.go b/pkg/cache/server/config.go index 35b99b7786a..807191073d2 100644 --- a/pkg/cache/server/config.go +++ b/pkg/cache/server/config.go @@ -141,6 +141,7 @@ func NewConfig(opts *cacheserveroptions.CompletedOptions, optionalLocalShardRest apiHandler = filters.WithClusterScope(apiHandler) apiHandler = WithShardScope(apiHandler) apiHandler = WithServiceScope(apiHandler) + apiHandler = WithSyntheticDelay(apiHandler, opts.SyntheticDelay) return apiHandler } diff --git a/pkg/cache/server/handler.go b/pkg/cache/server/handler.go index b801ccec05d..4a5f1d315fd 100644 --- a/pkg/cache/server/handler.go +++ b/pkg/cache/server/handler.go @@ -22,6 +22,7 @@ import ( "net/url" "regexp" "strings" + "time" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -141,3 +142,11 @@ func WithServiceScope(handler http.Handler) http.Handler { handler.ServeHTTP(w, req) }) } + +// WithSyntheticDelay injects a synthetic delay to calls, to exacerbate timing issues and expose inconsistent client behavior. +func WithSyntheticDelay(handler http.Handler, delay time.Duration) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + time.Sleep(delay) + handler.ServeHTTP(w, req) + }) +} diff --git a/pkg/cache/server/options/options.go b/pkg/cache/server/options/options.go index ef599b7eff1..aa78cddba5c 100644 --- a/pkg/cache/server/options/options.go +++ b/pkg/cache/server/options/options.go @@ -17,6 +17,8 @@ limitations under the License. package options import ( + "time" + "github.com/spf13/pflag" genericoptions "k8s.io/apiserver/pkg/server/options" @@ -34,6 +36,7 @@ type Options struct { Authorization *genericoptions.DelegatingAuthorizationOptions APIEnablement *genericoptions.APIEnablementOptions EmbeddedEtcd etcdoptions.Options + SyntheticDelay time.Duration } type completedOptions struct { @@ -44,6 +47,7 @@ type completedOptions struct { Authorization *genericoptions.DelegatingAuthorizationOptions APIEnablement *genericoptions.APIEnablementOptions EmbeddedEtcd etcdoptions.CompletedOptions + SyntheticDelay time.Duration } type CompletedOptions struct { @@ -113,4 +117,5 @@ func (o *Options) Complete() (*CompletedOptions, error) { func (o *Options) AddFlags(fs *pflag.FlagSet) { o.EmbeddedEtcd.AddFlags(fs) o.SecureServing.AddFlags(fs) + fs.DurationVar(&o.SyntheticDelay, "synthetic-delay", 0, "The duration of time the cache server will inject a delay for to all inbound requests. Useful for testing.") }