Skip to content

Commit

Permalink
Merge #69591 #69592
Browse files Browse the repository at this point in the history
69591: sql: deflake TestStatusAPIStatements and TestStatusAPICombinedStatements r=xinhaoz a=xinhaoz

Fixes: #69557

As mentioned in #69533, we have a race condition in tests
where we request statements stats with start=now. In these
tests we expect to see no results, but because in-memory
stats have the aggregated_ts field set on iterator return,
depending on the time the test is run we might see
results returned.

For example, suppose we insert stats at 15:45. If we then request
stats at 16:00, requesting only stats aggregated after or at the
current time, i.e. start=16:00, the aggregated_ts for in-memory
stats will be set to 16:00 and thus we will see results
returned.

To deflake these tests, we stub the aggregated_ts field to
a predetermined value.

Release justification: non-production code changes
Release note: None

69592: sql: proper version gate sql stats r=maryliag,ajwerner a=Azhng

Previously, SQL Stats's implementation for version gating is faulty.
This means that SQL Stats's job monitor would attempt to start sql
stats compaction job in an incompatible cluster.
This commit fixed the faulty implementation.

Resolves #69459
Resolves #69544
Resolves #69565

Release justification: Category 2: Bug fixes and low-risk updates to
new functionality

Release note: None

Co-authored-by: Xin Hao Zhang <xzhang@cockroachlabs.com>
Co-authored-by: Azhng <archer.xn@gmail.com>
  • Loading branch information
3 people committed Aug 30, 2021
3 parents d6b2ecb + c724737 + 3ea128d commit e76382b
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 14 deletions.
39 changes: 28 additions & 11 deletions pkg/server/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1813,9 +1813,17 @@ func TestStatusAPIStatements(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

params, _ := tests.CreateTestServerParams()
// Aug 30 2021 19:50:00 GMT+0000
aggregatedTs := int64(1630353000)
testCluster := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{
ServerArgs: params,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLStatsKnobs: &sqlstats.TestingKnobs{
AOSTClause: "AS OF SYSTEM TIME '-1us'",
StubTimeNow: func() time.Time { return timeutil.Unix(aggregatedTs, 0) },
},
},
},
})
defer testCluster.Stopper().Stop(context.Background())

Expand Down Expand Up @@ -1897,17 +1905,24 @@ func TestStatusAPIStatements(t *testing.T) {
// Test no params
testPath("statements", expectedStatements)
// Test combined=true forwards to CombinedStatements
nowInSecs := timeutil.Now().Unix()
testPath(fmt.Sprintf("statements?combined=true&start=%d", nowInSecs), nil)
testPath(fmt.Sprintf("statements?combined=true&start=%d", aggregatedTs+60), nil)
}

func TestStatusAPICombinedStatements(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

params, _ := tests.CreateTestServerParams()
// Aug 30 2021 19:50:00 GMT+0000
aggregatedTs := int64(1630353000)
testCluster := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{
ServerArgs: params,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLStatsKnobs: &sqlstats.TestingKnobs{
AOSTClause: "AS OF SYSTEM TIME '-1us'",
StubTimeNow: func() time.Time { return timeutil.Unix(aggregatedTs, 0) },
},
},
},
})
defer testCluster.Stopper().Stop(context.Background())

Expand Down Expand Up @@ -1989,11 +2004,13 @@ func TestStatusAPICombinedStatements(t *testing.T) {
// Test with no query params
testPath("combinedstmts", expectedStatements)

nowInSecs := timeutil.Now().Unix()
// Test with end = now; should give the same results as get all.
testPath(fmt.Sprintf("combinedstmts?end=%d", nowInSecs), expectedStatements)
// Test with start = 1 hour ago end = now; should give the same results as get all.
testPath(fmt.Sprintf("combinedstmts?start=%d&end=%d", nowInSecs-3600, nowInSecs), expectedStatements)
oneMinAfterAggregatedTs := aggregatedTs + 60
// Test with end = 1 min after aggregatedTs; should give the same results as get all.
testPath(fmt.Sprintf("combinedstmts?end=%d", oneMinAfterAggregatedTs), expectedStatements)
// Test with start = 1 hour before aggregatedTs end = 1 min after aggregatedTs; should give same results as get all.
testPath(fmt.Sprintf("combinedstmts?start=%d&end=%d", aggregatedTs-3600, oneMinAfterAggregatedTs), expectedStatements)
// Test with start = 1 min after aggregatedTs; should give no results
testPath(fmt.Sprintf("combinedstmts?start=%d", oneMinAfterAggregatedTs), nil)
}

func TestListSessionsSecurity(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ go_test(
"compaction_test.go",
"controller_test.go",
"flush_test.go",
"job_monitor_test.go",
"main_test.go",
"reader_test.go",
"scheduled_sql_stats_compaction_test.go",
],
deps = [
":persistedsqlstats",
"//pkg/base",
"//pkg/clusterversion",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobstest",
Expand Down
63 changes: 63 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/job_monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package persistedsqlstats_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

func TestVersionGating(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
params, _ := tests.CreateTestServerParams()
params.Knobs.Server = &server.TestingKnobs{
DisableAutomaticVersionUpgrade: 1,
BinaryVersionOverride: clusterversion.ByKey(clusterversion.SQLStatsCompactionScheduledJob - 1),
}
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: params,
})

defer tc.Stopper().Stop(ctx)

sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0 /* idx */))
sqlDB.CheckQueryResults(t,
"SELECT count(*) FROM [SHOW SCHEDULES FOR SQL STATISTICS]",
[][]string{{"0"}})

sqlDB.Exec(t, `SET CLUSTER SETTING version = $1`,
clusterversion.ByKey(clusterversion.SQLStatsCompactionScheduledJob).String())

// Change the recurrence cluster setting to force job monitor to create the
// sql stats compaction schedule.
sqlDB.Exec(t, "SET CLUSTER SETTING sql.stats.cleanup.recurrence = '@daily'")

// Wait for the change to be picked up by the job monitor.
sqlDB.CheckQueryResultsRetry(t,
"SELECT recurrence FROM [SHOW SCHEDULES FOR SQL STATISTICS]",
[][]string{{"@daily"}})

sqlDB.CheckQueryResults(t,
"SELECT count(*) FROM [SHOW SCHEDULES FOR SQL STATISTICS]",
[][]string{{"1"}})
}
15 changes: 12 additions & 3 deletions pkg/sql/sqlstats/persistedsqlstats/scheduled_job_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func (j *jobMonitor) start(ctx context.Context, stopper *stop.Stopper) {

func (j *jobMonitor) registerClusterSettingHook() {
SQLStatsCleanupRecurrence.SetOnChange(&j.st.SV, func(ctx context.Context) {
if !j.isVersionCompatible(ctx) {
return
}
j.ensureSchedule(ctx)
if err := j.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
sj, err := j.getSchedule(ctx, txn)
if err != nil {
Expand Down Expand Up @@ -137,9 +141,9 @@ func (j *jobMonitor) getSchedule(
}

func (j *jobMonitor) ensureSchedule(ctx context.Context) {
clusterVersion := j.st.Version.ActiveVersionOrEmpty(ctx)
if !clusterVersion.IsActive(clusterversion.SQLStatsCompactionScheduledJob) {
log.Warningf(ctx, "cannot create sql stats scheduled compaction job because current cluster version is too low")
if !j.isVersionCompatible(ctx) {
log.Infof(ctx, "cannot create sql stats scheduled compaction job because current cluster version is too low")
return
}

var sj *jobs.ScheduledJob
Expand Down Expand Up @@ -168,6 +172,11 @@ func (j *jobMonitor) ensureSchedule(ctx context.Context) {
}
}

func (j *jobMonitor) isVersionCompatible(ctx context.Context) bool {
clusterVersion := j.st.Version.ActiveVersionOrEmpty(ctx)
return clusterVersion.IsActive(clusterversion.SQLStatsCompactionScheduledJob)
}

// CheckScheduleAnomaly checks a given schedule to see if it is either paused
// or has unusually long run interval.
func CheckScheduleAnomaly(sj *jobs.ScheduledJob) error {
Expand Down

0 comments on commit e76382b

Please sign in to comment.