diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index d297ddf5755..3d33d3366c6 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -1445,7 +1445,7 @@ func (jd *HandleT) createDSInTx(tx *Tx, newDS dataSetT) error { // TODO : Evaluate a way to handle indexes only for particular tables if jd.tablePrefix == "rt" { - sqlStatement = fmt.Sprintf(`CREATE INDEX IF NOT EXISTS "customval_workspace_%s" ON %q (custom_val,workspace_id)`, newDS.Index, newDS.JobTable) + sqlStatement = fmt.Sprintf(`CREATE INDEX "idx_%[1]s_cv_ws" ON %[1]q (custom_val,workspace_id)`, newDS.JobTable) _, err = tx.ExecContext(context.TODO(), sqlStatement) if err != nil { return err @@ -1469,6 +1469,13 @@ func (jd *HandleT) createDSInTx(tx *Tx, newDS dataSetT) error { return err } + if _, err = tx.ExecContext(context.TODO(), fmt.Sprintf(`CREATE INDEX "idx_%[1]s_jid_id" ON %[1]q(job_id asc,id desc)`, newDS.JobStatusTable)); err != nil { + return err + } + if _, err = tx.ExecContext(context.TODO(), fmt.Sprintf(`CREATE VIEW "v_last_%[1]s" AS SELECT DISTINCT ON (job_id) * FROM %[1]q ORDER BY job_id ASC, id DESC`, newDS.JobStatusTable)); err != nil { + return err + } + err = jd.journalMarkDoneInTx(tx, opID) if err != nil { return err @@ -1569,10 +1576,10 @@ func (jd *HandleT) dropDSInTx(tx *Tx, ds dataSetT) error { if _, err = tx.Exec(fmt.Sprintf(`LOCK TABLE %q IN ACCESS EXCLUSIVE MODE;`, ds.JobTable)); err != nil { return err } - if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q`, ds.JobStatusTable)); err != nil { + if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q CASCADE`, ds.JobStatusTable)); err != nil { return err } - if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q`, ds.JobTable)); err != nil { + if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q CASCADE`, ds.JobTable)); err != nil { return err } jd.postDropDs(ds) @@ -1591,9 +1598,9 @@ func (jd *HandleT) dropDSForRecovery(ds dataSetT) { sqlStatement = fmt.Sprintf(`LOCK TABLE %q IN ACCESS EXCLUSIVE MODE;`, ds.JobTable) jd.prepareAndExecStmtInTxAllowMissing(tx, sqlStatement) - sqlStatement = fmt.Sprintf(`DROP TABLE IF EXISTS %q`, ds.JobStatusTable) + sqlStatement = fmt.Sprintf(`DROP TABLE IF EXISTS %q CASCADE`, ds.JobStatusTable) jd.prepareAndExecStmtInTx(tx, sqlStatement) - sqlStatement = fmt.Sprintf(`DROP TABLE IF EXISTS %q`, ds.JobTable) + sqlStatement = fmt.Sprintf(`DROP TABLE IF EXISTS %q CASCADE`, ds.JobTable) jd.prepareAndExecStmtInTx(tx, sqlStatement) err = tx.Commit() jd.assertError(err) @@ -1660,10 +1667,10 @@ func (jd *HandleT) mustRenameDSInTx(tx *Tx, ds dataSetT) error { return fmt.Errorf("could not rename job table %s to %s: %w", ds.JobTable, renamedJobTable, err) } if count == 0 { - if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q`, renamedJobStatusTable)); err != nil { + if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q CASCADE`, renamedJobStatusTable)); err != nil { return fmt.Errorf("could not drop empty pre_drop job status table %s: %w", renamedJobStatusTable, err) } - if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q`, renamedJobTable)); err != nil { + if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q CASCADE`, renamedJobTable)); err != nil { return fmt.Errorf("could not drop empty pre_drop job table %s: %w", renamedJobTable, err) } } @@ -1769,10 +1776,7 @@ func (jd *HandleT) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS da defer queryStat.End() compactDSQuery := fmt.Sprintf( - `with last_status as - ( - select * from %[1]q where id in (select max(id) from %[1]q group by job_id) - ), + `with last_status as (select * from "v_last_%[1]s"), inserted_jobs as ( insert into %[3]q (job_id, workspace_id, uuid, user_id, custom_val, parameters, event_payload, event_count, created_at, expire_at) @@ -2064,28 +2068,11 @@ func (jd *HandleT) GetPileUpCounts(ctx context.Context) (map[string]map[string]i j.workspace_id as workspace from %[1]q j - left join ( - select * from (select - *, - ROW_NUMBER() OVER( - PARTITION BY rs.job_id - ORDER BY - rs.id DESC - ) AS row_no - FROM - %[2]q as rs) nq1 - where - nq1.row_no = 1 - - ) s on j.job_id = s.job_id - where - ( - s.job_state not in ( - 'aborted', 'succeeded', - 'migrated' - ) - or s.job_id is null - ) + left join "v_last_%[2]s" s on j.job_id = s.job_id + where ( + s.job_state not in ('aborted', 'succeeded', 'migrated') + or s.job_id is null + ) ) select count(*), @@ -2479,7 +2466,7 @@ func (jd *HandleT) getProcessedJobsDS(ctx context.Context, ds dataSetT, params G var stateQuery string if len(stateFilters) > 0 { - stateQuery = " AND " + constructQueryOR("job_state", stateFilters) + stateQuery = " AND " + constructQueryOR("job_latest_state.job_state", stateFilters) } var filterConditions []string @@ -2516,12 +2503,9 @@ func (jd *HandleT) getProcessedJobsDS(ctx context.Context, ds dataSetT, params G job_latest_state.exec_time, job_latest_state.retry_time, job_latest_state.error_code, job_latest_state.error_response, job_latest_state.parameters FROM - %[1]q AS jobs, - (SELECT job_id, job_state, attempt, exec_time, retry_time, - error_code, error_response, parameters FROM %[2]q WHERE id IN - (SELECT MAX(id) from %[2]q GROUP BY job_id) %[3]s) - AS job_latest_state - WHERE jobs.job_id=job_latest_state.job_id + %[1]q AS jobs + JOIN "v_last_%[2]s" job_latest_state ON jobs.job_id=job_latest_state.job_id + %[3]s %[4]s AND job_latest_state.retry_time < $1 ORDER BY jobs.job_id %[5]s`, ds.JobTable, ds.JobStatusTable, stateQuery, filterQuery, limitQuery) @@ -2656,7 +2640,7 @@ func (jd *HandleT) getUnprocessedJobsDS(ctx context.Context, ds dataSetT, params ` sum(jobs.event_count) over (order by jobs.job_id asc) as running_event_counts, `+ ` sum(pg_column_size(jobs.event_payload)) over (order by jobs.job_id) as running_payload_size `+ `FROM %[1]q AS jobs `+ - `LEFT JOIN %[2]q AS job_status ON jobs.job_id=job_status.job_id `+ + `LEFT JOIN %[2]q job_status ON jobs.job_id=job_status.job_id `+ `WHERE job_status.job_id is NULL `, ds.JobTable, ds.JobStatusTable) @@ -3807,9 +3791,10 @@ func (jd *HandleT) deleteJobStatusDSInTx(txHandler transactionHandler, ds dataSe sqlFiltersString = "WHERE " + sqlFiltersString } - sqlStatement := fmt.Sprintf(`DELETE FROM %[1]q WHERE id IN - (SELECT MAX(id) from %[1]q where job_id IN (SELECT job_id from %[2]q %[3]s) GROUP BY job_id) %[4]s - AND retry_time < $1`, + sqlStatement := fmt.Sprintf(`DELETE FROM %[1]q + WHERE id = ANY( + SELECT id from "v_last_%[1]s" where job_id IN (SELECT job_id from %[2]q %[3]s) + ) %[4]s AND retry_time < $1`, ds.JobStatusTable, ds.JobTable, sqlFiltersString, stateQuery) stmt, err := txHandler.Prepare(sqlStatement) diff --git a/jobsdb/readonly_jobsdb.go b/jobsdb/readonly_jobsdb.go index 2d1f63d153f..495bc3d45ae 100644 --- a/jobsdb/readonly_jobsdb.go +++ b/jobsdb/readonly_jobsdb.go @@ -341,7 +341,7 @@ func (jd *ReadonlyHandleT) getProcessedJobsDSCount(ctx context.Context, ds dataS var stateQuery, customValQuery, sourceQuery string if len(stateFilters) > 0 { - stateQuery = " AND " + constructQueryOR("job_state", stateFilters) + stateQuery = " AND " + constructQueryOR("job_latest_state.job_state", stateFilters) } else { stateQuery = "" } @@ -393,11 +393,9 @@ func (jd *ReadonlyHandleT) getProcessedJobsDSCount(ctx context.Context, ds dataS selectColumn = fmt.Sprintf("COUNT(%[1]s.job_id)", ds.JobTable) } sqlStatement = fmt.Sprintf(`SELECT %[6]s FROM - %[1]s, - (SELECT job_id, retry_time FROM %[2]s WHERE id IN - (SELECT MAX(id) from %[2]s GROUP BY job_id) %[3]s) - AS job_latest_state - WHERE %[1]s.job_id=job_latest_state.job_id + %[1]s + JOIN "v_last_%[2]s" job_latest_state ON %[1]s.job_id=job_latest_state.job_id + %[3]s %[4]s %[5]s AND job_latest_state.retry_time < $1`, ds.JobTable, ds.JobStatusTable, stateQuery, customValQuery, sourceQuery, selectColumn) @@ -505,10 +503,8 @@ func (jd *ReadonlyHandleT) GetJobSummaryCount(arg, prefix string) (string, error %[1]s.custom_val ,%[1]s.parameters->'destination_id' as destination, job_latest_state.job_state FROM %[1]s - LEFT JOIN - (SELECT job_id, job_state, attempt, exec_time, retry_time,error_code, error_response FROM %[2]s - WHERE id IN (SELECT MAX(id) from %[2]s GROUP BY job_id)) AS job_latest_state - ON %[1]s.job_id=job_latest_state.job_id GROUP BY job_latest_state.job_state,%[1]s.parameters->'source_id',%[1]s.parameters->'destination_id', %[1]s.custom_val;`, dsPair.JobTableName, dsPair.JobStatusTableName) + LEFT JOIN "v_last_%[2]s" job_latest_state ON %[1]s.job_id=job_latest_state.job_id + GROUP BY job_latest_state.job_state, %[1]s.parameters->'source_id', %[1]s.parameters->'destination_id', %[1]s.custom_val;`, dsPair.JobTableName, dsPair.JobStatusTableName) row, err := jd.DbHandle.Query(sqlStatement) if err != nil { return "", err @@ -561,11 +557,9 @@ func (jd *ReadonlyHandleT) GetLatestFailedJobs(arg, prefix string) (string, erro sqlStatement := fmt.Sprintf(`SELECT %[1]s.job_id, %[1]s.user_id, %[1]s.custom_val, job_latest_state.exec_time, job_latest_state.error_code, job_latest_state.error_response - FROM %[1]s, - (SELECT job_id, job_state, attempt, exec_time, retry_time,error_code, error_response FROM %[2]s WHERE id IN - (SELECT MAX(id) from %[2]s GROUP BY job_id) AND (job_state = 'failed')) - AS job_latest_state - WHERE %[1]s.job_id=job_latest_state.job_id + FROM %[1]s + JOIN "v_last_%[2]s" job_latest_state ON %[1]s.job_id=job_latest_state.job_id + WHERE job_latest_state.job_state = 'failed' `, dsList.JobTableName, dsList.JobStatusTableName) if argList[1] != "" { sqlStatement = sqlStatement + fmt.Sprintf(`AND %[1]s.custom_val = '%[2]s'`, dsList.JobTableName, argList[1]) @@ -626,12 +620,7 @@ func (jd *ReadonlyHandleT) GetJobByID(job_id, _ string) (string, error) { job_latest_state.error_code, job_latest_state.error_response FROM %[1]s - LEFT JOIN - (SELECT job_id, job_state, attempt, exec_time, retry_time, - error_code, error_response FROM %[2]s WHERE id IN - (SELECT MAX(id) from %[2]s GROUP BY job_id)) - AS job_latest_state - ON %[1]s.job_id=job_latest_state.job_id + LEFT JOIN "%[2]s" job_latest_state ON %[1]s.job_id=job_latest_state.job_id WHERE %[1]s.job_id = %[3]s;`, dsPair.JobTable, dsPair.JobStatusTable, job_id) event := JobT{} diff --git a/jobsdb/unionQuery.go b/jobsdb/unionQuery.go index c62a88b7d39..dd1a7c06125 100644 --- a/jobsdb/unionQuery.go +++ b/jobsdb/unionQuery.go @@ -363,18 +363,7 @@ func (*MultiTenantHandleT) getInitialSingleWorkspaceQueryString(ds dataSetT, con job_latest_state.parameters as status_parameters FROM "%[1]s" AS jobs - LEFT JOIN ( - SELECT - job_id, job_state, attempt, exec_time, retry_time, - error_code, error_response, parameters - FROM "%[2]s" - WHERE - id IN ( - SELECT MAX(id) - from "%[2]s" - GROUP BY job_id - ) - ) AS job_latest_state ON jobs.job_id = job_latest_state.job_id + LEFT JOIN "v_last_%[2]s" job_latest_state ON jobs.job_id = job_latest_state.job_id WHERE jobs.workspace_id IN %[7]s %[3]s %[4]s %[5]s %[6]s`, ds.JobTable, ds.JobStatusTable, stateQuery, customValQuery, sourceQuery, limitQuery, workspaceString) diff --git a/sql/migrations/jobsdb/000010_add_index_job_status_table.down.tmpl b/sql/migrations/jobsdb/000010_add_index_job_status_table.down.tmpl new file mode 100644 index 00000000000..64b7d940785 --- /dev/null +++ b/sql/migrations/jobsdb/000010_add_index_job_status_table.down.tmpl @@ -0,0 +1,4 @@ +{{range .Datasets}} + DROP INDEX IF EXISTS "idx_{{$.Prefix}}_job_status_{{.}}_jid_id"; + DROP VIEW IF EXISTS "v_last_{{$.Prefix}}_job_status_{{.}}"; +{{end}} \ No newline at end of file diff --git a/sql/migrations/jobsdb/000010_add_index_job_status_table.up.tmpl b/sql/migrations/jobsdb/000010_add_index_job_status_table.up.tmpl new file mode 100644 index 00000000000..c5b48dfd862 --- /dev/null +++ b/sql/migrations/jobsdb/000010_add_index_job_status_table.up.tmpl @@ -0,0 +1,4 @@ +{{range .Datasets}} + CREATE INDEX IF NOT EXISTS "idx_{{$.Prefix}}_job_status_{{.}}_jid_id" ON "{{$.Prefix}}_job_status_{{.}}" (job_id asc,id desc); + CREATE OR REPLACE VIEW "v_last_{{$.Prefix}}_job_status_{{.}}" AS SELECT DISTINCT ON (job_id) * FROM "{{$.Prefix}}_job_status_{{.}}" ORDER BY job_id ASC,id DESC; +{{end}} \ No newline at end of file diff --git a/sql/migrations/node/000008_unionjobsdb_fn.down.sql b/sql/migrations/node/000008_unionjobsdb_fn.down.sql new file mode 100644 index 00000000000..e94a647c632 --- /dev/null +++ b/sql/migrations/node/000008_unionjobsdb_fn.down.sql @@ -0,0 +1 @@ +DROP FUNCTION IF EXISTS unionjobsdb(prefix text, num int); \ No newline at end of file diff --git a/sql/migrations/node/000008_unionjobsdb_fn.up.sql b/sql/migrations/node/000008_unionjobsdb_fn.up.sql new file mode 100644 index 00000000000..bff8d22b292 --- /dev/null +++ b/sql/migrations/node/000008_unionjobsdb_fn.up.sql @@ -0,0 +1,37 @@ +-- unionjobsdb function automatically joins datasets and returns jobs +-- along with their latest jobs status (or null) +-- +-- Parameters +-- prefix: table prefix, e.g. gw, rt, batch_rt +-- num: number of datasets to include in the query, e.g. 4 +CREATE OR REPLACE FUNCTION unionjobsdb(prefix text, num int) +RETURNS table ( + t_name text, + job_id bigint, + workspace_id text, + uuid uuid, + user_id text, + parameters jsonb, + custom_val character varying(64), + event_payload jsonb, + event_count integer, + created_at timestamp with time zone, + expire_at timestamp with time zone, + status_id bigint, + job_state character varying(64), + attempt smallint, + error_code character varying(32), + error_response jsonb +) +AS $$ +DECLARE + qry text; +BEGIN +SELECT string_agg( + format('SELECT %1$L, j.job_id, j.workspace_id, j.uuid, j.user_id, j.parameters, j.custom_val, j.event_payload, j.event_count, j.created_at, j.expire_at, latest_status.id, latest_status.job_state, latest_status.attempt, latest_status.error_code, latest_status.error_response FROM %1$I j LEFT JOIN %2$I latest_status on latest_status.job_id = j.job_id', alltables.table_name, 'v_last_' || prefix || '_job_status_'|| substring(alltables.table_name, char_length(prefix)+7,30)), + ' UNION ') INTO qry + FROM (select table_name from information_schema.tables +WHERE table_name LIKE prefix || '_jobs_%' order by table_name asc LIMIT num) alltables; +RETURN QUERY EXECUTE qry; +END; +$$ LANGUAGE plpgsql; \ No newline at end of file