From e9ea87cb535720e77dec5e60e41d163b3407b7e1 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Tue, 28 Jul 2020 17:57:51 -0400 Subject: [PATCH] jobs: Implement job control for schedules. Add a `FOR SCHEDULES` clause to job control statements to enable control over jobs created by the scheduled jobs. ``` PAUSE JOBS FOR SCHEDULE 123 RESUME JOBS FOR SCHEDULES (SELECT schedule_id ....) CANCEL JOBS FOR SCHEDULE 321 ``` Release Notes (enterprise change): Add `FOR SCHEDULES` clause to the job control statements to enable management of the jobs created by schedules. --- docs/generated/sql/bnf/cancel_job.bnf | 1 + docs/generated/sql/bnf/pause_job.bnf | 1 + docs/generated/sql/bnf/pause_schedule.bnf | 1 - docs/generated/sql/bnf/resume_job.bnf | 1 + docs/generated/sql/bnf/stmt_block.bnf | 8 +- pkg/jobs/schedule_control_test.go | 103 ++++++++++++++++++++++ pkg/jobs/testutils_test.go | 30 +++++-- pkg/sql/delegate/delegate.go | 3 + pkg/sql/delegate/job_control.go | 30 +++++++ pkg/sql/parser/parse_test.go | 12 +++ pkg/sql/parser/sql.y | 29 +++++- pkg/sql/sem/tree/run_control.go | 21 ++++- pkg/sql/sem/tree/stmt.go | 15 +++- pkg/sql/sem/tree/walk.go | 15 ++-- 14 files changed, 244 insertions(+), 26 deletions(-) create mode 100644 pkg/sql/delegate/job_control.go diff --git a/docs/generated/sql/bnf/cancel_job.bnf b/docs/generated/sql/bnf/cancel_job.bnf index b088a10e07b0..c6e5e569359b 100644 --- a/docs/generated/sql/bnf/cancel_job.bnf +++ b/docs/generated/sql/bnf/cancel_job.bnf @@ -1,3 +1,4 @@ cancel_jobs_stmt ::= 'CANCEL' 'JOB' job_id | 'CANCEL' 'JOBS' select_stmt + | 'CANCEL' 'JOBS' for_schedules_clause diff --git a/docs/generated/sql/bnf/pause_job.bnf b/docs/generated/sql/bnf/pause_job.bnf index 6b9fbb521b1f..0f9fce485a67 100644 --- a/docs/generated/sql/bnf/pause_job.bnf +++ b/docs/generated/sql/bnf/pause_job.bnf @@ -1,3 +1,4 @@ pause_jobs_stmt ::= 'PAUSE' 'JOB' job_id | 'PAUSE' 'JOBS' select_stmt + | 'PAUSE' 'JOBS' for_schedules_clause diff --git a/docs/generated/sql/bnf/pause_schedule.bnf b/docs/generated/sql/bnf/pause_schedule.bnf index 0e4152dbbed5..845a4b06a0ac 100644 --- a/docs/generated/sql/bnf/pause_schedule.bnf +++ b/docs/generated/sql/bnf/pause_schedule.bnf @@ -1,4 +1,3 @@ pause_schedules_stmt ::= 'PAUSE' 'SCHEDULE' schedule_id | 'PAUSE' 'SCHEDULES' select_stmt - | with_clause 'PAUSE' 'SCHEDULES' diff --git a/docs/generated/sql/bnf/resume_job.bnf b/docs/generated/sql/bnf/resume_job.bnf index c7ef323f9f21..16d5876549a2 100644 --- a/docs/generated/sql/bnf/resume_job.bnf +++ b/docs/generated/sql/bnf/resume_job.bnf @@ -1,3 +1,4 @@ resume_jobs_stmt ::= 'RESUME' 'JOB' job_id | 'RESUME' 'JOBS' select_stmt + | 'RESUME' 'JOBS' for_schedules_clause diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index cda7dca4a98e..240e4f3a4360 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -338,6 +338,7 @@ opt_with_backup_options ::= cancel_jobs_stmt ::= 'CANCEL' 'JOB' a_expr | 'CANCEL' 'JOBS' select_stmt + | 'CANCEL' 'JOBS' for_schedules_clause cancel_queries_stmt ::= 'CANCEL' 'QUERY' a_expr @@ -449,11 +450,11 @@ on_conflict ::= pause_jobs_stmt ::= 'PAUSE' 'JOB' a_expr | 'PAUSE' 'JOBS' select_stmt + | 'PAUSE' 'JOBS' for_schedules_clause pause_schedules_stmt ::= 'PAUSE' 'SCHEDULE' a_expr | 'PAUSE' 'SCHEDULES' select_stmt - | with_clause 'PAUSE' 'SCHEDULES' reset_session_stmt ::= 'RESET' session_var @@ -468,6 +469,7 @@ partitioned_backup_list ::= resume_jobs_stmt ::= 'RESUME' 'JOB' a_expr | 'RESUME' 'JOBS' select_stmt + | 'RESUME' 'JOBS' for_schedules_clause resume_schedules_stmt ::= 'RESUME' 'SCHEDULE' a_expr @@ -1075,6 +1077,10 @@ backup_options_list ::= a_expr ::= ( c_expr | '+' a_expr | '-' a_expr | '~' a_expr | 'SQRT' a_expr | 'CBRT' a_expr | 'NOT' a_expr | 'NOT' a_expr | 'DEFAULT' ) ( ( 'TYPECAST' cast_target | 'TYPEANNOTATE' typename | 'COLLATE' collation_name | 'AT' 'TIME' 'ZONE' a_expr | '+' a_expr | '-' a_expr | '*' a_expr | '/' a_expr | 'FLOORDIV' a_expr | '%' a_expr | '^' a_expr | '#' a_expr | '&' a_expr | '|' a_expr | '<' a_expr | '>' a_expr | '?' a_expr | 'JSON_SOME_EXISTS' a_expr | 'JSON_ALL_EXISTS' a_expr | 'CONTAINS' a_expr | 'CONTAINED_BY' a_expr | '=' a_expr | 'CONCAT' a_expr | 'LSHIFT' a_expr | 'RSHIFT' a_expr | 'FETCHVAL' a_expr | 'FETCHTEXT' a_expr | 'FETCHVAL_PATH' a_expr | 'FETCHTEXT_PATH' a_expr | 'REMOVE_PATH' a_expr | 'INET_CONTAINED_BY_OR_EQUALS' a_expr | 'AND_AND' a_expr | 'INET_CONTAINS_OR_EQUALS' a_expr | 'LESS_EQUALS' a_expr | 'GREATER_EQUALS' a_expr | 'NOT_EQUALS' a_expr | 'AND' a_expr | 'OR' a_expr | 'LIKE' a_expr | 'LIKE' a_expr 'ESCAPE' a_expr | 'NOT' 'LIKE' a_expr | 'NOT' 'LIKE' a_expr 'ESCAPE' a_expr | 'ILIKE' a_expr | 'ILIKE' a_expr 'ESCAPE' a_expr | 'NOT' 'ILIKE' a_expr | 'NOT' 'ILIKE' a_expr 'ESCAPE' a_expr | 'SIMILAR' 'TO' a_expr | 'SIMILAR' 'TO' a_expr 'ESCAPE' a_expr | 'NOT' 'SIMILAR' 'TO' a_expr | 'NOT' 'SIMILAR' 'TO' a_expr 'ESCAPE' a_expr | '~' a_expr | 'NOT_REGMATCH' a_expr | 'REGIMATCH' a_expr | 'NOT_REGIMATCH' a_expr | 'IS' 'NAN' | 'IS' 'NOT' 'NAN' | 'IS' 'NULL' | 'ISNULL' | 'IS' 'NOT' 'NULL' | 'NOTNULL' | 'IS' 'TRUE' | 'IS' 'NOT' 'TRUE' | 'IS' 'FALSE' | 'IS' 'NOT' 'FALSE' | 'IS' 'UNKNOWN' | 'IS' 'NOT' 'UNKNOWN' | 'IS' 'DISTINCT' 'FROM' a_expr | 'IS' 'NOT' 'DISTINCT' 'FROM' a_expr | 'IS' 'OF' '(' type_list ')' | 'IS' 'NOT' 'OF' '(' type_list ')' | 'BETWEEN' opt_asymmetric b_expr 'AND' a_expr | 'NOT' 'BETWEEN' opt_asymmetric b_expr 'AND' a_expr | 'BETWEEN' 'SYMMETRIC' b_expr 'AND' a_expr | 'NOT' 'BETWEEN' 'SYMMETRIC' b_expr 'AND' a_expr | 'IN' in_expr | 'NOT' 'IN' in_expr | subquery_op sub_type a_expr ) )* +for_schedules_clause ::= + 'FOR' 'SCHEDULES' select_stmt + | 'FOR' 'SCHEDULE' a_expr + create_changefeed_stmt ::= 'CREATE' 'CHANGEFEED' 'FOR' changefeed_targets opt_changefeed_sink opt_with_options diff --git a/pkg/jobs/schedule_control_test.go b/pkg/jobs/schedule_control_test.go index 7dd703647753..531d96492abe 100644 --- a/pkg/jobs/schedule_control_test.go +++ b/pkg/jobs/schedule_control_test.go @@ -13,8 +13,15 @@ package jobs import ( "context" "fmt" + "strings" "testing" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobstest" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/stretchr/testify/require" ) @@ -113,3 +120,99 @@ func TestScheduleControl(t *testing.T) { require.Equal(t, 0, len(th.sqlDB.QueryStr(t, querySchedules))) }) } + +func TestJobsControlForSchedules(t *testing.T) { + defer leaktest.AfterTest(t)() + th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables) + defer cleanup() + + registry := th.server.JobRegistry().(*Registry) + blockResume := make(chan struct{}) + defer close(blockResume) + + // Our resume never completes any jobs, until this test completes. + // As such, the job does not undergo usual job state transitions + // (e.g. pause-request -> paused). + RegisterConstructor(jobspb.TypeImport, func(job *Job, _ *cluster.Settings) Resumer { + return FakeResumer{ + OnResume: func(_ context.Context, _ chan<- tree.Datums) error { + <-blockResume + return nil + }, + } + }) + + record := Record{ + Description: "fake job", + Username: "test", + Details: jobspb.ImportDetails{}, + Progress: jobspb.ImportProgress{}, + } + + const numJobs = 5 + + // Create few jobs not started by any schedule. + for i := 0; i < numJobs; i++ { + require.NoError(t, registry.NewJob(record).Created(context.Background())) + } + + var scheduleID int64 = 123 + + for _, tc := range []struct { + command string + numSchedules int + }{ + {"pause", 1}, + {"resume", 1}, + {"cancel", 1}, + {"pause", 2}, + {"resume", 3}, + {"cancel", 4}, + } { + schedulesStr := &strings.Builder{} + for i := 0; i < tc.numSchedules; i++ { + scheduleID++ + if schedulesStr.Len() > 0 { + schedulesStr.WriteByte(',') + } + fmt.Fprintf(schedulesStr, "%d", scheduleID) + + for i := 0; i < numJobs; i++ { + record.CreatedBy = &CreatedByInfo{ + Name: CreatedByScheduledJobs, + ID: scheduleID, + } + require.NoError(t, registry.NewJob(record).Created(context.Background())) + + if tc.command == "resume" { + // Job has to be in paused state in order for it to be resumable; + // Alas, because we don't actually run real jobs (see comment above), + // We can't just pause the job (since it will stay in pause-requested state forever). + // So, just force set job status to paused. + th.sqlDB.Exec(t, "UPDATE system.jobs SET status=$1 WHERE id=$2", StatusPaused, scheduleID) + } + } + } + + jobControl := tc.command + " JOBS FOR " + if tc.numSchedules == 1 { + jobControl += "SCHEDULE " + schedulesStr.String() + } else { + jobControl += fmt.Sprintf("SCHEDULES SELECT unnest(array[%s])", schedulesStr) + } + + t.Run(jobControl, func(t *testing.T) { + // Go through internal executor to execute job control command. + // This correctly reports the number of effected rows. + numEffected, err := th.cfg.InternalExecutor.ExecEx( + context.Background(), + "test-num-effected", + nil, + sqlbase.InternalExecutorSessionDataOverride{User: security.RootUser}, + jobControl, + ) + require.NoError(t, err) + require.Equal(t, numJobs*tc.numSchedules, numEffected) + }) + } +} diff --git a/pkg/jobs/testutils_test.go b/pkg/jobs/testutils_test.go index e51d167cb19f..47c64c14b250 100644 --- a/pkg/jobs/testutils_test.go +++ b/pkg/jobs/testutils_test.go @@ -33,9 +33,10 @@ import ( ) type testHelper struct { - env *jobstest.JobSchedulerTestEnv - cfg *scheduledjobs.JobExecutionConfig - sqlDB *sqlutils.SQLRunner + env *jobstest.JobSchedulerTestEnv + server serverutils.TestServerInterface + cfg *scheduledjobs.JobExecutionConfig + sqlDB *sqlutils.SQLRunner } // newTestHelper creates and initializes appropriate state for a test, @@ -46,6 +47,12 @@ type testHelper struct { // is disabled by this test helper. // If you want to run daemon, invoke it directly. func newTestHelper(t *testing.T) (*testHelper, func()) { + return newTestHelperForTables(t, jobstest.UseTestTables) +} + +func newTestHelperForTables( + t *testing.T, envTableType jobstest.EnvTablesType, +) (*testHelper, func()) { knobs := &TestingKnobs{ TakeOverJobsScheduling: func(_ func(ctx context.Context, maxSchedules int64, txn *kv.Txn) error) { }, @@ -57,14 +64,17 @@ func newTestHelper(t *testing.T) (*testHelper, func()) { sqlDB := sqlutils.MakeSQLRunner(db) // Setup test scheduled jobs table. - env := jobstest.NewJobSchedulerTestEnv(jobstest.UseTestTables, timeutil.Now()) + env := jobstest.NewJobSchedulerTestEnv(envTableType, timeutil.Now()) - sqlDB.Exec(t, jobstest.GetScheduledJobsTableSchema(env)) - sqlDB.Exec(t, jobstest.GetJobsTableSchema(env)) + if envTableType == jobstest.UseTestTables { + sqlDB.Exec(t, jobstest.GetScheduledJobsTableSchema(env)) + sqlDB.Exec(t, jobstest.GetJobsTableSchema(env)) + } restoreRegistry := settings.TestingSaveRegistry() return &testHelper{ - env: env, + env: env, + server: s, cfg: &scheduledjobs.JobExecutionConfig{ Settings: s.ClusterSettings(), InternalExecutor: s.InternalExecutor().(sqlutil.InternalExecutor), @@ -73,8 +83,10 @@ func newTestHelper(t *testing.T) (*testHelper, func()) { }, sqlDB: sqlDB, }, func() { - sqlDB.Exec(t, "DROP TABLE "+env.SystemJobsTableName()) - sqlDB.Exec(t, "DROP TABLE "+env.ScheduledJobsTableName()) + if envTableType == jobstest.UseTestTables { + sqlDB.Exec(t, "DROP TABLE "+env.SystemJobsTableName()) + sqlDB.Exec(t, "DROP TABLE "+env.ScheduledJobsTableName()) + } s.Stopper().Stop(context.Background()) restoreRegistry() } diff --git a/pkg/sql/delegate/delegate.go b/pkg/sql/delegate/delegate.go index b6e81d4de597..df51e2b6d096 100644 --- a/pkg/sql/delegate/delegate.go +++ b/pkg/sql/delegate/delegate.go @@ -109,6 +109,9 @@ func TryDelegate( case *tree.ShowTransactionStatus: return d.delegateShowVar(&tree.ShowVar{Name: "transaction_status"}) + case *tree.ControlJobsForSchedules: + return d.delegateJobControl(t) + case *tree.ShowLastQueryStatistics: return nil, unimplemented.New( "show last query statistics", diff --git a/pkg/sql/delegate/job_control.go b/pkg/sql/delegate/job_control.go new file mode 100644 index 000000000000..387036adbfe9 --- /dev/null +++ b/pkg/sql/delegate/job_control.go @@ -0,0 +1,30 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package delegate + +import ( + "fmt" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" +) + +func (d *delegator) delegateJobControl(stmt *tree.ControlJobsForSchedules) (tree.Statement, error) { + // TODO(yevgeniy): it is very unfornate that we have to use the IN() clause + // in order to select matching jobs. + // It would be better if the job control statement had a better (planNode) implementation, + // so that we can rely on the optimizer to select relevant nodes. + return parse(fmt.Sprintf(` +%s JOBS +SELECT id FROM system.jobs +WHERE jobs.created_by_type = '%s' AND jobs.created_by_id IN (%s) +`, tree.JobCommandToStatement[stmt.Command], jobs.CreatedByScheduledJobs, stmt.Schedules)) +} diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index e39d86ae7a84..1883cad2591d 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -445,6 +445,12 @@ func TestParse(t *testing.T) { {`EXPLAIN SHOW JOBS SELECT a`}, {`SHOW JOBS WHEN COMPLETE SELECT a`}, {`EXPLAIN SHOW JOBS WHEN COMPLETE SELECT a`}, + {`PAUSE JOBS FOR SCHEDULES SELECT 1`}, + {`EXPLAIN PAUSE JOBS FOR SCHEDULES SELECT 1`}, + {`RESUME JOBS FOR SCHEDULES SELECT unnest(ARRAY[1, 2, 3])`}, + {`EXPLAIN RESUME JOBS FOR SCHEDULES SELECT unnest(ARRAY[1, 2, 3])`}, + {`CANCEL JOBS FOR SCHEDULES (SELECT schedule_id FROM somewhere WHERE something = true)`}, + {`EXPLAIN CANCEL JOBS FOR SCHEDULES (SELECT schedule_id FROM somewhere WHERE something = true)`}, {`EXPLAIN SELECT 1`}, {`EXPLAIN EXPLAIN SELECT 1`}, @@ -2022,10 +2028,16 @@ $function$`, {`CANCEL JOB a`, `CANCEL JOBS VALUES (a)`}, {`EXPLAIN CANCEL JOB a`, `EXPLAIN CANCEL JOBS VALUES (a)`}, + {`CANCEL JOBS FOR SCHEDULE a`, `CANCEL JOBS FOR SCHEDULES VALUES (a)`}, + {`EXPLAIN CANCEL JOBS FOR SCHEDULE a`, `EXPLAIN CANCEL JOBS FOR SCHEDULES VALUES (a)`}, {`RESUME JOB a`, `RESUME JOBS VALUES (a)`}, {`EXPLAIN RESUME JOB a`, `EXPLAIN RESUME JOBS VALUES (a)`}, + {`RESUME JOBS FOR SCHEDULE a`, `RESUME JOBS FOR SCHEDULES VALUES (a)`}, + {`EXPLAIN RESUME JOBS FOR SCHEDULE a`, `EXPLAIN RESUME JOBS FOR SCHEDULES VALUES (a)`}, {`PAUSE JOB a`, `PAUSE JOBS VALUES (a)`}, {`EXPLAIN PAUSE JOB a`, `EXPLAIN PAUSE JOBS VALUES (a)`}, + {`PAUSE JOBS FOR SCHEDULE a`, `PAUSE JOBS FOR SCHEDULES VALUES (a)`}, + {`EXPLAIN PAUSE JOBS FOR SCHEDULE a`, `EXPLAIN PAUSE JOBS FOR SCHEDULES VALUES (a)`}, {`PAUSE SCHEDULE a`, `PAUSE SCHEDULES VALUES (a)`}, {`EXPLAIN PAUSE SCHEDULE a`, `EXPLAIN PAUSE SCHEDULES VALUES (a)`}, {`RESUME SCHEDULE a`, `RESUME SCHEDULES VALUES (a)`}, diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 2f28c96ad8ac..595e0f9fd75a 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -781,6 +781,7 @@ func (u *sqlSymUnion) alterTypeAddValuePlacement() *tree.AlterTypeAddValuePlacem %type insert_stmt %type import_stmt %type pause_stmt pause_jobs_stmt pause_schedules_stmt +%type <*tree.Select> for_schedules_clause %type release_stmt %type reset_stmt reset_session_stmt reset_csetting_stmt %type resume_stmt resume_jobs_stmt resume_schedules_stmt @@ -2528,6 +2529,10 @@ cancel_jobs_stmt: { $$.val = &tree.ControlJobs{Jobs: $3.slct(), Command: tree.CancelJob} } +| CANCEL JOBS for_schedules_clause + { + $$.val = &tree.ControlJobsForSchedules{Schedules: $3.slct(), Command: tree.CancelJob} + } | CANCEL JOBS error // SHOW HELP: CANCEL JOBS // %Help: CANCEL QUERIES - cancel running queries @@ -4782,13 +4787,30 @@ pause_jobs_stmt: Command: tree.PauseJob, } } -| PAUSE JOB error // SHOW HELP: PAUSE JOBS +| PAUSE JOB error // SHOW HELP: PAUSE JOBS | PAUSE JOBS select_stmt { $$.val = &tree.ControlJobs{Jobs: $3.slct(), Command: tree.PauseJob} } +| PAUSE JOBS for_schedules_clause + { + $$.val = &tree.ControlJobsForSchedules{Schedules: $3.slct(), Command: tree.PauseJob} + } | PAUSE JOBS error // SHOW HELP: PAUSE JOBS + +for_schedules_clause: + FOR SCHEDULES select_stmt + { + $$.val = $3.slct() + } +| FOR SCHEDULE a_expr + { + $$.val = &tree.Select{ + Select: &tree.ValuesClause{Rows: []tree.Exprs{tree.Exprs{$3.expr()}}}, + } + } + // %Help: PAUSE SCHEDULES - pause scheduled jobs // %Category: Misc // %Text: @@ -4814,7 +4836,6 @@ pause_schedules_stmt: Command: tree.PauseSchedule, } } -| with_clause PAUSE SCHEDULES | PAUSE SCHEDULES error // SHOW HELP: PAUSE SCHEDULES // %Help: CREATE SCHEMA - create a new schema @@ -6254,6 +6275,10 @@ resume_jobs_stmt: { $$.val = &tree.ControlJobs{Jobs: $3.slct(), Command: tree.ResumeJob} } +| RESUME JOBS for_schedules_clause + { + $$.val = &tree.ControlJobsForSchedules{Schedules: $3.slct(), Command: tree.ResumeJob} + } | RESUME JOBS error // SHOW HELP: RESUME JOBS // %Help: RESUME SCHEDULES - resume executing scheduled jobs diff --git a/pkg/sql/sem/tree/run_control.go b/pkg/sql/sem/tree/run_control.go index 29902b39c5e5..0794cefbf1e6 100644 --- a/pkg/sql/sem/tree/run_control.go +++ b/pkg/sql/sem/tree/run_control.go @@ -104,7 +104,22 @@ type ControlSchedules struct { var _ Statement = &ControlSchedules{} // Format implements NodeFormatter interface -func (c *ControlSchedules) Format(ctx *FmtCtx) { - fmt.Fprintf(ctx, "%s SCHEDULES ", c.Command) - c.Schedules.Format(ctx) +func (n *ControlSchedules) Format(ctx *FmtCtx) { + fmt.Fprintf(ctx, "%s SCHEDULES ", n.Command) + n.Schedules.Format(ctx) } + +// ControlJobsForSchedules represents PAUSE/RESUME/CANCEL clause +// which applies job command to the jobs matching specified schedule(s). +type ControlJobsForSchedules struct { + Schedules *Select + Command JobCommand +} + +// Format implements NodeFormatter interface +func (n *ControlJobsForSchedules) Format(ctx *FmtCtx) { + fmt.Fprintf(ctx, "%s JOBS FOR SCHEDULES %s", + JobCommandToStatement[n.Command], AsString(n.Schedules)) +} + +var _ Statement = &ControlJobsForSchedules{} diff --git a/pkg/sql/sem/tree/stmt.go b/pkg/sql/sem/tree/stmt.go index 677f8fae6307..fc244ab5013c 100644 --- a/pkg/sql/sem/tree/stmt.go +++ b/pkg/sql/sem/tree/stmt.go @@ -254,8 +254,16 @@ func (n *ControlJobs) StatementTag() string { func (*ControlSchedules) StatementType() StatementType { return RowsAffected } // StatementTag returns a short string identifying the type of statement. -func (c *ControlSchedules) StatementTag() string { - return fmt.Sprintf("%s SCHEDULES", c.Command) +func (n *ControlSchedules) StatementTag() string { + return fmt.Sprintf("%s SCHEDULES", n.Command) +} + +// StatementType implements the Statement interface. +func (*ControlJobsForSchedules) StatementType() StatementType { return RowsAffected } + +// StatementTag returns a short string identifying the type of statement. +func (n *ControlJobsForSchedules) StatementTag() string { + return fmt.Sprintf("%s JOBS FOR SCHEDULES", JobCommandToStatement[n.Command]) } // StatementType implements the Statement interface. @@ -947,7 +955,8 @@ func (n *Analyze) String() string { return AsString(n) } func (n *Backup) String() string { return AsString(n) } func (n *BeginTransaction) String() string { return AsString(n) } func (n *ControlJobs) String() string { return AsString(n) } -func (c *ControlSchedules) String() string { return AsString(c) } +func (n *ControlSchedules) String() string { return AsString(n) } +func (n *ControlJobsForSchedules) String() string { return AsString(n) } func (n *CancelQueries) String() string { return AsString(n) } func (n *CancelSessions) String() string { return AsString(n) } func (n *CannedOptPlan) String() string { return AsString(n) } diff --git a/pkg/sql/sem/tree/walk.go b/pkg/sql/sem/tree/walk.go index c6d8861abc51..a247bb070260 100644 --- a/pkg/sql/sem/tree/walk.go +++ b/pkg/sql/sem/tree/walk.go @@ -963,19 +963,19 @@ func (stmt *ControlJobs) walkStmt(v Visitor) Statement { } // copyNode makes a copy of this Statement without recursing in any child Statements. -func (stmt *ControlSchedules) copyNode() *ControlSchedules { - stmtCopy := *stmt +func (n *ControlSchedules) copyNode() *ControlSchedules { + stmtCopy := *n return &stmtCopy } // walkStmt is part of the walkableStmt interface. -func (stmt *ControlSchedules) walkStmt(v Visitor) Statement { - sel, changed := walkStmt(v, stmt.Schedules) +func (n *ControlSchedules) walkStmt(v Visitor) Statement { + sel, changed := walkStmt(v, n.Schedules) if changed { - stmt = stmt.copyNode() - stmt.Schedules = sel.(*Select) + n = n.copyNode() + n.Schedules = sel.(*Select) } - return stmt + return n } // copyNode makes a copy of this Statement without recursing in any child Statements. @@ -1419,6 +1419,7 @@ var _ walkableStmt = &ValuesClause{} var _ walkableStmt = &CancelQueries{} var _ walkableStmt = &CancelSessions{} var _ walkableStmt = &ControlJobs{} +var _ walkableStmt = &ControlSchedules{} var _ walkableStmt = &BeginTransaction{} // walkStmt walks the entire parsed stmt calling WalkExpr on each