-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
utils.go
79 lines (71 loc) · 1.95 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package jobs
import (
"context"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/errors"
)
// RunningJobExists checks that whether there are any other jobs (matched by
// payloadPredicate callback) in the pending, running, or paused status that
// started earlier than the job with provided jobID.
func RunningJobExists(
ctx context.Context,
jobID jobspb.JobID,
ie sqlutil.InternalExecutor,
txn *kv.Txn,
payloadPredicate func(payload *jobspb.Payload) bool,
) (exists bool, retErr error) {
const stmt = `
SELECT
id, payload
FROM
system.jobs
WHERE
status IN ($1, $2, $3, $4, $5, $6)
ORDER BY created`
it, err := ie.QueryIterator(
ctx,
"get-jobs",
txn,
stmt,
StatusPending,
StatusRunning,
StatusPaused,
StatusCancelRequested,
StatusPauseRequested,
StatusReverting,
)
if err != nil {
return false /* exists */, err
}
// We have to make sure to close the iterator since we might return from the
// for loop early (before Next() returns false).
defer func() { retErr = errors.CombineErrors(retErr, it.Close()) }()
var ok bool
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
row := it.Cur()
payload, err := UnmarshalPayload(row[1])
if err != nil {
return false /* exists */, err
}
if payloadPredicate(payload) {
id := jobspb.JobID(*row[0].(*tree.DInt))
if id == jobID {
break
}
return true /* exists */, nil /* retErr */
}
}
return false /* exists */, err
}