From 4632032fa7b47c43e699134f69573ba630824ac0 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Fri, 22 Apr 2022 16:36:32 -0400 Subject: [PATCH 1/2] sql: Introduce `SpanConstrainer` interface. Add a small `SpanConstrainer` interface to enable direct usage of optimizer to constrain a span based on filtering expression. The users of this functionality, such as changefeeds (or rangefeed) would be able to leverage optimizer to constrain the span that they need to watch -- thus improving flexibility and efficiency. Release Notes: None --- pkg/sql/BUILD.bazel | 5 + pkg/sql/constraint.go | 136 ++++++++++++++++++++++++ pkg/sql/constraint_test.go | 204 ++++++++++++++++++++++++++++++++++++ pkg/sql/job_exec_context.go | 17 +++ pkg/sql/planhook.go | 1 + 5 files changed, 363 insertions(+) create mode 100644 pkg/sql/constraint.go create mode 100644 pkg/sql/constraint_test.go diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 8d0ec58be709..3af6e7865321 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -43,6 +43,7 @@ go_library( "conn_executor_savepoints.go", "conn_fsm.go", "conn_io.go", + "constraint.go", "control_jobs.go", "control_schedules.go", "copy.go", @@ -342,8 +343,10 @@ go_library( "//pkg/sql/opt/exec", "//pkg/sql/opt/exec/execbuilder", "//pkg/sql/opt/exec/explain", + "//pkg/sql/opt/idxconstraint", "//pkg/sql/opt/indexrec", "//pkg/sql/opt/memo", + "//pkg/sql/opt/norm", "//pkg/sql/opt/optbuilder", "//pkg/sql/opt/xform", "//pkg/sql/optionalnodeliveness", @@ -490,6 +493,7 @@ go_test( "conn_executor_savepoints_test.go", "conn_executor_test.go", "conn_io_test.go", + "constraint_test.go", "copy_file_upload_test.go", "copy_in_test.go", "copy_test.go", @@ -651,6 +655,7 @@ go_test( "//pkg/sql/roleoption", "//pkg/sql/row", "//pkg/sql/rowenc", + "//pkg/sql/rowenc/keyside", "//pkg/sql/rowenc/rowencpb", "//pkg/sql/rowenc/valueside", "//pkg/sql/rowexec", diff --git a/pkg/sql/constraint.go b/pkg/sql/constraint.go new file mode 100644 index 000000000000..10548f7a6800 --- /dev/null +++ b/pkg/sql/constraint.go @@ -0,0 +1,136 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/opt" + "github.com/cockroachdb/cockroach/pkg/sql/opt/idxconstraint" + "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" + "github.com/cockroachdb/cockroach/pkg/sql/opt/norm" + "github.com/cockroachdb/cockroach/pkg/sql/opt/optbuilder" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/span" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/errors" +) + +// SpanConstrainer is an interface for constraining spans. +type SpanConstrainer interface { + // ConstrainPrimaryIndexSpanByExpr constrains primary index span using + // specified filter expression. + // Returns constrained spans that fully satisfy the expression. + // If the expression cannot be fully satisfied, returns an error. + // Expression is required to specify constraints over unqualified + // primary key columns only. The expression must be boolean expression. + // If the expression is a contradiction, returns an error. + ConstrainPrimaryIndexSpanByExpr( + ctx context.Context, + desc catalog.TableDescriptor, + evalCtx *eval.Context, + semaCtx *tree.SemaContext, + filter tree.Expr, + ) ([]roachpb.Span, error) +} + +// ConstrainPrimaryIndexSpanByExpr implements SpanConstrainer +func (p *planner) ConstrainPrimaryIndexSpanByExpr( + ctx context.Context, + desc catalog.TableDescriptor, + evalCtx *eval.Context, + semaCtx *tree.SemaContext, + filter tree.Expr, +) ([]roachpb.Span, error) { + var oc optCatalog + oc.init(p) + oc.reset() + + tbl, err := newOptTable(desc, oc.codec(), nil /* stats */, emptyZoneConfig) + if err != nil { + return nil, err + } + + var nf norm.Factory + nf.Init(evalCtx, &oc) + nf.Metadata().AddTable(tbl, &tree.TableName{}) + + b := optbuilder.NewScalar(ctx, semaCtx, evalCtx, &nf) + if err := b.Build(filter); err != nil { + return nil, err + } + + root := nf.Memo().RootExpr().(opt.ScalarExpr) + if root.DataType() != types.Bool { + return nil, pgerror.Newf(pgcode.DatatypeMismatch, + "expected boolean expression, found expression of type %s", root.DataType()) + } + + fe := memo.FiltersExpr{nf.ConstructFiltersItem(root)} + fe = nf.CustomFuncs().SimplifyFilters(fe) + fe = nf.CustomFuncs().ConsolidateFilters(fe) + + if fe.IsTrue() { + return []roachpb.Span{desc.PrimaryIndexSpan(oc.codec())}, nil + } + if fe.IsFalse() { + return nil, errors.Newf("filter %q is a contradiction", filter) + } + + primary := desc.GetPrimaryIndex() + indexCols := make([]opt.OrderingColumn, len(primary.IndexDesc().KeyColumnIDs)) + var notNullIndexCols opt.ColSet + for i, colID := range primary.IndexDesc().KeyColumnIDs { + if primary.GetKeyColumnDirection(i) == descpb.IndexDescriptor_ASC { + indexCols[i] = opt.OrderingColumn(colID) + } else { + indexCols[i] = opt.OrderingColumn(-colID) + } + notNullIndexCols.Add(opt.ColumnID(colID)) + } + + const consolidate = true + var ic idxconstraint.Instance + + ic.Init( + fe, nil, indexCols, notNullIndexCols, nil, + consolidate, evalCtx, &nf, nil, + ) + + if !ic.RemainingFilters().IsTrue() { + err = errors.Newf( + "primary key span %s cannot be fully constrained by expression %q", + desc.PrimaryIndexSpan(oc.codec()), filter) + if len(indexCols) > 1 { + // Constraints over composite keys are hard. Give a bit of a hint. + err = errors.WithHint(err, + "try constraining prefix columns of the composite key with equality or an IN clause") + } + return nil, err + } + + if ic.Constraint().IsContradiction() { + return nil, errors.Newf("filter %q is a contradiction", filter) + } + if ic.Constraint().IsUnconstrained() { + return nil, errors.Newf("filter %q is a tautology; use 'true' or omit constraint", filter) + } + + var sb span.Builder + sb.Init(evalCtx, oc.codec(), desc, desc.GetPrimaryIndex()) + return sb.SpansFromConstraint(ic.Constraint(), span.NoopSplitter()) +} diff --git a/pkg/sql/constraint_test.go b/pkg/sql/constraint_test.go new file mode 100644 index 000000000000..9200a07620aa --- /dev/null +++ b/pkg/sql/constraint_test.go @@ -0,0 +1,204 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" + "github.com/cockroachdb/cockroach/pkg/sql/parser" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc/keyside" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" + "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func mkPkKey(t *testing.T, tableID descpb.ID, vals ...int) roachpb.Key { + t.Helper() + + // Encode index id, then each value. + key, err := keyside.Encode( + keys.SystemSQLCodec.TablePrefix(uint32(tableID)), + tree.NewDInt(tree.DInt(1)), encoding.Ascending) + + require.NoError(t, err) + for _, v := range vals { + d := tree.NewDInt(tree.DInt(v)) + key, err = keyside.Encode(key, d, encoding.Ascending) + require.NoError(t, err) + } + + return key +} + +func TestSpanConstrainer(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + params, _ := tests.CreateTestServerParams() + s, db, kvDB := serverutils.StartServer(t, params) + defer s.Stopper().Stop(context.Background()) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE TABLE foo (a INT, b int, c STRING, CONSTRAINT "pk" PRIMARY KEY (a, b), INDEX (c))`) + fooDesc := desctestutils.TestingGetTableDescriptor( + kvDB, keys.SystemSQLCodec, "defaultdb", "public", "foo") + + ctx := context.Background() + execCfg := s.ExecutorConfig().(ExecutorConfig) + p, cleanup := NewInternalPlanner("test", kv.NewTxn(ctx, kvDB, s.NodeID()), + security.RootUserName(), &MemoryMetrics{}, &execCfg, sessiondatapb.SessionData{}, + ) + defer cleanup() + + primarySpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) + pkStart := primarySpan.Key + pkEnd := primarySpan.EndKey + fooID := fooDesc.GetID() + + sc := p.(SpanConstrainer) + evalCtx := eval.MakeTestingEvalContext(s.ClusterSettings()) + semaCtx := tree.MakeSemaContext() + for _, tc := range []struct { + filter string + expectErr string + expectSpans []roachpb.Span + }{ + { + filter: "5 > 1", + expectSpans: []roachpb.Span{primarySpan}, + }, + { + filter: "0 != 0", + expectErr: "is a contradiction", + }, + { + filter: "a IS NULL", + expectErr: "is a contradiction", + }, + { + filter: "a > 3 AND a < 3", + expectErr: "is a contradiction", + }, + { + filter: "a >=3 or a < 3", + expectErr: "is a tautology", + }, + { + filter: "5", + expectErr: "expected boolean expression", + }, + { + filter: "no_such_column = 'something'", + expectErr: `column "no_such_column" does not exist`, + }, + { + filter: "true", + expectSpans: []roachpb.Span{primarySpan}, + }, + { + filter: "false", + expectErr: "is a contradiction", + }, + { + filter: "a > 100", + expectSpans: []roachpb.Span{{Key: mkPkKey(t, fooID, 101), EndKey: pkEnd}}, + }, + { + filter: "a > 10 AND a > 5", + expectSpans: []roachpb.Span{{Key: mkPkKey(t, fooID, 11), EndKey: pkEnd}}, + }, + { + filter: "a > 10 OR a > 5", + expectSpans: []roachpb.Span{{Key: mkPkKey(t, fooID, 6), EndKey: pkEnd}}, + }, + { + filter: "a > 100 AND a <= 101", + expectSpans: []roachpb.Span{{Key: mkPkKey(t, fooID, 101), EndKey: mkPkKey(t, fooID, 102)}}, + }, + { + filter: "a > 100 and a < 200", + expectSpans: []roachpb.Span{{Key: mkPkKey(t, fooID, 101), EndKey: mkPkKey(t, fooID, 200)}}, + }, + { + filter: "a > 100 or a <= 99", + expectSpans: []roachpb.Span{ + {Key: pkStart, EndKey: mkPkKey(t, fooID, 100)}, + {Key: mkPkKey(t, fooID, 101), EndKey: pkEnd}, + }, + }, + { + filter: "b < 42", + expectErr: "cannot be fully constrained", + }, + { + filter: "c = 'ten'", + expectErr: "cannot be fully constrained", + }, + { + filter: "a < 42 OR (a > 100 AND b > 11)", + expectErr: "cannot be fully constrained", + }, + { + filter: "a > 2 AND b > 5 AND a > 2", + expectErr: "cannot be fully constrained", + }, + { + // Same as above, but now with tuples. + filter: "a < 42 OR ((a, b) > (100, 11))", + expectSpans: []roachpb.Span{ + {Key: pkStart, EndKey: mkPkKey(t, fooID, 42)}, + // Remember: tuples use lexicographical ordering so the start key is + // /Table/104/1/100/12 (i.e. a="100" and b="12" (because 100/12 lexicographically follows 100). + {Key: mkPkKey(t, fooID, 100, 12), EndKey: pkEnd}, + }, + }, + { + filter: "(a, b) > (2, 5)", + expectSpans: []roachpb.Span{{Key: mkPkKey(t, fooID, 2, 6), EndKey: pkEnd}}, + }, + { + filter: "a IN (5, 10, 20) AND b < 25", + expectSpans: []roachpb.Span{ + {Key: mkPkKey(t, fooID, 5), EndKey: mkPkKey(t, fooID, 5, 25)}, + {Key: mkPkKey(t, fooID, 10), EndKey: mkPkKey(t, fooID, 10, 25)}, + {Key: mkPkKey(t, fooID, 20), EndKey: mkPkKey(t, fooID, 20, 25)}, + }, + }, + } { + t.Run(tc.filter, func(t *testing.T) { + filterExpr, err := parser.ParseExpr(tc.filter) + require.NoError(t, err) + + spans, err := sc.ConstrainPrimaryIndexSpanByExpr(ctx, fooDesc, &evalCtx, &semaCtx, filterExpr) + if tc.expectErr != "" { + require.Regexp(t, tc.expectErr, err) + require.Nil(t, spans) + } else { + require.NoError(t, err) + require.Equal(t, tc.expectSpans, spans) + } + }) + } +} diff --git a/pkg/sql/job_exec_context.go b/pkg/sql/job_exec_context.go index 5b15fb3f37b5..80f5489da5eb 100644 --- a/pkg/sql/job_exec_context.go +++ b/pkg/sql/job_exec_context.go @@ -11,11 +11,16 @@ package sql import ( + "context" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" @@ -67,6 +72,17 @@ func (e *plannerJobExecContext) SpanConfigReconciler() spanconfig.Reconciler { } func (e *plannerJobExecContext) Txn() *kv.Txn { return e.p.Txn() } +// ConstrainPrimaryIndexSpanByExpr implements SpanConstrainer +func (e *plannerJobExecContext) ConstrainPrimaryIndexSpanByExpr( + ctx context.Context, + desc catalog.TableDescriptor, + evalCtx *eval.Context, + semaCtx *tree.SemaContext, + filter tree.Expr, +) ([]roachpb.Span, error) { + return e.p.ConstrainPrimaryIndexSpanByExpr(ctx, desc, evalCtx, semaCtx, filter) +} + // JobExecContext provides the execution environment for a job. It is what is // passed to the Resume/OnFailOrCancel/OnPauseRequested methods of a jobs's // Resumer to give that resumer access to things like ExecutorCfg, LeaseMgr, @@ -77,6 +93,7 @@ func (e *plannerJobExecContext) Txn() *kv.Txn { return e.p.Txn() } // (though note that ExtendedEvalContext may transitively include methods that // close over/expect a txn so use it with caution). type JobExecContext interface { + SpanConstrainer SemaCtx() *tree.SemaContext ExtendedEvalContext() *extendedEvalContext SessionData() *sessiondata.SessionData diff --git a/pkg/sql/planhook.go b/pkg/sql/planhook.go index bd8291dfe841..63b0887bd783 100644 --- a/pkg/sql/planhook.go +++ b/pkg/sql/planhook.go @@ -78,6 +78,7 @@ func (p *planner) RunParams(ctx context.Context) runParams { // that gets passed back due to this inversion of roles. type PlanHookState interface { resolver.SchemaResolver + SpanConstrainer RunParams(ctx context.Context) runParams SemaCtx() *tree.SemaContext ExtendedEvalContext() *extendedEvalContext From e8622486eb59524e5003ce8203e583f23327f38a Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Sat, 23 Apr 2022 12:54:18 -0400 Subject: [PATCH 2/2] changfeedccl: Support filter over primary key span. Extend changfeeds to support a filtering expression over primary key span via `primary_key_filter` option. When specified, this option is treated as an SQL filter expression over primary key columns. This filter can then be used to constrain the spans that's watched by changefeed only to those that satisfy the expression. The use of this filter restricted only to the changefeeds over a single table. Release Notes (enterprise change): `primary_key_filter` option can be used to restrict the span watched by changefeed only to the portion that satisfies the filtering predicate. --- .../changefeedccl/alter_changefeed_stmt.go | 39 ++++------ .../changefeedccl/alter_changefeed_test.go | 71 +++++++++++++++++++ pkg/ccl/changefeedccl/cdctest/testfeed.go | 5 ++ pkg/ccl/changefeedccl/changefeed_dist.go | 57 +++++++++++++-- pkg/ccl/changefeedccl/changefeed_stmt.go | 34 +++++++++ pkg/ccl/changefeedccl/changefeed_test.go | 49 +++++++++++++ .../changefeedccl/changefeedbase/options.go | 4 +- .../changefeedccl/helpers_tenant_shim_test.go | 1 - pkg/ccl/changefeedccl/testfeed_test.go | 35 +++++++++ 9 files changed, 260 insertions(+), 35 deletions(-) diff --git a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go index f801bd96c414..8c5cc888d6fd 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go @@ -391,11 +391,7 @@ func generateNewTargets( for _, targetDesc := range newTableDescs { existingTargetDescs = append(existingTargetDescs, targetDesc) } - existingTargetSpans, err := fetchSpansForDescs(ctx, p, opts, statementTime, existingTargetDescs) - if err != nil { - return nil, nil, hlc.Timestamp{}, nil, err - } - + existingTargetSpans := fetchSpansForDescs(p, existingTargetDescs) var newTargetDescs []catalog.Descriptor for _, target := range v.Targets { desc, found, err := getTargetDesc(ctx, p, descResolver, target.TableName) @@ -415,10 +411,7 @@ func generateNewTargets( newTargetDescs = append(newTargetDescs, desc) } - addedTargetSpans, err := fetchSpansForDescs(ctx, p, opts, statementTime, newTargetDescs) - if err != nil { - return nil, nil, hlc.Timestamp{}, nil, err - } + addedTargetSpans := fetchSpansForDescs(p, newTargetDescs) // By default, we will not perform an initial scan on newly added // targets. Hence, the user must explicitly state that they want an @@ -484,10 +477,7 @@ func generateNewTargets( } } if len(droppedTargetDescs) > 0 { - droppedTargetSpans, err := fetchSpansForDescs(ctx, p, opts, statementTime, droppedTargetDescs) - if err != nil { - return nil, nil, hlc.Timestamp{}, nil, err - } + droppedTargetSpans := fetchSpansForDescs(p, droppedTargetDescs) removeSpansFromProgress(newJobProgress, droppedTargetSpans) } } @@ -670,20 +660,17 @@ func removeSpansFromProgress(prevProgress jobspb.Progress, spansToRemove []roach } func fetchSpansForDescs( - ctx context.Context, - p sql.PlanHookState, - opts map[string]string, - statementTime hlc.Timestamp, - descs []catalog.Descriptor, -) ([]roachpb.Span, error) { - targets := make([]jobspb.ChangefeedTargetSpecification, len(descs)) - for i, d := range descs { - targets[i] = jobspb.ChangefeedTargetSpecification{TableID: d.GetID()} + p sql.PlanHookState, descs []catalog.Descriptor, +) (primarySpans []roachpb.Span) { + seen := make(map[descpb.ID]struct{}) + for _, d := range descs { + if _, isDup := seen[d.GetID()]; isDup { + continue + } + seen[d.GetID()] = struct{}{} + primarySpans = append(primarySpans, d.(catalog.TableDescriptor).PrimaryIndexSpan(p.ExtendedEvalContext().Codec)) } - - spans, err := fetchSpansForTargets(ctx, p.ExecCfg(), targets, statementTime) - - return spans, err + return primarySpans } func getPrevOpts(prevDescription string, opts map[string]string) (map[string]string, error) { diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index 1b4e1fbd237d..85922524f35b 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -1277,3 +1277,74 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { t.Run(`kafka`, kafkaTest(testFn, feedTestNoTenants)) } + +func TestAlterChangefeedUpdateFilter(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`) + defer closeFeed(t, testFeed) + + sqlDB.Exec(t, `INSERT INTO foo SELECT *, 'initial' FROM generate_series(1, 5)`) + assertPayloads(t, testFeed, []string{ + `foo: [1]->{"after": {"a": 1, "b": "initial"}}`, + `foo: [2]->{"after": {"a": 2, "b": "initial"}}`, + `foo: [3]->{"after": {"a": 3, "b": "initial"}}`, + `foo: [4]->{"after": {"a": 4, "b": "initial"}}`, + `foo: [5]->{"after": {"a": 5, "b": "initial"}}`, + }) + + feed, ok := testFeed.(cdctest.EnterpriseTestFeed) + require.True(t, ok) + + require.NoError(t, feed.TickHighWaterMark(f.Server().Clock().Now())) + require.NoError(t, feed.Pause()) + + // Try to set an invalid filter (column b is not part of primary key). + sqlDB.ExpectErr(t, "cannot be fully constrained", + fmt.Sprintf(`ALTER CHANGEFEED %d SET schema_change_policy='stop', primary_key_filter='b IS NULL'`, feed.JobID())) + + // Set filter to emit a > 4. We expect to see update row 5, and onward. + sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d SET schema_change_policy='stop', primary_key_filter='a > 4'`, feed.JobID())) + require.NoError(t, feed.Resume()) + + // Upsert 10 new values -- we expect to see only 5-10 + sqlDB.Exec(t, `UPSERT INTO foo SELECT *, 'updated' FROM generate_series(1, 10)`) + assertPayloads(t, testFeed, []string{ + `foo: [5]->{"after": {"a": 5, "b": "updated"}}`, + `foo: [6]->{"after": {"a": 6, "b": "updated"}}`, + `foo: [7]->{"after": {"a": 7, "b": "updated"}}`, + `foo: [8]->{"after": {"a": 8, "b": "updated"}}`, + `foo: [9]->{"after": {"a": 9, "b": "updated"}}`, + `foo: [10]->{"after": {"a": 10, "b": "updated"}}`, + }) + + // Pause again, clear out filter and verify we get expected values. + require.NoError(t, feed.TickHighWaterMark(f.Server().Clock().Now())) + require.NoError(t, feed.Pause()) + + // Set filter to emit a > 4. We expect to see update row 5, and onward. + sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d UNSET primary_key_filter`, feed.JobID())) + require.NoError(t, feed.Resume()) + + sqlDB.Exec(t, `UPSERT INTO foo SELECT *, 'new value' FROM generate_series(1, 10)`) + assertPayloads(t, testFeed, []string{ + `foo: [1]->{"after": {"a": 1, "b": "new value"}}`, + `foo: [2]->{"after": {"a": 2, "b": "new value"}}`, + `foo: [3]->{"after": {"a": 3, "b": "new value"}}`, + `foo: [4]->{"after": {"a": 4, "b": "new value"}}`, + `foo: [5]->{"after": {"a": 5, "b": "new value"}}`, + `foo: [6]->{"after": {"a": 6, "b": "new value"}}`, + `foo: [7]->{"after": {"a": 7, "b": "new value"}}`, + `foo: [8]->{"after": {"a": 8, "b": "new value"}}`, + `foo: [9]->{"after": {"a": 9, "b": "new value"}}`, + `foo: [10]->{"after": {"a": 10, "b": "new value"}}`, + }) + } + + t.Run(`kafka`, kafkaTest(testFn)) +} diff --git a/pkg/ccl/changefeedccl/cdctest/testfeed.go b/pkg/ccl/changefeedccl/cdctest/testfeed.go index c589e9a9937a..3a1835be3d9c 100644 --- a/pkg/ccl/changefeedccl/cdctest/testfeed.go +++ b/pkg/ccl/changefeedccl/cdctest/testfeed.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" ) // TestFeedFactory is an interface to create changefeeds. @@ -76,4 +77,8 @@ type EnterpriseTestFeed interface { FetchRunningStatus() (string, error) // Details returns changefeed details for this feed. Details() (*jobspb.ChangefeedDetails, error) + // HighWaterMark returns feed highwatermark. + HighWaterMark() (hlc.Timestamp, error) + // TickHighWaterMark waits until job highwatermark progresses beyond specified threshold. + TickHighWaterMark(minHWM hlc.Timestamp) error } diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index dd6c124ed6c3..81080a5503b1 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -17,8 +17,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/parser" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -110,10 +114,28 @@ func distChangefeedFlow( spansTS = spansTS.Next() } var err error - trackedSpans, err = fetchSpansForTargets(ctx, execCfg, AllTargets(details), spansTS) + var tableDescs []catalog.TableDescriptor + tableDescs, err = fetchTableDescriptors(ctx, execCfg, AllTargets(details), spansTS) if err != nil { return err } + + if filterExpr, isSet := details.Opts[changefeedbase.OptPrimaryKeyFilter]; isSet { + if len(tableDescs) > 1 { + return pgerror.Newf(pgcode.InvalidParameterValue, + "option %s can only be used with 1 changefeed target (found %d)", + changefeedbase.OptPrimaryKeyFilter, len(tableDescs), + ) + } + trackedSpans, err = constrainSpansByExpression(ctx, execCtx, filterExpr, tableDescs[0]) + if err != nil { + return err + } + } else { + for _, d := range tableDescs { + trackedSpans = append(trackedSpans, d.PrimaryIndexSpan(execCfg.Codec)) + } + } } var checkpoint jobspb.ChangefeedProgress_Checkpoint @@ -130,17 +152,18 @@ func distChangefeedFlow( ctx, execCtx, jobID, details, trackedSpans, initialHighWater, checkpoint, resultsCh, distflowKnobs) } -func fetchSpansForTargets( +func fetchTableDescriptors( ctx context.Context, execCfg *sql.ExecutorConfig, targets []jobspb.ChangefeedTargetSpecification, ts hlc.Timestamp, -) ([]roachpb.Span, error) { - var spans []roachpb.Span +) ([]catalog.TableDescriptor, error) { + var targetDescs []catalog.TableDescriptor + fetchSpans := func( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { - spans = nil + targetDescs = nil if err := txn.SetFixedTimestamp(ctx, ts); err != nil { return err } @@ -159,12 +182,32 @@ func fetchSpansForTargets( if err != nil { return err } - spans = append(spans, tableDesc.PrimaryIndexSpan(execCfg.Codec)) + targetDescs = append(targetDescs, tableDesc) } return nil } if err := sql.DescsTxn(ctx, execCfg, fetchSpans); err != nil { return nil, err } - return spans, nil + return targetDescs, nil +} + +func constrainSpansByExpression( + ctx context.Context, execCtx sql.JobExecContext, filterStr string, descr catalog.TableDescriptor, +) ([]roachpb.Span, error) { + if filterStr == "" { + return nil, pgerror.Newf(pgcode.InvalidParameterValue, + "option %s must not be empty", changefeedbase.OptPrimaryKeyFilter, + ) + } + + filterExpr, err := parser.ParseExpr(filterStr) + if err != nil { + return nil, pgerror.Wrapf(err, pgcode.InvalidParameterValue, + "filter expression %s must be a valid SQL expression", changefeedbase.OptPrimaryKeyFilter) + } + + semaCtx := tree.MakeSemaContext() + return execCtx.ConstrainPrimaryIndexSpanByExpr( + ctx, descr, &execCtx.ExtendedEvalContext().Context, &semaCtx, filterExpr) } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 88f9fdac3f65..011328aa0159 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -425,6 +425,18 @@ func createChangefeedJobRecord( return nil, err } + if filterExpr, isSet := details.Opts[changefeedbase.OptPrimaryKeyFilter]; isSet { + policy := changefeedbase.SchemaChangePolicy(details.Opts[changefeedbase.OptSchemaChangePolicy]) + if policy != changefeedbase.OptSchemaChangePolicyStop { + return nil, errors.Newf("option %s can only be used with %s=%s", + changefeedbase.OptPrimaryKeyFilter, changefeedbase.OptSchemaChangePolicy, + changefeedbase.OptSchemaChangePolicyStop) + } + if err := validatePrimaryKeyFilterExpression(ctx, p, filterExpr, targetDescs); err != nil { + return nil, err + } + } + if _, err := getEncoder(details.Opts, AllTargets(details)); err != nil { return nil, err } @@ -886,6 +898,28 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails return details, nil } +func validatePrimaryKeyFilterExpression( + ctx context.Context, + execCtx sql.JobExecContext, + filterExpr string, + descriptors map[tree.TablePattern]catalog.Descriptor, +) error { + if len(descriptors) > 1 { + return pgerror.Newf(pgcode.InvalidParameterValue, + "option %s can only be used with 1 changefeed target (found %d)", + changefeedbase.OptPrimaryKeyFilter, len(descriptors), + ) + } + + var tableDescr catalog.TableDescriptor + for _, d := range descriptors { + tableDescr = d.(catalog.TableDescriptor) + } + + _, err := constrainSpansByExpression(ctx, execCtx, filterExpr, tableDescr) + return err +} + type changefeedResumer struct { job *jobs.Job } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 364f29de5ebd..f8c2a5ee25c1 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -5998,6 +5998,55 @@ func TestChangefeedOnlyInitialScanCSVSinkless(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) } +func TestChangefeedPrimaryKeyFilter(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, "CREATE TABLE foo (a INT PRIMARY KEY, b string)") + sqlDB.Exec(t, "CREATE TABLE bar (a INT PRIMARY KEY, b string)") + sqlDB.Exec(t, "INSERT INTO foo SELECT * FROM generate_series(1, 20)") + + sqlDB.ExpectErr(t, "can only be used with schema_change_policy=stop", + `CREATE CHANGEFEED FOR foo, bar WITH primary_key_filter='a < 5 OR a > 18'`) + + sqlDB.ExpectErr(t, `option primary_key_filter can only be used with 1 changefeed target`, + `CREATE CHANGEFEED FOR foo, bar WITH schema_change_policy='stop', primary_key_filter='a < 5 OR a > 18'`) + + feed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH schema_change_policy='stop', primary_key_filter='a < 5 OR a > 18'`) + defer closeFeed(t, feed) + + assertPayloads(t, feed, []string{ + `foo: [1]->{"after": {"a": 1, "b": null}}`, + `foo: [2]->{"after": {"a": 2, "b": null}}`, + `foo: [3]->{"after": {"a": 3, "b": null}}`, + `foo: [4]->{"after": {"a": 4, "b": null}}`, + `foo: [19]->{"after": {"a": 19, "b": null}}`, + `foo: [20]->{"after": {"a": 20, "b": null}}`, + }) + + for i := 0; i < 22; i++ { + sqlDB.Exec(t, "UPSERT INTO foo VALUES ($1, $2)", i, strconv.Itoa(i)) + } + + assertPayloads(t, feed, []string{ + `foo: [0]->{"after": {"a": 0, "b": "0"}}`, + `foo: [1]->{"after": {"a": 1, "b": "1"}}`, + `foo: [2]->{"after": {"a": 2, "b": "2"}}`, + `foo: [3]->{"after": {"a": 3, "b": "3"}}`, + `foo: [4]->{"after": {"a": 4, "b": "4"}}`, + `foo: [19]->{"after": {"a": 19, "b": "19"}}`, + `foo: [20]->{"after": {"a": 20, "b": "20"}}`, + `foo: [21]->{"after": {"a": 21, "b": "21"}}`, + }) + } + + t.Run(`enterprise`, enterpriseTest(testFn)) + t.Run(`cloudstorage`, cloudStorageTest(testFn)) + t.Run(`kafka`, kafkaTest(testFn)) +} + func startMonitorWithBudget(budget int64) *mon.BytesMonitor { mm := mon.NewMonitorWithLimit( "test-mm", mon.MemoryResource, budget, diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 5f758a152b3c..1cafed810c75 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -71,6 +71,7 @@ const ( OptOnError = `on_error` OptMetricsScope = `metrics_label` OptVirtualColumns = `virtual_columns` + OptPrimaryKeyFilter = `primary_key_filter` OptVirtualColumnsOmitted VirtualColumnVisibility = `omitted` OptVirtualColumnsNull VirtualColumnVisibility = `null` @@ -210,6 +211,7 @@ var ChangefeedOptionExpectValues = map[string]sql.KVStringOptValidate{ OptOnError: sql.KVStringOptRequireValue, OptMetricsScope: sql.KVStringOptRequireValue, OptVirtualColumns: sql.KVStringOptRequireValue, + OptPrimaryKeyFilter: sql.KVStringOptRequireValue, } func makeStringSet(opts ...string) map[string]struct{} { @@ -229,7 +231,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope, OptSchemaChangeEvents, OptSchemaChangePolicy, OptProtectDataFromGCOnPause, OptOnError, OptInitialScan, OptNoInitialScan, OptInitialScanOnly, - OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics) + OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics, OptPrimaryKeyFilter) // SQLValidOptions is options exclusive to SQL sink var SQLValidOptions map[string]struct{} = nil diff --git a/pkg/ccl/changefeedccl/helpers_tenant_shim_test.go b/pkg/ccl/changefeedccl/helpers_tenant_shim_test.go index 7624e8db8970..7dd4b3e0cb24 100644 --- a/pkg/ccl/changefeedccl/helpers_tenant_shim_test.go +++ b/pkg/ccl/changefeedccl/helpers_tenant_shim_test.go @@ -66,7 +66,6 @@ func (t *testServerShim) ExecutorConfig() interface{} { panic(unsuppor func (t *testServerShim) TracerI() interface{} { panic(unsupportedShimMethod) } func (t *testServerShim) GossipI() interface{} { panic(unsupportedShimMethod) } func (t *testServerShim) RangeFeedFactory() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) Clock() *hlc.Clock { panic(unsupportedShimMethod) } func (t *testServerShim) DistSenderI() interface{} { panic(unsupportedShimMethod) } func (t *testServerShim) MigrationServer() interface{} { panic(unsupportedShimMethod) } func (t *testServerShim) SQLServer() interface{} { panic(unsupportedShimMethod) } diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index b29fa47eae62..e4ecabd90774 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -25,6 +25,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/Shopify/sarama" "github.com/cockroachdb/cockroach-go/v2/crdb" @@ -339,6 +340,40 @@ func (f *jobFeed) Details() (*jobspb.ChangefeedDetails, error) { return payload.GetChangefeed(), nil } +// HighWaterMark implements FeedJob interface. +func (f *jobFeed) HighWaterMark() (hlc.Timestamp, error) { + var details []byte + if err := f.db.QueryRow( + `SELECT progress FROM system.jobs WHERE id=$1`, f.jobID, + ).Scan(&details); err != nil { + return hlc.Timestamp{}, err + } + var progress jobspb.Progress + if err := protoutil.Unmarshal(details, &progress); err != nil { + return hlc.Timestamp{}, err + } + h := progress.GetHighWater() + var hwm hlc.Timestamp + if h != nil { + hwm = *h + } + return hwm, nil +} + +// TickHighWaterMark implements the TestFeed interface. +func (f *jobFeed) TickHighWaterMark(minHWM hlc.Timestamp) error { + return testutils.SucceedsWithinError(func() error { + current, err := f.HighWaterMark() + if err != nil { + return err + } + if minHWM.Less(current) { + return nil + } + return errors.New("waiting to tick") + }, 10*time.Second) +} + // FetchTerminalJobErr retrieves the error message from changefeed job. func (f *jobFeed) FetchTerminalJobErr() error { var errStr string