diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index f5d4f4a2e021..59f2f42d5e9d 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "admission_control_elastic_backup.go", "admission_control_elastic_cdc.go", "admission_control_elastic_io.go", + "admission_control_follower_overload.go", "admission_control_index_backfill.go", "admission_control_index_overload.go", "admission_control_intent_resolution.go", diff --git a/pkg/cmd/roachtest/tests/admission_control.go b/pkg/cmd/roachtest/tests/admission_control.go index f2c23526d8f2..c4ca50c1fe80 100644 --- a/pkg/cmd/roachtest/tests/admission_control.go +++ b/pkg/cmd/roachtest/tests/admission_control.go @@ -28,6 +28,7 @@ func registerAdmission(r registry.Registry) { // roachperf. Need to munge with histogram data to compute % test run spent // over some latency threshold. Will be Useful to track over time. + registerFollowerOverload(r) registerElasticControlForBackups(r) registerElasticControlForCDC(r) registerElasticControlForRowLevelTTL(r) diff --git a/pkg/cmd/roachtest/tests/admission_control_follower_overload.go b/pkg/cmd/roachtest/tests/admission_control_follower_overload.go new file mode 100644 index 000000000000..4b5697d58256 --- /dev/null +++ b/pkg/cmd/roachtest/tests/admission_control_follower_overload.go @@ -0,0 +1,301 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tests + +import ( + "context" + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/stretchr/testify/require" +) + +func registerFollowerOverload(r registry.Registry) { + spec := func(subtest string, cfg admissionControlFollowerOverloadOpts) registry.TestSpec { + return registry.TestSpec{ + Name: "admission/follower-overload/" + subtest, + Owner: registry.OwnerAdmissionControl, + Timeout: 3 * time.Hour, + CompatibleClouds: registry.AllExceptAWS, + Suites: registry.ManualOnly, + Tags: registry.Tags(`manual`), + // TODO(aaditya): Revisit this as part of #111614. + //Suites: registry.Suites(registry.Weekly), + //Tags: registry.Tags(`weekly`), + // Don't re-use the cluster, since we don't have any conventions + // about `wipe` removing any custom systemd units. + // + // NB: use 16vcpu machines to avoid getting anywhere close to EBS + // bandwidth limits on AWS, see: + // https://github.com/cockroachdb/cockroach/issues/82109#issuecomment-1154049976 + Cluster: r.MakeClusterSpec(4, spec.CPU(4), spec.ReuseNone()), + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runAdmissionControlFollowerOverload(ctx, t, c, cfg) + }, + } + } + + // The control group - just the vanilla cluster workloads, no nemesis. + // Running this and looking at performance blips can give us an idea of what + // normal looks like. This is most directly contrasted with + // presplit-with-leases but since the workload on N3 barely needs any + // resources, it should also compare well with presplit-no-leases. + r.Add(spec("presplit-control", admissionControlFollowerOverloadOpts{ + kv0N12: true, + kvN12ExtraArgs: "--splits 100", + kv50N3: true, + })) + + // n1 and n2 field all active work but replicate follower writes to n3. n3 + // has no leases but has a disk nemesis and without regular traffic flow + // control, cause L0 overload. The workload should be steady with good p99s + // since there is no backpressure from n3 without regular traffic flow + // control, and without proposal quota pools emptying at the time of + // writing, and we're not sending it any foreground traffic. The quota pools + // shouldn't deplete since writes are spread out evenly across 100 ranges. + r.Add(spec("presplit-no-leases", admissionControlFollowerOverloadOpts{ + ioNemesis: true, + kv0N12: true, + kvN12ExtraArgs: "--splits 100", + })) + + // Everything as before, but now the writes all hit the same range. This + // could lead to the quota pool on that range running significantly emptier, + // possibly to the point of stalling foreground writes. + r.Add(spec("hotspot-no-leases", admissionControlFollowerOverloadOpts{ + ioNemesis: true, + kv0N12: true, + kvN12ExtraArgs: "--sequential", + kv50N3: true, + })) + + // This is identical to presplit-no-leases, but this time we are also + // running a (small) workload against n3. Looking at the performance of this + // workload gives us an idea of the impact of follower writes overload on a + // foreground workload. + r.Add(spec("presplit-with-leases", admissionControlFollowerOverloadOpts{ + ioNemesis: true, + kv0N12: true, + kvN12ExtraArgs: "--splits=100", + kv50N3: true, + })) + + // TODO(irfansharif,aaditya): Add variants that enable regular traffic flow + // control. Run variants without follower pausing too. +} + +type admissionControlFollowerOverloadOpts struct { + ioNemesis bool // limit write throughput on s3 (n3) to 20MiB/s + kvN12ExtraArgs string + kv0N12 bool // run kv0 on n1 and n2 + kv50N3 bool // run kv50 on n3 +} + +func runAdmissionControlFollowerOverload( + ctx context.Context, t test.Test, c cluster.Cluster, cfg admissionControlFollowerOverloadOpts, +) { + require.False(t, c.IsLocal()) + + resetSystemdUnits := func() { + for _, cmd := range []string{"stop", "reset-failed"} { + _ = c.RunE(ctx, c.Node(4), "sudo", "systemctl", cmd, "kv-n12") + _ = c.RunE(ctx, c.Node(4), "sudo", "systemctl", cmd, "kv-n3") + } + } + + // Make cluster re-use possible to iterate on this test without making a new + // cluster every time. + resetSystemdUnits() + + // Set up prometheus. + { + clusNodes := c.Range(1, c.Spec().NodeCount-1) + workloadNode := c.Node(c.Spec().NodeCount) + cfg := (&prometheus.Config{}). + WithPrometheusNode(workloadNode.InstallNodes()[0]). + WithGrafanaDashboard("https://go.crdb.dev/p/index-admission-control-grafana"). + WithCluster(clusNodes.InstallNodes()). + WithNodeExporter(clusNodes.InstallNodes()). + WithWorkload("kv-n12", workloadNode.InstallNodes()[0], 2112). // kv-n12 + WithWorkload("kv-n3", workloadNode.InstallNodes()[0], 2113) // kv-n3 (if present) + + require.NoError(t, c.StartGrafana(ctx, t.L(), cfg)) + cleanupFunc := func() { + if err := c.StopGrafana(ctx, t.L(), t.ArtifactsDir()); err != nil { + t.L().ErrorfCtx(ctx, "Error(s) shutting down prom/grafana %s", err) + } + } + defer cleanupFunc() + } + + phaseDuration := time.Hour + + nodes := c.Range(1, 3) + c.Put(ctx, t.Cockroach(), "cockroach") + c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), nodes) + db := c.Conn(ctx, t.L(), 1) + require.NoError(t, WaitFor3XReplication(ctx, t, db)) + + { + _, err := c.Conn(ctx, t.L(), 1).ExecContext(ctx, `SET CLUSTER SETTING admission.kv.pause_replication_io_threshold = 0.8`) + require.NoError(t, err) + } + + if cfg.kv0N12 { + args := strings.Fields("./cockroach workload init kv {pgurl:1}") + args = append(args, strings.Fields(cfg.kvN12ExtraArgs)...) + c.Run(ctx, c.Node(1), args...) + } + if cfg.kv50N3 { + args := strings.Fields("./cockroach workload init kv --db kvn3 {pgurl:1}") + c.Run(ctx, c.Node(1), args...) + } + + // Node 3 should not have any leases (excepting kvn3, if present). + runner := sqlutils.MakeSQLRunner(db) + for _, row := range runner.QueryStr( + t, `SELECT target FROM [ SHOW ZONE CONFIGURATIONS ]`, + ) { + q := `ALTER ` + row[0] + ` CONFIGURE ZONE USING lease_preferences = '[[-node3]]'` + t.L().Printf("%s", q) + _, err := db.Exec(q) + require.NoError(t, err) + } + if cfg.kv50N3 { + q := `ALTER DATABASE kvn3 CONFIGURE ZONE USING lease_preferences = '[[+node3]]', constraints = COPY FROM PARENT` + t.L().Printf("%s", q) + runner.Exec(t, q) + } + + { + var attempts int + for ctx.Err() == nil { + attempts++ + m1 := runner.QueryStr(t, `SELECT range_id FROM crdb_internal.ranges WHERE lease_holder=3 AND database_name != 'kvn3'`) + m2 := runner.QueryStr(t, `SELECT range_id FROM crdb_internal.ranges WHERE lease_holder!=3 AND database_name = 'kvn3'`) + if len(m1)+len(m2) == 0 { + t.L().Printf("done waiting for lease movement") + break + } + if len(m1) > 0 { + t.L().Printf("waiting for %d range leases to move off n3: %v", len(m1), m1) + } + if len(m2) > 0 { + t.L().Printf("waiting for %d range leases to move to n3: %v", len(m2), m2) + } + + time.Sleep(10 * time.Second) + require.Less(t, attempts, 100) + } + } + + if cfg.kv0N12 { + // Deploy workload against the default kv database (which has no leases on + // n3) and let it run for a phase duration. This does not block and keeps + // running even after the test tears down. Initially, the below workload was + // configured for 400 requests per second with 10k blocks, amounting to + // 4mb/s of goodput. Experimentally this was observed to cause (after ~8h) a + // per-store read throughput of ~60mb/s and write throughput of ~140mb/s for + // a total of close to 200mb/s (per store). This was too much for default + // EBS disks (see below) and there was unpredictable performance when + // reprovisioning such volumes with higher throughput, so we run at 2mb/s + // which should translate to ~100mb/s of max sustained combined throughput. + // + // NB: on GCE pd-ssd, we get 30 IOPS/GB of (combined) throughput and + // 0.45MB/(GB*s) for each GB provisioned, so for the 500GB volumes in this + // test 15k IOPS and 225MB/s. + // + // See: https://cloud.google.com/compute/docs/disks/performance#footnote-1 + // + // On AWS, the default EBS volumes have 3000 IOPS and 125MB/s combined + // throughput. Additionally, instances have a throughput limit for talking + // to EBS, see: + // + // https://github.com/cockroachdb/cockroach/issues/82109#issuecomment-1154049976 + deployWorkload := ` +mkdir -p logs && \ +sudo systemd-run --property=Type=exec \ +--property=StandardOutput=file:/home/ubuntu/logs/kv-n12.stdout.log \ +--property=StandardError=file:/home/ubuntu/logs/kv-n12.stderr.log \ +--remain-after-exit --unit kv-n12 -- ./cockroach workload run kv --read-percent 0 \ +--max-rate 400 --concurrency 400 --min-block-bytes 5000 --max-block-bytes 5000 --tolerate-errors {pgurl:1-2}` + c.Run(ctx, c.Node(4), deployWorkload) + } + if cfg.kv50N3 { + // On n3, we run a "trickle" workload that does not add much work to the + // system but which we can use to establish to monitor the impact of the + // overload on the follower to its foreground traffic. All leases for this + // workload are held by n3. + const deployWorkload = ` +sudo systemd-run --property=Type=exec \ +--property=StandardOutput=file:/home/ubuntu/logs/kv-n3.stdout.log \ +--property=StandardError=file:/home/ubuntu/logs/kv-n3.stderr.log \ +--remain-after-exit --unit kv-n3 -- ./cockroach workload run kv --db kvn3 \ +--read-percent 50 --max-rate 100 --concurrency 1000 --min-block-bytes 100 --max-block-bytes 100 \ +--prometheus-port 2113 --tolerate-errors {pgurl:3}` + c.Run(ctx, c.Node(4), deployWorkload) + } + t.L().Printf("deployed workload") + + wait(c.NewMonitor(ctx, nodes), phaseDuration) + + if cfg.ioNemesis { + // Limit write throughput on s3 to 20mb/s. This is not enough to keep up + // with the workload, at least not in the long run, due to write amp. + // + // NB: I happen to have tested this on RAID0 and it doesn't quite behave + // as expected: the limit will be set on the `md0` device: + // + // nvme1n1 259:0 0 500G 0 disk + // └─md0 9:0 0 872.3G 0 raid0 /mnt/data1 + // nvme2n1 259:1 0 372.5G 0 disk + // └─md0 9:0 0 872.3G 0 raid0 /mnt/data1 + // + // and so the actual write throttle is about 2x what was set. + c.Run(ctx, c.Node(3), "sudo", "systemctl", "set-property", "cockroach", "'IOWriteBandwidthMax={store-dir} 20971520'") + t.L().Printf("installed write throughput limit on n3") + } + + wait(c.NewMonitor(ctx, nodes), phaseDuration) + + // TODO(aaditya,irfansharif): collect, assert on, and export metrics, using: + // https://github.com/cockroachdb/cockroach/pull/80724. + // + // Things to check: + // - LSM health of follower (and, to be sure, on other replicas). + // - Latency of a benign read-only workload on the follower. + // - Comparison of baseline perf of kv0 workload before disk nemesis (i.e. + // run first without nemesis, then with nemesis, maybe again without, make + // sure they're all sort of comparable, or report all three, or something + // like that. At first probably just export the overall coefficient of + // variation or something like that and leave detailed interpretation to + // human eyes on roachperf. +} + +func wait(m cluster.Monitor, duration time.Duration) { + m.Go(func(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(duration): + return nil + } + }) + m.Wait() +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go index 5117c2c9f704..e266bc2b388c 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go @@ -64,7 +64,9 @@ type Controller struct { limit tokensPerWorkClass // We maintain flow token buckets for {regular,elastic} work along each - // stream. This is lazily instantiated. + // stream. This is lazily instantiated. mu is held wen adding to the map. + // Readers only need to hold mu, if they don't want to miss a concurrently + // added entry. // // TODO(irfansharif): Sort out the GC story for these buckets. When // streams get closed permanently (tenants get deleted, nodes removed) @@ -246,9 +248,8 @@ func (c *Controller) ReturnTokens( // Inspect is part of the kvflowcontrol.Controller interface. func (c *Controller) Inspect(ctx context.Context) []kvflowinspectpb.Stream { - c.mu.Lock() - defer c.mu.Unlock() - + // NB: we are not acquiring c.mu since we don't care about streams that are + // being concurrently added to the map. var streams []kvflowinspectpb.Stream c.mu.buckets.Range(func(key, value any) bool { stream := key.(kvflowcontrol.Stream) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go index 25edf843233a..40216dacacb1 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go @@ -161,8 +161,6 @@ func newMetrics(c *Controller) *metrics { annotateMetricTemplateWithWorkClass(wc, flowTokensAvailable), func() int64 { sum := int64(0) - c.mu.Lock() - defer c.mu.Unlock() c.mu.buckets.Range(func(key, value any) bool { b := value.(*bucket) sum += int64(b.tokens(wc)) @@ -221,16 +219,16 @@ func newMetrics(c *Controller) *metrics { }, ) + // blockedStreamLogger controls periodic logging of blocked streams in + // WorkClass wc. var blockedStreamLogger = log.Every(30 * time.Second) var buf strings.Builder m.BlockedStreamCount[wc] = metric.NewFunctionalGauge( annotateMetricTemplateWithWorkClass(wc, blockedStreamCount), func() int64 { - shouldLog := blockedStreamLogger.ShouldLog() - + shouldLogBlocked := blockedStreamLogger.ShouldLog() + // count is the metric value. count := int64(0) - c.mu.Lock() - defer c.mu.Unlock() streamStatsCount := 0 // TODO(sumeer): this cap is not ideal. Consider dynamically reducing @@ -241,47 +239,62 @@ func newMetrics(c *Controller) *metrics { b := value.(*bucket) if b.tokens(wc) <= 0 { - count += 1 + count++ - if shouldLog { - if count > 100 { - // TODO(sumeer): this cap is not ideal. - return false // cap output to 100 blocked streams - } + if shouldLogBlocked { + // TODO(sumeer): this cap is not ideal. + const blockedStreamCountCap = 100 if count == 1 { buf.Reset() - } - if count > 1 { + buf.WriteString(stream.String()) + } else if count <= blockedStreamCountCap { buf.WriteString(", ") + buf.WriteString(stream.String()) + } else if count == blockedStreamCountCap+1 { + buf.WriteString(" omitted some due to overflow") } - buf.WriteString(stream.String()) } } - if shouldLog { + // Log stats, which reflect both elastic and regular, when handling + // the elastic metric. The choice of wc == elastic is arbitrary. + // Every 30s this predicate will evaluate to true, and we will log + // all the streams (elastic and regular) that experienced some + // blocking since the last time such logging was done. If a + // high-enough log verbosity is specified, shouldLogBacked will + // always be true, but since this method executes at the frequency + // of scraping the metric, we will still log at a reasonable rate. + if shouldLogBlocked && wc == elastic { + // Get and reset stats regardless of whether we will log this + // stream or not. We want stats to reflect only the last metric + // interval. regularStats, elasticStats := b.getAndResetStats(c.clock.PhysicalTime()) + logStream := false if regularStats.noTokenDuration > 0 || elasticStats.noTokenDuration > 0 { + logStream = true streamStatsCount++ } - if streamStatsCount <= streamStatsCountCap { - var b strings.Builder - fmt.Fprintf(&b, "stream %s was blocked: durations:", stream.String()) - if regularStats.noTokenDuration > 0 { - fmt.Fprintf(&b, " regular %s", regularStats.noTokenDuration.String()) - } - if elasticStats.noTokenDuration > 0 { - fmt.Fprintf(&b, " elastic %s", elasticStats.noTokenDuration.String()) + if logStream { + if streamStatsCount <= streamStatsCountCap { + var b strings.Builder + fmt.Fprintf(&b, "stream %s was blocked: durations:", stream.String()) + if regularStats.noTokenDuration > 0 { + fmt.Fprintf(&b, " regular %s", regularStats.noTokenDuration.String()) + } + if elasticStats.noTokenDuration > 0 { + fmt.Fprintf(&b, " elastic %s", elasticStats.noTokenDuration.String()) + } + fmt.Fprintf(&b, " tokens deducted: regular %s elastic %s", + humanize.IBytes(uint64(regularStats.tokensDeducted)), + humanize.IBytes(uint64(elasticStats.tokensDeducted))) + log.Infof(context.Background(), "%s", redact.SafeString(b.String())) + } else if streamStatsCount == streamStatsCountCap+1 { + log.Infof(context.Background(), "skipped logging some streams that were blocked") } - fmt.Fprintf(&b, " tokens deducted: regular %s elastic %s", - humanize.Bytes(uint64(regularStats.tokensDeducted)), - humanize.Bytes(uint64(elasticStats.tokensDeducted))) - log.Infof(context.Background(), "%s", redact.SafeString(b.String())) - } else if streamStatsCount == streamStatsCountCap+1 { - log.Infof(context.Background(), "skipped logging some streams that were blocked") } } return true }) - if shouldLog && count > 0 { + if shouldLogBlocked && count > 0 { log.Warningf(context.Background(), "%d blocked %s replication stream(s): %s", count, wc, redact.SafeString(buf.String())) } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_test.go index a1b2bd6a6689..dd937721935e 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_test.go @@ -13,6 +13,8 @@ package kvflowcontroller import ( "context" "fmt" + "math" + "regexp" "strings" "testing" "time" @@ -444,6 +446,248 @@ func TestInspectController(t *testing.T) { makeInspectStream(2, 8<<20 /* 8MiB */, 16<<20 /* 16 MiB */)) } +func TestControllerLogging(t *testing.T) { + defer leaktest.AfterTest(t)() + s := log.ScopeWithoutShowLogs(t) + // Causes every call to retrieve the elastic metric to log. + prevVModule := log.GetVModule() + _ = log.SetVModule("kvflowcontroller_metrics=2") + defer func() { _ = log.SetVModule(prevVModule) }() + defer s.Close(t) + + ctx := context.Background() + testStartTs := timeutil.Now() + + makeStream := func(id uint64) kvflowcontrol.Stream { + return kvflowcontrol.Stream{ + TenantID: roachpb.MustMakeTenantID(id), + StoreID: roachpb.StoreID(id), + } + } + makeConnectedStream := func(id uint64) kvflowcontrol.ConnectedStream { + return &mockConnectedStream{ + stream: makeStream(id), + } + } + + st := cluster.MakeTestingClusterSettings() + const numTokens = 1 << 20 /* 1 MiB */ + elasticTokensPerStream.Override(ctx, &st.SV, numTokens) + regularTokensPerStream.Override(ctx, &st.SV, numTokens) + kvflowcontrol.Mode.Override(ctx, &st.SV, int64(kvflowcontrol.ApplyToAll)) + controller := New(metric.NewRegistry(), st, hlc.NewClockForTesting(nil)) + + numBlocked := 0 + createStreamAndExhaustTokens := func(id uint64, checkMetric bool) { + admitted, err := controller.Admit( + ctx, admissionpb.NormalPri, time.Time{}, makeConnectedStream(id)) + require.NoError(t, err) + require.True(t, admitted) + controller.DeductTokens( + ctx, admissionpb.BulkNormalPri, kvflowcontrol.Tokens(numTokens), makeStream(id)) + controller.DeductTokens( + ctx, admissionpb.NormalPri, kvflowcontrol.Tokens(numTokens), makeStream(id)) + if checkMetric { + // This first call will also log. + require.Equal(t, int64(numBlocked+1), controller.metrics.BlockedStreamCount[elastic].Value()) + require.Equal(t, int64(numBlocked+1), controller.metrics.BlockedStreamCount[regular].Value()) + } + numBlocked++ + } + // 1 stream that is blocked. + id := uint64(1) + createStreamAndExhaustTokens(id, true) + // Total 24 streams are blocked. + for id++; id < 25; id++ { + createStreamAndExhaustTokens(id, false) + } + // 25th stream will also be blocked. The detailed stats will only cover an + // arbitrary subset of 20 streams. + log.Infof(ctx, "creating stream id %d", id) + createStreamAndExhaustTokens(id, true) + + // Total 104 streams are blocked. + for id++; id < 105; id++ { + createStreamAndExhaustTokens(id, false) + } + // 105th stream will also be blocked. The blocked stream names will only + // list 100 streams. + log.Infof(ctx, "creating stream id %d", id) + createStreamAndExhaustTokens(id, true) + + log.FlushFiles() + entries, err := log.FetchEntriesFromFiles(testStartTs.UnixNano(), + math.MaxInt64, 2000, + regexp.MustCompile(`kvflowcontroller_metrics\.go|kvflowcontroller_test\.go`), + log.WithMarkedSensitiveData) + require.NoError(t, err) + /* + Log output is: + + stream t1/s1 was blocked: durations: regular 16.083µs elastic 17.875µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + 1 blocked elastic replication stream(s): t1/s1 + 1 blocked regular replication stream(s): t1/s1 + creating stream id 25 + stream t18/s18 was blocked: durations: regular 39.708µs elastic 40.208µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t2/s2 was blocked: durations: regular 113.75µs elastic 219.792µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t14/s14 was blocked: durations: regular 77.625µs elastic 78.167µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t24/s24 was blocked: durations: regular 56.166µs elastic 56.708µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t12/s12 was blocked: durations: regular 107.792µs elastic 108.334µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t3/s3 was blocked: durations: regular 150.916µs elastic 151.5µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t10/s10 was blocked: durations: regular 134.167µs elastic 134.667µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t17/s17 was blocked: durations: regular 121.583µs elastic 122.125µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t4/s4 was blocked: durations: regular 178.417µs elastic 179µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t16/s16 was blocked: durations: regular 146.667µs elastic 147.208µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t1/s1 was blocked: durations: regular 809.833µs elastic 809.833µs tokens deducted: regular 0 B elastic 0 B + stream t20/s20 was blocked: durations: regular 154.083µs elastic 154.625µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t7/s7 was blocked: durations: regular 209.208µs elastic 209.708µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t13/s13 was blocked: durations: regular 199µs elastic 199.583µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t21/s21 was blocked: durations: regular 182.667µs elastic 183.25µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t23/s23 was blocked: durations: regular 188.042µs elastic 188.583µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t5/s5 was blocked: durations: regular 263.75µs elastic 264.375µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t8/s8 was blocked: durations: regular 262µs elastic 262.541µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t19/s19 was blocked: durations: regular 233.334µs elastic 233.875µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t22/s22 was blocked: durations: regular 235µs elastic 235.542µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + skipped logging some streams that were blocked + 25 blocked elastic replication stream(s): t18/s18, t2/s2, t14/s14, t24/s24, t12/s12, t3/s3, t10/s10, t17/s17, t4/s4, t16/s16, t1/s1, t20/s20, t7/s7, t13/s13, t21/s21, t23/s23, t5/s5, t8/s8, t19/s19, t22/s22, t25/s25, t11/s11, t6/s6, t9/s9, t15/s15 + 25 blocked regular replication stream(s): t14/s14, t24/s24, t12/s12, t3/s3, t18/s18, t2/s2, t4/s4, t16/s16, t1/s1, t20/s20, t7/s7, t13/s13, t10/s10, t17/s17, t21/s21, t23/s23, t19/s19, t22/s22, t25/s25, t11/s11, t6/s6, t5/s5, t8/s8, t9/s9, t15/s15 + creating stream id 105 + stream t104/s104 was blocked: durations: regular 14.416µs elastic 14.916µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t3/s3 was blocked: durations: regular 535.75µs elastic 535.75µs tokens deducted: regular 0 B elastic 0 B + stream t90/s90 was blocked: durations: regular 76.25µs elastic 76.834µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t1/s1 was blocked: durations: regular 500.667µs elastic 500.667µs tokens deducted: regular 0 B elastic 0 B + stream t105/s105 was blocked: durations: regular 43.708µs elastic 44.416µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t54/s54 was blocked: durations: regular 239.542µs elastic 240.083µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t96/s96 was blocked: durations: regular 100.708µs elastic 101.25µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t97/s97 was blocked: durations: regular 109.416µs elastic 110µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t68/s68 was blocked: durations: regular 222.417µs elastic 223µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t45/s45 was blocked: durations: regular 320.25µs elastic 320.792µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t94/s94 was blocked: durations: regular 153.167µs elastic 153.75µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t65/s65 was blocked: durations: regular 266.916µs elastic 267.458µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t14/s14 was blocked: durations: regular 690.875µs elastic 690.875µs tokens deducted: regular 0 B elastic 0 B + stream t8/s8 was blocked: durations: regular 547.167µs elastic 547.167µs tokens deducted: regular 0 B elastic 0 B + stream t6/s6 was blocked: durations: regular 517.875µs elastic 517.875µs tokens deducted: regular 0 B elastic 0 B + stream t55/s55 was blocked: durations: regular 360.583µs elastic 361.166µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + stream t5/s5 was blocked: durations: regular 600.042µs elastic 600.042µs tokens deducted: regular 0 B elastic 0 B + stream t15/s15 was blocked: durations: regular 566.875µs elastic 566.875µs tokens deducted: regular 0 B elastic 0 B + stream t13/s13 was blocked: durations: regular 674.459µs elastic 674.459µs tokens deducted: regular 0 B elastic 0 B + stream t32/s32 was blocked: durations: regular 517µs elastic 517.542µs tokens deducted: regular 1.0 MiB elastic 2.0 MiB + skipped logging some streams that were blocked + 105 blocked elastic replication stream(s): t104/s104, t3/s3, t90/s90, t1/s1, t105/s105, t54/s54, t96/s96, t97/s97, t68/s68, t45/s45, t94/s94, t65/s65, t14/s14, t8/s8, t6/s6, t55/s55, t5/s5, t15/s15, t13/s13, t32/s32, t79/s79, t20/s20, t11/s11, t24/s24, t48/s48, t84/s84, t99/s99, t46/s46, t37/s37, t12/s12, t26/s26, t44/s44, t75/s75, t21/s21, t49/s49, t82/s82, t91/s91, t81/s81, t51/s51, t74/s74, t22/s22, t19/s19, t25/s25, t83/s83, t47/s47, t58/s58, t76/s76, t9/s9, t103/s103, t33/s33, t62/s62, t38/s38, t95/s95, t52/s52, t85/s85, t53/s53, t72/s72, t102/s102, t10/s10, t59/s59, t73/s73, t42/s42, t66/s66, t29/s29, t92/s92, t100/s100, t27/s27, t86/s86, t2/s2, t63/s63, t43/s43, t101/s101, t69/s69, t39/s39, t28/s28, t36/s36, t35/s35, t93/s93, t87/s87, t57/s57, t56/s56, t98/s98, t41/s41, t50/s50, t78/s78, t88/s88, t89/s89, t4/s4, t60/s60, t70/s70, t67/s67, t23/s23, t71/s71, t34/s34, t16/s16, t64/s64, t17/s17, t40/s40, t7/s7, t61/s61 omitted some due to overflow + 105 blocked regular replication stream(s): t11/s11, t84/s84, t24/s24, t48/s48, t99/s99, t37/s37, t12/s12, t46/s46, t21/s21, t49/s49, t82/s82, t26/s26, t44/s44, t75/s75, t91/s91, t51/s51, t81/s81, t74/s74, t25/s25, t83/s83, t47/s47, t22/s22, t19/s19, t58/s58, t76/s76, t9/s9, t103/s103, t62/s62, t33/s33, t52/s52, t85/s85, t38/s38, t95/s95, t10/s10, t59/s59, t53/s53, t72/s72, t102/s102, t66/s66, t29/s29, t73/s73, t42/s42, t100/s100, t27/s27, t92/s92, t2/s2, t86/s86, t43/s43, t101/s101, t69/s69, t63/s63, t28/s28, t36/s36, t35/s35, t39/s39, t87/s87, t93/s93, t57/s57, t56/s56, t41/s41, t98/s98, t78/s78, t88/s88, t89/s89, t4/s4, t50/s50, t60/s60, t70/s70, t67/s67, t34/s34, t16/s16, t64/s64, t17/s17, t23/s23, t71/s71, t7/s7, t61/s61, t40/s40, t18/s18, t30/s30, t31/s31, t77/s77, t80/s80, t104/s104, t3/s3, t90/s90, t1/s1, t105/s105, t97/s97, t68/s68, t54/s54, t96/s96, t65/s65, t45/s45, t94/s94, t6/s6, t14/s14, t8/s8, t15/s15, t13/s13 omitted some due to overflow + */ + + blockedStreamRegexp, err := regexp.Compile( + "stream .* was blocked: durations: regular .* elastic .* tokens deducted: regular .* elastic .*") + require.NoError(t, err) + blockedStreamSkippedRegexp, err := regexp.Compile( + "skipped logging some streams that were blocked") + require.NoError(t, err) + const blockedCountElasticRegexp = "%d blocked elastic replication stream" + const blockedCountRegularRegexp = "%d blocked regular replication stream" + blocked1ElasticRegexp, err := regexp.Compile(fmt.Sprintf(blockedCountElasticRegexp, 1)) + require.NoError(t, err) + blocked1RegularRegexp, err := regexp.Compile(fmt.Sprintf(blockedCountRegularRegexp, 1)) + require.NoError(t, err) + blocked25ElasticRegexp, err := regexp.Compile(fmt.Sprintf(blockedCountElasticRegexp, 25)) + require.NoError(t, err) + blocked25RegularRegexp, err := regexp.Compile(fmt.Sprintf(blockedCountRegularRegexp, 25)) + require.NoError(t, err) + blocked105ElasticRegexp, err := regexp.Compile( + "105 blocked elastic replication stream.* omitted some due to overflow") + require.NoError(t, err) + blocked105RegularRegexp, err := regexp.Compile( + "105 blocked regular replication stream.* omitted some due to overflow") + require.NoError(t, err) + + const creatingRegexp = "creating stream id %d" + creating25Regexp, err := regexp.Compile(fmt.Sprintf(creatingRegexp, 25)) + require.NoError(t, err) + creating105Regexp, err := regexp.Compile(fmt.Sprintf(creatingRegexp, 105)) + require.NoError(t, err) + + blockedStreamCount := 0 + foundBlockedElastic := false + foundBlockedRegular := false + foundBlockedStreamSkipped := false + // First section of the log where 1 stream blocked. Entries are in reverse + // chronological order. + index := len(entries) - 1 + for ; index >= 0; index-- { + entry := entries[index] + if creating25Regexp.MatchString(entry.Message) { + break + } + if blockedStreamRegexp.MatchString(entry.Message) { + blockedStreamCount++ + } + if blocked1ElasticRegexp.MatchString(entry.Message) { + foundBlockedElastic = true + } + if blocked1RegularRegexp.MatchString(entry.Message) { + foundBlockedRegular = true + } + if blockedStreamSkippedRegexp.MatchString(entry.Message) { + foundBlockedStreamSkipped = true + } + } + require.Equal(t, 1, blockedStreamCount) + require.True(t, foundBlockedElastic) + require.True(t, foundBlockedRegular) + require.False(t, foundBlockedStreamSkipped) + + blockedStreamCount = 0 + foundBlockedElastic = false + foundBlockedRegular = false + // Second section of the log where 25 streams blocked. + for ; index >= 0; index-- { + entry := entries[index] + if creating105Regexp.MatchString(entry.Message) { + break + } + if blockedStreamRegexp.MatchString(entry.Message) { + blockedStreamCount++ + } + if blocked25ElasticRegexp.MatchString(entry.Message) { + foundBlockedElastic = true + } + if blocked25RegularRegexp.MatchString(entry.Message) { + foundBlockedRegular = true + } + if blockedStreamSkippedRegexp.MatchString(entry.Message) { + foundBlockedStreamSkipped = true + } + } + require.Equal(t, 20, blockedStreamCount) + require.True(t, foundBlockedElastic) + require.True(t, foundBlockedRegular) + require.True(t, foundBlockedStreamSkipped) + + blockedStreamCount = 0 + foundBlockedElastic = false + foundBlockedRegular = false + // Third section of the log where 105 streams blocked. + for ; index >= 0; index-- { + entry := entries[index] + if blockedStreamRegexp.MatchString(entry.Message) { + blockedStreamCount++ + } + if blocked105ElasticRegexp.MatchString(entry.Message) { + foundBlockedElastic = true + } + if blocked105RegularRegexp.MatchString(entry.Message) { + foundBlockedRegular = true + } + if blockedStreamSkippedRegexp.MatchString(entry.Message) { + foundBlockedStreamSkipped = true + } + } + require.Equal(t, 20, blockedStreamCount) + require.True(t, foundBlockedElastic) + require.True(t, foundBlockedRegular) + require.True(t, foundBlockedStreamSkipped) +} + type mockConnectedStream struct { stream kvflowcontrol.Stream } diff --git a/pkg/util/admission/admissionpb/admissionpb.go b/pkg/util/admission/admissionpb/admissionpb.go index c23f2a77e29e..53d7f01c311d 100644 --- a/pkg/util/admission/admissionpb/admissionpb.go +++ b/pkg/util/admission/admissionpb/admissionpb.go @@ -208,9 +208,9 @@ func (w WorkClass) SafeFormat(p redact.SafePrinter, verb rune) { case RegularWorkClass: p.Printf("regular") case ElasticWorkClass: - p.Print("elastic") + p.Printf("elastic") default: - p.Print("") + p.Printf("") } }