From c724737593281935890ccd66fa4248f8e565eea2 Mon Sep 17 00:00:00 2001 From: Xin Hao Zhang Date: Mon, 30 Aug 2021 11:19:37 -0400 Subject: [PATCH 1/2] sql: deflake TestStatusAPIStatements and TestStatusAPICombinedStatements Fixes: #69557 As mentioned in #69533, we have a race condition in tests where we request statements stats with start=now. In these tests we expect to see no results, but because in-memory stats have the aggregated_ts field set on iterator return, depending on the time the test is run we might see results returned. For example, suppose we insert stats at 15:45. If we then request stats at 16:00, requesting only stats aggregated after or at the current time, i.e. start=16:00, the aggregated_ts for in-memory stats will be set to 16:00 and thus we will see results returned. To deflake these tests, we stub the aggregated_ts field to a predetermined value. Release justification: non-production code changes Release note: None --- pkg/server/status_test.go | 39 ++++++++++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index 3472012e7df9..aa09eee7ade0 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -1813,9 +1813,17 @@ func TestStatusAPIStatements(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - params, _ := tests.CreateTestServerParams() + // Aug 30 2021 19:50:00 GMT+0000 + aggregatedTs := int64(1630353000) testCluster := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{ - ServerArgs: params, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SQLStatsKnobs: &sqlstats.TestingKnobs{ + AOSTClause: "AS OF SYSTEM TIME '-1us'", + StubTimeNow: func() time.Time { return timeutil.Unix(aggregatedTs, 0) }, + }, + }, + }, }) defer testCluster.Stopper().Stop(context.Background()) @@ -1897,17 +1905,24 @@ func TestStatusAPIStatements(t *testing.T) { // Test no params testPath("statements", expectedStatements) // Test combined=true forwards to CombinedStatements - nowInSecs := timeutil.Now().Unix() - testPath(fmt.Sprintf("statements?combined=true&start=%d", nowInSecs), nil) + testPath(fmt.Sprintf("statements?combined=true&start=%d", aggregatedTs+60), nil) } func TestStatusAPICombinedStatements(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - params, _ := tests.CreateTestServerParams() + // Aug 30 2021 19:50:00 GMT+0000 + aggregatedTs := int64(1630353000) testCluster := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{ - ServerArgs: params, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SQLStatsKnobs: &sqlstats.TestingKnobs{ + AOSTClause: "AS OF SYSTEM TIME '-1us'", + StubTimeNow: func() time.Time { return timeutil.Unix(aggregatedTs, 0) }, + }, + }, + }, }) defer testCluster.Stopper().Stop(context.Background()) @@ -1989,11 +2004,13 @@ func TestStatusAPICombinedStatements(t *testing.T) { // Test with no query params testPath("combinedstmts", expectedStatements) - nowInSecs := timeutil.Now().Unix() - // Test with end = now; should give the same results as get all. - testPath(fmt.Sprintf("combinedstmts?end=%d", nowInSecs), expectedStatements) - // Test with start = 1 hour ago end = now; should give the same results as get all. - testPath(fmt.Sprintf("combinedstmts?start=%d&end=%d", nowInSecs-3600, nowInSecs), expectedStatements) + oneMinAfterAggregatedTs := aggregatedTs + 60 + // Test with end = 1 min after aggregatedTs; should give the same results as get all. + testPath(fmt.Sprintf("combinedstmts?end=%d", oneMinAfterAggregatedTs), expectedStatements) + // Test with start = 1 hour before aggregatedTs end = 1 min after aggregatedTs; should give same results as get all. + testPath(fmt.Sprintf("combinedstmts?start=%d&end=%d", aggregatedTs-3600, oneMinAfterAggregatedTs), expectedStatements) + // Test with start = 1 min after aggregatedTs; should give no results + testPath(fmt.Sprintf("combinedstmts?start=%d", oneMinAfterAggregatedTs), nil) } func TestListSessionsSecurity(t *testing.T) { From 3ea128d53acb53ab2d79cdbbc0770376f0697921 Mon Sep 17 00:00:00 2001 From: Azhng Date: Mon, 30 Aug 2021 11:54:07 -0400 Subject: [PATCH 2/2] sql: proper version gate sql stats Previously, SQL Stats's implementation for version gating is faulty. This means that SQL Stats's job monitor would attempt to start sql stats compaction job in an incompatible cluster. This commit fixed the faulty implementation. Resolves #69459 Resolves #69541 Resolves #69544 Resolves #69565 Release justification: Category 2: Bug fixes and low-risk updates to new functionality Release note: None --- .../sqlstats/persistedsqlstats/BUILD.bazel | 2 + .../persistedsqlstats/job_monitor_test.go | 63 +++++++++++++++++++ .../scheduled_job_monitor.go | 15 ++++- 3 files changed, 77 insertions(+), 3 deletions(-) create mode 100644 pkg/sql/sqlstats/persistedsqlstats/job_monitor_test.go diff --git a/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel index b3aa0ad65fed..b1cf70be32f8 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel +++ b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel @@ -58,6 +58,7 @@ go_test( "compaction_test.go", "controller_test.go", "flush_test.go", + "job_monitor_test.go", "main_test.go", "reader_test.go", "scheduled_sql_stats_compaction_test.go", @@ -65,6 +66,7 @@ go_test( deps = [ ":persistedsqlstats", "//pkg/base", + "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/jobs/jobstest", diff --git a/pkg/sql/sqlstats/persistedsqlstats/job_monitor_test.go b/pkg/sql/sqlstats/persistedsqlstats/job_monitor_test.go new file mode 100644 index 000000000000..3837da4ff626 --- /dev/null +++ b/pkg/sql/sqlstats/persistedsqlstats/job_monitor_test.go @@ -0,0 +1,63 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package persistedsqlstats_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +func TestVersionGating(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + params, _ := tests.CreateTestServerParams() + params.Knobs.Server = &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: 1, + BinaryVersionOverride: clusterversion.ByKey(clusterversion.SQLStatsCompactionScheduledJob - 1), + } + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: params, + }) + + defer tc.Stopper().Stop(ctx) + + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0 /* idx */)) + sqlDB.CheckQueryResults(t, + "SELECT count(*) FROM [SHOW SCHEDULES FOR SQL STATISTICS]", + [][]string{{"0"}}) + + sqlDB.Exec(t, `SET CLUSTER SETTING version = $1`, + clusterversion.ByKey(clusterversion.SQLStatsCompactionScheduledJob).String()) + + // Change the recurrence cluster setting to force job monitor to create the + // sql stats compaction schedule. + sqlDB.Exec(t, "SET CLUSTER SETTING sql.stats.cleanup.recurrence = '@daily'") + + // Wait for the change to be picked up by the job monitor. + sqlDB.CheckQueryResultsRetry(t, + "SELECT recurrence FROM [SHOW SCHEDULES FOR SQL STATISTICS]", + [][]string{{"@daily"}}) + + sqlDB.CheckQueryResults(t, + "SELECT count(*) FROM [SHOW SCHEDULES FOR SQL STATISTICS]", + [][]string{{"1"}}) +} diff --git a/pkg/sql/sqlstats/persistedsqlstats/scheduled_job_monitor.go b/pkg/sql/sqlstats/persistedsqlstats/scheduled_job_monitor.go index 7f766df0a133..d6139a2f8f29 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/scheduled_job_monitor.go +++ b/pkg/sql/sqlstats/persistedsqlstats/scheduled_job_monitor.go @@ -86,6 +86,10 @@ func (j *jobMonitor) start(ctx context.Context, stopper *stop.Stopper) { func (j *jobMonitor) registerClusterSettingHook() { SQLStatsCleanupRecurrence.SetOnChange(&j.st.SV, func(ctx context.Context) { + if !j.isVersionCompatible(ctx) { + return + } + j.ensureSchedule(ctx) if err := j.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { sj, err := j.getSchedule(ctx, txn) if err != nil { @@ -137,9 +141,9 @@ func (j *jobMonitor) getSchedule( } func (j *jobMonitor) ensureSchedule(ctx context.Context) { - clusterVersion := j.st.Version.ActiveVersionOrEmpty(ctx) - if !clusterVersion.IsActive(clusterversion.SQLStatsCompactionScheduledJob) { - log.Warningf(ctx, "cannot create sql stats scheduled compaction job because current cluster version is too low") + if !j.isVersionCompatible(ctx) { + log.Infof(ctx, "cannot create sql stats scheduled compaction job because current cluster version is too low") + return } var sj *jobs.ScheduledJob @@ -168,6 +172,11 @@ func (j *jobMonitor) ensureSchedule(ctx context.Context) { } } +func (j *jobMonitor) isVersionCompatible(ctx context.Context) bool { + clusterVersion := j.st.Version.ActiveVersionOrEmpty(ctx) + return clusterVersion.IsActive(clusterversion.SQLStatsCompactionScheduledJob) +} + // CheckScheduleAnomaly checks a given schedule to see if it is either paused // or has unusually long run interval. func CheckScheduleAnomaly(sj *jobs.ScheduledJob) error {