Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
58422: changefeedccl: support primary key changes r=ajwerner a=ajwerner

This PR does a few things. One is that when errors occur due to unsupported
schema changes during the execution of a changefeed, relatively poor handling
ensues. Ideally we'd allow the changefeed to run its course right up to that
unsupported schema change's timestamp and then ensure that we persist the fact
that we've processed all of that data. That would permit a user to then restart
a changefeed after the unsupported change. There are some edge cases here worth
considering related to off-by-ones in the timestamp management. I probably
should go through that exercise before merging this PR.

The real feature this work is in support of is to allow for changefeeds to
successfully navigate changes to a primary index.

This PR works and support changes to the primary key of a table that also
include column set changes.

Release note (enterprise change): Support primary key changes in `CHANGEFEED`.

60825: kv/kvclient: fix ManualRefresh error handling r=ajwerner a=nvanbenschoten

Fixes #60760.
Fallout from #60567.

The refreshed BatchRequest was nil on the error path, which was
resulting in a nil-pointer exception. This commit fixes this by
passing the original BatchRequest to updateStateLocked, like the
TxnCoordSender normally does.

60908: Revert "vendor: bump pebble to 959663f8" r=nvanbenschoten a=nvanbenschoten

Informs #60828.

This reverts commit d8c3eef.

Co-authored-by: Andrew Werner <ajwerner@cockroachlabs.com>
Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
3 people committed Feb 22, 2021
4 parents 6a95063 + 3504342 + e5926f9 + 0b46328 commit bf9744b
Show file tree
Hide file tree
Showing 26 changed files with 1,007 additions and 508 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,8 @@ def go_deps():
name = "com_github_cockroachdb_pebble",
build_file_proto_mode = "disable_global",
importpath = "github.com/cockroachdb/pebble",
sum = "h1:H3Cj4eJkj6nqLPogxjA8VTcUTg258Ra+vG0uxJc5SBg=",
version = "v0.0.0-20210219204632-959663f8ccbf",
sum = "h1:EyzONynthydmrlGVcEiyNmbLwDejSGb9Rzyn1NcEtNw=",
version = "v0.0.0-20210217155127-444296cfa2bb",
)
go_repository(
name = "com_github_cockroachdb_redact",
Expand Down
4 changes: 2 additions & 2 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ go_repository(
name = "org_golang_x_sys",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/sys",
sum = "h1:mDSj8NPponP6fRpRDblAGl5bpSHjPulHtk5lGl0gLSY=",
version = "v0.0.0-20210219172841-57ea560cfca1",
sum = "h1:2/QtM1mL37YmcsT8HaDNHDgTqqFVw+zr8UzMiBVLzYU=",
version = "v0.0.0-20210217105451-b926d437f341",
)

go_repository(
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,4 @@ timeseries.storage.resolution_30m.ttl duration 2160h0m0s the maximum age of time
trace.debug.enable boolean false if set, traces for recent requests can be seen at https://<ui>/debug/requests
trace.lightstep.token string if set, traces go to Lightstep using this token
trace.zipkin.collector string if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set
version version 20.2-36 set the active cluster version in the format '<major>.<minor>'
version version 20.2-38 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen at https://<ui>/debug/requests</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>20.2-36</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>20.2-38</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55
github.com/cockroachdb/gostdlib v1.13.0
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f
github.com/cockroachdb/pebble v0.0.0-20210219204632-959663f8ccbf
github.com/cockroachdb/pebble v0.0.0-20210217155127-444296cfa2bb
github.com/cockroachdb/redact v1.0.9
github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd
github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2
Expand Down Expand Up @@ -154,7 +154,7 @@ require (
golang.org/x/oauth2 v0.0.0-20190115181402-5dab4167f31c
golang.org/x/perf v0.0.0-20180704124530-6e6d33e29852
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/sys v0.0.0-20210219172841-57ea560cfca1
golang.org/x/sys v0.0.0-20210217105451-b926d437f341
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1
golang.org/x/text v0.3.5
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ github.com/cockroachdb/grpc-gateway v1.14.6-0.20200519165156-52697fc4a249 h1:pZu
github.com/cockroachdb/grpc-gateway v1.14.6-0.20200519165156-52697fc4a249/go.mod h1:UJ0EZAp832vCd54Wev9N1BMKEyvcZ5+IM0AwDrnlkEc=
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f h1:o/kfcElHqOiXqcou5a3rIlMc7oJbMQkeLk0VQJ7zgqY=
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f/go.mod h1:i/u985jwjWRlyHXQbwatDASoW0RMlZ/3i9yJHE2xLkI=
github.com/cockroachdb/pebble v0.0.0-20210219204632-959663f8ccbf h1:H3Cj4eJkj6nqLPogxjA8VTcUTg258Ra+vG0uxJc5SBg=
github.com/cockroachdb/pebble v0.0.0-20210219204632-959663f8ccbf/go.mod h1:1XpB4cLQcF189RAcWi4gUc110zJgtOfT7SVNGY8sOe0=
github.com/cockroachdb/pebble v0.0.0-20210217155127-444296cfa2bb h1:EyzONynthydmrlGVcEiyNmbLwDejSGb9Rzyn1NcEtNw=
github.com/cockroachdb/pebble v0.0.0-20210217155127-444296cfa2bb/go.mod h1:1XpB4cLQcF189RAcWi4gUc110zJgtOfT7SVNGY8sOe0=
github.com/cockroachdb/redact v1.0.8 h1:8QG/764wK+vmEYoOlfobpe12EQcS81ukx/a4hdVMxNw=
github.com/cockroachdb/redact v1.0.8/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
github.com/cockroachdb/redact v1.0.9 h1:sjlUvGorKMIVQfo+w2RqDi5eewCHn453C/vdIXMzjzI=
Expand Down Expand Up @@ -901,8 +901,8 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201009025420-dfb3f7c4e634/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210219172841-57ea560cfca1 h1:mDSj8NPponP6fRpRDblAGl5bpSHjPulHtk5lGl0gLSY=
golang.org/x/sys v0.0.0-20210219172841-57ea560cfca1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210217105451-b926d437f341 h1:2/QtM1mL37YmcsT8HaDNHDgTqqFVw+zr8UzMiBVLzYU=
golang.org/x/sys v0.0.0-20210217105451-b926d437f341/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221 h1:/ZHdbVpdR/jk3g30/d4yUL0JU9kksj8+F/bnQUVLGDM=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E=
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"//pkg/ccl/changefeedccl/changefeeddist",
"//pkg/ccl/changefeedccl/kvfeed",
"//pkg/ccl/utilccl",
"//pkg/clusterversion",
"//pkg/docs",
"//pkg/featureflag",
"//pkg/geo",
Expand Down Expand Up @@ -124,6 +125,7 @@ go_test(
"//pkg/ccl/importccl",
"//pkg/ccl/storageccl",
"//pkg/ccl/utilccl",
"//pkg/clusterversion",
"//pkg/gossip",
"//pkg/jobs",
"//pkg/jobs/jobspb",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/cdctest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/timeutil",
"@com_github_cockroachdb_cockroach_go//crdb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_jackc_pgx//:pgx",
],
Expand Down
118 changes: 63 additions & 55 deletions pkg/ccl/changefeedccl/cdctest/testfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"
"time"

"github.com/cockroachdb/cockroach-go/crdb"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
Expand Down Expand Up @@ -375,8 +376,8 @@ type TableFeed struct {
jobFeed
sinkURI string

rows *gosql.Rows
seen map[string]struct{}
toSend []*TestFeedMessage
seen map[string]struct{}
}

// ResetSeen is useful when manually pausing and resuming a TableFeed.
Expand All @@ -403,71 +404,78 @@ func (c *TableFeed) Next() (*TestFeedMessage, error) {
// by repeatedly fetching and deleting all rows in the table. Then it pages
// through the results until they are empty and repeats.
for {
if c.rows != nil && c.rows.Next() {
m := &TestFeedMessage{}
var msgID int64
if err := c.rows.Scan(
&m.Topic, &m.Partition, &msgID, &m.Key, &m.Value, &m.Resolved,
); err != nil {
return nil, err
}

// Scan turns NULL bytes columns into a 0-length, non-nil byte
// array, which is pretty unexpected. Nil them out before returning.
// Either key+value or payload will be set, but not both.
if len(m.Key) > 0 || len(m.Value) > 0 {
// TODO(dan): This skips duplicates, since they're allowed by the
// semantics of our changefeeds. Now that we're switching to RangeFeed,
// this can actually happen (usually because of splits) and cause
// flakes. However, we really should be de-duping key+ts, this is too
// coarse. Fixme.
seenKey := m.Topic + m.Partition + string(m.Key) + string(m.Value)
if _, ok := c.seen[seenKey]; ok {
continue
}
c.seen[seenKey] = struct{}{}

m.Resolved = nil
return m, nil
}
m.Key, m.Value = nil, nil
return m, nil
if len(c.toSend) > 0 {
toSend := c.toSend[0]
c.toSend = c.toSend[1:]
return toSend, nil
}
if c.rows != nil {
if err := c.rows.Close(); err != nil {
return nil, err
}
c.rows = nil
}

if err := c.fetchJobError(); err != nil {
return nil, c.jobErr
}
var toSend []*TestFeedMessage
if err := crdb.ExecuteTx(context.Background(), c.db, nil, func(tx *gosql.Tx) error {

// TODO(dan): It's a bummer that this mutates the sqlsink table. I
// originally tried paging through message_id by repeatedly generating a
// new high-water with GenerateUniqueInt, but this was racy with rows
// being flushed out by the sink. An alternative is to steal the nanos
// part from `high_water_timestamp` in `crdb_internal.jobs` and run it
// through `builtins.GenerateUniqueID`, but that would mean we're only
// ever running tests on rows that have gotten a resolved timestamp,
// which seems limiting.
var err error
c.rows, err = c.db.Query(
`SELECT * FROM [DELETE FROM sqlsink RETURNING *] ORDER BY topic, partition, message_id`)
if err != nil {
// Avoid anything that might somehow look like deadlock under stressrace.
_, err := tx.Exec("SET TRANSACTION PRIORITY LOW")
if err != nil {
return err
}

toSend = nil // reset for this iteration
// TODO(dan): It's a bummer that this mutates the sqlsink table. I
// originally tried paging through message_id by repeatedly generating a
// new high-water with GenerateUniqueInt, but this was racy with rows
// being flushed out by the sink. An alternative is to steal the nanos
// part from `high_water_timestamp` in `crdb_internal.jobs` and run it
// through `builtins.GenerateUniqueID`, but that would mean we're only
// ever running tests on rows that have gotten a resolved timestamp,
// which seems limiting.
rows, err := tx.Query(
`SELECT * FROM [DELETE FROM sqlsink RETURNING *] ORDER BY topic, partition, message_id`)
if err != nil {
return err
}
for rows.Next() {

m := &TestFeedMessage{}
var msgID int64
if err := rows.Scan(
&m.Topic, &m.Partition, &msgID, &m.Key, &m.Value, &m.Resolved,
); err != nil {
return err
}

// Scan turns NULL bytes columns into a 0-length, non-nil byte
// array, which is pretty unexpected. Nil them out before returning.
// Either key+value or payload will be set, but not both.
if len(m.Key) > 0 || len(m.Value) > 0 {
// TODO(dan): This skips duplicates, since they're allowed by the
// semantics of our changefeeds. Now that we're switching to RangeFeed,
// this can actually happen (usually because of splits) and cause
// flakes. However, we really should be de-duping key+ts, this is too
// coarse. Fixme.
seenKey := m.Topic + m.Partition + string(m.Key) + string(m.Value)
if _, ok := c.seen[seenKey]; ok {
continue
}
c.seen[seenKey] = struct{}{}

m.Resolved = nil
} else {
m.Key, m.Value = nil, nil
}
toSend = append(toSend, m)
}
return rows.Err()
}); err != nil {
return nil, err
}
c.toSend = toSend
}
}

// Close implements the TestFeed interface.
func (c *TableFeed) Close() error {
if c.rows != nil {
if err := c.rows.Close(); err != nil {
return errors.Errorf(`could not close rows: %v`, err)
}
}
if _, err := c.db.Exec(`CANCEL JOB $1`, c.JobID); err != nil {
log.Infof(context.Background(), `could not cancel feed %d: %v`, c.JobID, err)
}
Expand Down
37 changes: 26 additions & 11 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,34 @@ func distChangefeedFlow(
}
}

spansTS := details.StatementTime
execCfg := execCtx.ExecCfg()
var initialHighWater hlc.Timestamp
if h := progress.GetHighWater(); h != nil && !h.IsEmpty() {
initialHighWater = *h
// If we have a high-water set, use it to compute the spans, since the
// ones at the statement time may have been garbage collected by now.
spansTS = initialHighWater
}
var trackedSpans []roachpb.Span
{
spansTS := details.StatementTime
if h := progress.GetHighWater(); h != nil && !h.IsEmpty() {
initialHighWater = *h
// If we have a high-water set, use it to compute the spans, since the
// ones at the statement time may have been garbage collected by now.
spansTS = initialHighWater
}

execCfg := execCtx.ExecCfg()
trackedSpans, err := fetchSpansForTargets(ctx, execCfg.DB, execCfg.Codec, details.Targets, spansTS)
if err != nil {
return err
// We want to fetch the target spans as of the timestamp following the
// highwater unless the highwater corresponds to a timestamp of an initial
// scan. This logic is irritatingly complex but extremely important. Namely,
// we may be here because the schema changed at the current resolved
// timestamp. However, an initial scan should be performed at exactly the
// timestamp specified; initial scans can be created at the timestamp of a
// schema change and thus should see the side-effect of the schema change.
isRestartAfterCheckpointOrNoInitialScan := progress.GetHighWater() != nil
if isRestartAfterCheckpointOrNoInitialScan {
spansTS = spansTS.Next()
}
var err error
trackedSpans, err = fetchSpansForTargets(ctx, execCfg.DB, execCfg.Codec, details.Targets, spansTS)
if err != nil {
return err
}
}

return changefeeddist.StartDistChangefeed(
Expand Down
Loading

0 comments on commit bf9744b

Please sign in to comment.