Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

builtins: change complete_stream builtin to take a timestamp #60543

Merged
merged 1 commit into from
Feb 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2274,7 +2274,7 @@ The swap_ordinate_string parameter is a 2-character string naming the ordinates
<table>
<thead><tr><th>Function &rarr; Returns</th><th>Description</th></tr></thead>
<tbody>
<tr><td><a name="crdb_internal.complete_stream_ingestion_job"></a><code>crdb_internal.complete_stream_ingestion_job(job_id: <a href="int.html">int</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function can be used to signal a running stream ingestion job to complete. The job will eventually stop ingesting, revert to the latest resolved timestamp and leave the cluster in a consistent state. This function does not wait for the job to reach a terminal state, but instead returns the job id as soon as it has signaled the job to complete. This builtin can be used in conjunction with SHOW JOBS WHEN COMPLETE to ensure that the job has left the cluster in a consistent state.</p>
<tr><td><a name="crdb_internal.complete_stream_ingestion_job"></a><code>crdb_internal.complete_stream_ingestion_job(job_id: <a href="int.html">int</a>, cutover_ts: <a href="timestamp.html">timestamptz</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function can be used to signal a running stream ingestion job to complete. The job will eventually stop ingesting, revert to the specified timestamp and leave the cluster in a consistent state. The specified timestamp can only be specified up to the microsecond. This function does not wait for the job to reach a terminal state, but instead returns the job id as soon as it has signaled the job to complete. This builtin can be used in conjunction with SHOW JOBS WHEN COMPLETE to ensure that the job has left the cluster in a consistent state.</p>
</span></td></tr></tbody>
</table>

Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/ccl/storageccl",
"//pkg/ccl/storageccl/engineccl",
"//pkg/ccl/streamingccl/streamingest",
"//pkg/ccl/streamingccl/streamingutils",
"//pkg/ccl/utilccl",
"//pkg/ccl/workloadccl",
],
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/ccl_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingutils"
_ "github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/workloadccl"
)
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"//pkg/kv",
"//pkg/sql/sem/tree",
"//pkg/streaming",
"//pkg/util/hlc",
"//pkg/util/protoutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/streamingccl/streamingutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand All @@ -23,7 +24,9 @@ func init() {
streaming.CompleteIngestionHook = doCompleteIngestion
}

func doCompleteIngestion(evalCtx *tree.EvalContext, txn *kv.Txn, jobID int) error {
func doCompleteIngestion(
evalCtx *tree.EvalContext, txn *kv.Txn, jobID int, cutoverTimestamp hlc.Timestamp,
) error {
// Get the job payload for job_id.
const jobsQuery = `SELECT progress FROM system.jobs WHERE id=$1 FOR UPDATE`
row, err := evalCtx.InternalExecutor.QueryRow(evalCtx.Context,
Expand All @@ -49,7 +52,7 @@ func doCompleteIngestion(evalCtx *tree.EvalContext, txn *kv.Txn, jobID int) erro

// Update the sentinel being polled by the stream ingestion job to
// check if a complete has been signaled.
sp.StreamIngest.MarkedForCompletion = true
sp.StreamIngest.CutoverTime = cutoverTimestamp
progress.ModifiedMicros = timeutil.ToUnixMicros(txn.ReadTimestamp().GoTime())
progressBytes, err := protoutil.Marshal(progress)
if err != nil {
Expand Down
12 changes: 8 additions & 4 deletions pkg/ccl/streamingccl/streamingutils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,14 @@ func TestCutoverBuiltin(t *testing.T) {
progress := job.Progress()
sp, ok := progress.GetDetails().(*jobspb.Progress_StreamIngest)
require.True(t, ok)
require.False(t, sp.StreamIngest.MarkedForCompletion)
require.True(t, sp.StreamIngest.CutoverTime.IsEmpty())

cutoverTime := timeutil.Now()
var jobID int
err = db.QueryRowContext(
ctx,
`SELECT crdb_internal.complete_stream_ingestion_job($1)`,
*job.ID()).Scan(&jobID)
`SELECT crdb_internal.complete_stream_ingestion_job($1, $2)`,
*job.ID(), cutoverTime).Scan(&jobID)
require.NoError(t, err)
require.Equal(t, *job.ID(), int64(jobID))

Expand All @@ -73,5 +74,8 @@ func TestCutoverBuiltin(t *testing.T) {
progress = sj.Progress()
sp, ok = progress.GetDetails().(*jobspb.Progress_StreamIngest)
require.True(t, ok)
require.True(t, sp.StreamIngest.MarkedForCompletion)
// The builtin only offers microsecond precision and so we must account for
// that when comparing against our chosen time.
cutoverTime = cutoverTime.Round(time.Microsecond)
require.Equal(t, hlc.Timestamp{WallTime: cutoverTime.UnixNano()}, sp.StreamIngest.CutoverTime)
}
Loading