Skip to content

Commit

Permalink
Merge #50240 #50249
Browse files Browse the repository at this point in the history
50240: jobs: Start scheduled jobs daemon on server startup. r=miretskiy a=miretskiy

Start scheduled job daemon when starting cockroach binary.

Informs #49346

Release notes: None

50249: logictest: introduce new configs for experimental distsql planning r=yuzefovich a=yuzefovich

This commit adds three new configurations `local-exp-planning`,
`fakedist-exp-planning`, and `5node-exp-planning` which override
`experimental_distsql_planning` cluster setting to `on`. The
corresponding config is included in `5node-defaults` list. And once
the new factory is significantly implemented, it will probably make
sense to add the configs to `default-configs`.

Release note: None

Co-authored-by: Yevgeniy Miretskiy <yevgeniy@cockroachlabs.com>
Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
3 people committed Jun 17, 2020
3 parents 5ddcfde + 8d23371 + ceab620 commit bb4bf85
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 59 deletions.
7 changes: 4 additions & 3 deletions pkg/jobs/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ type jobSchedulerEnv interface {
// production jobSchedulerEnv implementation.
type prodJobSchedulerEnvImpl struct{}

var prodJobSchedulerEnv jobSchedulerEnv = &prodJobSchedulerEnvImpl{}
// ProdJobSchedulerEnv is a jobSchedulerEnv implementation suitable for production.
var ProdJobSchedulerEnv jobSchedulerEnv = &prodJobSchedulerEnvImpl{}

const createdByName = "crdb_schedule"

Expand Down Expand Up @@ -75,7 +76,7 @@ type jobScheduler struct {

func newJobScheduler(env jobSchedulerEnv, ex sqlutil.InternalExecutor) *jobScheduler {
if env == nil {
env = prodJobSchedulerEnv
env = ProdJobSchedulerEnv
}
return &jobScheduler{
env: env,
Expand Down Expand Up @@ -273,7 +274,7 @@ func StartJobSchedulerDaemon(
stopper.RunWorker(ctx, func(ctx context.Context) {
for waitPeriod := getInitialScanDelay(); ; waitPeriod = getWaitPeriod(sv) {
select {
case <-stopper.ShouldStop():
case <-stopper.ShouldQuiesce():
return
case <-time.After(waitPeriod):
if !schedulerEnabledSetting.Get(sv) {
Expand Down
138 changes: 92 additions & 46 deletions pkg/jobs/job_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/lint/passes/fmtsafe/testdata/src/github.com/cockroachdb/errors"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/types"
"github.com/gorhill/cronexpr"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -140,24 +146,32 @@ func TestJobSchedulerDaemonInitialScanDelay(t *testing.T) {
}
}

func getScopedSettings() (*settings.Values, func()) {
sv := &settings.Values{}
sv.Init(nil)
return sv, settings.TestingSaveRegistry()
}

func TestJobSchedulerDaemonGetWaitPeriod(t *testing.T) {
defer leaktest.AfterTest(t)()

sv := settings.Values{}
schedulerEnabledSetting.Override(&sv, false)
sv, cleanup := getScopedSettings()
defer cleanup()

schedulerEnabledSetting.Override(sv, false)

// When disabled, we wait 5 minutes before rechecking.
require.True(t, 5*time.Minute == getWaitPeriod(&sv))
schedulerEnabledSetting.Override(&sv, true)
require.EqualValues(t, 5*time.Minute, getWaitPeriod(sv))
schedulerEnabledSetting.Override(sv, true)

// When pace is too low, we use something more reasonable.
schedulerPaceSetting.Override(&sv, time.Nanosecond)
require.True(t, minPacePeriod == getWaitPeriod(&sv))
schedulerPaceSetting.Override(sv, time.Nanosecond)
require.EqualValues(t, minPacePeriod, getWaitPeriod(sv))

// Otherwise, we use user specified setting.
pace := 42 * time.Second
schedulerPaceSetting.Override(&sv, pace)
require.True(t, pace == getWaitPeriod(&sv))
schedulerPaceSetting.Override(sv, pace)
require.EqualValues(t, pace, getWaitPeriod(sv))
}

type recordScheduleExecutor struct {
Expand All @@ -179,26 +193,30 @@ func (n *recordScheduleExecutor) NotifyJobTermination(

var _ ScheduledJobExecutor = &recordScheduleExecutor{}

func scanImmediately() func() {
oldScanDelay := getInitialScanDelay
getInitialScanDelay = func() time.Duration { return 0 }
return func() { getInitialScanDelay = oldScanDelay }
}

func TestJobSchedulerCanBeDisabledWhileSleeping(t *testing.T) {
defer leaktest.AfterTest(t)()

h, cleanup := newTestHelper(t)
defer cleanup()
ctx := context.Background()

sv := settings.Values{}
schedulerEnabledSetting.Override(&sv, true)
sv, cleanup := getScopedSettings()
defer cleanup()
schedulerEnabledSetting.Override(sv, true)

// Register executor which keeps track of schedules it executes.
const executorName = "record-execute"
neverExecute := &recordScheduleExecutor{}
defer registerScopedScheduledJobExecutor(executorName, neverExecute)()

// Disable initial scan delay.
defer func(f func() time.Duration) {
getInitialScanDelay = f
}(getInitialScanDelay)
getInitialScanDelay = func() time.Duration { return 0 }
defer scanImmediately()()

// Override getWaitPeriod to use small delay.
defer func(f func(_ *settings.Values) time.Duration) {
Expand Down Expand Up @@ -232,7 +250,7 @@ func TestJobSchedulerCanBeDisabledWhileSleeping(t *testing.T) {
}

// Run the daemon.
StartJobSchedulerDaemon(ctx, stopper, &sv, h.env, h.kvDB, h.ex)
StartJobSchedulerDaemon(ctx, stopper, sv, h.env, h.kvDB, h.ex)

// Wait for daemon to run it's scan loop few times.
for i := 0; i < 5; i++ {
Expand Down Expand Up @@ -271,6 +289,16 @@ func expectScheduledRuns(t *testing.T, h *testHelper, expected ...expectedRun) {
})
}

func overridePaceSetting(d time.Duration) func() {
oldPace := getWaitPeriod
getWaitPeriod = func(_ *settings.Values) time.Duration {
return d
}
return func() {
getWaitPeriod = oldPace
}
}

func TestJobSchedulerDaemonProcessesJobs(t *testing.T) {
defer leaktest.AfterTest(t)()
h, cleanup := newTestHelper(t)
Expand All @@ -293,22 +321,13 @@ func TestJobSchedulerDaemonProcessesJobs(t *testing.T) {
sort.Slice(scheduleIDs, func(i, j int) bool { return scheduleIDs[i] < scheduleIDs[j] })

// Make daemon run fast.
defer func(f func(_ *settings.Values) time.Duration) {
getWaitPeriod = f
}(getWaitPeriod)

getWaitPeriod = func(_ *settings.Values) time.Duration {
return 10 * time.Millisecond
}
defer func(f func() time.Duration) {
getInitialScanDelay = f
}(getInitialScanDelay)
getInitialScanDelay = func() time.Duration { return 0 }
defer overridePaceSetting(10 * time.Millisecond)()
defer scanImmediately()()

stopper := stop.NewStopper()
sv := settings.Values{}
schedulerEnabledSetting.Override(&sv, true)
StartJobSchedulerDaemon(ctx, stopper, &sv, h.env, h.kvDB, h.ex)
sv, cleanup := getScopedSettings()
defer cleanup()
StartJobSchedulerDaemon(ctx, stopper, sv, h.env, h.kvDB, h.ex)

// Advance our fake time 1 hour forward (plus a bit)
h.env.AdvanceTime(time.Hour + time.Second)
Expand Down Expand Up @@ -346,27 +365,17 @@ func TestJobSchedulerDaemonHonorsMaxJobsLimit(t *testing.T) {
sort.Slice(scheduleIDs, func(i, j int) bool { return scheduleIDs[i] < scheduleIDs[j] })

// Make daemon execute initial scan immediately, but block subsequent scans.
defer func(f func(_ *settings.Values) time.Duration) {
getWaitPeriod = f
}(getWaitPeriod)

getWaitPeriod = func(_ *settings.Values) time.Duration {
return time.Hour
}

defer func(f func() time.Duration) {
getInitialScanDelay = f
}(getInitialScanDelay)
getInitialScanDelay = func() time.Duration { return 0 }
defer scanImmediately()()
defer overridePaceSetting(time.Hour)()

// Advance our fake time 1 hour forward (plus a bit) so that the daemon finds matching jobs.
h.env.AdvanceTime(time.Hour + time.Second)

stopper := stop.NewStopper()
sv := settings.Values{}
schedulerEnabledSetting.Override(&sv, true)
schedulerMaxJobsPerIterationSetting.Override(&sv, 2)
StartJobSchedulerDaemon(ctx, stopper, &sv, h.env, h.kvDB, h.ex)
sv, cleanup := getScopedSettings()
defer cleanup()
schedulerMaxJobsPerIterationSetting.Override(sv, 2)
StartJobSchedulerDaemon(ctx, stopper, sv, h.env, h.kvDB, h.ex)

// Note: time is stored in the table with microsecond precision.
expectScheduledRuns(t, h,
Expand All @@ -379,3 +388,40 @@ func TestJobSchedulerDaemonHonorsMaxJobsLimit(t *testing.T) {

stopper.Stop(ctx)
}

func TestJobSchedulerDaemonUsesSystemTables(t *testing.T) {
defer leaktest.AfterTest(t)()
defer settings.TestingSaveRegistry()()

// Make daemon run quickly.
defer scanImmediately()()
defer overridePaceSetting(10 * time.Millisecond)()
ctx := context.Background()

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

runner := sqlutils.MakeSQLRunner(db)
runner.Exec(t, "CREATE TABLE defaultdb.foo(a int)")

// Create a one off job which writes some values into 'foo' table.
schedule := NewScheduledJob(ProdJobSchedulerEnv)
schedule.SetScheduleName("test schedule")
schedule.SetNextRun(timeutil.Now())
any, err := types.MarshalAny(
&jobspb.SqlStatementExecutionArg{Statement: "INSERT INTO defaultdb.foo VALUES (1), (2), (3)"})
require.NoError(t, err)
schedule.SetExecutionDetails(InlineExecutorName, jobspb.ExecutionArguments{Args: any})
require.NoError(t, schedule.Create(
ctx, s.InternalExecutor().(sqlutil.InternalExecutor), nil))

// Verify the schedule ran.
testutils.SucceedsSoon(t, func() error {
var count int
if err := db.QueryRow(
"SELECT count(*) FROM defaultdb.foo").Scan(&count); err != nil || count != 3 {
return errors.Newf("expected 3 rows, got %d (err=%+v)", count, err)
}
return nil
})
}
2 changes: 1 addition & 1 deletion pkg/jobs/scheduled_job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func NotifyJobTermination(
}

if env == nil {
env = prodJobSchedulerEnv
env = ProdJobSchedulerEnv
}

// Get the executor for this schedule.
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1578,6 +1578,10 @@ func (s *Server) Start(ctx context.Context) error {
return errors.Wrapf(err, "failed to register engines with debug server")
}

// Start scheduled jobs daemon.
jobs.StartJobSchedulerDaemon(
ctx, s.stopper, &s.st.SV, jobs.ProdJobSchedulerEnv, s.db, s.sqlServer.internalExecutor)

log.Event(ctx, "server ready")
return nil
}
Expand Down
47 changes: 40 additions & 7 deletions pkg/sql/logictest/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,8 @@ type testClusterConfig struct {
overrideVectorize string
// if non-empty, overrides the default automatic statistics mode.
overrideAutoStats string
// if non-empty, overrides the default experimental DistSQL planning mode.
overrideExperimentalDistSQLPlanning string
// if set, queries using distSQL processors or vectorized operators that can
// fall back to disk do so immediately, using only their disk-based
// implementation.
Expand Down Expand Up @@ -479,13 +481,6 @@ var logicTestConfigs = []testClusterConfig{
overrideAutoStats: "false",
overrideVectorize: "201auto",
},
{
name: "fakedist",
numNodes: 3,
useFakeSpanResolver: true,
overrideDistSQLMode: "on",
overrideAutoStats: "false",
},
{
name: "local-mixed-19.2-20.1",
numNodes: 1,
Expand All @@ -495,6 +490,20 @@ var logicTestConfigs = []testClusterConfig{
binaryVersion: roachpb.Version{Major: 20, Minor: 1},
disableUpgrade: true,
},
{
name: "local-spec-planning",
numNodes: 1,
overrideDistSQLMode: "off",
overrideAutoStats: "false",
overrideExperimentalDistSQLPlanning: "on",
},
{
name: "fakedist",
numNodes: 3,
useFakeSpanResolver: true,
overrideDistSQLMode: "on",
overrideAutoStats: "false",
},
{
name: "fakedist-vec-off",
numNodes: 3,
Expand Down Expand Up @@ -539,6 +548,14 @@ var logicTestConfigs = []testClusterConfig{
sqlExecUseDisk: true,
skipShort: true,
},
{
name: "fakedist-spec-planning",
numNodes: 3,
useFakeSpanResolver: true,
overrideDistSQLMode: "on",
overrideAutoStats: "false",
overrideExperimentalDistSQLPlanning: "on",
},
{
name: "5node",
numNodes: 5,
Expand Down Expand Up @@ -577,6 +594,13 @@ var logicTestConfigs = []testClusterConfig{
sqlExecUseDisk: true,
skipShort: true,
},
{
name: "5node-spec-planning",
numNodes: 5,
overrideDistSQLMode: "on",
overrideAutoStats: "false",
overrideExperimentalDistSQLPlanning: "on",
},
{
name: "3node-tenant",
numNodes: 3,
Expand Down Expand Up @@ -630,6 +654,7 @@ var (
"5node-vec-disk-auto",
"5node-metadata",
"5node-disk",
"5node-spec-planning",
}
defaultConfig = parseTestConfig(defaultConfigNames)
fiveNodeDefaultConfig = parseTestConfig(fiveNodeDefaultConfigNames)
Expand Down Expand Up @@ -1361,6 +1386,14 @@ func (t *logicTest) setup(cfg testClusterConfig, serverArgs TestServerArgs) {
t.Fatal(err)
}
}

if cfg.overrideExperimentalDistSQLPlanning != "" {
if _, err := conn.Exec(
"SET CLUSTER SETTING sql.defaults.experimental_distsql_planning = $1::string", cfg.overrideExperimentalDistSQLPlanning,
); err != nil {
t.Fatal(err)
}
}
}

if cfg.overrideDistSQLMode != "" {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/explain
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# LogicTest: local local-vec-off local-vec-auto
# LogicTest: local local-vec-off local-vec-auto local-spec-planning

statement ok
CREATE TABLE t (a INT PRIMARY KEY)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# LogicTest: 5node
# LogicTest: 5node 5node-spec-planning

# These tests are different from explain_analyze because they require manual
# data placement.
Expand Down

0 comments on commit bb4bf85

Please sign in to comment.