Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(jobsdb): latest job status query optimization #2693

Merged
merged 3 commits into from
Nov 21, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 41 additions & 51 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,16 @@ func (jd *HandleT) init() {
jd.dbHandle = sqlDB
}
jd.workersAndAuxSetup()

// Database schema migration should happen early, even before jobsdb is started,
// so that we can be sure that all the necessary tables are created and considered to be in
// the latest schema version, before rudder-migrator starts introducing new tables.
jd.dsListLock.WithLock(func(l lock.LockToken) {
switch jd.ownerType {
case Write, ReadWrite:
jd.setupDatabaseTables(l, jd.clearAll)
Copy link
Contributor Author

@atzoum atzoum Nov 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: all gateways will need to be upgraded to the latest version so that the necessary schema migration will be performed on gw datasets.

}
})
}

func (jd *HandleT) workersAndAuxSetup() {
Expand Down Expand Up @@ -898,24 +908,19 @@ func (jd *HandleT) Start() error {
jd.backgroundGroup = g

if !jd.skipSetupDBSetup {
jd.setUpForOwnerType(ctx, jd.ownerType, jd.clearAll)

// Avoid clearing the database, if .Start() is called again.
jd.clearAll = false
jd.setUpForOwnerType(ctx, jd.ownerType)
}
return nil
}

func (jd *HandleT) setUpForOwnerType(ctx context.Context, ownerType OwnerType, clearAll bool) {
func (jd *HandleT) setUpForOwnerType(ctx context.Context, ownerType OwnerType) {
jd.dsListLock.WithLock(func(l lock.LockToken) {
switch ownerType {
case Read:
jd.readerSetup(ctx, l)
case Write:
jd.setupDatabaseTables(l, clearAll)
jd.writerSetup(ctx, l)
case ReadWrite:
jd.setupDatabaseTables(l, clearAll)
jd.readerWriterSetup(ctx, l)
}
})
Expand Down Expand Up @@ -1445,7 +1450,7 @@ func (jd *HandleT) createDSInTx(tx *Tx, newDS dataSetT) error {

// TODO : Evaluate a way to handle indexes only for particular tables
if jd.tablePrefix == "rt" {
sqlStatement = fmt.Sprintf(`CREATE INDEX IF NOT EXISTS "customval_workspace_%s" ON %q (custom_val,workspace_id)`, newDS.Index, newDS.JobTable)
sqlStatement = fmt.Sprintf(`CREATE INDEX "idx_%[1]s_cv_ws" ON %[1]q (custom_val,workspace_id)`, newDS.JobTable)
_, err = tx.ExecContext(context.TODO(), sqlStatement)
if err != nil {
return err
Expand All @@ -1469,6 +1474,13 @@ func (jd *HandleT) createDSInTx(tx *Tx, newDS dataSetT) error {
return err
}

if _, err = tx.ExecContext(context.TODO(), fmt.Sprintf(`CREATE INDEX "idx_%[1]s_jid_id" ON %[1]q(job_id asc,id desc)`, newDS.JobStatusTable)); err != nil {
return err
}
if _, err = tx.ExecContext(context.TODO(), fmt.Sprintf(`CREATE VIEW "v_last_%[1]s" AS SELECT DISTINCT ON (job_id) * FROM %[1]q ORDER BY job_id ASC, id DESC`, newDS.JobStatusTable)); err != nil {
return err
}

err = jd.journalMarkDoneInTx(tx, opID)
if err != nil {
return err
Expand Down Expand Up @@ -1569,10 +1581,10 @@ func (jd *HandleT) dropDSInTx(tx *Tx, ds dataSetT) error {
if _, err = tx.Exec(fmt.Sprintf(`LOCK TABLE %q IN ACCESS EXCLUSIVE MODE;`, ds.JobTable)); err != nil {
return err
}
if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q`, ds.JobStatusTable)); err != nil {
if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q CASCADE`, ds.JobStatusTable)); err != nil {
return err
}
if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q`, ds.JobTable)); err != nil {
if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q CASCADE`, ds.JobTable)); err != nil {
return err
}
jd.postDropDs(ds)
Expand All @@ -1591,9 +1603,9 @@ func (jd *HandleT) dropDSForRecovery(ds dataSetT) {
sqlStatement = fmt.Sprintf(`LOCK TABLE %q IN ACCESS EXCLUSIVE MODE;`, ds.JobTable)
jd.prepareAndExecStmtInTxAllowMissing(tx, sqlStatement)

sqlStatement = fmt.Sprintf(`DROP TABLE IF EXISTS %q`, ds.JobStatusTable)
sqlStatement = fmt.Sprintf(`DROP TABLE IF EXISTS %q CASCADE`, ds.JobStatusTable)
jd.prepareAndExecStmtInTx(tx, sqlStatement)
sqlStatement = fmt.Sprintf(`DROP TABLE IF EXISTS %q`, ds.JobTable)
sqlStatement = fmt.Sprintf(`DROP TABLE IF EXISTS %q CASCADE`, ds.JobTable)
jd.prepareAndExecStmtInTx(tx, sqlStatement)
err = tx.Commit()
jd.assertError(err)
Expand Down Expand Up @@ -1660,10 +1672,10 @@ func (jd *HandleT) mustRenameDSInTx(tx *Tx, ds dataSetT) error {
return fmt.Errorf("could not rename job table %s to %s: %w", ds.JobTable, renamedJobTable, err)
}
if count == 0 {
if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q`, renamedJobStatusTable)); err != nil {
if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q CASCADE`, renamedJobStatusTable)); err != nil {
return fmt.Errorf("could not drop empty pre_drop job status table %s: %w", renamedJobStatusTable, err)
}
if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q`, renamedJobTable)); err != nil {
if _, err = tx.Exec(fmt.Sprintf(`DROP TABLE %q CASCADE`, renamedJobTable)); err != nil {
return fmt.Errorf("could not drop empty pre_drop job table %s: %w", renamedJobTable, err)
}
}
Expand Down Expand Up @@ -1769,10 +1781,7 @@ func (jd *HandleT) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS da
defer queryStat.End()

compactDSQuery := fmt.Sprintf(
`with last_status as
(
select * from %[1]q where id in (select max(id) from %[1]q group by job_id)
),
`with last_status as (select * from "v_last_%[1]s"),
inserted_jobs as
(
insert into %[3]q (job_id, workspace_id, uuid, user_id, custom_val, parameters, event_payload, event_count, created_at, expire_at)
Expand Down Expand Up @@ -2064,28 +2073,11 @@ func (jd *HandleT) GetPileUpCounts(ctx context.Context) (map[string]map[string]i
j.workspace_id as workspace
from
%[1]q j
left join (
select * from (select
*,
ROW_NUMBER() OVER(
PARTITION BY rs.job_id
ORDER BY
rs.id DESC
) AS row_no
FROM
%[2]q as rs) nq1
where
nq1.row_no = 1

) s on j.job_id = s.job_id
where
(
s.job_state not in (
'aborted', 'succeeded',
'migrated'
)
or s.job_id is null
)
left join "v_last_%[2]s" s on j.job_id = s.job_id
where (
s.job_state not in ('aborted', 'succeeded', 'migrated')
or s.job_id is null
)
)
select
count(*),
Expand Down Expand Up @@ -2479,7 +2471,7 @@ func (jd *HandleT) getProcessedJobsDS(ctx context.Context, ds dataSetT, params G

var stateQuery string
if len(stateFilters) > 0 {
stateQuery = " AND " + constructQueryOR("job_state", stateFilters)
stateQuery = " AND " + constructQueryOR("job_latest_state.job_state", stateFilters)
}

var filterConditions []string
Expand Down Expand Up @@ -2516,12 +2508,9 @@ func (jd *HandleT) getProcessedJobsDS(ctx context.Context, ds dataSetT, params G
job_latest_state.exec_time, job_latest_state.retry_time,
job_latest_state.error_code, job_latest_state.error_response, job_latest_state.parameters
FROM
%[1]q AS jobs,
(SELECT job_id, job_state, attempt, exec_time, retry_time,
error_code, error_response, parameters FROM %[2]q WHERE id IN
(SELECT MAX(id) from %[2]q GROUP BY job_id) %[3]s)
AS job_latest_state
WHERE jobs.job_id=job_latest_state.job_id
%[1]q AS jobs
JOIN "v_last_%[2]s" job_latest_state ON jobs.job_id=job_latest_state.job_id
%[3]s
%[4]s
AND job_latest_state.retry_time < $1 ORDER BY jobs.job_id %[5]s`,
ds.JobTable, ds.JobStatusTable, stateQuery, filterQuery, limitQuery)
Expand Down Expand Up @@ -2656,7 +2645,7 @@ func (jd *HandleT) getUnprocessedJobsDS(ctx context.Context, ds dataSetT, params
` sum(jobs.event_count) over (order by jobs.job_id asc) as running_event_counts, `+
` sum(pg_column_size(jobs.event_payload)) over (order by jobs.job_id) as running_payload_size `+
`FROM %[1]q AS jobs `+
`LEFT JOIN %[2]q AS job_status ON jobs.job_id=job_status.job_id `+
`LEFT JOIN %[2]q job_status ON jobs.job_id=job_status.job_id `+
`WHERE job_status.job_id is NULL `,
ds.JobTable, ds.JobStatusTable)

Expand Down Expand Up @@ -3807,9 +3796,10 @@ func (jd *HandleT) deleteJobStatusDSInTx(txHandler transactionHandler, ds dataSe
sqlFiltersString = "WHERE " + sqlFiltersString
}

sqlStatement := fmt.Sprintf(`DELETE FROM %[1]q WHERE id IN
(SELECT MAX(id) from %[1]q where job_id IN (SELECT job_id from %[2]q %[3]s) GROUP BY job_id) %[4]s
AND retry_time < $1`,
sqlStatement := fmt.Sprintf(`DELETE FROM %[1]q
WHERE id = ANY(
SELECT id from "v_last_%[1]s" where job_id IN (SELECT job_id from %[2]q %[3]s)
) %[4]s AND retry_time < $1`,
ds.JobStatusTable, ds.JobTable, sqlFiltersString, stateQuery)

stmt, err := txHandler.Prepare(sqlStatement)
Expand Down
31 changes: 10 additions & 21 deletions jobsdb/readonly_jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func (jd *ReadonlyHandleT) getProcessedJobsDSCount(ctx context.Context, ds dataS
var stateQuery, customValQuery, sourceQuery string

if len(stateFilters) > 0 {
stateQuery = " AND " + constructQueryOR("job_state", stateFilters)
stateQuery = " AND " + constructQueryOR("job_latest_state.job_state", stateFilters)
} else {
stateQuery = ""
}
Expand Down Expand Up @@ -393,11 +393,9 @@ func (jd *ReadonlyHandleT) getProcessedJobsDSCount(ctx context.Context, ds dataS
selectColumn = fmt.Sprintf("COUNT(%[1]s.job_id)", ds.JobTable)
}
sqlStatement = fmt.Sprintf(`SELECT %[6]s FROM
%[1]s,
(SELECT job_id, retry_time FROM %[2]s WHERE id IN
(SELECT MAX(id) from %[2]s GROUP BY job_id) %[3]s)
AS job_latest_state
WHERE %[1]s.job_id=job_latest_state.job_id
%[1]s
JOIN "v_last_%[2]s" job_latest_state ON %[1]s.job_id=job_latest_state.job_id
%[3]s
%[4]s %[5]s
AND job_latest_state.retry_time < $1`,
ds.JobTable, ds.JobStatusTable, stateQuery, customValQuery, sourceQuery, selectColumn)
Expand Down Expand Up @@ -505,10 +503,8 @@ func (jd *ReadonlyHandleT) GetJobSummaryCount(arg, prefix string) (string, error
%[1]s.custom_val ,%[1]s.parameters->'destination_id' as destination,
job_latest_state.job_state
FROM %[1]s
LEFT JOIN
(SELECT job_id, job_state, attempt, exec_time, retry_time,error_code, error_response FROM %[2]s
WHERE id IN (SELECT MAX(id) from %[2]s GROUP BY job_id)) AS job_latest_state
ON %[1]s.job_id=job_latest_state.job_id GROUP BY job_latest_state.job_state,%[1]s.parameters->'source_id',%[1]s.parameters->'destination_id', %[1]s.custom_val;`, dsPair.JobTableName, dsPair.JobStatusTableName)
LEFT JOIN "v_last_%[2]s" job_latest_state ON %[1]s.job_id=job_latest_state.job_id
GROUP BY job_latest_state.job_state, %[1]s.parameters->'source_id', %[1]s.parameters->'destination_id', %[1]s.custom_val;`, dsPair.JobTableName, dsPair.JobStatusTableName)
row, err := jd.DbHandle.Query(sqlStatement)
if err != nil {
return "", err
Expand Down Expand Up @@ -561,11 +557,9 @@ func (jd *ReadonlyHandleT) GetLatestFailedJobs(arg, prefix string) (string, erro
sqlStatement := fmt.Sprintf(`SELECT %[1]s.job_id, %[1]s.user_id, %[1]s.custom_val,
job_latest_state.exec_time,
job_latest_state.error_code, job_latest_state.error_response
FROM %[1]s,
(SELECT job_id, job_state, attempt, exec_time, retry_time,error_code, error_response FROM %[2]s WHERE id IN
(SELECT MAX(id) from %[2]s GROUP BY job_id) AND (job_state = 'failed'))
AS job_latest_state
WHERE %[1]s.job_id=job_latest_state.job_id
FROM %[1]s
JOIN "v_last_%[2]s" job_latest_state ON %[1]s.job_id=job_latest_state.job_id
WHERE job_latest_state.job_state = 'failed'
`, dsList.JobTableName, dsList.JobStatusTableName)
if argList[1] != "" {
sqlStatement = sqlStatement + fmt.Sprintf(`AND %[1]s.custom_val = '%[2]s'`, dsList.JobTableName, argList[1])
Expand Down Expand Up @@ -626,12 +620,7 @@ func (jd *ReadonlyHandleT) GetJobByID(job_id, _ string) (string, error) {
job_latest_state.error_code, job_latest_state.error_response
FROM
%[1]s
LEFT JOIN
(SELECT job_id, job_state, attempt, exec_time, retry_time,
error_code, error_response FROM %[2]s WHERE id IN
(SELECT MAX(id) from %[2]s GROUP BY job_id))
AS job_latest_state
ON %[1]s.job_id=job_latest_state.job_id
LEFT JOIN "%[2]s" job_latest_state ON %[1]s.job_id=job_latest_state.job_id
atzoum marked this conversation as resolved.
Show resolved Hide resolved
WHERE %[1]s.job_id = %[3]s;`, dsPair.JobTable, dsPair.JobStatusTable, job_id)

event := JobT{}
Expand Down
13 changes: 1 addition & 12 deletions jobsdb/unionQuery.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,18 +363,7 @@ func (*MultiTenantHandleT) getInitialSingleWorkspaceQueryString(ds dataSetT, con
job_latest_state.parameters as status_parameters
FROM
"%[1]s" AS jobs
LEFT JOIN (
SELECT
job_id, job_state, attempt, exec_time, retry_time,
error_code, error_response, parameters
FROM "%[2]s"
WHERE
id IN (
SELECT MAX(id)
from "%[2]s"
GROUP BY job_id
)
) AS job_latest_state ON jobs.job_id = job_latest_state.job_id
LEFT JOIN "v_last_%[2]s" job_latest_state ON jobs.job_id = job_latest_state.job_id
WHERE
jobs.workspace_id IN %[7]s %[3]s %[4]s %[5]s %[6]s`,
ds.JobTable, ds.JobStatusTable, stateQuery, customValQuery, sourceQuery, limitQuery, workspaceString)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{{range .Datasets}}
DROP INDEX IF EXISTS "idx_{{$.Prefix}}_job_status_{{.}}_jid_id";
DROP VIEW IF EXISTS "v_last_{{$.Prefix}}_job_status_{{.}}";
{{end}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{{range .Datasets}}
CREATE INDEX IF NOT EXISTS "idx_{{$.Prefix}}_job_status_{{.}}_jid_id" ON "{{$.Prefix}}_job_status_{{.}}" (job_id asc,id desc);
CREATE OR REPLACE VIEW "v_last_{{$.Prefix}}_job_status_{{.}}" AS SELECT DISTINCT ON (job_id) * FROM "{{$.Prefix}}_job_status_{{.}}" ORDER BY job_id ASC,id DESC;
{{end}}
1 change: 1 addition & 0 deletions sql/migrations/node/000008_unionjobsdb_fn.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP FUNCTION IF EXISTS unionjobsdb(prefix text, num int);
37 changes: 37 additions & 0 deletions sql/migrations/node/000008_unionjobsdb_fn.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
-- unionjobsdb function automatically joins datasets and returns jobs
-- along with their latest jobs status (or null)
--
-- Parameters
-- prefix: table prefix, e.g. gw, rt, batch_rt
-- num: number of datasets to include in the query, e.g. 4
CREATE OR REPLACE FUNCTION unionjobsdb(prefix text, num int)
RETURNS table (
t_name text,
job_id bigint,
workspace_id text,
uuid uuid,
user_id text,
parameters jsonb,
custom_val character varying(64),
event_payload jsonb,
event_count integer,
created_at timestamp with time zone,
expire_at timestamp with time zone,
status_id bigint,
job_state character varying(64),
attempt smallint,
error_code character varying(32),
error_response jsonb
)
AS $$
DECLARE
qry text;
BEGIN
SELECT string_agg(
format('SELECT %1$L, j.job_id, j.workspace_id, j.uuid, j.user_id, j.parameters, j.custom_val, j.event_payload, j.event_count, j.created_at, j.expire_at, latest_status.id, latest_status.job_state, latest_status.attempt, latest_status.error_code, latest_status.error_response FROM %1$I j LEFT JOIN %2$I latest_status on latest_status.job_id = j.job_id', alltables.table_name, 'v_last_' || prefix || '_job_status_'|| substring(alltables.table_name, char_length(prefix)+7,30)),
' UNION ') INTO qry
FROM (select table_name from information_schema.tables
WHERE table_name LIKE prefix || '_jobs_%' order by table_name asc LIMIT num) alltables;
RETURN QUERY EXECUTE qry;
END;
$$ LANGUAGE plpgsql;