Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: use new parquet library #103293

Merged
merged 2 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ go_library(
"//pkg/sql/execinfrapb",
"//pkg/sql/exprutil",
"//pkg/sql/flowinfra",
"//pkg/sql/importer",
"//pkg/sql/isql",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
Expand Down Expand Up @@ -154,9 +153,6 @@ go_library(
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
"@com_github_fraugster_parquet_go//:parquet-go",
"@com_github_fraugster_parquet_go//parquet",
"@com_github_fraugster_parquet_go//parquetschema",
"@com_github_gogo_protobuf//jsonpb",
"@com_github_gogo_protobuf//types",
"@com_github_google_btree//:btree",
Expand Down Expand Up @@ -317,7 +313,6 @@ go_test(
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_fraugster_parquet_go//:parquet-go",
"@com_github_gogo_protobuf//types",
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_lib_pq//:pq",
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7668,7 +7668,6 @@ func TestChangefeedPredicateWithSchemaChange(t *testing.T) {
defer log.Scope(t).Close(t)

skip.UnderRace(t, "takes too long under race")
defer TestingSetIncludeParquetMetadata()()

setupSQL := []string{
`CREATE TYPE status AS ENUM ('open', 'closed', 'inactive')`,
Expand Down
43 changes: 26 additions & 17 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,18 +423,31 @@ var NoLongerExperimental = map[string]string{
DeprecatedSinkSchemeCloudStorageS3: SinkSchemeCloudStorageS3,
}

// OptionsSet is a test of changefeed option strings.
type OptionsSet map[string]struct{}

// InitialScanOnlyUnsupportedOptions is options that are not supported with the
// initial scan only option
var InitialScanOnlyUnsupportedOptions = makeStringSet(OptEndTime, OptResolvedTimestamps, OptDiff,
var InitialScanOnlyUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptResolvedTimestamps, OptDiff,
OptMVCCTimestamps, OptUpdatedTimestamps)

// ParquetFormatUnsupportedOptions is options that are not supported with the
// parquet format.
//
// OptKeyInValue is disallowed because parquet files have no concept of key
// columns, so there is no reason to emit duplicate key datums.
//
// TODO(#103129): add support for some of these
var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptResolvedTimestamps, OptDiff,
OptMVCCTimestamps, OptUpdatedTimestamps, OptKeyInValue)

// AlterChangefeedUnsupportedOptions are changefeed options that we do not allow
// users to alter.
// TODO(sherman): At the moment we disallow altering both the initial_scan_only
// and the end_time option. However, there are instances in which it should be
// allowed to alter either of these options. We need to support the alteration
// of these fields.
var AlterChangefeedUnsupportedOptions = makeStringSet(OptCursor, OptInitialScan,
var AlterChangefeedUnsupportedOptions OptionsSet = makeStringSet(OptCursor, OptInitialScan,
OptNoInitialScan, OptInitialScanOnly, OptEndTime)

// AlterChangefeedOptionExpectValues is used to parse alter changefeed options
Expand Down Expand Up @@ -1039,16 +1052,21 @@ func (s StatementOptions) ValidateForCreateChangefeed(isPredicateChangefeed bool
if err != nil {
return err
}
validateInitialScanUnsupportedOptions := func(errMsg string) error {
for o := range InitialScanOnlyUnsupportedOptions {

// validateUnsupportedOptions returns an error if any of the supplied are
// in the statement options. The error string should be the string
// representation of the option (ex. "key_in_value", or "initial_scan='only'").
validateUnsupportedOptions := func(unsupportedOptions OptionsSet, errorStr string) error {
for o := range unsupportedOptions {
if _, ok := s.m[o]; ok {
return errors.Newf(`cannot specify both %s='only' and %s`, OptInitialScan, o)
return errors.Newf(`cannot specify both %s and %s`, errorStr, o)
}
}
return nil
}
if scanType == OnlyInitialScan {
if err := validateInitialScanUnsupportedOptions(fmt.Sprintf("%s='only'", OptInitialScan)); err != nil {
if err := validateUnsupportedOptions(InitialScanOnlyUnsupportedOptions,
fmt.Sprintf("%s='only'", OptInitialScan)); err != nil {
return err
}
} else {
Expand All @@ -1058,17 +1076,8 @@ func (s StatementOptions) ValidateForCreateChangefeed(isPredicateChangefeed bool
}
// Right now parquet does not support any of these options
if s.m[OptFormat] == string(OptFormatParquet) {
if isPredicateChangefeed {
// Diff option is allowed when using predicate changefeeds with parquet format.
for o := range InitialScanOnlyUnsupportedOptions {
if _, ok := s.m[o]; ok && o != OptDiff {
return errors.Newf(`cannot specify both format='%s' and %s`, OptFormatParquet, o)
}
}
} else {
if err := validateInitialScanUnsupportedOptions(string(OptFormatParquet)); err != nil {
return err
}
if err := validateUnsupportedOptions(ParquetFormatUnsupportedOptions, fmt.Sprintf("format=%s", OptFormatParquet)); err != nil {
return err
}
}
for o := range s.m {
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func TestOptionsValidations(t *testing.T) {
{map[string]string{"initial_scan_only": "", "resolved": ""}, true, "cannot specify both initial_scan='only'"},
{map[string]string{"initial_scan_only": "", "resolved": ""}, true, "cannot specify both initial_scan='only'"},
{map[string]string{"key_column": "b"}, false, "requires the unordered option"},
{map[string]string{"diff": "", "format": "parquet"}, true, ""},
}

for _, test := range tests {
Expand Down
16 changes: 7 additions & 9 deletions pkg/ccl/changefeedccl/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1068,8 +1068,6 @@ func TestParquetEncoder(t *testing.T) {
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
defer TestingSetIncludeParquetMetadata()()

tests := []struct {
name string
changefeedStmt string
Expand All @@ -1089,22 +1087,22 @@ func TestParquetEncoder(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY, x STRING, y INT, z FLOAT NOT NULL, a BOOL)`)
sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY, x STRING, y INT, z FLOAT NOT NULL, a BOOL, c INT[])`)
defer sqlDB.Exec(t, `DROP TABLE FOO`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'Alice', 3, 0.5032135844230652, true), (2, 'Bob',
2, CAST('nan' AS FLOAT),false),(3, NULL, NULL, 4.5, NULL)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'Alice', 3, 0.5032135844230652, true, ARRAY[]), (2, 'Bob',
2, CAST('nan' AS FLOAT),false, NULL),(3, NULL, NULL, 4.5, NULL, ARRAY[1,NULL,3])`)
foo := feed(t, f, test.changefeedStmt)
defer closeFeed(t, foo)

assertPayloads(t, foo, []string{
`foo: [1]->{"after": {"a": true, "i": 1, "x": "Alice", "y": 3, "z": 0.5032135844230652}}`,
`foo: [2]->{"after": {"a": false, "i": 2, "x": "Bob", "y": 2, "z": "NaN"}}`,
`foo: [3]->{"after": {"a": null, "i": 3, "x": null, "y": null, "z": 4.5}}`,
`foo: [1]->{"after": {"a": true, "c": [], "i": 1, "x": "Alice", "y": 3, "z": 0.5032135844230652}}`,
`foo: [2]->{"after": {"a": false, "c": null, "i": 2, "x": "Bob", "y": 2, "z": "NaN"}}`,
`foo: [3]->{"after": {"a": null, "c": [1, null, 3], "i": 3, "x": null, "y": null, "z": 4.5}}`,
})

sqlDB.Exec(t, `UPDATE foo SET x='wonderland' where i=1`)
assertPayloads(t, foo, []string{
`foo: [1]->{"after": {"a": true, "i": 1, "x": "wonderland", "y": 3, "z": 0.5032135844230652}}`,
`foo: [1]->{"after": {"a": true, "c": [], "i": 1, "x": "wonderland", "y": 3, "z": 0.5032135844230652}}`,
})

sqlDB.Exec(t, `DELETE from foo where i=1`)
Expand Down
18 changes: 8 additions & 10 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,6 @@ func makeFeedFactoryWithOptions(
userDB, cleanup := getInitialDBForEnterpriseFactory(t, s, db, options)
f.(*cloudFeedFactory).configureUserDB(userDB)
return f, func() {
TestingSetIncludeParquetMetadata()()
cleanup()
}
case "enterprise":
Expand Down Expand Up @@ -1081,12 +1080,20 @@ func cdcTestNamedWithSystem(
testLabel = fmt.Sprintf("%s/%s", sinkType, name)
}
t.Run(testLabel, func(t *testing.T) {
// Even if the parquet format is not being used, enable metadata
// in all tests for simplicity.
testServer, cleanupServer := makeServerWithOptions(t, options)
knobs := testServer.TestingKnobs.
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
knobs.EnableParquetMetadata = true

feedFactory, cleanupSink := makeFeedFactoryWithOptions(t, sinkType, testServer.Server, testServer.DB, options)
feedFactory = maybeUseExternalConnection(feedFactory, testServer.DB, sinkType, options, t)
defer cleanupServer()
defer cleanupSink()
defer cleanupCloudStorage()

testFn(t, testServer, feedFactory)
})
}
Expand Down Expand Up @@ -1277,15 +1284,6 @@ func waitForJobStatus(
})
}

// TestingSetIncludeParquetMetadata adds the option to turn on adding metadata
// to the parquet file which is used in testing.
func TestingSetIncludeParquetMetadata() func() {
includeParquetTestMetadata = true
return func() {
includeParquetTestMetadata = false
}
}

// ChangefeedJobPermissionsTestSetup creates entities and users with various permissions
// for tests which test access control for changefeed jobs.
//
Expand Down
69 changes: 58 additions & 11 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package changefeedccl

import (
"io"
"strings"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand All @@ -22,13 +23,13 @@ type parquetWriter struct {
datumAlloc []tree.Datum
}

// newParquetWriterFromRow constructs a new parquet writer which outputs to
// the given sink. This function interprets the schema from the supplied row.
func newParquetWriterFromRow(
row cdcevent.Row, sink io.Writer, opts ...parquet.Option,
) (*parquetWriter, error) {
columnNames := make([]string, len(row.ResultColumns())+1)
columnTypes := make([]*types.T, len(row.ResultColumns())+1)
// newParquetSchemaDefintion returns a parquet schema definition based on the
// cdcevent.Row and the number of cols in the schema.
func newParquetSchemaDefintion(row cdcevent.Row) (*parquet.SchemaDefinition, int, error) {
numCols := len(row.ResultColumns()) + 1

columnNames := make([]string, numCols)
columnTypes := make([]*types.T, numCols)

idx := 0
if err := row.ForEachColumn().Col(func(col cdcevent.ResultColumn) error {
Expand All @@ -37,27 +38,49 @@ func newParquetWriterFromRow(
idx += 1
return nil
}); err != nil {
return nil, err
return nil, 0, err
}

columnNames[idx] = parquetCrdbEventTypeColName
columnTypes[idx] = types.String

schemaDef, err := parquet.NewSchema(columnNames, columnTypes)
if err != nil {
return nil, 0, err
}
return schemaDef, numCols, nil
}

// newParquetWriterFromRow constructs a new parquet writer which outputs to
// the given sink. This function interprets the schema from the supplied row.
func newParquetWriterFromRow(
row cdcevent.Row,
sink io.Writer,
knobs *TestingKnobs, /* may be nil */
opts ...parquet.Option,
) (*parquetWriter, error) {
schemaDef, numCols, err := newParquetSchemaDefintion(row)
if err != nil {
return nil, err
}

writerConstructor := parquet.NewWriter
if includeParquetTestMetadata {

if knobs.EnableParquetMetadata {
if opts, err = addParquetTestMetadata(row, opts); err != nil {
return nil, err
}

// To use parquet test utils for reading datums, the writer needs to be
// configured with additional metadata.
writerConstructor = parquet.NewWriterWithReaderMeta
}

writer, err := writerConstructor(schemaDef, sink, opts...)
if err != nil {
return nil, err
}
return &parquetWriter{inner: writer, datumAlloc: make([]tree.Datum, len(columnNames))}, nil
return &parquetWriter{inner: writer, datumAlloc: make([]tree.Datum, numCols)}, nil
}

// addData writes the updatedRow, adding the row's event type. There is no guarantee
Expand All @@ -70,7 +93,7 @@ func (w *parquetWriter) addData(updatedRow cdcevent.Row, prevRow cdcevent.Row) e
return w.inner.AddRow(w.datumAlloc)
}

// Close closes the writer and flushes any buffered data to the sink.
// close closes the writer and flushes any buffered data to the sink.
func (w *parquetWriter) close() error {
return w.inner.Close()
}
Expand All @@ -88,3 +111,27 @@ func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []
datums = append(datums, getEventTypeDatum(updatedRow, prevRow).DString())
return nil
}

// addParquetTestMetadata appends options to the provided options to configure the
// parquet writer to write metadata required by cdc test feed factories.
func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet.Option, error) {
keyCols := make([]string, 0)
if err := row.ForEachKeyColumn().Col(func(col cdcevent.ResultColumn) error {
keyCols = append(keyCols, col.Name)
return nil
}); err != nil {
return opts, err
}
opts = append(opts, parquet.WithMetadata(map[string]string{"keyCols": strings.Join(keyCols, ",")}))

allCols := make([]string, 0)
if err := row.ForEachColumn().Col(func(col cdcevent.ResultColumn) error {
allCols = append(allCols, col.Name)
return nil
}); err != nil {
return opts, err
}
allCols = append(allCols, parquetCrdbEventTypeColName)
opts = append(opts, parquet.WithMetadata(map[string]string{"allCols": strings.Join(allCols, ",")}))
return opts, nil
}
Loading