diff --git a/examples/common/scripts/vttablet-up.sh b/examples/common/scripts/vttablet-up.sh index 0e70837d235..d22fd33ab48 100755 --- a/examples/common/scripts/vttablet-up.sh +++ b/examples/common/scripts/vttablet-up.sh @@ -46,7 +46,6 @@ vttablet \ --init_shard $shard \ --init_tablet_type $tablet_type \ --health_check_interval 5s \ - --enable_replication_reporter \ --backup_storage_implementation file \ --file_backup_storage_root $VTDATAROOT/backups \ --restore_from_backup \ @@ -56,6 +55,7 @@ vttablet \ --pid_file $VTDATAROOT/$tablet_dir/vttablet.pid \ --vtctld_addr http://$hostname:$vtctld_web_port/ \ --disable_active_reparents \ + --throttler-config-via-topo --heartbeat_enable --heartbeat_interval=250ms --heartbeat_on_demand_duration=5s \ > $VTDATAROOT/$tablet_dir/vttablet.out 2>&1 & # Block waiting for the tablet to be listening diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index cc3d4a275d0..32160387009 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -300,7 +300,7 @@ Usage of vttablet: --tablet_protocol string Protocol to use to make queryservice RPCs to vttablets. (default "grpc") --throttle_check_as_check_self Should throttler/check return a throttler/check-self result (changes throttler behavior for writes) --throttle_metrics_query SELECT Override default heartbeat/lag metric. Use either SELECT (must return single row, single value) or `SHOW GLOBAL ... LIKE ...` queries. Set -throttle_metrics_threshold respectively. - --throttle_metrics_threshold float Override default throttle threshold, respective to -throttle_metrics_query (default 1.7976931348623157e+308) + --throttle_metrics_threshold float Override default throttle threshold, respective to --throttle_metrics_query (default 1.7976931348623157e+308) --throttle_tablet_types string Comma separated VTTablet types to be considered by the throttler. default: 'replica'. example: 'replica,rdonly'. 'replica' aways implicitly included (default "replica") --throttle_threshold duration Replication lag threshold for default lag throttling (default 1s) --throttler-config-via-topo When 'true', read config from topo service and ignore throttle_threshold, throttle_metrics_threshold, throttle_metrics_query, throttle_check_as_check_self diff --git a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go index 001ded3d9cb..ec6eed14f17 100644 --- a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go +++ b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go @@ -30,6 +30,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/onlineddl" + "vitess.io/vitess/go/test/endtoend/throttler" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" @@ -259,13 +260,13 @@ func TestSchemaChange(t *testing.T) { err := clusterInstance.WaitForTabletsToHealthyInVtgate() require.NoError(t, err) - _, err = onlineddl.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, "", false) + _, err = throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, "", false) require.NoError(t, err) for _, ks := range clusterInstance.Keyspaces { for _, shard := range ks.Shards { for _, tablet := range shard.Vttablets { - onlineddl.WaitForThrottlerStatusEnabled(t, tablet, extendedMigrationWait) + throttler.WaitForThrottlerStatusEnabled(t, tablet, true, nil, extendedMigrationWait) } } } diff --git a/go/test/endtoend/onlineddl/vtctlutil.go b/go/test/endtoend/onlineddl/vtctlutil.go index 62cc681eab1..d2da7327e3a 100644 --- a/go/test/endtoend/onlineddl/vtctlutil.go +++ b/go/test/endtoend/onlineddl/vtctlutil.go @@ -17,8 +17,6 @@ limitations under the License. package onlineddl import ( - "context" - "fmt" "testing" "time" @@ -36,51 +34,3 @@ func CheckCancelAllMigrationsViaVtctl(t *testing.T, vtctlclient *cluster.VtctlCl _, err := vtctlclient.ApplySchemaWithOutput(keyspace, cancelQuery, cluster.VtctlClientParams{SkipPreflight: true}) assert.NoError(t, err) } - -// UpdateThrottlerTopoConfig runs vtctlclient UpdateThrottlerConfig. -// This retries the command until it succeeds or times out as the -// SrvKeyspace record may not yet exist for a newly created -// Keyspace that is still initializing before it becomes serving. -func UpdateThrottlerTopoConfig(clusterInstance *cluster.LocalProcessCluster, enable bool, disable bool, threshold float64, metricsQuery string, viaVtctldClient bool) (result string, err error) { - args := []string{} - clientfunc := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput - if !viaVtctldClient { - args = append(args, "--") - clientfunc = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput - } - args = append(args, "UpdateThrottlerConfig") - if enable { - args = append(args, "--enable") - } - if disable { - args = append(args, "--disable") - } - if threshold > 0 { - args = append(args, "--threshold", fmt.Sprintf("%f", threshold)) - } - if metricsQuery != "" { - args = append(args, "--custom-query", metricsQuery) - args = append(args, "--check-as-check-self") - } else { - args = append(args, "--check-as-check-shard") - } - args = append(args, clusterInstance.Keyspaces[0].Name) - - ctx, cancel := context.WithTimeout(context.Background(), throttlerConfigTimeout) - defer cancel() - - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - - for { - result, err = clientfunc(args...) - if err == nil { - return result, nil - } - select { - case <-ctx.Done(): - return "", fmt.Errorf("timed out waiting for UpdateThrottlerConfig to succeed after %v. Last seen value: %+v, error: %v", throttlerConfigTimeout, result, err) - case <-ticker.C: - } - } -} diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index 78daaed63a0..fe87262a21f 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -31,12 +31,22 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" "vitess.io/vitess/go/test/endtoend/cluster" - "vitess.io/vitess/go/test/endtoend/onlineddl" + "vitess.io/vitess/go/test/endtoend/throttler" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +const ( + customQuery = "show global status like 'threads_running'" + customThreshold = 5 * time.Second + unreasonablyLowThreshold = 1 * time.Millisecond + extremelyHighThreshold = 1 * time.Hour + onDemandHeartbeatDuration = 5 * time.Second + throttlerEnabledTimeout = 60 * time.Second + useDefaultQuery = "" +) + var ( clusterInstance *cluster.LocalProcessCluster primaryTablet *cluster.Vttablet @@ -77,16 +87,10 @@ var ( throttledAppsAPIPath = "throttler/throttled-apps" checkAPIPath = "throttler/check" checkSelfAPIPath = "throttler/check-self" - customQuery = "show global status like 'threads_running'" - customThreshold = 5 -) - -const ( - throttlerThreshold = 1 * time.Second // standard, tight threshold - unreasonablyLowThreshold = 1 * time.Millisecond - extremelyHighThreshold = 1 * time.Hour - onDemandHeartbeatDuration = 5 * time.Second - applyConfigWait = 15 * time.Second // time after which we're sure the throttler has refreshed config and tablets + getResponseBody = func(resp *http.Response) string { + body, _ := io.ReadAll(resp.Body) + return string(body) + } ) func TestMain(m *testing.M) { @@ -109,7 +113,6 @@ func TestMain(m *testing.M) { "--watch_replication_stream", "--enable_replication_reporter", "--throttler-config-via-topo", - "--throttle_threshold", throttlerThreshold.String(), "--heartbeat_enable", "--heartbeat_interval", "250ms", "--heartbeat_on_demand_duration", onDemandHeartbeatDuration.String(), @@ -191,7 +194,7 @@ func warmUpHeartbeat(t *testing.T) (respStatus int) { // waitForThrottleCheckStatus waits for the tablet to return the provided HTTP code in a throttle check func waitForThrottleCheckStatus(t *testing.T, tablet *cluster.Vttablet, wantCode int) { _ = warmUpHeartbeat(t) - ctx, cancel := context.WithTimeout(context.Background(), onDemandHeartbeatDuration+applyConfigWait) + ctx, cancel := context.WithTimeout(context.Background(), onDemandHeartbeatDuration*4) defer cancel() for { @@ -210,7 +213,7 @@ func waitForThrottleCheckStatus(t *testing.T, tablet *cluster.Vttablet, wantCode require.NoError(t, err) resp.Body.Close() - assert.Equal(t, wantCode, resp.StatusCode, "body: %v", string(b)) + assert.Equalf(t, wantCode, resp.StatusCode, "body: %s", string(b)) return default: resp.Body.Close() @@ -243,37 +246,64 @@ func TestInitialThrottler(t *testing.T) { t.Run("validating OK response from disabled throttler", func(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) }) - t.Run("enabling throttler with low threshold", func(t *testing.T) { - _, err := onlineddl.UpdateThrottlerTopoConfig(clusterInstance, true, false, unreasonablyLowThreshold.Seconds(), "", false) + t.Run("enabling throttler with very low threshold", func(t *testing.T) { + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, unreasonablyLowThreshold.Seconds(), useDefaultQuery, false) assert.NoError(t, err) + + // Wait for the throttler to be enabled everywhere with the new config. + for _, tablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets { + throttler.WaitForThrottlerStatusEnabled(t, tablet, true, &throttler.Config{Query: throttler.DefaultQuery, Threshold: unreasonablyLowThreshold.Seconds()}, throttlerEnabledTimeout) + } }) t.Run("validating pushback response from throttler", func(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests) }) t.Run("disabling throttler", func(t *testing.T) { - _, err := onlineddl.UpdateThrottlerTopoConfig(clusterInstance, false, true, unreasonablyLowThreshold.Seconds(), "", false) + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, true, unreasonablyLowThreshold.Seconds(), useDefaultQuery, false) assert.NoError(t, err) + + // Wait for the throttler to be disabled everywhere. + for _, tablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets { + throttler.WaitForThrottlerStatusEnabled(t, tablet, false, nil, throttlerEnabledTimeout) + } }) t.Run("validating OK response from disabled throttler, again", func(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) }) t.Run("enabling throttler, again", func(t *testing.T) { - _, err := onlineddl.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, "", true) + // Enable the throttler again with the default query which also moves us back + // to the default threshold. + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, useDefaultQuery, true) assert.NoError(t, err) + + // Wait for the throttler to be enabled everywhere again with the default config. + for _, tablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets { + throttler.WaitForThrottlerStatusEnabled(t, tablet, true, throttler.DefaultConfig, throttlerEnabledTimeout) + } }) t.Run("validating pushback response from throttler, again", func(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests) }) t.Run("setting high threshold", func(t *testing.T) { - _, err := onlineddl.UpdateThrottlerTopoConfig(clusterInstance, false, false, extremelyHighThreshold.Seconds(), "", true) + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, extremelyHighThreshold.Seconds(), useDefaultQuery, true) assert.NoError(t, err) + + // Wait for the throttler to be enabled everywhere with new config. + for _, tablet := range []cluster.Vttablet{*primaryTablet, *replicaTablet} { + throttler.WaitForThrottlerStatusEnabled(t, &tablet, true, &throttler.Config{Query: throttler.DefaultQuery, Threshold: extremelyHighThreshold.Seconds()}, throttlerEnabledTimeout) + } }) t.Run("validating OK response from throttler with high threshold", func(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) }) t.Run("setting low threshold", func(t *testing.T) { - _, err := onlineddl.UpdateThrottlerTopoConfig(clusterInstance, false, false, throttlerThreshold.Seconds(), "", true) + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery, true) assert.NoError(t, err) + + // Wait for the throttler to be enabled everywhere with new config. + for _, tablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets { + throttler.WaitForThrottlerStatusEnabled(t, tablet, true, throttler.DefaultConfig, throttlerEnabledTimeout) + } }) t.Run("validating pushback response from throttler on low threshold", func(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests) @@ -287,16 +317,17 @@ func TestInitialThrottler(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) t.Run("validating OK response from throttler with low threshold, heartbeats running still", func(t *testing.T) { time.Sleep(1 * time.Second) resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) t.Run("validating pushback response from throttler on low threshold once heartbeats go stale", func(t *testing.T) { + time.Sleep(2 * onDemandHeartbeatDuration) // just... really wait long enough, make sure on-demand stops waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests) }) } @@ -306,7 +337,6 @@ func TestThrottlerAfterMetricsCollected(t *testing.T) { // By this time metrics will have been collected. We expect no lag, and something like: // {"StatusCode":200,"Value":0.282278,"Threshold":1,"Message":""} - // t.Run("validating throttler OK", func(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) }) @@ -314,50 +344,55 @@ func TestThrottlerAfterMetricsCollected(t *testing.T) { resp, body, err := throttledApps(primaryTablet) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) assert.Contains(t, body, "always-throttled-app") }) t.Run("validating primary check self", func(t *testing.T) { resp, err := throttleCheckSelf(primaryTablet) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) t.Run("validating replica check self", func(t *testing.T) { resp, err := throttleCheckSelf(replicaTablet) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) } func TestLag(t *testing.T) { defer cluster.PanicHandler(t) + // Temporarily disable VTOrc recoveries because we want to + // STOP replication specifically in order to increase the + // lag and we DO NOT want VTOrc to try and fix this. + clusterInstance.DisableVTOrcRecoveries(t) + defer clusterInstance.EnableVTOrcRecoveries(t) t.Run("stopping replication", func(t *testing.T) { err := clusterInstance.VtctlclientProcess.ExecuteCommand("StopReplication", replicaTablet.Alias) assert.NoError(t, err) }) t.Run("accumulating lag, expecting throttler push back", func(t *testing.T) { - time.Sleep(2 * throttlerThreshold) + time.Sleep(2 * throttler.DefaultThreshold) resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode) + assert.Equalf(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) t.Run("primary self-check should still be fine", func(t *testing.T) { resp, err := throttleCheckSelf(primaryTablet) require.NoError(t, err) defer resp.Body.Close() // self (on primary) is unaffected by replication lag - assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) t.Run("replica self-check should show error", func(t *testing.T) { resp, err := throttleCheckSelf(replicaTablet) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode) + assert.Equalf(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) t.Run("starting replication", func(t *testing.T) { err := clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", replicaTablet.Alias) @@ -371,13 +406,13 @@ func TestLag(t *testing.T) { require.NoError(t, err) defer resp.Body.Close() // self (on primary) is unaffected by replication lag - assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) t.Run("replica self-check should be fine", func(t *testing.T) { resp, err := throttleCheckSelf(replicaTablet) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) } @@ -392,7 +427,6 @@ func TestNoReplicas(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) }) t.Run("restoring to REPLICA", func(t *testing.T) { - err := clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "REPLICA") assert.NoError(t, err) @@ -403,25 +437,26 @@ func TestNoReplicas(t *testing.T) { func TestCustomQuery(t *testing.T) { defer cluster.PanicHandler(t) - t.Run("enabling throttler with low threshold", func(t *testing.T) { - _, err := onlineddl.UpdateThrottlerTopoConfig(clusterInstance, true, false, float64(customThreshold), customQuery, false) + t.Run("enabling throttler with custom query and threshold", func(t *testing.T) { + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, customThreshold.Seconds(), customQuery, false) assert.NoError(t, err) - time.Sleep(applyConfigWait) + + // Wait for the throttler to be enabled everywhere with new custom config. + for _, tablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets { + throttler.WaitForThrottlerStatusEnabled(t, tablet, true, &throttler.Config{Query: customQuery, Threshold: customThreshold.Seconds()}, throttlerEnabledTimeout) + } }) t.Run("validating OK response from throttler with custom query", func(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - - b, err := io.ReadAll(resp.Body) - assert.NoError(t, err) - assert.Equal(t, http.StatusOK, resp.StatusCode, "response: %v", string(b)) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) t.Run("test threads running", func(t *testing.T) { - sleepDuration := 10 * time.Second + sleepDuration := 20 * time.Second var wg sync.WaitGroup - for i := 0; i < customThreshold; i++ { - // generate different Sleep() calls, all at minimum sleepDuration + for i := 0; i < int(customThreshold.Seconds()); i++ { + // Generate different Sleep() calls, all at minimum sleepDuration. wg.Add(1) go func(i int) { defer wg.Done() @@ -429,26 +464,24 @@ func TestCustomQuery(t *testing.T) { }(i) } t.Run("exceeds threshold", func(t *testing.T) { - time.Sleep(sleepDuration / 2) - // by this time we will have testThreshold+1 threads_running, and we should hit the threshold - // {"StatusCode":429,"Value":2,"Threshold":2,"Message":"Threshold exceeded"} + throttler.WaitForQueryResult(t, primaryTablet, + "select if(variable_value > 5, 'true', 'false') as result from performance_schema.global_status where variable_name='threads_running'", + "true", sleepDuration/3) + throttler.WaitForValidData(t, primaryTablet, sleepDuration-(5*time.Second)) + // Now we should be reporting ~ customThreshold*2 threads_running, and we should + // hit the threshold. For example: + // {"StatusCode":429,"Value":6,"Threshold":5,"Message":"Threshold exceeded"} { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - - b, err := io.ReadAll(resp.Body) - assert.NoError(t, err) - assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode, "response: %v", string(b)) + assert.Equalf(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) } { resp, err := throttleCheckSelf(primaryTablet) require.NoError(t, err) defer resp.Body.Close() - - b, err := io.ReadAll(resp.Body) - assert.NoError(t, err) - assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode, "response: %v", string(b)) + assert.Equalf(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) } }) t.Run("wait for queries to terminate", func(t *testing.T) { @@ -460,34 +493,39 @@ func TestCustomQuery(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) } { resp, err := throttleCheckSelf(primaryTablet) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) } }) }) } func TestRestoreDefaultQuery(t *testing.T) { - // validte going back from custom-query to default-query (replication lag) still works defer cluster.PanicHandler(t) - t.Run("enabling throttler with standard threshold", func(t *testing.T) { - _, err := onlineddl.UpdateThrottlerTopoConfig(clusterInstance, true, false, throttlerThreshold.Seconds(), "", false) + // Validate going back from custom-query to default-query (replication lag) still works. + t.Run("enabling throttler with default query and threshold", func(t *testing.T) { + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery, false) assert.NoError(t, err) + + // Wait for the throttler to be up and running everywhere again with the default config. + for _, tablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets { + throttler.WaitForThrottlerStatusEnabled(t, tablet, true, throttler.DefaultConfig, throttlerEnabledTimeout) + } }) - t.Run("validating OK response from throttler with low threshold, heartbeats running", func(t *testing.T) { - waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) - }) - t.Run("validating pushback response from throttler on low threshold once heartbeats go stale", func(t *testing.T) { - time.Sleep(2 * onDemandHeartbeatDuration) // just... really wait long enough, make sure on-demand stops + t.Run("validating OK response from throttler with default threshold, heartbeats running", func(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) + }) + t.Run("validating pushback response from throttler on default threshold once heartbeats go stale", func(t *testing.T) { + time.Sleep(2 * onDemandHeartbeatDuration) // just... really wait long enough, make sure on-demand stops + waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests) }) } diff --git a/go/test/endtoend/throttler/util.go b/go/test/endtoend/throttler/util.go new file mode 100644 index 00000000000..e8769999fc1 --- /dev/null +++ b/go/test/endtoend/throttler/util.go @@ -0,0 +1,212 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package throttler + +import ( + "context" + "fmt" + "io" + "net/http" + "testing" + "time" + + "github.com/buger/jsonparser" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/vt/log" +) + +type Config struct { + Query string + Threshold float64 +} + +const ( + DefaultQuery = "select unix_timestamp(now(6))-max(ts/1000000000) as replication_lag from _vt.heartbeat" + DefaultThreshold = 1 * time.Second + ConfigTimeout = 60 * time.Second +) + +var DefaultConfig = &Config{ + Query: DefaultQuery, + Threshold: DefaultThreshold.Seconds(), +} + +// UpdateThrottlerTopoConfig runs vtctlclient UpdateThrottlerConfig. +// This retries the command until it succeeds or times out as the +// SrvKeyspace record may not yet exist for a newly created +// Keyspace that is still initializing before it becomes serving. +func UpdateThrottlerTopoConfig(clusterInstance *cluster.LocalProcessCluster, enable bool, disable bool, threshold float64, metricsQuery string, viaVtctldClient bool) (result string, err error) { + args := []string{} + clientfunc := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput + if !viaVtctldClient { + args = append(args, "--") + clientfunc = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput + } + args = append(args, "UpdateThrottlerConfig") + if enable { + args = append(args, "--enable") + } + if disable { + args = append(args, "--disable") + } + if threshold > 0 { + args = append(args, "--threshold", fmt.Sprintf("%f", threshold)) + } + args = append(args, "--custom-query", metricsQuery) + if metricsQuery != "" { + args = append(args, "--check-as-check-self") + } else { + args = append(args, "--check-as-check-shard") + } + args = append(args, clusterInstance.Keyspaces[0].Name) + + ctx, cancel := context.WithTimeout(context.Background(), ConfigTimeout) + defer cancel() + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + result, err = clientfunc(args...) + if err == nil { + return result, nil + } + select { + case <-ctx.Done(): + return "", fmt.Errorf("timed out waiting for UpdateThrottlerConfig to succeed after %v; last seen value: %+v, error: %v", ConfigTimeout, result, err) + case <-ticker.C: + } + } +} + +// WaitForThrottlerStatusEnabled waits for a tablet to report its throttler status as +// enabled/disabled and have the provided config (if any) until the specified timeout. +func WaitForThrottlerStatusEnabled(t *testing.T, tablet *cluster.Vttablet, enabled bool, config *Config, timeout time.Duration) { + enabledJSONPath := "IsEnabled" + queryJSONPath := "Query" + thresholdJSONPath := "Threshold" + url := fmt.Sprintf("http://localhost:%d/throttler/status", tablet.HTTPPort) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + body := getHTTPBody(url) + isEnabled, err := jsonparser.GetBoolean([]byte(body), enabledJSONPath) + require.NoError(t, err) + if isEnabled == enabled { + if config == nil { + return + } + query, err := jsonparser.GetString([]byte(body), queryJSONPath) + require.NoError(t, err) + threshold, err := jsonparser.GetFloat([]byte(body), thresholdJSONPath) + require.NoError(t, err) + if query == config.Query && threshold == config.Threshold { + return + } + } + select { + case <-ctx.Done(): + t.Errorf("timed out waiting for the %s tablet's throttler status enabled to be %t with the correct config after %v; last seen value: %s", + tablet.Alias, enabled, timeout, body) + return + case <-ticker.C: + } + } +} + +func getHTTPBody(url string) string { + resp, err := http.Get(url) + if err != nil { + log.Infof("http Get returns %+v", err) + return "" + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + log.Infof("http Get returns status %d", resp.StatusCode) + return "" + } + respByte, _ := io.ReadAll(resp.Body) + body := string(respByte) + return body +} + +// WaitForQueryResult waits for a tablet to return the given result for the given +// query until the specified timeout. +// This is for simple queries that return 1 column in 1 row. It compares the result +// for that column as a string with the provided result. +func WaitForQueryResult(t *testing.T, tablet *cluster.Vttablet, query, result string, timeout time.Duration) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + res, err := tablet.VttabletProcess.QueryTablet(query, "", false) + require.NoError(t, err) + if res != nil && len(res.Rows) == 1 && res.Rows[0][0].ToString() == result { + return + } + select { + case <-ctx.Done(): + t.Errorf("timed out waiting for the %q query to produce a result of %q on tablet %s after %v; last seen value: %s", + query, result, tablet.Alias, timeout, res.Rows[0][0].ToString()) + return + case <-ticker.C: + } + } +} + +// WaitForValidData waits for a tablet's checks to return a non 500 http response +// which indicates that it's not able to provide valid results. This is most +// commonly caused by the throttler still gathering the initial results for +// the given configuration. +func WaitForValidData(t *testing.T, tablet *cluster.Vttablet, timeout time.Duration) { + checkURL := fmt.Sprintf("http://localhost:%d/throttler/check", tablet.HTTPPort) + selfCheckURL := fmt.Sprintf("http://localhost:%d/throttler/check-self", tablet.HTTPPort) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + + for { + checkResp, checkErr := http.Get(checkURL) + if checkErr != nil { + defer checkResp.Body.Close() + } + selfCheckResp, selfCheckErr := http.Get(selfCheckURL) + if selfCheckErr != nil { + defer selfCheckResp.Body.Close() + } + if checkErr == nil && selfCheckErr == nil && + checkResp.StatusCode != http.StatusInternalServerError && + selfCheckResp.StatusCode != http.StatusInternalServerError { + return + } + select { + case <-ctx.Done(): + t.Errorf("timed out waiting for %s tablet's throttler to return a valid result after %v; last seen value: %+v", + tablet.Alias, timeout, checkResp) + return + case <-ticker.C: + } + } +} diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index df415ddf582..9b04dd4bb21 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -81,7 +81,7 @@ func registerThrottlerFlags(fs *pflag.FlagSet) { fs.DurationVar(&throttleThreshold, "throttle_threshold", throttleThreshold, "Replication lag threshold for default lag throttling") fs.StringVar(&throttleMetricQuery, "throttle_metrics_query", throttleMetricQuery, "Override default heartbeat/lag metric. Use either `SELECT` (must return single row, single value) or `SHOW GLOBAL ... LIKE ...` queries. Set -throttle_metrics_threshold respectively.") - fs.Float64Var(&throttleMetricThreshold, "throttle_metrics_threshold", throttleMetricThreshold, "Override default throttle threshold, respective to -throttle_metrics_query") + fs.Float64Var(&throttleMetricThreshold, "throttle_metrics_threshold", throttleMetricThreshold, "Override default throttle threshold, respective to --throttle_metrics_query") fs.BoolVar(&throttlerCheckAsCheckSelf, "throttle_check_as_check_self", throttlerCheckAsCheckSelf, "Should throttler/check return a throttler/check-self result (changes throttler behavior for writes)") fs.BoolVar(&throttlerConfigViaTopo, "throttler-config-via-topo", throttlerConfigViaTopo, "When 'true', read config from topo service and ignore throttle_threshold, throttle_metrics_threshold, throttle_metrics_query, throttle_check_as_check_self") } @@ -164,6 +164,9 @@ type ThrottlerStatus struct { IsEnabled bool IsDormant bool + Query string + Threshold float64 + AggregatedMetrics map[string]base.MetricResult MetricsHealth base.MetricHealthMap } @@ -256,6 +259,10 @@ func (throttler *Throttler) GetMetricsQuery() string { return throttler.metricsQuery.Load().(string) } +func (throttler *Throttler) GetMetricsThreshold() float64 { + return throttler.MetricsThreshold.Get() +} + // initThrottler initializes config func (throttler *Throttler) initConfig() { log.Infof("Throttler: initializing config") @@ -310,8 +317,9 @@ func (throttler *Throttler) WatchSrvKeyspaceCallback(srvks *topodatapb.SrvKeyspa } throttlerConfig := throttler.normalizeThrottlerConfig(srvks.ThrottlerConfig) - if throttler.isEnabled > 0 { - // throttler is running and we should apply the config change through Operate() or else we get into race conditions + if throttler.IsEnabled() { + // Throttler is running and we should apply the config change through Operate() + // or else we get into race conditions. go func() { throttler.throttlerConfigChan <- throttlerConfig }() @@ -1031,6 +1039,9 @@ func (throttler *Throttler) Status() *ThrottlerStatus { IsEnabled: (atomic.LoadInt64(&throttler.isEnabled) > 0), IsDormant: throttler.isDormant(), + Query: throttler.GetMetricsQuery(), + Threshold: throttler.GetMetricsThreshold(), + AggregatedMetrics: throttler.aggregatedMetricsSnapshot(), MetricsHealth: throttler.metricsHealthSnapshot(), }