Skip to content

Commit

Permalink
Merge #80499
Browse files Browse the repository at this point in the history
80499:  changfeedccl: Support filter over primary key span. r=miretskiy a=miretskiy

Extend changfeeds to support a filtering expression over primary key
span via `primary_key_filter` option.

When specified, this option is treated as an SQL filter expression
over primary key columns.  This filter can then be used to constrain
the spans that's watched by changefeed only to those that satisfy
the expression.  The use of this filter restricted only to the
changefeeds over a single table.

Release Notes (enterprise change): `primary_filter_filter` option can be used
to restrict the span watched by changefeed only to the portion that
satisfies the filtering predicate.

Co-authored-by: Yevgeniy Miretskiy <yevgeniy@cockroachlabs.com>
  • Loading branch information
craig[bot] and Yevgeniy Miretskiy committed Apr 29, 2022
2 parents 41e2600 + e862248 commit 97e72ba
Show file tree
Hide file tree
Showing 14 changed files with 623 additions and 35 deletions.
39 changes: 13 additions & 26 deletions pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,7 @@ func generateNewTargets(
for _, targetDesc := range newTableDescs {
existingTargetDescs = append(existingTargetDescs, targetDesc)
}
existingTargetSpans, err := fetchSpansForDescs(ctx, p, opts, statementTime, existingTargetDescs)
if err != nil {
return nil, nil, hlc.Timestamp{}, nil, err
}

existingTargetSpans := fetchSpansForDescs(p, existingTargetDescs)
var newTargetDescs []catalog.Descriptor
for _, target := range v.Targets {
desc, found, err := getTargetDesc(ctx, p, descResolver, target.TableName)
Expand All @@ -415,10 +411,7 @@ func generateNewTargets(
newTargetDescs = append(newTargetDescs, desc)
}

addedTargetSpans, err := fetchSpansForDescs(ctx, p, opts, statementTime, newTargetDescs)
if err != nil {
return nil, nil, hlc.Timestamp{}, nil, err
}
addedTargetSpans := fetchSpansForDescs(p, newTargetDescs)

// By default, we will not perform an initial scan on newly added
// targets. Hence, the user must explicitly state that they want an
Expand Down Expand Up @@ -484,10 +477,7 @@ func generateNewTargets(
}
}
if len(droppedTargetDescs) > 0 {
droppedTargetSpans, err := fetchSpansForDescs(ctx, p, opts, statementTime, droppedTargetDescs)
if err != nil {
return nil, nil, hlc.Timestamp{}, nil, err
}
droppedTargetSpans := fetchSpansForDescs(p, droppedTargetDescs)
removeSpansFromProgress(newJobProgress, droppedTargetSpans)
}
}
Expand Down Expand Up @@ -670,20 +660,17 @@ func removeSpansFromProgress(prevProgress jobspb.Progress, spansToRemove []roach
}

func fetchSpansForDescs(
ctx context.Context,
p sql.PlanHookState,
opts map[string]string,
statementTime hlc.Timestamp,
descs []catalog.Descriptor,
) ([]roachpb.Span, error) {
targets := make([]jobspb.ChangefeedTargetSpecification, len(descs))
for i, d := range descs {
targets[i] = jobspb.ChangefeedTargetSpecification{TableID: d.GetID()}
p sql.PlanHookState, descs []catalog.Descriptor,
) (primarySpans []roachpb.Span) {
seen := make(map[descpb.ID]struct{})
for _, d := range descs {
if _, isDup := seen[d.GetID()]; isDup {
continue
}
seen[d.GetID()] = struct{}{}
primarySpans = append(primarySpans, d.(catalog.TableDescriptor).PrimaryIndexSpan(p.ExtendedEvalContext().Codec))
}

spans, err := fetchSpansForTargets(ctx, p.ExecCfg(), targets, statementTime)

return spans, err
return primarySpans
}

func getPrevOpts(prevDescription string, opts map[string]string) (map[string]string, error) {
Expand Down
71 changes: 71 additions & 0 deletions pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1277,3 +1277,74 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {

t.Run(`kafka`, kafkaTest(testFn, feedTestNoTenants))
}

func TestAlterChangefeedUpdateFilter(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
defer closeFeed(t, testFeed)

sqlDB.Exec(t, `INSERT INTO foo SELECT *, 'initial' FROM generate_series(1, 5)`)
assertPayloads(t, testFeed, []string{
`foo: [1]->{"after": {"a": 1, "b": "initial"}}`,
`foo: [2]->{"after": {"a": 2, "b": "initial"}}`,
`foo: [3]->{"after": {"a": 3, "b": "initial"}}`,
`foo: [4]->{"after": {"a": 4, "b": "initial"}}`,
`foo: [5]->{"after": {"a": 5, "b": "initial"}}`,
})

feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
require.True(t, ok)

require.NoError(t, feed.TickHighWaterMark(f.Server().Clock().Now()))
require.NoError(t, feed.Pause())

// Try to set an invalid filter (column b is not part of primary key).
sqlDB.ExpectErr(t, "cannot be fully constrained",
fmt.Sprintf(`ALTER CHANGEFEED %d SET schema_change_policy='stop', primary_key_filter='b IS NULL'`, feed.JobID()))

// Set filter to emit a > 4. We expect to see update row 5, and onward.
sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d SET schema_change_policy='stop', primary_key_filter='a > 4'`, feed.JobID()))
require.NoError(t, feed.Resume())

// Upsert 10 new values -- we expect to see only 5-10
sqlDB.Exec(t, `UPSERT INTO foo SELECT *, 'updated' FROM generate_series(1, 10)`)
assertPayloads(t, testFeed, []string{
`foo: [5]->{"after": {"a": 5, "b": "updated"}}`,
`foo: [6]->{"after": {"a": 6, "b": "updated"}}`,
`foo: [7]->{"after": {"a": 7, "b": "updated"}}`,
`foo: [8]->{"after": {"a": 8, "b": "updated"}}`,
`foo: [9]->{"after": {"a": 9, "b": "updated"}}`,
`foo: [10]->{"after": {"a": 10, "b": "updated"}}`,
})

// Pause again, clear out filter and verify we get expected values.
require.NoError(t, feed.TickHighWaterMark(f.Server().Clock().Now()))
require.NoError(t, feed.Pause())

// Set filter to emit a > 4. We expect to see update row 5, and onward.
sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d UNSET primary_key_filter`, feed.JobID()))
require.NoError(t, feed.Resume())

sqlDB.Exec(t, `UPSERT INTO foo SELECT *, 'new value' FROM generate_series(1, 10)`)
assertPayloads(t, testFeed, []string{
`foo: [1]->{"after": {"a": 1, "b": "new value"}}`,
`foo: [2]->{"after": {"a": 2, "b": "new value"}}`,
`foo: [3]->{"after": {"a": 3, "b": "new value"}}`,
`foo: [4]->{"after": {"a": 4, "b": "new value"}}`,
`foo: [5]->{"after": {"a": 5, "b": "new value"}}`,
`foo: [6]->{"after": {"a": 6, "b": "new value"}}`,
`foo: [7]->{"after": {"a": 7, "b": "new value"}}`,
`foo: [8]->{"after": {"a": 8, "b": "new value"}}`,
`foo: [9]->{"after": {"a": 9, "b": "new value"}}`,
`foo: [10]->{"after": {"a": 10, "b": "new value"}}`,
})
}

t.Run(`kafka`, kafkaTest(testFn))
}
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/testfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// TestFeedFactory is an interface to create changefeeds.
Expand Down Expand Up @@ -76,4 +77,8 @@ type EnterpriseTestFeed interface {
FetchRunningStatus() (string, error)
// Details returns changefeed details for this feed.
Details() (*jobspb.ChangefeedDetails, error)
// HighWaterMark returns feed highwatermark.
HighWaterMark() (hlc.Timestamp, error)
// TickHighWaterMark waits until job highwatermark progresses beyond specified threshold.
TickHighWaterMark(minHWM hlc.Timestamp) error
}
57 changes: 50 additions & 7 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -110,10 +114,28 @@ func distChangefeedFlow(
spansTS = spansTS.Next()
}
var err error
trackedSpans, err = fetchSpansForTargets(ctx, execCfg, AllTargets(details), spansTS)
var tableDescs []catalog.TableDescriptor
tableDescs, err = fetchTableDescriptors(ctx, execCfg, AllTargets(details), spansTS)
if err != nil {
return err
}

if filterExpr, isSet := details.Opts[changefeedbase.OptPrimaryKeyFilter]; isSet {
if len(tableDescs) > 1 {
return pgerror.Newf(pgcode.InvalidParameterValue,
"option %s can only be used with 1 changefeed target (found %d)",
changefeedbase.OptPrimaryKeyFilter, len(tableDescs),
)
}
trackedSpans, err = constrainSpansByExpression(ctx, execCtx, filterExpr, tableDescs[0])
if err != nil {
return err
}
} else {
for _, d := range tableDescs {
trackedSpans = append(trackedSpans, d.PrimaryIndexSpan(execCfg.Codec))
}
}
}

var checkpoint jobspb.ChangefeedProgress_Checkpoint
Expand All @@ -130,17 +152,18 @@ func distChangefeedFlow(
ctx, execCtx, jobID, details, trackedSpans, initialHighWater, checkpoint, resultsCh, distflowKnobs)
}

func fetchSpansForTargets(
func fetchTableDescriptors(
ctx context.Context,
execCfg *sql.ExecutorConfig,
targets []jobspb.ChangefeedTargetSpecification,
ts hlc.Timestamp,
) ([]roachpb.Span, error) {
var spans []roachpb.Span
) ([]catalog.TableDescriptor, error) {
var targetDescs []catalog.TableDescriptor

fetchSpans := func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
spans = nil
targetDescs = nil
if err := txn.SetFixedTimestamp(ctx, ts); err != nil {
return err
}
Expand All @@ -159,12 +182,32 @@ func fetchSpansForTargets(
if err != nil {
return err
}
spans = append(spans, tableDesc.PrimaryIndexSpan(execCfg.Codec))
targetDescs = append(targetDescs, tableDesc)
}
return nil
}
if err := sql.DescsTxn(ctx, execCfg, fetchSpans); err != nil {
return nil, err
}
return spans, nil
return targetDescs, nil
}

func constrainSpansByExpression(
ctx context.Context, execCtx sql.JobExecContext, filterStr string, descr catalog.TableDescriptor,
) ([]roachpb.Span, error) {
if filterStr == "" {
return nil, pgerror.Newf(pgcode.InvalidParameterValue,
"option %s must not be empty", changefeedbase.OptPrimaryKeyFilter,
)
}

filterExpr, err := parser.ParseExpr(filterStr)
if err != nil {
return nil, pgerror.Wrapf(err, pgcode.InvalidParameterValue,
"filter expression %s must be a valid SQL expression", changefeedbase.OptPrimaryKeyFilter)
}

semaCtx := tree.MakeSemaContext()
return execCtx.ConstrainPrimaryIndexSpanByExpr(
ctx, descr, &execCtx.ExtendedEvalContext().Context, &semaCtx, filterExpr)
}
34 changes: 34 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,18 @@ func createChangefeedJobRecord(
return nil, err
}

if filterExpr, isSet := details.Opts[changefeedbase.OptPrimaryKeyFilter]; isSet {
policy := changefeedbase.SchemaChangePolicy(details.Opts[changefeedbase.OptSchemaChangePolicy])
if policy != changefeedbase.OptSchemaChangePolicyStop {
return nil, errors.Newf("option %s can only be used with %s=%s",
changefeedbase.OptPrimaryKeyFilter, changefeedbase.OptSchemaChangePolicy,
changefeedbase.OptSchemaChangePolicyStop)
}
if err := validatePrimaryKeyFilterExpression(ctx, p, filterExpr, targetDescs); err != nil {
return nil, err
}
}

if _, err := getEncoder(details.Opts, AllTargets(details)); err != nil {
return nil, err
}
Expand Down Expand Up @@ -886,6 +898,28 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails
return details, nil
}

func validatePrimaryKeyFilterExpression(
ctx context.Context,
execCtx sql.JobExecContext,
filterExpr string,
descriptors map[tree.TablePattern]catalog.Descriptor,
) error {
if len(descriptors) > 1 {
return pgerror.Newf(pgcode.InvalidParameterValue,
"option %s can only be used with 1 changefeed target (found %d)",
changefeedbase.OptPrimaryKeyFilter, len(descriptors),
)
}

var tableDescr catalog.TableDescriptor
for _, d := range descriptors {
tableDescr = d.(catalog.TableDescriptor)
}

_, err := constrainSpansByExpression(ctx, execCtx, filterExpr, tableDescr)
return err
}

type changefeedResumer struct {
job *jobs.Job
}
Expand Down
49 changes: 49 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5998,6 +5998,55 @@ func TestChangefeedOnlyInitialScanCSVSinkless(t *testing.T) {
t.Run(`sinkless`, sinklessTest(testFn))
}

func TestChangefeedPrimaryKeyFilter(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, "CREATE TABLE foo (a INT PRIMARY KEY, b string)")
sqlDB.Exec(t, "CREATE TABLE bar (a INT PRIMARY KEY, b string)")
sqlDB.Exec(t, "INSERT INTO foo SELECT * FROM generate_series(1, 20)")

sqlDB.ExpectErr(t, "can only be used with schema_change_policy=stop",
`CREATE CHANGEFEED FOR foo, bar WITH primary_key_filter='a < 5 OR a > 18'`)

sqlDB.ExpectErr(t, `option primary_key_filter can only be used with 1 changefeed target`,
`CREATE CHANGEFEED FOR foo, bar WITH schema_change_policy='stop', primary_key_filter='a < 5 OR a > 18'`)

feed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH schema_change_policy='stop', primary_key_filter='a < 5 OR a > 18'`)
defer closeFeed(t, feed)

assertPayloads(t, feed, []string{
`foo: [1]->{"after": {"a": 1, "b": null}}`,
`foo: [2]->{"after": {"a": 2, "b": null}}`,
`foo: [3]->{"after": {"a": 3, "b": null}}`,
`foo: [4]->{"after": {"a": 4, "b": null}}`,
`foo: [19]->{"after": {"a": 19, "b": null}}`,
`foo: [20]->{"after": {"a": 20, "b": null}}`,
})

for i := 0; i < 22; i++ {
sqlDB.Exec(t, "UPSERT INTO foo VALUES ($1, $2)", i, strconv.Itoa(i))
}

assertPayloads(t, feed, []string{
`foo: [0]->{"after": {"a": 0, "b": "0"}}`,
`foo: [1]->{"after": {"a": 1, "b": "1"}}`,
`foo: [2]->{"after": {"a": 2, "b": "2"}}`,
`foo: [3]->{"after": {"a": 3, "b": "3"}}`,
`foo: [4]->{"after": {"a": 4, "b": "4"}}`,
`foo: [19]->{"after": {"a": 19, "b": "19"}}`,
`foo: [20]->{"after": {"a": 20, "b": "20"}}`,
`foo: [21]->{"after": {"a": 21, "b": "21"}}`,
})
}

t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`cloudstorage`, cloudStorageTest(testFn))
t.Run(`kafka`, kafkaTest(testFn))
}

func startMonitorWithBudget(budget int64) *mon.BytesMonitor {
mm := mon.NewMonitorWithLimit(
"test-mm", mon.MemoryResource, budget,
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const (
OptOnError = `on_error`
OptMetricsScope = `metrics_label`
OptVirtualColumns = `virtual_columns`
OptPrimaryKeyFilter = `primary_key_filter`

OptVirtualColumnsOmitted VirtualColumnVisibility = `omitted`
OptVirtualColumnsNull VirtualColumnVisibility = `null`
Expand Down Expand Up @@ -210,6 +211,7 @@ var ChangefeedOptionExpectValues = map[string]sql.KVStringOptValidate{
OptOnError: sql.KVStringOptRequireValue,
OptMetricsScope: sql.KVStringOptRequireValue,
OptVirtualColumns: sql.KVStringOptRequireValue,
OptPrimaryKeyFilter: sql.KVStringOptRequireValue,
}

func makeStringSet(opts ...string) map[string]struct{} {
Expand All @@ -229,7 +231,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope,
OptSchemaChangeEvents, OptSchemaChangePolicy,
OptProtectDataFromGCOnPause, OptOnError,
OptInitialScan, OptNoInitialScan, OptInitialScanOnly,
OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics)
OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics, OptPrimaryKeyFilter)

// SQLValidOptions is options exclusive to SQL sink
var SQLValidOptions map[string]struct{} = nil
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/helpers_tenant_shim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func (t *testServerShim) ExecutorConfig() interface{} { panic(unsuppor
func (t *testServerShim) TracerI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) GossipI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) RangeFeedFactory() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) Clock() *hlc.Clock { panic(unsupportedShimMethod) }
func (t *testServerShim) DistSenderI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) MigrationServer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SQLServer() interface{} { panic(unsupportedShimMethod) }
Expand Down
Loading

0 comments on commit 97e72ba

Please sign in to comment.