diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index 3472012e7df9..aa09eee7ade0 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -1813,9 +1813,17 @@ func TestStatusAPIStatements(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - params, _ := tests.CreateTestServerParams() + // Aug 30 2021 19:50:00 GMT+0000 + aggregatedTs := int64(1630353000) testCluster := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{ - ServerArgs: params, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SQLStatsKnobs: &sqlstats.TestingKnobs{ + AOSTClause: "AS OF SYSTEM TIME '-1us'", + StubTimeNow: func() time.Time { return timeutil.Unix(aggregatedTs, 0) }, + }, + }, + }, }) defer testCluster.Stopper().Stop(context.Background()) @@ -1897,17 +1905,24 @@ func TestStatusAPIStatements(t *testing.T) { // Test no params testPath("statements", expectedStatements) // Test combined=true forwards to CombinedStatements - nowInSecs := timeutil.Now().Unix() - testPath(fmt.Sprintf("statements?combined=true&start=%d", nowInSecs), nil) + testPath(fmt.Sprintf("statements?combined=true&start=%d", aggregatedTs+60), nil) } func TestStatusAPICombinedStatements(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - params, _ := tests.CreateTestServerParams() + // Aug 30 2021 19:50:00 GMT+0000 + aggregatedTs := int64(1630353000) testCluster := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{ - ServerArgs: params, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SQLStatsKnobs: &sqlstats.TestingKnobs{ + AOSTClause: "AS OF SYSTEM TIME '-1us'", + StubTimeNow: func() time.Time { return timeutil.Unix(aggregatedTs, 0) }, + }, + }, + }, }) defer testCluster.Stopper().Stop(context.Background()) @@ -1989,11 +2004,13 @@ func TestStatusAPICombinedStatements(t *testing.T) { // Test with no query params testPath("combinedstmts", expectedStatements) - nowInSecs := timeutil.Now().Unix() - // Test with end = now; should give the same results as get all. - testPath(fmt.Sprintf("combinedstmts?end=%d", nowInSecs), expectedStatements) - // Test with start = 1 hour ago end = now; should give the same results as get all. - testPath(fmt.Sprintf("combinedstmts?start=%d&end=%d", nowInSecs-3600, nowInSecs), expectedStatements) + oneMinAfterAggregatedTs := aggregatedTs + 60 + // Test with end = 1 min after aggregatedTs; should give the same results as get all. + testPath(fmt.Sprintf("combinedstmts?end=%d", oneMinAfterAggregatedTs), expectedStatements) + // Test with start = 1 hour before aggregatedTs end = 1 min after aggregatedTs; should give same results as get all. + testPath(fmt.Sprintf("combinedstmts?start=%d&end=%d", aggregatedTs-3600, oneMinAfterAggregatedTs), expectedStatements) + // Test with start = 1 min after aggregatedTs; should give no results + testPath(fmt.Sprintf("combinedstmts?start=%d", oneMinAfterAggregatedTs), nil) } func TestListSessionsSecurity(t *testing.T) { diff --git a/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel index b3aa0ad65fed..b1cf70be32f8 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel +++ b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel @@ -58,6 +58,7 @@ go_test( "compaction_test.go", "controller_test.go", "flush_test.go", + "job_monitor_test.go", "main_test.go", "reader_test.go", "scheduled_sql_stats_compaction_test.go", @@ -65,6 +66,7 @@ go_test( deps = [ ":persistedsqlstats", "//pkg/base", + "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/jobs/jobstest", diff --git a/pkg/sql/sqlstats/persistedsqlstats/job_monitor_test.go b/pkg/sql/sqlstats/persistedsqlstats/job_monitor_test.go new file mode 100644 index 000000000000..3837da4ff626 --- /dev/null +++ b/pkg/sql/sqlstats/persistedsqlstats/job_monitor_test.go @@ -0,0 +1,63 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package persistedsqlstats_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +func TestVersionGating(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + params, _ := tests.CreateTestServerParams() + params.Knobs.Server = &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: 1, + BinaryVersionOverride: clusterversion.ByKey(clusterversion.SQLStatsCompactionScheduledJob - 1), + } + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: params, + }) + + defer tc.Stopper().Stop(ctx) + + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0 /* idx */)) + sqlDB.CheckQueryResults(t, + "SELECT count(*) FROM [SHOW SCHEDULES FOR SQL STATISTICS]", + [][]string{{"0"}}) + + sqlDB.Exec(t, `SET CLUSTER SETTING version = $1`, + clusterversion.ByKey(clusterversion.SQLStatsCompactionScheduledJob).String()) + + // Change the recurrence cluster setting to force job monitor to create the + // sql stats compaction schedule. + sqlDB.Exec(t, "SET CLUSTER SETTING sql.stats.cleanup.recurrence = '@daily'") + + // Wait for the change to be picked up by the job monitor. + sqlDB.CheckQueryResultsRetry(t, + "SELECT recurrence FROM [SHOW SCHEDULES FOR SQL STATISTICS]", + [][]string{{"@daily"}}) + + sqlDB.CheckQueryResults(t, + "SELECT count(*) FROM [SHOW SCHEDULES FOR SQL STATISTICS]", + [][]string{{"1"}}) +} diff --git a/pkg/sql/sqlstats/persistedsqlstats/scheduled_job_monitor.go b/pkg/sql/sqlstats/persistedsqlstats/scheduled_job_monitor.go index 7f766df0a133..d6139a2f8f29 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/scheduled_job_monitor.go +++ b/pkg/sql/sqlstats/persistedsqlstats/scheduled_job_monitor.go @@ -86,6 +86,10 @@ func (j *jobMonitor) start(ctx context.Context, stopper *stop.Stopper) { func (j *jobMonitor) registerClusterSettingHook() { SQLStatsCleanupRecurrence.SetOnChange(&j.st.SV, func(ctx context.Context) { + if !j.isVersionCompatible(ctx) { + return + } + j.ensureSchedule(ctx) if err := j.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { sj, err := j.getSchedule(ctx, txn) if err != nil { @@ -137,9 +141,9 @@ func (j *jobMonitor) getSchedule( } func (j *jobMonitor) ensureSchedule(ctx context.Context) { - clusterVersion := j.st.Version.ActiveVersionOrEmpty(ctx) - if !clusterVersion.IsActive(clusterversion.SQLStatsCompactionScheduledJob) { - log.Warningf(ctx, "cannot create sql stats scheduled compaction job because current cluster version is too low") + if !j.isVersionCompatible(ctx) { + log.Infof(ctx, "cannot create sql stats scheduled compaction job because current cluster version is too low") + return } var sj *jobs.ScheduledJob @@ -168,6 +172,11 @@ func (j *jobMonitor) ensureSchedule(ctx context.Context) { } } +func (j *jobMonitor) isVersionCompatible(ctx context.Context) bool { + clusterVersion := j.st.Version.ActiveVersionOrEmpty(ctx) + return clusterVersion.IsActive(clusterversion.SQLStatsCompactionScheduledJob) +} + // CheckScheduleAnomaly checks a given schedule to see if it is either paused // or has unusually long run interval. func CheckScheduleAnomaly(sj *jobs.ScheduledJob) error {