Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobs: Implement job control for schedules. #52038

Merged
merged 1 commit into from
Jul 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/sql/bnf/cancel_job.bnf
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
cancel_jobs_stmt ::=
'CANCEL' 'JOB' job_id
| 'CANCEL' 'JOBS' select_stmt
| 'CANCEL' 'JOBS' for_schedules_clause
1 change: 1 addition & 0 deletions docs/generated/sql/bnf/pause_job.bnf
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pause_jobs_stmt ::=
'PAUSE' 'JOB' job_id
| 'PAUSE' 'JOBS' select_stmt
| 'PAUSE' 'JOBS' for_schedules_clause
1 change: 0 additions & 1 deletion docs/generated/sql/bnf/pause_schedule.bnf
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pause_schedules_stmt ::=
'PAUSE' 'SCHEDULE' schedule_id
| 'PAUSE' 'SCHEDULES' select_stmt
| with_clause 'PAUSE' 'SCHEDULES'
1 change: 1 addition & 0 deletions docs/generated/sql/bnf/resume_job.bnf
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
resume_jobs_stmt ::=
'RESUME' 'JOB' job_id
| 'RESUME' 'JOBS' select_stmt
| 'RESUME' 'JOBS' for_schedules_clause
8 changes: 7 additions & 1 deletion docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ opt_with_backup_options ::=
cancel_jobs_stmt ::=
'CANCEL' 'JOB' a_expr
| 'CANCEL' 'JOBS' select_stmt
| 'CANCEL' 'JOBS' for_schedules_clause

cancel_queries_stmt ::=
'CANCEL' 'QUERY' a_expr
Expand Down Expand Up @@ -449,11 +450,11 @@ on_conflict ::=
pause_jobs_stmt ::=
'PAUSE' 'JOB' a_expr
| 'PAUSE' 'JOBS' select_stmt
| 'PAUSE' 'JOBS' for_schedules_clause

pause_schedules_stmt ::=
'PAUSE' 'SCHEDULE' a_expr
| 'PAUSE' 'SCHEDULES' select_stmt
| with_clause 'PAUSE' 'SCHEDULES'

reset_session_stmt ::=
'RESET' session_var
Expand All @@ -468,6 +469,7 @@ partitioned_backup_list ::=
resume_jobs_stmt ::=
'RESUME' 'JOB' a_expr
| 'RESUME' 'JOBS' select_stmt
| 'RESUME' 'JOBS' for_schedules_clause

resume_schedules_stmt ::=
'RESUME' 'SCHEDULE' a_expr
Expand Down Expand Up @@ -1075,6 +1077,10 @@ backup_options_list ::=
a_expr ::=
( c_expr | '+' a_expr | '-' a_expr | '~' a_expr | 'SQRT' a_expr | 'CBRT' a_expr | 'NOT' a_expr | 'NOT' a_expr | 'DEFAULT' ) ( ( 'TYPECAST' cast_target | 'TYPEANNOTATE' typename | 'COLLATE' collation_name | 'AT' 'TIME' 'ZONE' a_expr | '+' a_expr | '-' a_expr | '*' a_expr | '/' a_expr | 'FLOORDIV' a_expr | '%' a_expr | '^' a_expr | '#' a_expr | '&' a_expr | '|' a_expr | '<' a_expr | '>' a_expr | '?' a_expr | 'JSON_SOME_EXISTS' a_expr | 'JSON_ALL_EXISTS' a_expr | 'CONTAINS' a_expr | 'CONTAINED_BY' a_expr | '=' a_expr | 'CONCAT' a_expr | 'LSHIFT' a_expr | 'RSHIFT' a_expr | 'FETCHVAL' a_expr | 'FETCHTEXT' a_expr | 'FETCHVAL_PATH' a_expr | 'FETCHTEXT_PATH' a_expr | 'REMOVE_PATH' a_expr | 'INET_CONTAINED_BY_OR_EQUALS' a_expr | 'AND_AND' a_expr | 'INET_CONTAINS_OR_EQUALS' a_expr | 'LESS_EQUALS' a_expr | 'GREATER_EQUALS' a_expr | 'NOT_EQUALS' a_expr | 'AND' a_expr | 'OR' a_expr | 'LIKE' a_expr | 'LIKE' a_expr 'ESCAPE' a_expr | 'NOT' 'LIKE' a_expr | 'NOT' 'LIKE' a_expr 'ESCAPE' a_expr | 'ILIKE' a_expr | 'ILIKE' a_expr 'ESCAPE' a_expr | 'NOT' 'ILIKE' a_expr | 'NOT' 'ILIKE' a_expr 'ESCAPE' a_expr | 'SIMILAR' 'TO' a_expr | 'SIMILAR' 'TO' a_expr 'ESCAPE' a_expr | 'NOT' 'SIMILAR' 'TO' a_expr | 'NOT' 'SIMILAR' 'TO' a_expr 'ESCAPE' a_expr | '~' a_expr | 'NOT_REGMATCH' a_expr | 'REGIMATCH' a_expr | 'NOT_REGIMATCH' a_expr | 'IS' 'NAN' | 'IS' 'NOT' 'NAN' | 'IS' 'NULL' | 'ISNULL' | 'IS' 'NOT' 'NULL' | 'NOTNULL' | 'IS' 'TRUE' | 'IS' 'NOT' 'TRUE' | 'IS' 'FALSE' | 'IS' 'NOT' 'FALSE' | 'IS' 'UNKNOWN' | 'IS' 'NOT' 'UNKNOWN' | 'IS' 'DISTINCT' 'FROM' a_expr | 'IS' 'NOT' 'DISTINCT' 'FROM' a_expr | 'IS' 'OF' '(' type_list ')' | 'IS' 'NOT' 'OF' '(' type_list ')' | 'BETWEEN' opt_asymmetric b_expr 'AND' a_expr | 'NOT' 'BETWEEN' opt_asymmetric b_expr 'AND' a_expr | 'BETWEEN' 'SYMMETRIC' b_expr 'AND' a_expr | 'NOT' 'BETWEEN' 'SYMMETRIC' b_expr 'AND' a_expr | 'IN' in_expr | 'NOT' 'IN' in_expr | subquery_op sub_type a_expr ) )*

for_schedules_clause ::=
'FOR' 'SCHEDULES' select_stmt
| 'FOR' 'SCHEDULE' a_expr

create_changefeed_stmt ::=
'CREATE' 'CHANGEFEED' 'FOR' changefeed_targets opt_changefeed_sink opt_with_options

Expand Down
103 changes: 103 additions & 0 deletions pkg/jobs/schedule_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,15 @@ package jobs
import (
"context"
"fmt"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobstest"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -113,3 +120,99 @@ func TestScheduleControl(t *testing.T) {
require.Equal(t, 0, len(th.sqlDB.QueryStr(t, querySchedules)))
})
}

func TestJobsControlForSchedules(t *testing.T) {
defer leaktest.AfterTest(t)()
th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables)
defer cleanup()

registry := th.server.JobRegistry().(*Registry)
blockResume := make(chan struct{})
defer close(blockResume)

// Our resume never completes any jobs, until this test completes.
// As such, the job does not undergo usual job state transitions
// (e.g. pause-request -> paused).
RegisterConstructor(jobspb.TypeImport, func(job *Job, _ *cluster.Settings) Resumer {
return FakeResumer{
OnResume: func(_ context.Context, _ chan<- tree.Datums) error {
<-blockResume
return nil
},
}
})

record := Record{
Description: "fake job",
Username: "test",
Details: jobspb.ImportDetails{},
Progress: jobspb.ImportProgress{},
}

const numJobs = 5

// Create few jobs not started by any schedule.
for i := 0; i < numJobs; i++ {
require.NoError(t, registry.NewJob(record).Created(context.Background()))
}

var scheduleID int64 = 123

for _, tc := range []struct {
command string
numSchedules int
}{
{"pause", 1},
{"resume", 1},
{"cancel", 1},
{"pause", 2},
{"resume", 3},
{"cancel", 4},
} {
schedulesStr := &strings.Builder{}
for i := 0; i < tc.numSchedules; i++ {
scheduleID++
if schedulesStr.Len() > 0 {
schedulesStr.WriteByte(',')
}
fmt.Fprintf(schedulesStr, "%d", scheduleID)

for i := 0; i < numJobs; i++ {
record.CreatedBy = &CreatedByInfo{
Name: CreatedByScheduledJobs,
ID: scheduleID,
}
require.NoError(t, registry.NewJob(record).Created(context.Background()))

if tc.command == "resume" {
// Job has to be in paused state in order for it to be resumable;
// Alas, because we don't actually run real jobs (see comment above),
// We can't just pause the job (since it will stay in pause-requested state forever).
// So, just force set job status to paused.
th.sqlDB.Exec(t, "UPDATE system.jobs SET status=$1 WHERE id=$2", StatusPaused, scheduleID)
}
}
}

jobControl := tc.command + " JOBS FOR "
if tc.numSchedules == 1 {
jobControl += "SCHEDULE " + schedulesStr.String()
} else {
jobControl += fmt.Sprintf("SCHEDULES SELECT unnest(array[%s])", schedulesStr)
}

t.Run(jobControl, func(t *testing.T) {
// Go through internal executor to execute job control command.
// This correctly reports the number of effected rows.
numEffected, err := th.cfg.InternalExecutor.ExecEx(
context.Background(),
"test-num-effected",
nil,
sqlbase.InternalExecutorSessionDataOverride{User: security.RootUser},
jobControl,
)
require.NoError(t, err)
require.Equal(t, numJobs*tc.numSchedules, numEffected)
})
}
}
30 changes: 21 additions & 9 deletions pkg/jobs/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ import (
)

type testHelper struct {
env *jobstest.JobSchedulerTestEnv
cfg *scheduledjobs.JobExecutionConfig
sqlDB *sqlutils.SQLRunner
env *jobstest.JobSchedulerTestEnv
server serverutils.TestServerInterface
cfg *scheduledjobs.JobExecutionConfig
sqlDB *sqlutils.SQLRunner
}

// newTestHelper creates and initializes appropriate state for a test,
Expand All @@ -46,6 +47,12 @@ type testHelper struct {
// is disabled by this test helper.
// If you want to run daemon, invoke it directly.
func newTestHelper(t *testing.T) (*testHelper, func()) {
return newTestHelperForTables(t, jobstest.UseTestTables)
}

func newTestHelperForTables(
t *testing.T, envTableType jobstest.EnvTablesType,
) (*testHelper, func()) {
knobs := &TestingKnobs{
TakeOverJobsScheduling: func(_ func(ctx context.Context, maxSchedules int64, txn *kv.Txn) error) {
},
Expand All @@ -57,14 +64,17 @@ func newTestHelper(t *testing.T) (*testHelper, func()) {
sqlDB := sqlutils.MakeSQLRunner(db)

// Setup test scheduled jobs table.
env := jobstest.NewJobSchedulerTestEnv(jobstest.UseTestTables, timeutil.Now())
env := jobstest.NewJobSchedulerTestEnv(envTableType, timeutil.Now())

sqlDB.Exec(t, jobstest.GetScheduledJobsTableSchema(env))
sqlDB.Exec(t, jobstest.GetJobsTableSchema(env))
if envTableType == jobstest.UseTestTables {
sqlDB.Exec(t, jobstest.GetScheduledJobsTableSchema(env))
sqlDB.Exec(t, jobstest.GetJobsTableSchema(env))
}

restoreRegistry := settings.TestingSaveRegistry()
return &testHelper{
env: env,
env: env,
server: s,
cfg: &scheduledjobs.JobExecutionConfig{
Settings: s.ClusterSettings(),
InternalExecutor: s.InternalExecutor().(sqlutil.InternalExecutor),
Expand All @@ -73,8 +83,10 @@ func newTestHelper(t *testing.T) (*testHelper, func()) {
},
sqlDB: sqlDB,
}, func() {
sqlDB.Exec(t, "DROP TABLE "+env.SystemJobsTableName())
sqlDB.Exec(t, "DROP TABLE "+env.ScheduledJobsTableName())
if envTableType == jobstest.UseTestTables {
sqlDB.Exec(t, "DROP TABLE "+env.SystemJobsTableName())
sqlDB.Exec(t, "DROP TABLE "+env.ScheduledJobsTableName())
}
s.Stopper().Stop(context.Background())
restoreRegistry()
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/delegate/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ func TryDelegate(
case *tree.ShowTransactionStatus:
return d.delegateShowVar(&tree.ShowVar{Name: "transaction_status"})

case *tree.ControlJobsForSchedules:
return d.delegateJobControl(t)

case *tree.ShowLastQueryStatistics:
return nil, unimplemented.New(
"show last query statistics",
Expand Down
30 changes: 30 additions & 0 deletions pkg/sql/delegate/job_control.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package delegate

import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
)

func (d *delegator) delegateJobControl(stmt *tree.ControlJobsForSchedules) (tree.Statement, error) {
// TODO(yevgeniy): it is very unfornate that we have to use the IN() clause
// in order to select matching jobs.
// It would be better if the job control statement had a better (planNode) implementation,
// so that we can rely on the optimizer to select relevant nodes.
return parse(fmt.Sprintf(`
%s JOBS
SELECT id FROM system.jobs
WHERE jobs.created_by_type = '%s' AND jobs.created_by_id IN (%s)
`, tree.JobCommandToStatement[stmt.Command], jobs.CreatedByScheduledJobs, stmt.Schedules))
}
12 changes: 12 additions & 0 deletions pkg/sql/parser/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,12 @@ func TestParse(t *testing.T) {
{`EXPLAIN SHOW JOBS SELECT a`},
{`SHOW JOBS WHEN COMPLETE SELECT a`},
{`EXPLAIN SHOW JOBS WHEN COMPLETE SELECT a`},
{`PAUSE JOBS FOR SCHEDULES SELECT 1`},
{`EXPLAIN PAUSE JOBS FOR SCHEDULES SELECT 1`},
{`RESUME JOBS FOR SCHEDULES SELECT unnest(ARRAY[1, 2, 3])`},
{`EXPLAIN RESUME JOBS FOR SCHEDULES SELECT unnest(ARRAY[1, 2, 3])`},
{`CANCEL JOBS FOR SCHEDULES (SELECT schedule_id FROM somewhere WHERE something = true)`},
{`EXPLAIN CANCEL JOBS FOR SCHEDULES (SELECT schedule_id FROM somewhere WHERE something = true)`},

{`EXPLAIN SELECT 1`},
{`EXPLAIN EXPLAIN SELECT 1`},
Expand Down Expand Up @@ -2022,10 +2028,16 @@ $function$`,

{`CANCEL JOB a`, `CANCEL JOBS VALUES (a)`},
{`EXPLAIN CANCEL JOB a`, `EXPLAIN CANCEL JOBS VALUES (a)`},
{`CANCEL JOBS FOR SCHEDULE a`, `CANCEL JOBS FOR SCHEDULES VALUES (a)`},
{`EXPLAIN CANCEL JOBS FOR SCHEDULE a`, `EXPLAIN CANCEL JOBS FOR SCHEDULES VALUES (a)`},
{`RESUME JOB a`, `RESUME JOBS VALUES (a)`},
{`EXPLAIN RESUME JOB a`, `EXPLAIN RESUME JOBS VALUES (a)`},
{`RESUME JOBS FOR SCHEDULE a`, `RESUME JOBS FOR SCHEDULES VALUES (a)`},
{`EXPLAIN RESUME JOBS FOR SCHEDULE a`, `EXPLAIN RESUME JOBS FOR SCHEDULES VALUES (a)`},
{`PAUSE JOB a`, `PAUSE JOBS VALUES (a)`},
{`EXPLAIN PAUSE JOB a`, `EXPLAIN PAUSE JOBS VALUES (a)`},
{`PAUSE JOBS FOR SCHEDULE a`, `PAUSE JOBS FOR SCHEDULES VALUES (a)`},
{`EXPLAIN PAUSE JOBS FOR SCHEDULE a`, `EXPLAIN PAUSE JOBS FOR SCHEDULES VALUES (a)`},
{`PAUSE SCHEDULE a`, `PAUSE SCHEDULES VALUES (a)`},
{`EXPLAIN PAUSE SCHEDULE a`, `EXPLAIN PAUSE SCHEDULES VALUES (a)`},
{`RESUME SCHEDULE a`, `RESUME SCHEDULES VALUES (a)`},
Expand Down
29 changes: 27 additions & 2 deletions pkg/sql/parser/sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,7 @@ func (u *sqlSymUnion) alterTypeAddValuePlacement() *tree.AlterTypeAddValuePlacem
%type <tree.Statement> insert_stmt
%type <tree.Statement> import_stmt
%type <tree.Statement> pause_stmt pause_jobs_stmt pause_schedules_stmt
%type <*tree.Select> for_schedules_clause
%type <tree.Statement> release_stmt
%type <tree.Statement> reset_stmt reset_session_stmt reset_csetting_stmt
%type <tree.Statement> resume_stmt resume_jobs_stmt resume_schedules_stmt
Expand Down Expand Up @@ -2528,6 +2529,10 @@ cancel_jobs_stmt:
{
$$.val = &tree.ControlJobs{Jobs: $3.slct(), Command: tree.CancelJob}
}
| CANCEL JOBS for_schedules_clause
{
$$.val = &tree.ControlJobsForSchedules{Schedules: $3.slct(), Command: tree.CancelJob}
}
| CANCEL JOBS error // SHOW HELP: CANCEL JOBS

// %Help: CANCEL QUERIES - cancel running queries
Expand Down Expand Up @@ -4782,13 +4787,30 @@ pause_jobs_stmt:
Command: tree.PauseJob,
}
}
| PAUSE JOB error // SHOW HELP: PAUSE JOBS
| PAUSE JOB error // SHOW HELP: PAUSE JOBS
| PAUSE JOBS select_stmt
{
$$.val = &tree.ControlJobs{Jobs: $3.slct(), Command: tree.PauseJob}
}
| PAUSE JOBS for_schedules_clause
{
$$.val = &tree.ControlJobsForSchedules{Schedules: $3.slct(), Command: tree.PauseJob}
}
| PAUSE JOBS error // SHOW HELP: PAUSE JOBS


for_schedules_clause:
FOR SCHEDULES select_stmt
{
$$.val = $3.slct()
}
| FOR SCHEDULE a_expr
{
$$.val = &tree.Select{
Select: &tree.ValuesClause{Rows: []tree.Exprs{tree.Exprs{$3.expr()}}},
}
}

// %Help: PAUSE SCHEDULES - pause scheduled jobs
// %Category: Misc
// %Text:
Expand All @@ -4814,7 +4836,6 @@ pause_schedules_stmt:
Command: tree.PauseSchedule,
}
}
| with_clause PAUSE SCHEDULES
| PAUSE SCHEDULES error // SHOW HELP: PAUSE SCHEDULES

// %Help: CREATE SCHEMA - create a new schema
Expand Down Expand Up @@ -6254,6 +6275,10 @@ resume_jobs_stmt:
{
$$.val = &tree.ControlJobs{Jobs: $3.slct(), Command: tree.ResumeJob}
}
| RESUME JOBS for_schedules_clause
{
$$.val = &tree.ControlJobsForSchedules{Schedules: $3.slct(), Command: tree.ResumeJob}
}
| RESUME JOBS error // SHOW HELP: RESUME JOBS

// %Help: RESUME SCHEDULES - resume executing scheduled jobs
Expand Down
Loading