Skip to content

Commit

Permalink
changefeedccl: scope backfills to only the tables affected by a schem…
Browse files Browse the repository at this point in the history
…a change

Previously, if a changefeed was defined on multiple targets, backfills (if
configured) would be run for every target after a schema change on just some.
This is unnecessary work.  This patch has the kv_feed identify affected spans
after a schema change, and only kick off backfills on those spans.

Release note (bug fix): Changefeeds defined on multiple tables will now
only backfill affected tables after a schema change.
  • Loading branch information
HonoreDB committed Oct 8, 2020
1 parent 43e02ad commit a34edd7
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 3 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ func makeKVFeedCfg(
Sink: buf,
Settings: cfg.Settings,
DB: cfg.DB,
Codec: cfg.Codec,
Clock: cfg.DB.Clock(),
Gossip: cfg.Gossip,
Spans: spans,
Expand Down
52 changes: 52 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,58 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
}
}

// Test schema changes that require a backfill on only some watched tables within a changefeed.
func TestChangefeedSchemaChangeBackfillScope(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)

t.Run(`add column with default`, func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE add_column_def (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `CREATE TABLE no_def_change (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO add_column_def VALUES (1)`)
sqlDB.Exec(t, `INSERT INTO add_column_def VALUES (2)`)
sqlDB.Exec(t, `INSERT INTO no_def_change VALUES (3)`)
combinedFeed := feed(t, f, `CREATE CHANGEFEED FOR add_column_def, no_def_change WITH updated`)
defer closeFeed(t, combinedFeed)
assertPayloadsStripTs(t, combinedFeed, []string{
`add_column_def: [1]->{"after": {"a": 1}}`,
`add_column_def: [2]->{"after": {"a": 2}}`,
`no_def_change: [3]->{"after": {"a": 3}}`,
})
sqlDB.Exec(t, `ALTER TABLE add_column_def ADD COLUMN b STRING DEFAULT 'd'`)
ts := fetchDescVersionModificationTime(t, db, f, `add_column_def`, 4)
// Schema change backfill
assertPayloadsStripTs(t, combinedFeed, []string{
`add_column_def: [1]->{"after": {"a": 1}}`,
`add_column_def: [2]->{"after": {"a": 2}}`,
})
// Changefeed level backfill
assertPayloads(t, combinedFeed, []string{
fmt.Sprintf(`add_column_def: [1]->{"after": {"a": 1, "b": "d"}, "updated": "%s"}`,
ts.AsOfSystemTime()),
fmt.Sprintf(`add_column_def: [2]->{"after": {"a": 2, "b": "d"}, "updated": "%s"}`,
ts.AsOfSystemTime()),
})
})

}

t.Run(`sinkless`, sinklessTest(testFn))
t.Run(`enterprise`, enterpriseTest(testFn))
log.Flush()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData)
if err != nil {
t.Fatal(err)
}
if len(entries) > 0 {
t.Fatalf("Found violation of CDC's guarantees: %v", entries)
}
}

// fetchDescVersionModificationTime fetches the `ModificationTime` of the specified
// `version` of `tableName`'s table descriptor.
func fetchDescVersionModificationTime(
Expand Down
21 changes: 18 additions & 3 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -37,6 +38,7 @@ import (
type Config struct {
Settings *cluster.Settings
DB *kv.DB
Codec keys.SQLCodec
Clock *hlc.Clock
Gossip gossip.OptionalGossip
Spans []roachpb.Span
Expand Down Expand Up @@ -94,6 +96,7 @@ func Run(ctx context.Context, cfg Config) error {
cfg.SchemaChangeEvents, cfg.SchemaChangePolicy,
cfg.NeedsInitialScan, cfg.WithDiff,
cfg.InitialHighWater,
cfg.Codec,
sf, sc, pff, bf)
g.GoCtx(f.run)
err := g.Wait()
Expand Down Expand Up @@ -131,6 +134,7 @@ type kvFeed struct {
withInitialBackfill bool
initialHighWater hlc.Timestamp
sink EventBufferWriter
codec keys.SQLCodec

schemaChangeEvents changefeedbase.SchemaChangeEventClass
schemaChangePolicy changefeedbase.SchemaChangePolicy
Expand All @@ -149,6 +153,7 @@ func newKVFeed(
schemaChangePolicy changefeedbase.SchemaChangePolicy,
withInitialBackfill, withDiff bool,
initialHighWater hlc.Timestamp,
codec keys.SQLCodec,
tf schemaFeed,
sc kvScanner,
pff physicalFeedFactory,
Expand All @@ -162,6 +167,7 @@ func newKVFeed(
initialHighWater: initialHighWater,
schemaChangeEvents: schemaChangeEvents,
schemaChangePolicy: schemaChangePolicy,
codec: codec,
tableFeed: tf,
scanner: sc,
physicalFeed: pff,
Expand Down Expand Up @@ -212,12 +218,21 @@ func (f *kvFeed) scanIfShould(
// time with an initial backfill but if you use a cursor then you will get the
// updates after that timestamp.
isInitialScan := initialScan && f.withInitialBackfill
var spansToBackfill []roachpb.Span
if isInitialScan {
scanTime = highWater
spansToBackfill = f.spans
} else if len(events) > 0 {
// TODO(ajwerner): In this case we should only backfill for the tables
// which have events which may not be all of the targets.
// Only backfill for the tables which have events which may not be all
// of the targets.
for _, ev := range events {
tablePrefix := f.codec.TablePrefix(uint32(ev.After.ID))
tableSpan := roachpb.Span{Key: tablePrefix, EndKey: tablePrefix.PrefixEnd()}
for _, sp := range f.spans {
if tableSpan.Overlaps(sp) {
spansToBackfill = append(spansToBackfill, sp)
}
}
if !scanTime.Equal(ev.After.ModificationTime) {
log.Fatalf(ctx, "found event in shouldScan which did not occur at the scan time %v: %v",
scanTime, ev)
Expand All @@ -237,7 +252,7 @@ func (f *kvFeed) scanIfShould(
}

if err := f.scanner.Scan(ctx, f.sink, physicalConfig{
Spans: f.spans,
Spans: spansToBackfill,
Timestamp: scanTime,
WithDiff: !isInitialScan && f.withDiff,
}); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func TestKVFeed(t *testing.T) {
tc.schemaChangeEvents, tc.schemaChangePolicy,
tc.needsInitialScan, tc.withDiff,
tc.initialHighWater,
keys.SystemSQLCodec,
&tf, sf, rangefeedFactory(ref.run), bufferFactory)
ctx, cancel := context.WithCancel(context.Background())
g := ctxgroup.WithContext(ctx)
Expand Down

0 comments on commit a34edd7

Please sign in to comment.