Skip to content

Commit

Permalink
chore(jobsdb): latest job status query optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Nov 17, 2022
1 parent ad2626f commit 7cf1360
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 77 deletions.
73 changes: 29 additions & 44 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1445,7 +1445,7 @@ func (jd *HandleT) createDSInTx(tx *Tx, newDS dataSetT) error {

// TODO : Evaluate a way to handle indexes only for particular tables
if jd.tablePrefix == "rt" {
sqlStatement = fmt.Sprintf(`CREATE INDEX IF NOT EXISTS "customval_workspace_%s" ON %q (custom_val,workspace_id)`, newDS.Index, newDS.JobTable)
sqlStatement = fmt.Sprintf(`CREATE INDEX "idx_%[1]s_cv_ws" ON %[1]q (custom_val,workspace_id)`, newDS.JobTable)
_, err = tx.ExecContext(context.TODO(), sqlStatement)
if err != nil {
return err
Expand All @@ -1469,6 +1469,13 @@ func (jd *HandleT) createDSInTx(tx *Tx, newDS dataSetT) error {
return err
}

if _, err = tx.ExecContext(context.TODO(), fmt.Sprintf(`CREATE INDEX "idx_%[1]s_jid_id" ON %[1]q(job_id asc,id desc)`, newDS.JobStatusTable)); err != nil {
return err
}
if _, err = tx.ExecContext(context.TODO(), fmt.Sprintf(`CREATE VIEW "v_last_%[1]s" AS SELECT DISTINCT ON (job_id) * FROM %[1]q ORDER BY job_id ASC, id DESC`, newDS.JobStatusTable)); err != nil {
return err
}

err = jd.journalMarkDoneInTx(tx, opID)
if err != nil {
return err
Expand Down Expand Up @@ -1569,10 +1576,10 @@ func (jd *HandleT) dropDSInTx(tx *Tx, ds dataSetT) error {
if _, err = tx.Exec(fmt.Sprintf(`LOCK TABLE %q IN ACCESS EXCLUSIVE MODE;`, ds.JobTable)); err != nil {
return err
}
if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q`, ds.JobStatusTable)); err != nil {
if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q CASCADE`, ds.JobStatusTable)); err != nil {
return err
}
if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q`, ds.JobTable)); err != nil {
if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q CASCADE`, ds.JobTable)); err != nil {
return err
}
jd.postDropDs(ds)
Expand All @@ -1591,9 +1598,9 @@ func (jd *HandleT) dropDSForRecovery(ds dataSetT) {
sqlStatement = fmt.Sprintf(`LOCK TABLE %q IN ACCESS EXCLUSIVE MODE;`, ds.JobTable)
jd.prepareAndExecStmtInTxAllowMissing(tx, sqlStatement)

sqlStatement = fmt.Sprintf(`DROP TABLE IF EXISTS %q`, ds.JobStatusTable)
sqlStatement = fmt.Sprintf(`DROP TABLE IF EXISTS %q CASCADE`, ds.JobStatusTable)
jd.prepareAndExecStmtInTx(tx, sqlStatement)
sqlStatement = fmt.Sprintf(`DROP TABLE IF EXISTS %q`, ds.JobTable)
sqlStatement = fmt.Sprintf(`DROP TABLE IF EXISTS %q CASCADE`, ds.JobTable)
jd.prepareAndExecStmtInTx(tx, sqlStatement)
err = tx.Commit()
jd.assertError(err)
Expand Down Expand Up @@ -1660,10 +1667,10 @@ func (jd *HandleT) mustRenameDSInTx(tx *Tx, ds dataSetT) error {
return fmt.Errorf("could not rename job table %s to %s: %w", ds.JobTable, renamedJobTable, err)
}
if count == 0 {
if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q`, renamedJobStatusTable)); err != nil {
if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q CASCADE`, renamedJobStatusTable)); err != nil {
return fmt.Errorf("could not drop empty pre_drop job status table %s: %w", renamedJobStatusTable, err)
}
if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q`, renamedJobTable)); err != nil {
if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q CASCADE`, renamedJobTable)); err != nil {
return fmt.Errorf("could not drop empty pre_drop job table %s: %w", renamedJobTable, err)
}
}
Expand Down Expand Up @@ -1769,10 +1776,7 @@ func (jd *HandleT) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS da
defer queryStat.End()

compactDSQuery := fmt.Sprintf(
`with last_status as
(
select * from %[1]q where id in (select max(id) from %[1]q group by job_id)
),
`with last_status as (select * from "v_last_%[1]s"),
inserted_jobs as
(
insert into %[3]q (job_id, workspace_id, uuid, user_id, custom_val, parameters, event_payload, event_count, created_at, expire_at)
Expand Down Expand Up @@ -2064,28 +2068,11 @@ func (jd *HandleT) GetPileUpCounts(ctx context.Context) (map[string]map[string]i
j.workspace_id as workspace
from
%[1]q j
left join (
select * from (select
*,
ROW_NUMBER() OVER(
PARTITION BY rs.job_id
ORDER BY
rs.id DESC
) AS row_no
FROM
%[2]q as rs) nq1
where
nq1.row_no = 1
) s on j.job_id = s.job_id
where
(
s.job_state not in (
'aborted', 'succeeded',
'migrated'
)
or s.job_id is null
)
left join "v_last_%[2]s" s on j.job_id = s.job_id
where (
s.job_state not in ('aborted', 'succeeded', 'migrated')
or s.job_id is null
)
)
select
count(*),
Expand Down Expand Up @@ -2479,7 +2466,7 @@ func (jd *HandleT) getProcessedJobsDS(ctx context.Context, ds dataSetT, params G

var stateQuery string
if len(stateFilters) > 0 {
stateQuery = " AND " + constructQueryOR("job_state", stateFilters)
stateQuery = " AND " + constructQueryOR("job_latest_state.job_state", stateFilters)
}

var filterConditions []string
Expand Down Expand Up @@ -2516,12 +2503,9 @@ func (jd *HandleT) getProcessedJobsDS(ctx context.Context, ds dataSetT, params G
job_latest_state.exec_time, job_latest_state.retry_time,
job_latest_state.error_code, job_latest_state.error_response, job_latest_state.parameters
FROM
%[1]q AS jobs,
(SELECT job_id, job_state, attempt, exec_time, retry_time,
error_code, error_response, parameters FROM %[2]q WHERE id IN
(SELECT MAX(id) from %[2]q GROUP BY job_id) %[3]s)
AS job_latest_state
WHERE jobs.job_id=job_latest_state.job_id
%[1]q AS jobs
JOIN "v_last_%[2]s" job_latest_state ON jobs.job_id=job_latest_state.job_id
%[3]s
%[4]s
AND job_latest_state.retry_time < $1 ORDER BY jobs.job_id %[5]s`,
ds.JobTable, ds.JobStatusTable, stateQuery, filterQuery, limitQuery)
Expand Down Expand Up @@ -2656,7 +2640,7 @@ func (jd *HandleT) getUnprocessedJobsDS(ctx context.Context, ds dataSetT, params
` sum(jobs.event_count) over (order by jobs.job_id asc) as running_event_counts, `+
` sum(pg_column_size(jobs.event_payload)) over (order by jobs.job_id) as running_payload_size `+
`FROM %[1]q AS jobs `+
`LEFT JOIN %[2]q AS job_status ON jobs.job_id=job_status.job_id `+
`LEFT JOIN %[2]q job_status ON jobs.job_id=job_status.job_id `+
`WHERE job_status.job_id is NULL `,
ds.JobTable, ds.JobStatusTable)

Expand Down Expand Up @@ -3807,9 +3791,10 @@ func (jd *HandleT) deleteJobStatusDSInTx(txHandler transactionHandler, ds dataSe
sqlFiltersString = "WHERE " + sqlFiltersString
}

sqlStatement := fmt.Sprintf(`DELETE FROM %[1]q WHERE id IN
(SELECT MAX(id) from %[1]q where job_id IN (SELECT job_id from %[2]q %[3]s) GROUP BY job_id) %[4]s
AND retry_time < $1`,
sqlStatement := fmt.Sprintf(`DELETE FROM %[1]q
WHERE id = ANY(
SELECT id from "v_last_%[1]s" where job_id IN (SELECT job_id from %[2]q %[3]s)
) %[4]s AND retry_time < $1`,
ds.JobStatusTable, ds.JobTable, sqlFiltersString, stateQuery)

stmt, err := txHandler.Prepare(sqlStatement)
Expand Down
31 changes: 10 additions & 21 deletions jobsdb/readonly_jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func (jd *ReadonlyHandleT) getProcessedJobsDSCount(ctx context.Context, ds dataS
var stateQuery, customValQuery, sourceQuery string

if len(stateFilters) > 0 {
stateQuery = " AND " + constructQueryOR("job_state", stateFilters)
stateQuery = " AND " + constructQueryOR("job_latest_state.job_state", stateFilters)
} else {
stateQuery = ""
}
Expand Down Expand Up @@ -393,11 +393,9 @@ func (jd *ReadonlyHandleT) getProcessedJobsDSCount(ctx context.Context, ds dataS
selectColumn = fmt.Sprintf("COUNT(%[1]s.job_id)", ds.JobTable)
}
sqlStatement = fmt.Sprintf(`SELECT %[6]s FROM
%[1]s,
(SELECT job_id, retry_time FROM %[2]s WHERE id IN
(SELECT MAX(id) from %[2]s GROUP BY job_id) %[3]s)
AS job_latest_state
WHERE %[1]s.job_id=job_latest_state.job_id
%[1]s
JOIN "v_last_%[2]s" job_latest_state ON %[1]s.job_id=job_latest_state.job_id
%[3]s
%[4]s %[5]s
AND job_latest_state.retry_time < $1`,
ds.JobTable, ds.JobStatusTable, stateQuery, customValQuery, sourceQuery, selectColumn)
Expand Down Expand Up @@ -505,10 +503,8 @@ func (jd *ReadonlyHandleT) GetJobSummaryCount(arg, prefix string) (string, error
%[1]s.custom_val ,%[1]s.parameters->'destination_id' as destination,
job_latest_state.job_state
FROM %[1]s
LEFT JOIN
(SELECT job_id, job_state, attempt, exec_time, retry_time,error_code, error_response FROM %[2]s
WHERE id IN (SELECT MAX(id) from %[2]s GROUP BY job_id)) AS job_latest_state
ON %[1]s.job_id=job_latest_state.job_id GROUP BY job_latest_state.job_state,%[1]s.parameters->'source_id',%[1]s.parameters->'destination_id', %[1]s.custom_val;`, dsPair.JobTableName, dsPair.JobStatusTableName)
LEFT JOIN "v_last_%[2]s" job_latest_state ON %[1]s.job_id=job_latest_state.job_id
GROUP BY job_latest_state.job_state, %[1]s.parameters->'source_id', %[1]s.parameters->'destination_id', %[1]s.custom_val;`, dsPair.JobTableName, dsPair.JobStatusTableName)
row, err := jd.DbHandle.Query(sqlStatement)
if err != nil {
return "", err
Expand Down Expand Up @@ -561,11 +557,9 @@ func (jd *ReadonlyHandleT) GetLatestFailedJobs(arg, prefix string) (string, erro
sqlStatement := fmt.Sprintf(`SELECT %[1]s.job_id, %[1]s.user_id, %[1]s.custom_val,
job_latest_state.exec_time,
job_latest_state.error_code, job_latest_state.error_response
FROM %[1]s,
(SELECT job_id, job_state, attempt, exec_time, retry_time,error_code, error_response FROM %[2]s WHERE id IN
(SELECT MAX(id) from %[2]s GROUP BY job_id) AND (job_state = 'failed'))
AS job_latest_state
WHERE %[1]s.job_id=job_latest_state.job_id
FROM %[1]s
JOIN "v_last_%[2]s" job_latest_state ON %[1]s.job_id=job_latest_state.job_id
WHERE job_latest_state.job_state = 'failed'
`, dsList.JobTableName, dsList.JobStatusTableName)
if argList[1] != "" {
sqlStatement = sqlStatement + fmt.Sprintf(`AND %[1]s.custom_val = '%[2]s'`, dsList.JobTableName, argList[1])
Expand Down Expand Up @@ -626,12 +620,7 @@ func (jd *ReadonlyHandleT) GetJobByID(job_id, _ string) (string, error) {
job_latest_state.error_code, job_latest_state.error_response
FROM
%[1]s
LEFT JOIN
(SELECT job_id, job_state, attempt, exec_time, retry_time,
error_code, error_response FROM %[2]s WHERE id IN
(SELECT MAX(id) from %[2]s GROUP BY job_id))
AS job_latest_state
ON %[1]s.job_id=job_latest_state.job_id
LEFT JOIN "%[2]s" job_latest_state ON %[1]s.job_id=job_latest_state.job_id
WHERE %[1]s.job_id = %[3]s;`, dsPair.JobTable, dsPair.JobStatusTable, job_id)

event := JobT{}
Expand Down
13 changes: 1 addition & 12 deletions jobsdb/unionQuery.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,18 +363,7 @@ func (*MultiTenantHandleT) getInitialSingleWorkspaceQueryString(ds dataSetT, con
job_latest_state.parameters as status_parameters
FROM
"%[1]s" AS jobs
LEFT JOIN (
SELECT
job_id, job_state, attempt, exec_time, retry_time,
error_code, error_response, parameters
FROM "%[2]s"
WHERE
id IN (
SELECT MAX(id)
from "%[2]s"
GROUP BY job_id
)
) AS job_latest_state ON jobs.job_id = job_latest_state.job_id
LEFT JOIN "v_last_%[2]s" job_latest_state ON jobs.job_id = job_latest_state.job_id
WHERE
jobs.workspace_id IN %[7]s %[3]s %[4]s %[5]s %[6]s`,
ds.JobTable, ds.JobStatusTable, stateQuery, customValQuery, sourceQuery, limitQuery, workspaceString)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{{range .Datasets}}
DROP INDEX IF EXISTS "idx_{{$.Prefix}}_job_status_{{.}}_jid_id";
DROP VIEW IF EXISTS "v_last_{{$.Prefix}}_job_status_{{.}}";
{{end}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{{range .Datasets}}
CREATE INDEX IF NOT EXISTS "idx_{{$.Prefix}}_job_status_{{.}}_jid_id" ON "{{$.Prefix}}_job_status_{{.}}" (job_id asc,id desc);
CREATE OR REPLACE VIEW "v_last_{{$.Prefix}}_job_status_{{.}}" AS SELECT DISTINCT ON (job_id) * FROM "{{$.Prefix}}_job_status_{{.}}" ORDER BY job_id ASC,id DESC;
{{end}}
1 change: 1 addition & 0 deletions sql/migrations/node/000008_unionjobsdb_fn.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP FUNCTION IF EXISTS unionjobsdb(prefix text, num int);
37 changes: 37 additions & 0 deletions sql/migrations/node/000008_unionjobsdb_fn.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
-- unionjobsdb function automatically joins datasets and returns jobs
-- along with their latest jobs status (or null)
--
-- Parameters
-- prefix: table prefix, e.g. gw, rt, batch_rt
-- num: number of datasets to include in the query, e.g. 4
CREATE OR REPLACE FUNCTION unionjobsdb(prefix text, num int)
RETURNS table (
t_name text,
job_id bigint,
workspace_id text,
uuid uuid,
user_id text,
parameters jsonb,
custom_val character varying(64),
event_payload jsonb,
event_count integer,
created_at timestamp with time zone,
expire_at timestamp with time zone,
status_id bigint,
job_state character varying(64),
attempt smallint,
error_code character varying(32),
error_response jsonb
)
AS $$
DECLARE
qry text;
BEGIN
SELECT string_agg(
format('SELECT %1$L, j.job_id, j.workspace_id, j.uuid, j.user_id, j.parameters, j.custom_val, j.event_payload, j.event_count, j.created_at, j.expire_at, latest_status.id, latest_status.job_state, latest_status.attempt, latest_status.error_code, latest_status.error_response FROM %1$I j LEFT JOIN %2$I latest_status on latest_status.job_id = j.job_id', alltables.table_name, 'v_last_' || prefix || '_job_status_'|| substring(alltables.table_name, char_length(prefix)+7,30)),
' UNION ') INTO qry
FROM (select table_name from information_schema.tables
WHERE table_name LIKE prefix || '_jobs_%' order by table_name asc LIMIT num) alltables;
RETURN QUERY EXECUTE qry;
END;
$$ LANGUAGE plpgsql;

0 comments on commit 7cf1360

Please sign in to comment.