Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changfeedccl: Support filter over primary key span. #80499

Merged
merged 2 commits into from
Apr 29, 2022

Conversation

miretskiy
Copy link
Contributor

Extend changfeeds to support a filtering expression over primary key
span via primary_key_filter option.

When specified, this option is treated as an SQL filter expression
over primary key columns. This filter can then be used to constrain
the spans that's watched by changefeed only to those that satisfy
the expression. The use of this filter restricted only to the
changefeeds over a single table.

Release Notes (enterprise change): primary_filter_filter option can be used
to restrict the span watched by changefeed only to the portion that
satisfies the filtering predicate.

@miretskiy miretskiy requested a review from a team as a code owner April 25, 2022 18:47
@miretskiy miretskiy requested review from gh-casper, ajwerner and a team and removed request for a team April 25, 2022 18:47
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@miretskiy miretskiy force-pushed the constrain branch 2 times, most recently from 9f5c5de to 2ee8e79 Compare April 25, 2022 20:01
Copy link
Contributor

@HonoreDB HonoreDB left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool! I like this iterative approach. Product question, but maybe this should just be "WHERE" and the not implemented error is "where clauses for changefeeds currently must be satisfiable using only primary key values"?

Also, reiterating what I said in a line comment: this should be part of a ChangefeedTargetSpecification, not a WITH option. It doesn't look like this would complicate the code at all, and this is semantically the same type of thing as a family (in fact, at the KV level a family kind of is a primary key constraint).

Reviewed 2 of 2 files at r1, 5 of 5 files at r2, 3 of 3 files at r3, 4 of 4 files at r4.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @gh-casper, @HonoreDB, and @miretskiy)


-- commits line 26 at r4:
primary_key_filter


pkg/ccl/changefeedccl/alter_changefeed_stmt.go line 674 at r4 (raw file):

	for _, d := range descs {
		primarySpans = append(primarySpans, d.(catalog.TableDescriptor).PrimaryIndexSpan(p.ExtendedEvalContext().Codec))

Should de-duplicate here.


pkg/ccl/changefeedccl/changefeed_dist.go line 123 at r4 (raw file):

		}

		if filterExpr, isSet := details.Opts[changefeedbase.OptPrimaryKeyFilter]; isSet {

Does it simplify anything other than syntax to have this restriction? I'd think we could do

CREATE CHANGEFEED FOR TABLE foo [FAMILY fname] [WHERE expr], TABLE baz [FAMILY fname] [WHERE expr] and have the predicate be part of the ChangefeedTargetSpecification rather than a global option. In fact I'd argue for this syntax anyway. It's more extendable into the multi-target case later, and makes it clearer that we're not doing joins.


pkg/sql/constraint.go line 112 at r2 (raw file):

	if ic.Constraint().IsContradiction() {
		return nil, errors.Newf("filter %q is a contradiction", filter)

Maybe also error on tautologies more complicated than "true"? Like if .IsUnconstrained() return nil, errors.Newf("filter %q is vacuous, use 'true' or omit constraint for the full table").

Copy link
Contributor Author

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answer to the above question inline. Let me know what you think

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @gh-casper, and @HonoreDB)


-- commits line 26 at r4:

Previously, HonoreDB (Aaron Zinger) wrote…

primary_key_filter

Done.


pkg/ccl/changefeedccl/alter_changefeed_stmt.go line 674 at r4 (raw file):

Previously, HonoreDB (Aaron Zinger) wrote…

Should de-duplicate here.

Hmm... Good point; also, bunch of no longer needed arguments removed.


pkg/ccl/changefeedccl/changefeed_dist.go line 123 at r4 (raw file):

Previously, HonoreDB (Aaron Zinger) wrote…

Does it simplify anything other than syntax to have this restriction? I'd think we could do

CREATE CHANGEFEED FOR TABLE foo [FAMILY fname] [WHERE expr], TABLE baz [FAMILY fname] [WHERE expr] and have the predicate be part of the ChangefeedTargetSpecification rather than a global option. In fact I'd argue for this syntax anyway. It's more extendable into the multi-target case later, and makes it clearer that we're not doing joins.

I agree that we could do this; I have chosen not to at this point. Here is why:

The reasons is that... walk before run approach. I honestly don't know yet, how multiple where
clauses would play out. I eliminated the possibility of multiple tables almost immediately.
The code doesn't have any support for scoping or aliasing (yet) -- so, these expressions would necessarily
need to get more complex -- something I wanted to avoid at least initially.

The second option, is to have where clause as you have it -- per family. At this point, it's not clear to me
why you'd want to have the same changefeed, over multiple families, each with different clause? Perhaps it makes sense; but it just wasn't clear to me at this point.

Finally, and perhaps more importantly, I want to leave the full "where" clause support until later -- when we have full predicate/filtering support. We don't have a full picture yet of what we would want to support.
For example, where clause allow sub-selects -- and it's a question if we should/should not.
Regardless, putting in a full where clause now would make it hard to take back later -- so I wanted to postpone
this decision until we get a better picture.

To be clear, and to re-iterate, I don't think what you propose is a bad idea. It's just that I'm afraid to commit
right now.


pkg/sql/constraint.go line 112 at r2 (raw file):

filter %q is vacuous
Good idea; added this check + expanded unit tests.

Copy link
Contributor

@HonoreDB HonoreDB left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Responded inline. Also, what happens on schema changes? Could just call validatePrimaryKeyFilterExpression in the ValidateTargets method the schema feed calls, I think.

Reviewed 2 of 7 files at r6.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @gh-casper, @HonoreDB, and @miretskiy)


pkg/ccl/changefeedccl/changefeed_dist.go line 123 at r4 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I agree that we could do this; I have chosen not to at this point. Here is why:

The reasons is that... walk before run approach. I honestly don't know yet, how multiple where
clauses would play out. I eliminated the possibility of multiple tables almost immediately.
The code doesn't have any support for scoping or aliasing (yet) -- so, these expressions would necessarily
need to get more complex -- something I wanted to avoid at least initially.

The second option, is to have where clause as you have it -- per family. At this point, it's not clear to me
why you'd want to have the same changefeed, over multiple families, each with different clause? Perhaps it makes sense; but it just wasn't clear to me at this point.

Finally, and perhaps more importantly, I want to leave the full "where" clause support until later -- when we have full predicate/filtering support. We don't have a full picture yet of what we would want to support.
For example, where clause allow sub-selects -- and it's a question if we should/should not.
Regardless, putting in a full where clause now would make it hard to take back later -- so I wanted to postpone
this decision until we get a better picture.

To be clear, and to re-iterate, I don't think what you propose is a bad idea. It's just that I'm afraid to commit
right now.

I agree that having a global predicate over all tables would add complexity. Whether we should call this "primary_key_filter" or "where" is a product question; I'm weakly in favor of "where" but I agree that will create inexorable pressure toward more and more predicate support.

What I feel more strongly about is that the syntax should be per target, not a with option. Your ConstrainPrimaryIndexSpanByExpr method already works for this--it takes a table desc and an expr, validates that the expr can be evaluated using only the primary key columns of the table desc, and returns a set of spans. It doesn't need to know if it's being called for multiple tables, each with their own expr. And downstream we're just passing spans, right?

I don't know of a specific use case for having different predicates per family, but different predicates per table in a multi-table feed seems guaranteed to be a request.

@miretskiy
Copy link
Contributor Author

miretskiy commented Apr 25, 2022 via email

@miretskiy
Copy link
Contributor Author

What I feel more strongly about is that the syntax should be per target, not a with option. Your ConstrainPrimaryIndexSpanByExpr method already works for this

I understand the sentiment. And I also understand that ConstrainPrimaryIndexSpanByExpr would work fine.
I would like to postpone this, nonetheless, for the following additional reasons:

  1. Editing sql.y would almost certainly doom possibility of backport (not a strong reason, but a reason nonetheless).
  2. Most importantly: currently, targets are defined as TABLE x FAMILY y. To extend it to support arbitrary clauses is a significant departure from existing syntax. One so significant, that it probably ought to be tacked on its own.
  3. Not only that, putting WITH clause, and having potential multiple with clause is not w/out pitfalls (i.e. it might produce ambiguous grammas).
  4. I want to delay this until full predicates. Basically, with full predicate demo, I feel that the changefeed statement stopped looking like an one-off bespoke system, and became more sql-y looking. It's a good thing. I have not had a chance to try extending this to support per family clauses. However, what I would really like is not to make things worse and more confusing. If there is a way to build a new sql statement that looks and behaves like a regular SQL -- i'd love to give it a try.
  5. Finally, this option going in as is, does not preclude changing it. We still have 6 months or so before 22.2

Copy link
Contributor

@sherman-grewal sherman-grewal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just caught up on this thread, another benefit with using an option over editing the SQL syntax is that the ALTER CHANGEFEED statement would work with this option without having to make any major changes. That being said, I do see the benefits of editing the syntax instead of introducing a new option.

On that note, have you tested that changefeeds will filter when a user introduces the option through ALTER CHANGEFEED? Perhaps this change warrants a test added to alter_changefeed_test.go

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @gh-casper, @HonoreDB, @miretskiy, and @rytaft)

Copy link
Collaborator

@mgartner mgartner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, what happens on schema changes?

I'm also curious how schema changes will be handled, especially PK changes.

Reviewed 2 of 5 files at r2, 1 of 3 files at r3, 7 of 7 files at r6, 4 of 5 files at r7, 1 of 1 files at r8, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @gh-casper, @miretskiy, and @rytaft)


pkg/sql/constraint_test.go line 62 at r6 (raw file):

	sqlDB := sqlutils.MakeSQLRunner(db)
	sqlDB.Exec(t, `CREATE TABLE foo (a INT, b int, c STRING, CONSTRAINT "pk" PRIMARY KEY (a, b))`)

nit: might not hurt to throw a secondary index on c and test that something like c = 10 doesn't produce any spans.

Another "fun" test would be filters that reduce to simpler spans, like a > 10 AND a > 5 and a > 10 OR a > 5.

Copy link
Contributor

@sherman-grewal sherman-grewal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 2 of 5 files at r2, 1 of 3 files at r3, 7 of 7 files at r6, 4 of 5 files at r7, 1 of 1 files at r8, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @gh-casper, @miretskiy, and @rytaft)


pkg/ccl/changefeedccl/changefeed_dist.go line 198 at r8 (raw file):

	ctx context.Context, execCtx sql.JobExecContext, filterStr string, descr catalog.TableDescriptor,
) ([]roachpb.Span, error) {
	if filterStr == "" {

Nit: Can we move these validations to validateDetails? That way we can catch these invalid options when a user tries to perform a CREATE/ALTER changefeed statement (instead of when the changefeed starts to run).

Also extend TestChangefeedErrors to include examples of invalid filter statements.

Copy link
Contributor Author

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @gh-casper, @miretskiy, @rytaft, and @sherman-grewal)


pkg/ccl/changefeedccl/changefeed_dist.go line 198 at r8 (raw file):

Previously, sherman-grewal (Sherman) wrote…

Nit: Can we move these validations to validateDetails? That way we can catch these invalid options when a user tries to perform a CREATE/ALTER changefeed statement (instead of when the changefeed starts to run).

I tried doing this (i..e have the entire validation inside validateDetails). But that resulted in duplicated code when I actually need to compute constrained spans -- something I didn't really like. So, I wound up with this function which only does the handling of the span constraints; and then validation helper built on top (that deals with empty string values, etc). I hope it's okay by you.

@miretskiy
Copy link
Contributor Author

@sherman-grewal addressed your questions/comments:

On that note, have you tested that changefeeds will filter when a user introduces the option through ALTER CHANGEFEED? Perhaps this change warrants a test added to alter_changefeed_test.go

Test added.

@miretskiy miretskiy requested a review from mgartner April 27, 2022 16:50
Copy link
Contributor Author

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also curious how schema changes will be handled, especially PK changes.

@mgartner currently the story is: changefeed will fail when this happens.

I think we will have to work harder in the upcoming release to make sure we can detect and prevent some of those
schema changes (for example, by having chagnefeeds have descriptors, so that schema changes can return an indication to the end user that their schema change may break the changefeed).

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @gh-casper, @mgartner, @miretskiy, @rytaft, and @sherman-grewal)


pkg/ccl/changefeedccl/changefeed_dist.go line 198 at r8 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I tried doing this (i..e have the entire validation inside validateDetails). But that resulted in duplicated code when I actually need to compute constrained spans -- something I didn't really like. So, I wound up with this function which only does the handling of the span constraints; and then validation helper built on top (that deals with empty string values, etc). I hope it's okay by you.

Also, a bit more reason: to validate expression, I need access to planner, and target descriptors; something validate details doesn't have. However, the call to validate filter happens when you create jobs record, so, alter statement gets this validation.

I have added a test where you try to alter with invalid expression.


pkg/sql/constraint_test.go line 62 at r6 (raw file):

Previously, mgartner (Marcus Gartner) wrote…

nit: might not hurt to throw a secondary index on c and test that something like c = 10 doesn't produce any spans.

Another "fun" test would be filters that reduce to simpler spans, like a > 10 AND a > 5 and a > 10 OR a > 5.

Great idea. Added.

Copy link
Collaborator

@mgartner mgartner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The optimizer-related code LGTM.

Reviewed 13 of 13 files at r10, 8 of 8 files at r11, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @gh-casper, @mgartner, @miretskiy, @rytaft, and @sherman-grewal)

@miretskiy
Copy link
Contributor Author

@HonoreDB as discussed offline: I added a check (and updated tests) to ensure that schema change policy set to "stop" when using primary key filter option.

Extend changfeeds to support a filtering expression over primary key
span via `primary_key_filter` option.

When specified, this option is treated as an SQL filter expression
over primary key columns.  This filter can then be used to constrain
the spans that's watched by changefeed only to those that satisfy
the expression.  The use of this filter restricted only to the
changefeeds over a single table.

Release Notes (enterprise change): `primary_key_filter` option can be used
to restrict the span watched by changefeed only to the portion that
satisfies the filtering predicate.
@miretskiy
Copy link
Contributor Author

bors r+

@craig
Copy link
Contributor

craig bot commented Apr 29, 2022

Build succeeded:

@craig craig bot merged commit 97e72ba into cockroachdb:master Apr 29, 2022
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request May 23, 2022
Introduce `cdceval` package -- a library for expression evaluation
for CDC.

Changefeed users for a long time requested ability to emit only a
subset of columns. They have also requested ability to filter
out unwanted events (for example, filter out deletions).

This library aims to accomplish those goals.  However, instead of
focusing on a narrow use cases, which would usually be addressed via
addition of new `WITH` option (as done in
cockroachdb#80499), this library
aims to provide support for general expression evaluation.

`cdceval` library provides the following functionality:
  * Ability to evaluate predicates (filters) so that events may be
    filtered.
  * Ability to evaluate projection expressions (`select *`, `select a,
    b,c`, or even `select a + b - c as math_column`)
  * Ability to evaluate virtual compute columns (currently not
    implemented in this PR).

`cdceval` library reuses existing parsing and evaluation libraries, but
adopts them for CDC use case.  CDC events are row level events, and as
such, CDC expressions only make sense in the context of a single
row/single table.  In addition, because CDC events are at least once
semantics, the emitted events must not depend on volatile state.
In summary, any expression is supported except:
  * Volatile functions -- not supported
  * Stable functions, such as `now()`, `current_timestamp()`, etc are
    modified so that they return stable values -- namely events MVCC
    timestamp.
  * Multi row functions (aggregates, windowing functions) are
    disallowed.

`cdceval` also defined few custom, CDC specific functions, such as:
  * `cdc_prev()`: Returns the previous row values as a JSONB object.
  * `cdc_is_delete()`: Returns true if the row was deleted.
  * Others -- see `functions.go`

The follow PRs will add a "front end" to this library to enable creation
and management of predicated changefeeds.

Release Notes: None
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request May 24, 2022
Introduce `cdceval` package -- a library for expression evaluation
for CDC.

Changefeed users for a long time requested ability to emit only a
subset of columns. They have also requested ability to filter
out unwanted events (for example, filter out deletions).

This library aims to accomplish those goals.  However, instead of
focusing on a narrow use cases, which would usually be addressed via
addition of new `WITH` option (as done in
cockroachdb#80499), this library
aims to provide support for general expression evaluation.

`cdceval` library provides the following functionality:
  * Ability to evaluate predicates (filters) so that events may be
    filtered.
  * Ability to evaluate projection expressions (`select *`, `select a,
    b,c`, or even `select a + b - c as math_column`)
  * Ability to evaluate virtual compute columns (currently not
    implemented in this PR).

`cdceval` library reuses existing parsing and evaluation libraries, but
adopts them for CDC use case.  CDC events are row level events, and as
such, CDC expressions only make sense in the context of a single
row/single table.  In addition, because CDC events are at least once
semantics, the emitted events must not depend on volatile state.
In summary, any expression is supported except:
  * Volatile functions -- not supported
  * Stable functions, such as `now()`, `current_timestamp()`, etc are
    modified so that they return stable values -- namely events MVCC
    timestamp.
  * Multi row functions (aggregates, windowing functions) are
    disallowed.

`cdceval` also defined few custom, CDC specific functions, such as:
  * `cdc_prev()`: Returns the previous row values as a JSONB object.
  * `cdc_is_delete()`: Returns true if the row was deleted.
  * Others -- see `functions.go`

The follow PRs will add a "front end" to this library to enable creation
and management of predicated changefeeds.

Release Notes: None
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request May 24, 2022
Introduce `cdceval` package -- a library for expression evaluation
for CDC.

Changefeed users for a long time requested ability to emit only a
subset of columns. They have also requested ability to filter
out unwanted events (for example, filter out deletions).

This library aims to accomplish those goals.  However, instead of
focusing on a narrow use cases, which would usually be addressed via
addition of new `WITH` option (as done in
cockroachdb#80499), this library
aims to provide support for general expression evaluation.

`cdceval` library provides the following functionality:
  * Ability to evaluate predicates (filters) so that events may be
    filtered.
  * Ability to evaluate projection expressions (`select *`, `select a,
    b,c`, or even `select a + b - c as math_column`)
  * Ability to evaluate virtual compute columns (currently not
    implemented in this PR).

`cdceval` library reuses existing parsing and evaluation libraries, but
adopts them for CDC use case.  CDC events are row level events, and as
such, CDC expressions only make sense in the context of a single
row/single table.  In addition, because CDC events are at least once
semantics, the emitted events must not depend on volatile state.
In summary, any expression is supported except:
  * Volatile functions -- not supported
  * Stable functions, such as `now()`, `current_timestamp()`, etc are
    modified so that they return stable values -- namely events MVCC
    timestamp.
  * Multi row functions (aggregates, windowing functions) are
    disallowed.

`cdceval` also defined few custom, CDC specific functions, such as:
  * `cdc_prev()`: Returns the previous row values as a JSONB object.
  * `cdc_is_delete()`: Returns true if the row was deleted.
  * Others -- see `functions.go`

The follow PRs will add a "front end" to this library to enable creation
and management of predicated changefeeds.

Release Notes: None
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request May 24, 2022
Introduce `cdceval` package -- a library for expression evaluation
for CDC.

Changefeed users for a long time requested ability to emit only a
subset of columns. They have also requested ability to filter
out unwanted events (for example, filter out deletions).

This library aims to accomplish those goals.  However, instead of
focusing on a narrow use cases, which would usually be addressed via
addition of new `WITH` option (as done in
cockroachdb#80499), this library
aims to provide support for general expression evaluation.

`cdceval` library provides the following functionality:
  * Ability to evaluate predicates (filters) so that events may be
    filtered.
  * Ability to evaluate projection expressions (`select *`, `select a,
    b,c`, or even `select a + b - c as math_column`)
  * Ability to evaluate virtual compute columns (currently not
    implemented in this PR).

`cdceval` library reuses existing parsing and evaluation libraries, but
adopts them for CDC use case.  CDC events are row level events, and as
such, CDC expressions only make sense in the context of a single
row/single table.  In addition, because CDC events are at least once
semantics, the emitted events must not depend on volatile state.
In summary, any expression is supported except:
  * Volatile functions -- not supported
  * Stable functions, such as `now()`, `current_timestamp()`, etc are
    modified so that they return stable values -- namely events MVCC
    timestamp.
  * Multi row functions (aggregates, windowing functions) are
    disallowed.

`cdceval` also defined few custom, CDC specific functions, such as:
  * `cdc_prev()`: Returns the previous row values as a JSONB object.
  * `cdc_is_delete()`: Returns true if the row was deleted.
  * Others -- see `functions.go`

The follow PRs will add a "front end" to this library to enable creation
and management of predicated changefeeds.

Release Notes: None
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request May 24, 2022
Introduce `cdceval` package -- a library for expression evaluation
for CDC.

Changefeed users for a long time requested ability to emit only a
subset of columns. They have also requested ability to filter
out unwanted events (for example, filter out deletions).

This library aims to accomplish those goals.  However, instead of
focusing on a narrow use cases, which would usually be addressed via
addition of new `WITH` option (as done in
cockroachdb#80499), this library
aims to provide support for general expression evaluation.

`cdceval` library provides the following functionality:
  * Ability to evaluate predicates (filters) so that events may be
    filtered.
  * Ability to evaluate projection expressions (`select *`, `select a,
    b,c`, or even `select a + b - c as math_column`)
  * Ability to evaluate virtual compute columns (currently not
    implemented in this PR).

`cdceval` library reuses existing parsing and evaluation libraries, but
adopts them for CDC use case.  CDC events are row level events, and as
such, CDC expressions only make sense in the context of a single
row/single table.  In addition, because CDC events are at least once
semantics, the emitted events must not depend on volatile state.
In summary, any expression is supported except:
  * Volatile functions -- not supported
  * Stable functions, such as `now()`, `current_timestamp()`, etc are
    modified so that they return stable values -- namely events MVCC
    timestamp.
  * Multi row functions (aggregates, windowing functions) are
    disallowed.

`cdceval` also defined few custom, CDC specific functions, such as:
  * `cdc_prev()`: Returns the previous row values as a JSONB object.
  * `cdc_is_delete()`: Returns true if the row was deleted.
  * Others -- see `functions.go`

The follow PRs will add a "front end" to this library to enable creation
and management of predicated changefeeds.

Release Notes: None
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request May 25, 2022
Introduce `cdceval` package -- a library for expression evaluation
for CDC.

Changefeed users for a long time requested ability to emit only a
subset of columns. They have also requested ability to filter
out unwanted events (for example, filter out deletions).

This library aims to accomplish those goals.  However, instead of
focusing on a narrow use cases, which would usually be addressed via
addition of new `WITH` option (as done in
cockroachdb#80499), this library
aims to provide support for general expression evaluation.

`cdceval` library provides the following functionality:
  * Ability to evaluate predicates (filters) so that events may be
    filtered.
  * Ability to evaluate projection expressions (`select *`, `select a,
    b,c`, or even `select a + b - c as math_column`)
  * Ability to evaluate virtual compute columns (currently not
    implemented in this PR).

`cdceval` library reuses existing parsing and evaluation libraries, but
adopts them for CDC use case.  CDC events are row level events, and as
such, CDC expressions only make sense in the context of a single
row/single table.  In addition, because CDC events are at least once
semantics, the emitted events must not depend on volatile state.
In summary, any expression is supported except:
  * Volatile functions -- not supported
  * Stable functions, such as `now()`, `current_timestamp()`, etc are
    modified so that they return stable values -- namely events MVCC
    timestamp.
  * Multi row functions (aggregates, windowing functions) are
    disallowed.

`cdceval` also defined few custom, CDC specific functions, such as:
  * `cdc_prev()`: Returns the previous row values as a JSONB object.
  * `cdc_is_delete()`: Returns true if the row was deleted.
  * Others -- see `functions.go`

The follow PRs will add a "front end" to this library to enable creation
and management of predicated changefeeds.

Release Notes: None
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request May 26, 2022
Introduce `cdceval` package -- a library for expression evaluation
for CDC.

Changefeed users for a long time requested ability to emit only a
subset of columns. They have also requested ability to filter
out unwanted events (for example, filter out deletions).

This library aims to accomplish those goals.  However, instead of
focusing on a narrow use cases, which would usually be addressed via
addition of new `WITH` option (as done in
cockroachdb#80499), this library
aims to provide support for general expression evaluation.

`cdceval` library provides the following functionality:
  * Ability to evaluate predicates (filters) so that events may be
    filtered.
  * Ability to evaluate projection expressions (`select *`, `select a,
    b,c`, or even `select a + b - c as math_column`)
  * Ability to evaluate virtual compute columns (currently not
    implemented in this PR).

`cdceval` library reuses existing parsing and evaluation libraries, but
adopts them for CDC use case.  CDC events are row level events, and as
such, CDC expressions only make sense in the context of a single
row/single table.  In addition, because CDC events are at least once
semantics, the emitted events must not depend on volatile state.
In summary, any expression is supported except:
  * Volatile functions -- not supported
  * Stable functions, such as `now()`, `current_timestamp()`, etc are
    modified so that they return stable values -- namely events MVCC
    timestamp.
  * Multi row functions (aggregates, windowing functions) are
    disallowed.

`cdceval` also defined few custom, CDC specific functions, such as:
  * `cdc_prev()`: Returns the previous row values as a JSONB object.
  * `cdc_is_delete()`: Returns true if the row was deleted.
  * Others -- see `functions.go`

The follow PRs will add a "front end" to this library to enable creation
and management of predicated changefeeds.

Release Notes: None
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request May 27, 2022
Introduce `cdceval` package -- a library for expression evaluation
for CDC.

Changefeed users for a long time requested ability to emit only a
subset of columns. They have also requested ability to filter
out unwanted events (for example, filter out deletions).

This library aims to accomplish those goals.  However, instead of
focusing on a narrow use cases, which would usually be addressed via
addition of new `WITH` option (as done in
cockroachdb#80499), this library
aims to provide support for general expression evaluation.

`cdceval` library provides the following functionality:
  * Ability to evaluate predicates (filters) so that events may be
    filtered.
  * Ability to evaluate projection expressions (`select *`, `select a,
    b,c`, or even `select a + b - c as math_column`)
  * Ability to evaluate virtual compute columns (currently not
    implemented in this PR).

`cdceval` library reuses existing parsing and evaluation libraries, but
adopts them for CDC use case.  CDC events are row level events, and as
such, CDC expressions only make sense in the context of a single
row/single table.  In addition, because CDC events are at least once
semantics, the emitted events must not depend on volatile state.
In summary, any expression is supported except:
  * Volatile functions -- not supported
  * Stable functions, such as `now()`, `current_timestamp()`, etc are
    modified so that they return stable values -- namely events MVCC
    timestamp.
  * Multi row functions (aggregates, windowing functions) are
    disallowed.

`cdceval` also defined few custom, CDC specific functions, such as:
  * `cdc_prev()`: Returns the previous row values as a JSONB object.
  * `cdc_is_delete()`: Returns true if the row was deleted.
  * Others -- see `functions.go`

The follow PRs will add a "front end" to this library to enable creation
and management of predicated changefeeds.

Release Notes: None
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request May 29, 2022
Introduce `cdceval` package -- a library for expression evaluation
for CDC.

Changefeed users for a long time requested ability to emit only a
subset of columns. They have also requested ability to filter
out unwanted events (for example, filter out deletions).

This library aims to accomplish those goals.  However, instead of
focusing on a narrow use cases, which would usually be addressed via
addition of new `WITH` option (as done in
cockroachdb#80499), this library
aims to provide support for general expression evaluation.

`cdceval` library provides the following functionality:
  * Ability to evaluate predicates (filters) so that events may be
    filtered.
  * Ability to evaluate projection expressions (`select *`, `select a,
    b,c`, or even `select a + b - c as math_column`)
  * Ability to evaluate virtual compute columns (currently not
    implemented in this PR).

`cdceval` library reuses existing parsing and evaluation libraries, but
adopts them for CDC use case.  CDC events are row level events, and as
such, CDC expressions only make sense in the context of a single
row/single table.  In addition, because CDC events are at least once
semantics, the emitted events must not depend on volatile state.
In summary, any expression is supported except:
  * Volatile functions -- not supported
  * Stable functions, such as `now()`, `current_timestamp()`, etc are
    modified so that they return stable values -- namely events MVCC
    timestamp.
  * Multi row functions (aggregates, windowing functions) are
    disallowed.

`cdceval` also defined few custom, CDC specific functions, such as:
  * `cdc_prev()`: Returns the previous row values as a JSONB object.
  * `cdc_is_delete()`: Returns true if the row was deleted.
  * Others -- see `functions.go`

The follow PRs will add a "front end" to this library to enable creation
and management of predicated changefeeds.

Release Notes: None
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request May 30, 2022
Introduce `cdceval` package -- a library for expression evaluation
for CDC.

Changefeed users for a long time requested ability to emit only a
subset of columns. They have also requested ability to filter
out unwanted events (for example, filter out deletions).

This library aims to accomplish those goals.  However, instead of
focusing on a narrow use cases, which would usually be addressed via
addition of new `WITH` option (as done in
cockroachdb#80499), this library
aims to provide support for general expression evaluation.

`cdceval` library provides the following functionality:
  * Ability to evaluate predicates (filters) so that events may be
    filtered.
  * Ability to evaluate projection expressions (`select *`, `select a,
    b,c`, or even `select a + b - c as math_column`)
  * Ability to evaluate virtual compute columns (currently not
    implemented in this PR).

`cdceval` library reuses existing parsing and evaluation libraries, but
adopts them for CDC use case.  CDC events are row level events, and as
such, CDC expressions only make sense in the context of a single
row/single table.  In addition, because CDC events are at least once
semantics, the emitted events must not depend on volatile state.
In summary, any expression is supported except:
  * Volatile functions -- not supported
  * Stable functions, such as `now()`, `current_timestamp()`, etc are
    modified so that they return stable values -- namely events MVCC
    timestamp.
  * Multi row functions (aggregates, windowing functions) are
    disallowed.

`cdceval` also defined few custom, CDC specific functions, such as:
  * `cdc_prev()`: Returns the previous row values as a JSONB object.
  * `cdc_is_delete()`: Returns true if the row was deleted.
  * Others -- see `functions.go`

The follow PRs will add a "front end" to this library to enable creation
and management of predicated changefeeds.

Release Notes: None
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request May 31, 2022
Introduce `cdceval` package -- a library for expression evaluation
for CDC.

Changefeed users for a long time requested ability to emit only a
subset of columns. They have also requested ability to filter
out unwanted events (for example, filter out deletions).

This library aims to accomplish those goals.  However, instead of
focusing on a narrow use cases, which would usually be addressed via
addition of new `WITH` option (as done in
cockroachdb#80499), this library
aims to provide support for general expression evaluation.

`cdceval` library provides the following functionality:
  * Ability to evaluate predicates (filters) so that events may be
    filtered.
  * Ability to evaluate projection expressions (`select *`, `select a,
    b,c`, or even `select a + b - c as math_column`)
  * Ability to evaluate virtual compute columns (currently not
    implemented in this PR).

`cdceval` library reuses existing parsing and evaluation libraries, but
adopts them for CDC use case.  CDC events are row level events, and as
such, CDC expressions only make sense in the context of a single
row/single table.  In addition, because CDC events are at least once
semantics, the emitted events must not depend on volatile state.
In summary, any expression is supported except:
  * Volatile functions -- not supported
  * Stable functions, such as `now()`, `current_timestamp()`, etc are
    modified so that they return stable values -- namely events MVCC
    timestamp.
  * Multi row functions (aggregates, windowing functions) are
    disallowed.

`cdceval` also defined few custom, CDC specific functions, such as:
  * `cdc_prev()`: Returns the previous row values as a JSONB object.
  * `cdc_is_delete()`: Returns true if the row was deleted.
  * Others -- see `functions.go`

The follow PRs will add a "front end" to this library to enable creation
and management of predicated changefeeds.

Release Notes: None
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request May 31, 2022
Introduce `cdceval` package -- a library for expression evaluation
for CDC.

Changefeed users for a long time requested ability to emit only a
subset of columns. They have also requested ability to filter
out unwanted events (for example, filter out deletions).

This library aims to accomplish those goals.  However, instead of
focusing on a narrow use cases, which would usually be addressed via
addition of new `WITH` option (as done in
cockroachdb#80499), this library
aims to provide support for general expression evaluation.

`cdceval` library provides the following functionality:
  * Ability to evaluate predicates (filters) so that events may be
    filtered.
  * Ability to evaluate projection expressions (`select *`, `select a,
    b,c`, or even `select a + b - c as math_column`)
  * Ability to evaluate virtual compute columns (currently not
    implemented in this PR).

`cdceval` library reuses existing parsing and evaluation libraries, but
adopts them for CDC use case.  CDC events are row level events, and as
such, CDC expressions only make sense in the context of a single
row/single table.  In addition, because CDC events are at least once
semantics, the emitted events must not depend on volatile state.
In summary, any expression is supported except:
  * Volatile functions -- not supported
  * Stable functions, such as `now()`, `current_timestamp()`, etc are
    modified so that they return stable values -- namely events MVCC
    timestamp.
  * Multi row functions (aggregates, windowing functions) are
    disallowed.

`cdceval` also defined few custom, CDC specific functions, such as:
  * `cdc_prev()`: Returns the previous row values as a JSONB object.
  * `cdc_is_delete()`: Returns true if the row was deleted.
  * Others -- see `functions.go`

The follow PRs will add a "front end" to this library to enable creation
and management of predicated changefeeds.

Release Notes: None
craig bot pushed a commit that referenced this pull request May 31, 2022
81676: changefeedccl: Predicates and projections in CDC. r=miretskiy a=miretskiy

Introduce `cdceval` package -- a library for expression evaluation
for CDC.

Changefeed users for a long time requested ability to emit only a
subset of columns. They have also requested ability to filter
out unwanted events (for example, filter out deletions).

This library aims to accomplish those goals.  However, instead of
focusing on a narrow use cases, which would usually be addressed via
addition of new `WITH` option (as done in
#80499), this library
aims to provide support for general expression evaluation.

`cdceval` library provides the following functionality:
  * Ability to evaluate predicates (filters) so that events may be
    filtered.
  * Ability to evaluate projection expressions (`select *`, `select a,
    b,c`, or even `select a + b - c as math_column`)
  * Ability to evaluate virtual compute columns (currently not
    implemented in this PR).

`cdceval` library reuses existing parsing and evaluation libraries, but
adopts them for CDC use case.  CDC events are row level events, and as
such, CDC expressions only make sense in the context of a single
row/single table.  In addition, because CDC events are at least once
semantics, the emitted events must not depend on volatile state.
In summary, any expression is supported except:
  * Volatile functions -- not supported
  * Stable functions, such as `now()`, `current_timestamp()`, etc are
    modified so that they return stable values -- namely events MVCC
    timestamp.
  * Multi row functions (aggregates, windowing functions) are
    disallowed.

`cdceval` also defined few custom, CDC specific functions, such as:
  * `cdc_prev()`: Returns the previous row values as a JSONB object.
  * `cdc_is_delete()`: Returns true if the row was deleted.
  * Others -- see `functions.go`

The follow PRs will add a "front end" to this library to enable creation
and management of predicated changefeeds.

Release Notes: None

82146: util/quantile: import quantile library r=matthewtodd a=matthewtodd

For upcoming outliers work in #79451, we'll re-use the biased streaming
quantiles implementation underlying the prometheus client library's
[summary][1] type.

But in order to monitor and possibly constrain our memory usage, we'll
need a way to measure the size of each `quantile.Stream`. That
functionality is not available upstream, and contributions are
explicitly [not being accepted][2] (and the [original upstream][3], from
bmizerany, lacks further functionality and is similarly inactive), so we
vendor the library here, unmodified from its [v1.0.1][4], in advance of
adding the methods we need.

[1]: https://prometheus.io/docs/practices/histograms/
[2]: beorn7/perks#5 (comment)
[3]: https://github.com/bmizerany/perks
[4]: https://github.com/beorn7/perks/tree/v1.0.1

Release note: None

Co-authored-by: Yevgeniy Miretskiy <yevgeniy@cockroachlabs.com>
Co-authored-by: Matthew Todd <todd@cockroachlabs.com>
craig bot pushed a commit that referenced this pull request Jun 26, 2022
82562: changefeeccl: Projections and Filters in CDC. r=miretskiy a=miretskiy

Add a variant of CHANGEFEED statement that allows specification
of predicates and projections.

```
CREATE CHANGEFEED [INTO 'sink'] [WITH opt=val, ...]
AS SELECT .... FROM t WHERE ...
```

This changefeed variant can target at most 1 table (and 1 column
family) at a time. The expressions used as the projections and
filters can be almost any supported expression with some restrictions:
  * Volatile functions not allowed.
  * Sub-selects not allowed.
  * Aggregate and window functions (i.e. functions operating over many
    rows) not allowed.
  * Some stable functions, notably functions which return MVCC
    timestamp, are overridden to return MVCC timestamp of the event.

In addition, some CDC specific functions are provided:
  * cdc_is_delete: returns true if the event is a deletion event.
  * cdc_prev: returns JSON representation of the previous row state.
  * cdc_updated_timestamp: returns event update timestamp (usually MVCC
    timestamp, but can be different if e.g. undergoing schema changes)
Additional CDC specific functions will be added in the follow on PRs.

Few examples:

* Emit all but the deletion events:
```
CREATE CHANGEFEED INTO 'kafka://'
AS SELECT * FROM table
WHERE NOT cdc_is_delete()
```

* Emit all events that modified `important_col` column:
```
CREATE CHANGEFEED INTO 'kafka://' WITH diff
AS SELECT *, cdc_prev() AS previous
FROM important_table
WHERE important_col != cdc_prev()->'important_col'
```

* Emit few colums, as well as computed expresions:
```
CREATE CHANGEFEED INTO 'kafka://' WITH diff
AS SELECT warehouseID, (totalItems - orderedItems) as itemsAvailable
FROM warehouse
WHERE region='US/east';
```

When filter expression is specified, changefeed will now consult
optimizer so that the set of spans scanned by changefeed can be
restricted based on the predicate.

For example, given the following table and a changefeed:
```
CREATE TABLE warehouse (
  region STRING,
  warehouseID int,
  ....
  PRIMARY KEY (region, warehouseID)
);

CREATE CHANGEFEED INTO 'kafka://' WITH diff
AS SELECT *
FROM warehouse
WHERE region='US/east';
```

The create changefeed will only scan table spans that contain `US/east`
region (and ignore all other table spans).

---

For foundational work, see:

- #81676
- #81249
- #80499

Addresses:
- #56949
- #31214


---

Release Notes (enterprise):
CHANGEFEED statement now supports general expressions -- predicates and projections.
Projections allow customers to emit only the data that they care about,
including computed columns, while predicates (i.e. filters) allow them
to restrict the data that's emitted only to those events that match the
filter.

```
CREATE CHANGEFEED INTO 'kafka://' AS SELECT * FROM t WHERE NOT cdc_is_delete()
```


Co-authored-by: Yevgeniy Miretskiy <yevgeniy@cockroachlabs.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants